BI 项目笔记
智能BI笔记
使用后端万用模板时的注意事项
- 更改pom文件的项目名name,包名
com.hjj 、工件名lingxibi 和Spring Boot版本(包名要全局更改) - 检查和更改项目的JDK版本
- 更改swagger接口文档的包扫描范围
- 更改yml文件的端口,项目名spring.application.name和数据库的信息,如url、username、password
- 更改Spring Boot启动类@MapperScan的包扫描路径
- 修改banner.txt文件
项目后端初始化注意事项
- 使用MyBatisX插件生成代码时,将生成的代码转移到对应的包中(原来包中已存在UserService、User等类根据需求去取舍user表中的字段和User类的变量)
- 将自增ID改为IdType.AUTO
1 | @TableId(value = "id", type = IdType.AUTO) |
- 后端启动项目端口冲突问题原因:Windows Hyper-V虚拟化平台占用了端口先使用:netsh interface ipv4 show excludedportrange protocol:=tcp查看被占用的端口,然后选择一个没被占用的端口启动项目禁用hyper -v
后端出现的问题
- 单元测试显示AIManager空指针异常,在Test测试类上加上@SpringBootTest注解,保证能够扫描到全部的包,使得能够加载SpringBoot中的Bean
- 因为运算符短路原则而出现的问题queryWrapper.eq(id > 0 && id != null, “id”, id);会报错,而queryWrapper.eq( id != null && id > 0, “id”, id);并不会报错
1 | 在你的代码中,有一段条件判断语句: |
AI调教和使用
AI接口普遍都有输入字数限制,尽可能压缩数据,能够允许多传一些数据
如何向 AI 提词(prompt)?
AI提词技巧1
持续输入,持续优化
第一次问:
我给你三行数据,请你帮我分析网站的增长趋势,数据如下:
第一行:日期:1号,用户数:10人第二行:日期:2号,用户数:20人第三行:日期:3号,用户数:30人
AI提词技巧2
数据压缩(内容压缩,比如把很长的内容提取关键词,也可以让AI来做)
我给你三行数据,请你帮我分析网站的增长趋势。数据如下:
表头:日期,用户数1号,102号,203号,30
使用csv对excel文件的数据进行提取和压缩
easyexcel:https://easyexcel.opensource.alibaba.com/docs/current/quickstart/read(开源)
如何调用AI
输入:
系统预设(提前告诉他职责、功能、回复格式要求) + 分析目标 + 压缩后的数据(预设很重要)
最简单的系统预设:假如你是一个数据分析师,接下来我会给你我的分析目标和原始数据,请告诉我分析结论。
AI提问技巧
如果想要 AI 更好地理解我们的输入,给我们预期的、精确格式的输出,我们就需要严格控制咱们得提问词。
- 系统预设:你是一个数据分析刊师和前端开发专家,接下来我会按照以下固定格式给你提供内容:分析需求:{数据分析的需求和目标}原始数据:{csv格式的原始数据,用,作为分隔符}请根据以上内容,帮我生成数据分析结论和可视化图表代码分析需求:分析网站用户的增长情况原始数据:日期,用户数1号,102号,203号,30
- 控制输出格式(便于AI返回的内容能够更加方便地为我们所用)
3种调用AI的方式
- 直接调用OpenAI 或者其他 AI 原始大模型官网的接口https://platform.openai.com/docs/api-reference1)在请求头中指定OPENAI API KEYAuthorization:Bearer OPENAI API KEY2)找到你要使用的接口,比如Al对话接口:https:/platform.openai.com/docs/api-reference/,chat/create3)按照接口文档的示例,构造HTTP请求
- 使用云服务商提供的,封装后的AI接口Azure云优点:本地都能用缺点:依然要钱,可能更贵
- 利用鱼聪明 AI 提供的开放SDK
怎么生成图表
AI无法生成现成的图表,但是AI可以生成代码 ==> 可以把代码利用前端组件库(Echarts)在网页进行展示
你是一个数据分析刊师和前端开发专家,接下来我会按照以下固定格式给你提供内容:分析需求:{数据分析的需求和目标}原始数据:{csv格式的原始数据,用,作为分隔符}请根据这两部分内容,按照以下格式生成内容(此外不要输出任何多余的开头、结尾、注释)
【【【【【【{前端Echarts V5 的 option 配置对象js代码,合理地将数据进行可视化}【【【【【【{ 明确的数据分析结论、越详细越好,不要生成多余的注释 }
安全性
如果用户上传一个超大的文件怎么办?比如100G?
只要涉及到用户自主上传的操作,一定要校验文件(图像)
- 文件大小
- 文件后缀(但可以将含有非法内容改后缀名)
- 文件的内容(成本较高)
- 文件的合规性(比如敏感内容,建议用第三方的审核功能)
扩展点:接入腾讯云的图片万象数据审核(COS 对象存储的审核功能)
数据存储
现状:我们把每个图表的原始数据全部存放在了同一个数据表(Chart表)的字段里
问题:
- 如果用户上传的原始数量很大、图表数日益增多,查询Chart表就会很慢
- 对于BI平台,用户是可以查看原始数据的、对原始数据进行简单查询的需求。现在如果把所有数据存放在一个字段(列)中,查询时,只能取出这个列的所有内容
解决方案: ==> 分库分表
把每个图表的原始数据(chartData)单独保存为一个新的数据表,而不是都存在一个字段里,这样:
- 存储时,能够分开存储,互不影响,也能增加安全性,(如果把每个用户上传的图表按列作为字段都存在同一个数据库表中,而不是每个图表生成一个对应的数据库表的话,会出现性能问题。因为如果有甲、乙两个人都在查数据,但是甲查得早而且查得数据量特别大,乙查的迟但查得数据量少,这就会导致乙查询时少数据量时反而耗时很长,影响其他用户查询)
- 查询时,可以使用多中SQL灵活取出需要的字段,查询性能更快
实现
分开存储:
- 存储图表信息时,不把数据存储为某个数据库表单个字段,而是新建一个chart_(图表d)的数据表通过图表id,数据列名,数据类型等字段,生成一下SQL语句,并执行即可
1 | --auto-generated definition-- |
分开查询:
- 以前直接查询图表,取chartData字段;现在改为读取chart_{图表id}的数据表select * from chart_{图标id}
分库分表
水平分表、垂直分库
分表:数据量大时可以拆分,前1w用户1个表,后1w用户1个表。对于BI来说,每个图表信息(chartData)为一个表
分库:根据业务分库,比如用户是同一个库,订单支付信息相关的一个库,一般根据业务来
具体实现:MyBatis的动态SQL(根据代码灵活地生成SQL)
- 想想我们分表建表时,哪些部分需要动态生成,比如数据库表名,字段名
- 在mapper.xml文件中定义SQL语句
1 | <select id="queryChartData" parameterType="String" resultMap="hashmap"> |
或者:
1 | <select id="queryChartData" parameterType="string" resultMap="hashmap"> |
缺点:有SQL注入风险
1 | public interface ChartMapper extends BaseMapper<Chart> { |
测试用例:
1 | @SpringBootTest |
比如:select * from chart_12345 where id=1 or 1=1;
限流
限流的几种算法
推荐文章:https://juejin.cn/post/6967742960540581918
- 固定窗口限流(食堂1小时只允许10个用户拿汉堡且1小时只能做10个,结果59分钟来了10个人,第1小时1分钟又来10个人,那么汉堡就不够)单位时间内允许部分操作1小时只允许10个用户操作优点:最简单缺点:可能出现流量突刺,流量高峰比如:前59分钟没有1个操作,第59分钟来了10操作;第1小时1分钟又来了10个操作。相当于2分钟内执行了20个操作,服务器仍然有高峰危险。
- 滑动窗口限流(每十分钟做一个汉堡,前一个小时汉堡被抢光了,1小时10分钟来了个新用户就能抢到1小时10分钟刚做的汉堡)单位时间内允许部分操作,但是这个单位时间是滑动的,需要指定一个滑动单位滑动单位1min:开始前:0s 1h 2h一分钟后:1min 1h1min 2h1min 优点:能够解决上述流量突刺的问题,因为第59分钟时,限流窗口是59~1小时59分,这个时间段内只能接收10次请求,只要还在这个窗口内,更多的操作就会被拒绝缺点:实现较复杂,限流效果和你的滑动单位有关,滑动单位越小,限流效果越好,但往往很难选取一个特别合适的滑动单位。
- 漏桶限流(推荐)(大家排好队一个一个拿汉堡,前面拿完后面才能拿,大家拿汉堡的速率一样)以固定的速率处理请求(流水),当请求(水)桶满了后,拒绝请求(禁止灌水)。每秒处理10个请求,桶的容量为10,每0.1秒固定处理一次请求,如果1秒内来了10个请求,都可以处理完,但如果1秒内来了11个请求,最后那个请求就会溢出水桶,被拒绝。优点:能够一定程度上应对流量突刺;能够以固定速率处理请求,保证服务器的安全****缺点:没有办法迅速处理一批请求,只能一个一个按顺序来(固定速率的缺点) 使用场景:希望服务器以恒定速率处理请求
- 令牌桶限流(推荐)(快餐店提前做好10个汉堡,开抢时前十个人能够同时得到汉堡,不用排队,但是剩下的10个人只能等待下一批的10个汉堡做好才能拿)管理员先生成一批令牌,每秒生成10个令牌;当用户要操作前,先去拿一个令牌,有令牌的人就有资格同时执行操作,没有令牌就等着优点:能够并发处理同时的请求需要考虑的问题:还是存在时间单位的选取问题使用场景:服务器允许并发操作,希望性能高一些
因为生成图表和结论是调用AI的,但调用AI是要成本的,那怎么防止用户疯狂刷量,防止破产呢?
解决问题:
- 控制成本 ==> 限制用户调用总次数
- 由于服务器同时刻处理的请求为10个,用户在短时间内疯狂使用(比如使用jmeter多次请求接口),导致服务器资源被占满,其他用户无法使用 ==> 限流
思考限流阈值多大合适?比如限制单个用户在每秒只能用几次?
限流的实现
- 本地限流(单机限流)每个服务器单独限流,一般适用于单体项目,即你的项目只有一个服务器
Guava RateLimiter实现令牌桶算法
令牌桶算法是一种常用的限流算法。它通过维护一个固定容量的令牌桶,每个请求需要获取一个令牌才能被处理。当令牌桶中的令牌不足时,新的请求将被限制。
Java 中可以使用Guava库的RateLimiter类实现令牌桶算法。例如:
计数器算法
计数器算法是一种简单的限流方法,通过计数器记录一定时间内的请求次数,当请求次数达到设定的阈值时,后续请求被拒绝。
可以使用 AtomicInteger 或 AtomicLong 进行计数,并结合定时任务或定时器来定时重置计数器。
1 | import com.google.common.util.concurrent.RateLimiter; |
分布式限流(多机限流)如果你的项目部署在多个服务器上,那么需采用分布式限流
把用户的使用频率等数据放到一个集中的存储进行统计,比如Redis,这样无论用户的请求落到了哪台服务器,都以集中的数据存储内的数据为准(Redisson)
在网关集中进行限流和统计(比如Sentinel、Spring Cloud Gateway)
1 | import redis.clients.jedis.Jedis; |
Redisson 限流实现
Redisson内置了一个限流工具类,可以帮助你利用Redis来存储和统计。
RedisLimiterManager:什么是Manager?专门提供 RedisLimiterManager 限流基础服务的(提供了通用的能力)
1 | @Configuration |
在需要限流的地方直接调用doRateLimit()方法就好了
限流粒度
- 针对某个方法限流,即最多允许同时XX个操作使用这个方法
- 针对某个用户限流,比如单个用户最多执行XX次操作
- 针对某个用户x方法限流,比如当个用户单位时间内最多执行x次这个方法
第五期计划
- 系统问题分析
- 异步化的思路
- 线程池的理论和实战
- 前端后端的异步化改造
系统问题分
问题场景:调用的服务处理能力有限,或者接口的处理(或返回)时长较长时,就应该考虑异步化了。
- 用户等待时间有点儿长(因为要等AI生成)
- 业务服务器可能会有很多请求在处理,导致系统资源紧张,严重时导致服务器宕机或者无法处理新的请求
- 调用的第三方服务(A!能力)的处理能力是有限的,比如每3秒只能处理1个请求,会导致A!处理不过来,严重时A!可能会对咱们的后台系统拒绝
异步化
new Thread?
同步:一件事情做完,在做另一件事情。(烧水后处理工作)
异步:不用等一件事做完,就可以做另一件事情。等第一件事完成后,可以收到一个通知,通知你这件事做好了,你可以再进行后续的处理(烧水的时候,水壶有一个蜂鸣器;烧水时人可以同时处理工作。水烧好后,人能听到声音,就知道水烧好了)
业务流程分析
标准异步化的业务流程
- 当用户要进行耗时很长的操作时,点击提交后,不需要在界面傻等,而是应该把这个任务保存到数据库中记录下来
- 用户要执行新任务时:a. 任务提交成功:
- 如果程序有多余的空闲线程,可以立刻去做这个任务
- 如果程序线程都在繁忙,无法继续处理,那就放在等待队列中
- b. 任务提交失败:比如我们的程序所有线程都在忙,任务队列满了
- 拒绝这个任务,再也不执行
- 通过保存到数据库中的记录来看到提交失败的任务,并且在程序空闲的时候,可以将任务从数据库中捞到程序里,再去执行
- 我们的程序(线程)从任务队列中取出任务依次执行,每完成一件事要修改一下任务的状态
- 用户可以查询任务的执行状态,或者在任务执行成功或者失败时能得到通知(发邮件、系统消息提示框、短信)
- 如果我们要执行的任务非常复杂,包含很多环节,在每一个小任务完成时,要在程序(数据库中)记录一下任务的执行状态(进度)
标准异步化的业务流程
- 用户点击智能分析页的提交按钮时,先把图表立刻保存到数据库中(作为一个任务)
- 用户可以在图表列表页面查看所有图表(已生成的、生成中、生成失败的)的信息和状态
- 用户可以修改生成失败的图表信息,点击重新生成
问题:
- 任务队列的最大容量应该设置为多少?
- 程序怎么从任务队列中取出任务并执行?这个任务队列的流程怎么实现?
线程池
为什么需要线程池?
- 线程的管理比较复杂(比如什么时候新增线程、什么时候减少空闲线程)
- 任务存取比较复杂(什么时候接受任务、什么时候拒绝任务,怎么保证大家不抢到同一个任务)
线程池的作用:帮助你轻松管理线程,协调任务的执行过程
线程池的实现
不用自己写,如果是在Spring中,可以用ThreadPoolTaskExecutor配合@Async注解来实现(不太建议)
如果是在Java中,可以使用JUC并发编程包的ThreadPoolExecutor来实现非常灵活的自定义线程池
线程池参数:
1 | public ThreadPoolExecutor(int corePoolSize, |
怎么确定线程池参数呢?结合业务场景和系统资源去测试调整,不断优化
回归到我们的业务,要考虑系统最脆弱的环节(系统的瓶颈)在哪里?现有条件:比如A!生成能力的并发是只允许4个任务同时去执行,AI能力允许20个任务排队。
回归到我们的业务,要考虑系统最脆弱的环节(系统的瓶颈)在哪里?现有条件:比如AI生成能力的并发是只允许4个任务同时去执行,AI能力允许20个任务排队。
具体参数的介绍:
corePoolSize(核心线程数=>正式员工数):正常情况下,我们的系统应该能同时工作的线程数(随时就绪的状态)maximumPoolSize(最大线程数=>哪怕任务再多,你也最多招这些人):极限情况下,我们的线程池最多有多少个线程?keepAliveTime(空闲线程存活时间):非核心线程在没有任务的情况下,过多久要删除(理解为开除临时工),从而释放无用的线程资源。TimeUnit unit(空闲线程存活时间的单位):分钟、秒workQueue(工作队列):用于存放给线程执行的任务,存在一个队列的长度threadFactory(线程工厂):控制每个线程的生成、线程的属性(比如线程名)RejectedExecutionHandler(拒绝策略):任务队列满的时候,我们采取什么措施,比如抛异常、不抛异常、自定义策略
资源隔离策略:比如重要的任务(VIP)一个队列,普通的任务一个队列,保证这个两个队列互不干扰
线程池的工作机制
本项目中如何设置线程池的参数
线程池的参数如何设置?现有条件:比如A!生成能力的并发是只允许4个任务同时去执行,AI能力允许20个任务排队。
corePoolSize(核心线程数=>正式员工数):正常情况下,可以设置为4maximumPoolSize:设置为极限情况,设置为<=4keepAliveTime(空闲线程存活时间):一般设置为秒级或者分钟级TimeUnit unit(空闲线程存活时间的单位):分钟、秒workQueue(工作队列):结合实际请况去设置,可以设置为20threadFactory(线程工厂):控制每个线程的生成、线程的属性(比如线程名)RejectedExecutionHandler(拒绝策略):抛异常,标记数据库的任务状态为”任务满了已拒绝”
一般情况下,任务分为IO密集型和计算密集型两种。计算密集型:吃CPU,比如音视频处理、图像处理、数学计算等,一般是设置corePoolSize为CPU的核数+1。maximumPoolSize一般设置为CPU核心数的2~3倍。(空余线程),可以让每个线程都能利用好CPU的每个核,而且线程之间不用频繁切换(减少打架、减少开销)IO密集型:吃带宽内存/硬盘的读写资源,corePoolSize可以设置大一点,一般经验值是2n左右,但是建议以IO的能力为主。
导入百万数据到数据库是属于IO密集型还是CPU密集型? ==> IO密集型
自定义线程池
1 | @Configuration |
开发
实现工作流程
- 给chart表新增任务状态字段(比如排队中、执行中、已完成、失败),任务执行信息字段(用于记录任务执行中、或者失败的一些信息)
- 用户点击智能分析页的提交按钮时,先把图表立刻保存到数据库中,然后提交任务
- 任务:先修改图表任务状态为“执行中”。等执行成功后,修改为“已完成”、保存执行结果;执行失败后,状态修改为“失败”,记录任务失败信息。
- 用户可以在图表管理页面查看所有图表(已生成的、生成中的、生成失败)的信息和状态用户可以修改生成失败的图表信息,点击重新生成
任务执行逻辑
先修改任务状态为执行中,减少重复执行的风险、同时让用户知道执行状态
优化点
- guava Retrying重试
- 提前考虑到AI生成错误的情况,在后端进行异常处理(比如AI说了多余的话,提取正确的字符串)
- 如果说任务根本没提交到队列中(或者队列已经满了),是不是可以用定时任务把失败状态的图表放在队列中(补偿)
- 给任务的执行增加一个超时时间,超时后自动标记为失败
- 反向压力:https:/zhuanlan.zhihu.com/p/404993753,通过调用的服务状态来选择当前系统的策略(比如根据AI服务的当前任务队列数来控制咱们系统的核心线程数),从而最大化利用系统资源。
- 我的图表页面增加一个刷新,定时自动刷新的按钮,保证获取到图表的最新状态
- 任务执行成功或失败,给用户发送消息通知(实时:websocket、server side event)
第六期计划
- 分析系统现在的不足
- 分布式消息队列
- 分布式消息队列RabbitMQ
- 项目扩展
分析系统化现状的不足
单机系统的不足
已经经过了同步到异步化的改造
现状:目前的异步是通过本地的线程池实现的。
- 无法集中限制,只能单机限制假如A!服务限制只能有2个用户同时使用,单个线程池可以限制最大核心线程数为2来实现。假设系统用量增大,改为分布式,多台服务器,每个服务器都要有2个线程,就有可能有2N个线程,超过了A!服务的限制。解决方案:在一个集中的地方去下发和管理任务(比如集中存储当前正在执行的任务数)
- 任务由于是放在内存中执行的,重启时可能会丢失虽然可以人工从数据库捞出来再重试,但是其实需要额外开发(比如定时任务),这种重试的场景是非常典型的,其实是不需要我们开发者过于关心、或者自己实现的。解决方案:把任务放在一个可以持久化存储的硬盘
- 优化:如果你的系统功能越来越多,长耗时任务越来越多,系统会越来越复杂(比如要开多个线程池、资源可能会出现项目抢占)。其实我们可以把长耗时、消耗很多的任务把它单独抽成一个程序,不要影响主业务。服务拆分(应用解耦)其实我们可以把长耗时、消耗很多的任务把它单独抽成一个程序,不要影响主业务。解决方案:可以有一个中间人,让中间人帮我们去连接两个系统(例如核心系统和智能生成业务)
分布式消息队列
中间件
连接多个系统、帮助多个系统紧密协作的技术、中间人
Redis、消息队列、分布式存储Etcd
消息队列
存储消息的队列关键词:存储、消息、队列存储:存数据消息:某种数据结构,比如字符串、对象、二进制数据、jso等等队列:先进先出的数据结构
可以理解为消息队列是特殊的数据库
应用场景(作用):在多个不同的系统、应用之间实现消息的传输。不需要考虑传输应用的编程语言、系统、框架等等。可以让jva开发的应用发消息,让php开发的应用收消息,这样就不用把所有代码写到同一个项目里(应用解耦)。
消息队列的模型
生产者:Producer,,类比为快递员,发送消息的人(客户端)消费者:Consumer,类比为取快递的人,接受读取消息的人(客户端)消息:Message,类比为快递,就是生产者要传输给消费者的数据消息队列:Queue
为什么不直接传输,要用消息队列?生产者不用关心你的消费者要不要消费、什么时候消费,我只需要把东西给消息队列,我的工作就算完成了。
生产者和消费者实现了解耦,互不影响。
为什么要用消息队列?
- 异步处理生产者发送完消息之后,可以继续忙别的,消费者什么时候消费都可以,不会产生阻塞
- 削峰填谷原本:1s内来了1w个请求,但服务器1s只能处理5k个氢气,那可以先把用户的请求放在消息队列中,消费者(实际执行操作的应用)可以按照自己的需求,慢慢去取现在:把1w的请求放在消息队列中,系统按照恒定的速率慢慢执行,从而保护系统的稳定性和安全性
分布式消息队列的优势
- 数据持久化:可以把消息存储到硬盘中,服务器重启也不会丢失
- 可扩展性:根据需求,随时增加(或减少)节点,搭建MQ集群
- 应用解耦:可以连接各个不同语言、框架开发的系统,让这些系统能够灵活传输读取数据
应用解耦的优点:
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中间用户感受不到物流系统的故障,提升系统的可用性。
发布订阅
如果一个非常大的系统要给其他子系统发送通知,最简单直接的方式是大系统直接依次调用(通知)小系统:问题:
- 每次发通知都要调用很多系统,很麻烦、有可能失败
- 新出现的项目(大项目感知不到的项目)无法得到通知
解决方案:大的核心系统始终往一个地方(消息队列)发消息,其他的系统都去订阅这个消息队列(读取消息队列中的消息)
应用场景
- 耗时的场景(异步)
- 高并发的场景(异步,削峰填谷)
- 分布式系统协作(尤其是跨团队,跨业务协作,应用解耦)
- 强稳定性的场景(比如金融业务,持久化、可靠性、削峰填谷)
消息队列的缺点
也叫分布式场景下需要考虑的问题
- 给系统引入额外的中间件,系统更复杂,额外维护中间件,额外的费用(部署、维护)成本
- 消息丢失、消息的顺序性、重复消费、数据的一致性(分布式系统就要考虑)
主流分布式消息队列选型
主流技术
- ActiveMQ
- RabbitMQ
- kafka
- RocketMQ
- zeroMQ
- pulsar
- Apache InLong(Tube)
技术对比
技术选型指标:
- 吞吐量:IO、并发
- 时效性:类似延迟,消息的发送,到达时间
- 可用性:系统可用的比率(比如1年365天宕机1s,可用率大概 X 个9)
- 可靠性:消息不会丢失(比如不丢失消息(订单)
技术名称 | 吞吐量 | 时效性 | 可用性 | 可靠性 | 优势 | 应用场景 |
---|---|---|---|---|---|---|
ActiveMQ | w级 | 高 | 高 | 高 | 简单易学 | 中小型企业、项目 |
RabbitMQ | w级 | 极高(us) | 高 | 高 | 生态好(基本什么语言都支持、时效性高,易学) | 适合绝大多数分布式的应用 |
kafka | 10w级 | 高(ms以内) | 极高 | 极高 | 吞吐量大、可靠性、可用性,强大的数据流处理能力 | 适用于大规模处理数据的场景,比如构建日志手机系统、实时数据流传输、事件流收集传输 |
RocketMQ | 10w级 | 高(ms) | 极高 | 极高 | 吞吐量大、可靠性、可用性,可扩展性 | 适用于金融、电商等对可靠性要求较高的场景,适合大规模的消息处理。 |
pulsar | 10w级 | 高(ms) | 极高 | 极高 | 可靠性、可用性很高,新型(技术架构先进),基于发布订阅模型 | 适合大规模、高并发的分布式系统(云原生)。适合实时分析、事件流处理、loT数据处理等。 |
RabbitMQ 入门实战
1 | <dependency> |
特点:生态好,好学习、易于理解,时效性强,支持很多不同语言的客户端,扩展性、可用性都很不错。学习性价比非常高的消息队列,适用于绝大多数中小规模分布式系统。
基本概念:
AMQP协议:https://www.rabbitmq.com/tutorials/amqp-concepts.html
高级消息队列协议(Advanced Message Queue Protocol))
生产者:发消息到某个交换机消费者:从某个队列中取消息交换机(Exchange):负责把消息转发到对应的队列队列(Queue):存储消息的路由(Routes):转发,就是怎么把消息从一个地方转到另一个地方(比如从生产者转发到某个队列)Channel频道:理解为操作消息队列的client(比如jdbcClient、redisClient),提供了和消息队列server建立通信的传输方法(为了复用连接,提高传输效率)
多消费者
使用场景:多个消费者同时去接收并处理任务(尤其是每个消费者的处理能力有限)
一个生产者给一个队列发消息,多个消费者从这个队列取消息。1对多。
消息确认
1 | Channel.basicAck(long deliveryTag, boolean multiple) |
channel.basicQos(1); // 限制消费者从队列中预取的消息数量;控制单个消费者的处理任务积压数,每个消费者最多同时处理1个任务
2个小技巧:1.使用Scanner接受用户输入,便于快速发送多条消息2,使用for循环创建多个消费者,便于快速验证队列模型工作机制
交换机
场景:多个机器同时去接受并处理任务(尤其是每个机器的处理能力有限)一个生产者给多个队列发消息,1个生产者对多个队列。交换机的作用:提供消息转发功能,类似于网络路由器要解决的问题:怎么把消息转发到不同的队列上,使消费者从不同的队列消费
Direct交换机
绑定:可以让交换机和队列进行关联,可以指定让交互机把什么样的消息发送给哪个队列routingKey:路由键,控制消息要转发给哪个队列的
特点:消息会根据路由键转发到指定的队列场景:特定的消息只交给特定的系统(程序)来处理
比如发日志的场景,希望用独立的程序来处理不同级别的日志,比如C1系统处理eror日志,C2系统处理其他级别的日志
topic交换机
特点:消息会根据一个模糊的路由键转发到指定的队列场景:特定的一类消息可以交给特定的一类系统(程序)来处理绑定关系:可以模糊匹配多个绑定
- *****:匹配一个单词,比如*.orange,那么a.orange、b.orange都能匹配
- #:匹配0个或多个单词,比如a.#,那么a.a、a.b、a.a.a都能匹配
应用场景:
老板要下发一个任务,让多个组来处理
注意,这里的匹配和MySQL的like的%不一样,只能按照单词来匹配,每个’,分隔单词,如果是’#.’,其实可以忽略,匹配0个词也Ok
Headers 交换机
Headers Exchange 的路由规则不是基于路由键(routing key),而是基于消息的 header 属性进行匹配。
RPC
支持用消息队列来模拟 RPC 的调用,但是一般没必要,直接用 Dubbo、GRPC等RPC框架就好了
核心特性
消息过期机制
可以给每条消息制定一个有效期,一短时间内违背消费者处理,就过期了
示例场景:消费者(库存系统)挂了,一个订单15分钟还未被库存系统处理,这个订单就已经失效了,哪怕库存系统再恢复,其实也不同扣减库存
适用场景:清理过期数据、模拟延迟队列的实现
- 给队列中的所有消息指定过期时间
- 给某条消息指定过期时间注意:如果消息已经发到消费者手上,但是未确认,消息是不会过期的。相反,如果消息未发送到消费者手上,并且消息达到过期时间,那么消息就会过期。
死信队列
官方文档:https:/www.rabbitmq.com/dlx.html为了保证消息的可靠性,比如每条消息都成功消费,需要提供一个容措机制,即:失败的消息怎么处理?死信:过期的消息、拒收的消息、消息队列满了、处理失败的消息的统称死信队列:专门处理死信的队列(注意,它就是一个普通队列,只不过是专门用来处理死信的,你甚至可以理解这个队列的名称叫“死信队列”)死信交换机:专门给死信队列转发消息的交换机(注意,它就是一个普通交换机,只不过是专门给死信队列发消息而已,理解为这个交换机的名称就叫“死信交换机”)。也存在路由绑定死信可以通过死信交换机绑定到死信队列。
示例场景:
RabbitMQ重点知识
也是面试重点
- 消息队列的概念、模型、应用场景
- 交换机的类别、路由绑定的关系
- 消息可靠性a. 消息确认机制(ack、nack、reject)b. 消息持久化(durable)c. 消息过期机制d. 死信队列
- 延迟队列(类似死信队列)
- 顺序消费、消费幂等性(本次不讲)
- 可扩展性(仅作了解)a. 集群b. 故障的恢复机制c. 镜像
- 运维监控告警(仅作了解)
RabbitMQ 项目实战
怎么在项目中使用RabbitMQ?
- 使用官方的客户端(优点:兼容性好,换语言成本低,比较灵活;缺点:太灵活,要自己处理某些事情)
- 使用封装好的客户端、比如Spring Boot RabbitMQ Starter优点:简单医用,直接配置直接用缺点:封装的太好了,没学过的话就不知道怎么用。不够灵活,被框架限制死了
根据场景来选择,没有绝对的优劣(类似JDBC和MyBatis)
有技术和英文水平的话,建议看官方文档,不要看过期博客
注意试用版本要和Spring Boot版本一致
生产者代码
1 | @Component |
消费者队列
1 | @Component |
单元测试
1 | @SpringBootTest |
BI项目通过RabbitMQ改造
以前是把任务提交到线程池,然后在线程池提交中编写处理程序的代码,线程池内排队。如果程序中断了,任务就没了,就丢了。
改造后的流程:
- 把任务提交改为向队列发送消息
- 写一个专门接收消息的队列,处理任务
- 如果程序中断了,消息未被确认,还会重发么?
- 现在,消息全部集中发送到消息队列,可以部署多个后端,都从一个地发取任务,实现分布式负载均衡
验证发现,如果程序中中断了,没有ack也没有nack(服务中断,没有任何响应),那么这条消息会重新入队,从而实现了每个任务都会执行。