智能BI笔记

使用后端万用模板时的注意事项

  1. 更改pom文件的项目名name,包名com.hjj、工件名lingxibi和Spring Boot版本(包名要全局更改)
  2. 检查和更改项目的JDK版本
  3. 更改swagger接口文档的包扫描范围
  4. 更改yml文件的端口,项目名spring.application.name和数据库的信息,如url、username、password
  5. 更改Spring Boot启动类@MapperScan的包扫描路径
  6. 修改banner.txt文件

项目后端初始化注意事项

  1. 使用MyBatisX插件生成代码时,将生成的代码转移到对应的包中(原来包中已存在UserService、User等类根据需求去取舍user表中的字段和User类的变量)
  2. 将自增ID改为IdType.AUTO
1
2
@TableId(value = "id", type = IdType.AUTO)
private Long id;
  1. 后端启动项目端口冲突问题原因:Windows Hyper-V虚拟化平台占用了端口先使用:netsh interface ipv4 show excludedportrange protocol:=tcp查看被占用的端口,然后选择一个没被占用的端口启动项目禁用hyper -v

后端出现的问题

  1. 单元测试显示AIManager空指针异常,在Test测试类上加上@SpringBootTest注解,保证能够扫描到全部的包,使得能够加载SpringBoot中的Bean
  2. 因为运算符短路原则而出现的问题queryWrapper.eq(id > 0 && id != null, “id”, id);会报错,而queryWrapper.eq( id != null && id > 0, “id”, id);并不会报错
1
2
3
4
5
6
7
8
9
10
11
在你的代码中,有一段条件判断语句:

queryWrapper.eq(id > 0 && id != null, "id", id);
这行代码的逻辑是:如果id > 0并且id不为null,则使用queryWrapper.eq()方法添加查询条件。由于Java的逻辑运算符是短路的,即在第一个条件为false时不再执行后续的条件判断,因此在这里,id > 0为false时,不会执行id != null的判断。

因此,当id为null时,id > 0为false,但由于短路,后续的id != null并不会被执行,所以条件判断结果为true。

如果你想要根据id是否为null来进行条件判断,可以简化成:

queryWrapper.eq(id != null, "id", id);
这样就会根据id是否为null来决定是否添加查询条件。

AI调教和使用

AI接口普遍都有输入字数限制,尽可能压缩数据,能够允许多传一些数据

如何向 AI 提词(prompt)?

AI提词技巧1

持续输入,持续优化

第一次问:

我给你三行数据,请你帮我分析网站的增长趋势,数据如下:

第一行:日期:1号,用户数:10人第二行:日期:2号,用户数:20人第三行:日期:3号,用户数:30人

AI提词技巧2

数据压缩(内容压缩,比如把很长的内容提取关键词,也可以让AI来做)

我给你三行数据,请你帮我分析网站的增长趋势。数据如下:

表头:日期,用户数1号,102号,203号,30

使用csv对excel文件的数据进行提取和压缩

easyexcel:https://easyexcel.opensource.alibaba.com/docs/current/quickstart/read(开源)

如何调用AI

输入:

系统预设(提前告诉他职责、功能、回复格式要求) + 分析目标 + 压缩后的数据(预设很重要

最简单的系统预设:假如你是一个数据分析师,接下来我会给你我的分析目标和原始数据,请告诉我分析结论。

AI提问技巧

如果想要 AI 更好地理解我们的输入,给我们预期的、精确格式的输出,我们就需要严格控制咱们得提问词。

  1. 系统预设:你是一个数据分析刊师和前端开发专家,接下来我会按照以下固定格式给你提供内容:分析需求:{数据分析的需求和目标}原始数据:{csv格式的原始数据,用,作为分隔符}请根据以上内容,帮我生成数据分析结论和可视化图表代码分析需求:分析网站用户的增长情况原始数据:日期,用户数1号,102号,203号,30
  2. 控制输出格式(便于AI返回的内容能够更加方便地为我们所用)

3种调用AI的方式

  1. 直接调用OpenAI 或者其他 AI 原始大模型官网的接口https://platform.openai.com/docs/api-reference1)在请求头中指定OPENAI API KEYAuthorization:Bearer OPENAI API KEY2)找到你要使用的接口,比如Al对话接口:https:/platform.openai.com/docs/api-reference/,chat/create3)按照接口文档的示例,构造HTTP请求
  2. 使用云服务商提供的,封装后的AI接口Azure云优点:本地都能用缺点:依然要钱,可能更贵
  3. 利用鱼聪明 AI 提供的开放SDK

怎么生成图表

AI无法生成现成的图表,但是AI可以生成代码 ==> 可以把代码利用前端组件库(Echarts)在网页进行展示

你是一个数据分析刊师和前端开发专家,接下来我会按照以下固定格式给你提供内容:分析需求:{数据分析的需求和目标}原始数据:{csv格式的原始数据,用,作为分隔符}请根据这两部分内容,按照以下格式生成内容(此外不要输出任何多余的开头、结尾、注释)

【【【【【【{前端Echarts V5 的 option 配置对象js代码,合理地将数据进行可视化}【【【【【【{ 明确的数据分析结论、越详细越好,不要生成多余的注释 }

安全性

如果用户上传一个超大的文件怎么办?比如100G?

只要涉及到用户自主上传的操作,一定要校验文件(图像)

  • 文件大小
  • 文件后缀(但可以将含有非法内容改后缀名)
  • 文件的内容(成本较高)
  • 文件的合规性(比如敏感内容,建议用第三方的审核功能)

扩展点:接入腾讯云的图片万象数据审核(COS 对象存储的审核功能)

数据存储

现状:我们把每个图表的原始数据全部存放在了同一个数据表(Chart表)的字段里

问题:

  1. 如果用户上传的原始数量很大、图表数日益增多,查询Chart表就会很慢
  2. 对于BI平台,用户是可以查看原始数据的、对原始数据进行简单查询的需求。现在如果把所有数据存放在一个字段(列)中,查询时,只能取出这个列的所有内容

解决方案: ==> 分库分表

把每个图表的原始数据(chartData)单独保存为一个新的数据表,而不是都存在一个字段里,这样:

  1. 存储时,能够分开存储,互不影响,也能增加安全性,(如果把每个用户上传的图表按列作为字段都存在同一个数据库表中,而不是每个图表生成一个对应的数据库表的话,会出现性能问题。因为如果有甲、乙两个人都在查数据,但是甲查得早而且查得数据量特别大,乙查的迟但查得数据量少,这就会导致乙查询时少数据量时反而耗时很长,影响其他用户查询)
  2. 查询时,可以使用多中SQL灵活取出需要的字段,查询性能更快

实现

分开存储:

  1. 存储图表信息时,不把数据存储为某个数据库表单个字段,而是新建一个chart_(图表d)的数据表通过图表id,数据列名,数据类型等字段,生成一下SQL语句,并执行即可
1
2
3
4
5
6
--auto-generated definition--
create table chart_1659210482555121666
(
日期 int nul1,
用户数 int nul1
);

分开查询:

  1. 以前直接查询图表,取chartData字段;现在改为读取chart_{图表id}的数据表select * from chart_{图标id}

分库分表

水平分表、垂直分库

分表:数据量大时可以拆分,前1w用户1个表,后1w用户1个表。对于BI来说,每个图表信息(chartData)为一个表

分库:根据业务分库,比如用户是同一个库,订单支付信息相关的一个库,一般根据业务来

具体实现:MyBatis的动态SQL(根据代码灵活地生成SQL)

  1. 想想我们分表建表时,哪些部分需要动态生成,比如数据库表名,字段名
  2. 在mapper.xml文件中定义SQL语句
1
2
3
<select id="queryChartData" parameterType="String" resultMap="hashmap">
select * from chart_#{cahrtId}
</select>

或者:

1
2
3
<select id="queryChartData" parameterType="string" resultMap="hashmap">
${querySql}
</select>

缺点:有SQL注入风险

1
2
3
public interface ChartMapper extends BaseMapper<Chart> {
List<Map<String, Object>> queryChartData(@Param("querySql") String querySql);
}

测试用例:

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
public class ChartMapperTest {
@Resource
ChartMapper chartMapper;

@Test
public void queryChartData() {
String chartId = "1750880023067357186";
String querySql = String.format("select * from chart_%s", chartId);
List<Map<String, Object>> resultData = chartMapper.queryChartData(querySql);
System.out.println(resultData);
}
}

比如:select * from chart_12345 where id=1 or 1=1;

限流

限流的几种算法

推荐文章:https://juejin.cn/post/6967742960540581918

  1. 固定窗口限流(食堂1小时只允许10个用户拿汉堡且1小时只能做10个,结果59分钟来了10个人,第1小时1分钟又来10个人,那么汉堡就不够)单位时间内允许部分操作1小时只允许10个用户操作优点:最简单缺点:可能出现流量突刺,流量高峰比如:前59分钟没有1个操作,第59分钟来了10操作;第1小时1分钟又来了10个操作。相当于2分钟内执行了20个操作,服务器仍然有高峰危险。
  2. 滑动窗口限流(每十分钟做一个汉堡,前一个小时汉堡被抢光了,1小时10分钟来了个新用户就能抢到1小时10分钟刚做的汉堡)单位时间内允许部分操作,但是这个单位时间是滑动的,需要指定一个滑动单位滑动单位1min:开始前:0s 1h 2h一分钟后:1min 1h1min 2h1min 优点:能够解决上述流量突刺的问题,因为第59分钟时,限流窗口是59~1小时59分,这个时间段内只能接收10次请求,只要还在这个窗口内,更多的操作就会被拒绝缺点:实现较复杂,限流效果和你的滑动单位有关,滑动单位越小,限流效果越好,但往往很难选取一个特别合适的滑动单位。
  3. 漏桶限流(推荐)(大家排好队一个一个拿汉堡,前面拿完后面才能拿,大家拿汉堡的速率一样)以固定的速率处理请求(流水),当请求(水)桶满了后,拒绝请求(禁止灌水)。每秒处理10个请求,桶的容量为10,每0.1秒固定处理一次请求,如果1秒内来了10个请求,都可以处理完,但如果1秒内来了11个请求,最后那个请求就会溢出水桶,被拒绝。优点:能够一定程度上应对流量突刺;能够以固定速率处理请求,保证服务器的安全****缺点:没有办法迅速处理一批请求,只能一个一个按顺序来(固定速率的缺点) 使用场景:希望服务器以恒定速率处理请求
  4. 令牌桶限流(推荐)(快餐店提前做好10个汉堡,开抢时前十个人能够同时得到汉堡,不用排队,但是剩下的10个人只能等待下一批的10个汉堡做好才能拿)管理员先生成一批令牌,每秒生成10个令牌;当用户要操作前,先去拿一个令牌,有令牌的人就有资格同时执行操作,没有令牌就等着优点:能够并发处理同时的请求需要考虑的问题:还是存在时间单位的选取问题使用场景:服务器允许并发操作,希望性能高一些

因为生成图表和结论是调用AI的,但调用AI是要成本的,那怎么防止用户疯狂刷量,防止破产呢?

解决问题:

  1. 控制成本 ==> 限制用户调用总次数
  2. 由于服务器同时刻处理的请求为10个,用户在短时间内疯狂使用(比如使用jmeter多次请求接口),导致服务器资源被占满,其他用户无法使用 ==> 限流

思考限流阈值多大合适?比如限制单个用户在每秒只能用几次?

限流的实现

  1. 本地限流(单机限流)每个服务器单独限流,一般适用于单体项目,即你的项目只有一个服务器
  • Guava RateLimiter实现令牌桶算法

  • 令牌桶算法是一种常用的限流算法。它通过维护一个固定容量的令牌桶,每个请求需要获取一个令牌才能被处理。当令牌桶中的令牌不足时,新的请求将被限制。

  • Java 中可以使用Guava库的RateLimiter类实现令牌桶算法。例如:

  • 计数器算法

  • 计数器算法是一种简单的限流方法,通过计数器记录一定时间内的请求次数,当请求次数达到设定的阈值时,后续请求被拒绝。

  • 可以使用 AtomicInteger 或 AtomicLong 进行计数,并结合定时任务或定时器来定时重置计数器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import com.google.common.util.concurrent.RateLimiter;

public class RateLimiterExample {
private static final RateLimiter rateLimiter = RateLimiter.create(10.0); // 每秒生成10个令牌

public void processRequest() {
if (rateLimiter.tryAcquire()) {
// 处理请求逻辑
} else {
// 请求被限流
}
}
}
import java.util.concurrent.atomic.AtomicInteger;

public class CounterLimiter {
private static final int MAX_REQUESTS = 10;
private static final int RESET_INTERVAL = 60 * 1000; // 重置计数器的时间间隔,这里设置为1分钟
private static final AtomicInteger requestCounter = new AtomicInteger(0);

public boolean allowRequest() {
int currentRequests = requestCounter.incrementAndGet();
if (currentRequests > MAX_REQUESTS) {
return false; // 请求超过限制
}
// 处理请求逻辑

// 定时任务或定时器重置计数器
// resetCounter();

return true;
}

private void resetCounter() {
requestCounter.set(0);
}
}
  1. 分布式限流(多机限流)如果你的项目部署在多个服务器上,那么需采用分布式限流

  2. 把用户的使用频率等数据放到一个集中的存储进行统计,比如Redis,这样无论用户的请求落到了哪台服务器,都以集中的数据存储内的数据为准(Redisson)

  3. 在网关集中进行限流和统计(比如Sentinel、Spring Cloud Gateway)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import redis.clients.jedis.Jedis;

public class DistributedRateLimiter {
private static final String REDIS_KEY = "request_counter";
private static final int MAX_REQUESTS = 100;

public boolean allowRequest() {
try (Jedis jedis = new Jedis("localhost")) {
Long currentRequests = jedis.incr(REDIS_KEY);

if (currentRequests > MAX_REQUESTS) {
return false; // 请求超过限制
}

// 处理请求逻辑

return true;
}
}
}

Redisson 限流实现

Redisson内置了一个限流工具类,可以帮助你利用Redis来存储和统计。

RedisLimiterManager:什么是Manager?专门提供 RedisLimiterManager 限流基础服务的(提供了通用的能力

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Configuration
@ConfigurationProperties(prefix = "spring.redis")
@Data
public class RedissonConfig {

private String host;
private String port;

@Bean
public RedissonClient redissonClient(){
// 1. 创建配置
Config config = new Config();
String redisAddress = String.format("redis://%s:%s", host, port);
config.useSingleServer().setAddress(redisAddress).setDatabase(3);//设置单个服务器,设置地址,选择数据库
// 2. 创建实例
RedissonClient redisson = Redisson.create(config);
return redisson;
}
}
/**
* 专门提供 RedisLimiter 限流基础服务(提供了通用的能力)
*/
@Service
public class RedisLimiterManager {

@Resource
private RedissonClient redissonClient;

/**
* 限流操作
*
* @param key 区分不同的限流器,比如不同的用户 id 应该分别统计
*/
public void doRateLimit(String key) {
// 创建一个名称为user_limiter的限流器,每秒最多访问2次
RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);
rateLimiter.trySetRate(RateType.OVERALL, 2, 1, RateIntervalUnit.SECONDS); // OVERALL类型:不管有多少台服务器都是放在一起统计的
// 每当一个操作来了后,请求一个令牌
boolean canOp = rateLimiter.tryAcquire(1);// 令牌请求数,处理请求需消耗的令牌
if (!canOp) {
throw new BusinessException(ErrorCode.TOO_MANY_REQUEST);
}
}
}

在需要限流的地方直接调用doRateLimit()方法就好了

限流粒度

  1. 针对某个方法限流,即最多允许同时XX个操作使用这个方法
  2. 针对某个用户限流,比如单个用户最多执行XX次操作
  3. 针对某个用户x方法限流,比如当个用户单位时间内最多执行x次这个方法

第五期计划

  1. 系统问题分析
  2. 异步化的思路
  3. 线程池的理论和实战
  4. 前端后端的异步化改造

系统问题分

问题场景:调用的服务处理能力有限,或者接口的处理(或返回)时长较长时,就应该考虑异步化了。

  1. 用户等待时间有点儿长(因为要等AI生成)
  2. 业务服务器可能会有很多请求在处理,导致系统资源紧张,严重时导致服务器宕机或者无法处理新的请求
  3. 调用的第三方服务(A!能力)的处理能力是有限的,比如每3秒只能处理1个请求,会导致A!处理不过来,严重时A!可能会对咱们的后台系统拒绝

异步化

new Thread?

同步:一件事情做完,在做另一件事情。(烧水后处理工作)

异步:不用等一件事做完,就可以做另一件事情。等第一件事完成后,可以收到一个通知,通知你这件事做好了,你可以再进行后续的处理(烧水的时候,水壶有一个蜂鸣器;烧水时人可以同时处理工作。水烧好后,人能听到声音,就知道水烧好了)

业务流程分析

标准异步化的业务流程

  1. 当用户要进行耗时很长的操作时,点击提交后,不需要在界面傻等,而是应该把这个任务保存到数据库中记录下来
  2. 用户要执行新任务时:a. 任务提交成功:
  • 如果程序有多余的空闲线程,可以立刻去做这个任务
  • 如果程序线程都在繁忙,无法继续处理,那就放在等待队列中
  1. b. 任务提交失败:比如我们的程序所有线程都在忙,任务队列满了
  • 拒绝这个任务,再也不执行
  • 通过保存到数据库中的记录来看到提交失败的任务,并且在程序空闲的时候,可以将任务从数据库中捞到程序里,再去执行
  1. 我们的程序(线程)从任务队列中取出任务依次执行,每完成一件事要修改一下任务的状态
  2. 用户可以查询任务的执行状态,或者在任务执行成功或者失败时能得到通知(发邮件、系统消息提示框、短信)
  3. 如果我们要执行的任务非常复杂,包含很多环节,在每一个小任务完成时,要在程序(数据库中)记录一下任务的执行状态(进度)

标准异步化的业务流程

  1. 用户点击智能分析页的提交按钮时,先把图表立刻保存到数据库中(作为一个任务)
  2. 用户可以在图表列表页面查看所有图表(已生成的、生成中、生成失败的)的信息和状态
  3. 用户可以修改生成失败的图表信息,点击重新生成

问题:

  1. 任务队列的最大容量应该设置为多少?
  2. 程序怎么从任务队列中取出任务并执行?这个任务队列的流程怎么实现?

线程池

为什么需要线程池?

  1. 线程的管理比较复杂(比如什么时候新增线程、什么时候减少空闲线程)
  2. 任务存取比较复杂(什么时候接受任务、什么时候拒绝任务,怎么保证大家不抢到同一个任务)

线程池的作用:帮助你轻松管理线程,协调任务的执行过程

线程池的实现

不用自己写,如果是在Spring中,可以用ThreadPoolTaskExecutor配合@Async注解来实现(不太建议)

如果是在Java中,可以使用JUC并发编程包的ThreadPoolExecutor来实现非常灵活的自定义线程池

线程池参数:

1
2
3
4
5
6
7
8
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

怎么确定线程池参数呢?结合业务场景和系统资源去测试调整,不断优化

回归到我们的业务,要考虑系统最脆弱的环节(系统的瓶颈)在哪里?现有条件:比如A!生成能力的并发是只允许4个任务同时去执行,AI能力允许20个任务排队。

回归到我们的业务,要考虑系统最脆弱的环节(系统的瓶颈)在哪里?现有条件:比如AI生成能力的并发是只允许4个任务同时去执行,AI能力允许20个任务排队。

具体参数的介绍:

corePoolSize(核心线程数=>正式员工数):正常情况下,我们的系统应该能同时工作的线程数(随时就绪的状态)maximumPoolSize(最大线程数=>哪怕任务再多,你也最多招这些人):极限情况下,我们的线程池最多有多少个线程?keepAliveTime(空闲线程存活时间):非核心线程在没有任务的情况下,过多久要删除(理解为开除临时工),从而释放无用的线程资源。TimeUnit unit(空闲线程存活时间的单位):分钟、秒workQueue(工作队列):用于存放给线程执行的任务,存在一个队列的长度threadFactory(线程工厂):控制每个线程的生成、线程的属性(比如线程名)RejectedExecutionHandler(拒绝策略):任务队列满的时候,我们采取什么措施,比如抛异常、不抛异常、自定义策略

资源隔离策略:比如重要的任务(VIP)一个队列,普通的任务一个队列,保证这个两个队列互不干扰

线程池的工作机制

本项目中如何设置线程池的参数

线程池的参数如何设置?现有条件:比如A!生成能力的并发是只允许4个任务同时去执行,AI能力允许20个任务排队。

corePoolSize(核心线程数=>正式员工数):正常情况下,可以设置为4maximumPoolSize:设置为极限情况,设置为<=4keepAliveTime(空闲线程存活时间):一般设置为秒级或者分钟级TimeUnit unit(空闲线程存活时间的单位):分钟、秒workQueue(工作队列):结合实际请况去设置,可以设置为20threadFactory(线程工厂):控制每个线程的生成、线程的属性(比如线程名)RejectedExecutionHandler(拒绝策略):抛异常,标记数据库的任务状态为”任务满了已拒绝”

一般情况下,任务分为IO密集型计算密集型两种。计算密集型:吃CPU,比如音视频处理、图像处理、数学计算等,一般是设置corePoolSize为CPU的核数+1。maximumPoolSize一般设置为CPU核心数的2~3倍。(空余线程),可以让每个线程都能利用好CPU的每个核,而且线程之间不用频繁切换(减少打架、减少开销)IO密集型:吃带宽内存/硬盘的读写资源,corePoolSize可以设置大一点,一般经验值是2n左右,但是建议以IO的能力为主。

导入百万数据到数据库是属于IO密集型还是CPU密集型? ==> IO密集型

自定义线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Configuration
public class ThreadPoolExecutorConfig {

@Bean
public ThreadPoolExecutor threadPoolExecutor() {
ThreadFactory threadFactory = new ThreadFactory() {
private int count = 1;
@Override
public Thread newThread(@NotNull Runnable r) {
Thread thread = new Thread();
thread.setName("线程" + count);
return thread;
}
};

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 100,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(4), threadFactory);
return threadPoolExecutor;
}
}

开发

实现工作流程

  1. 给chart表新增任务状态字段(比如排队中、执行中、已完成、失败),任务执行信息字段(用于记录任务执行中、或者失败的一些信息)
  2. 用户点击智能分析页的提交按钮时,先把图表立刻保存到数据库中,然后提交任务
  3. 任务:先修改图表任务状态为“执行中”。等执行成功后,修改为“已完成”、保存执行结果;执行失败后,状态修改为“失败”,记录任务失败信息。
  4. 用户可以在图表管理页面查看所有图表(已生成的、生成中的、生成失败)的信息和状态用户可以修改生成失败的图表信息,点击重新生成

任务执行逻辑

先修改任务状态为执行中,减少重复执行的风险、同时让用户知道执行状态

优化点

  1. guava Retrying重试
  2. 提前考虑到AI生成错误的情况,在后端进行异常处理(比如AI说了多余的话,提取正确的字符串)
  3. 如果说任务根本没提交到队列中(或者队列已经满了),是不是可以用定时任务把失败状态的图表放在队列中(补偿)
  4. 给任务的执行增加一个超时时间,超时后自动标记为失败
  5. 反向压力:https:/zhuanlan.zhihu.com/p/404993753,通过调用的服务状态来选择当前系统的策略(比如根据AI服务的当前任务队列数来控制咱们系统的核心线程数),从而最大化利用系统资源
  6. 我的图表页面增加一个刷新,定时自动刷新的按钮,保证获取到图表的最新状态
  7. 任务执行成功或失败,给用户发送消息通知(实时:websocket、server side event)

第六期计划

  1. 分析系统现在的不足
  2. 分布式消息队列
  3. 分布式消息队列RabbitMQ
  4. 项目扩展

分析系统化现状的不足

单机系统的不足

已经经过了同步到异步化的改造

现状:目前的异步是通过本地的线程池实现的。

  1. 无法集中限制,只能单机限制假如A!服务限制只能有2个用户同时使用,单个线程池可以限制最大核心线程数为2来实现。假设系统用量增大,改为分布式,多台服务器,每个服务器都要有2个线程,就有可能有2N个线程,超过了A!服务的限制。解决方案:在一个集中的地方去下发和管理任务(比如集中存储当前正在执行的任务数)
  2. 任务由于是放在内存中执行的,重启时可能会丢失虽然可以人工从数据库捞出来再重试,但是其实需要额外开发(比如定时任务),这种重试的场景是非常典型的,其实是不需要我们开发者过于关心、或者自己实现的。解决方案:把任务放在一个可以持久化存储的硬盘
  3. 优化:如果你的系统功能越来越多,长耗时任务越来越多,系统会越来越复杂(比如要开多个线程池、资源可能会出现项目抢占)。其实我们可以把长耗时、消耗很多的任务把它单独抽成一个程序,不要影响主业务。服务拆分(应用解耦)其实我们可以把长耗时、消耗很多的任务把它单独抽成一个程序,不要影响主业务。解决方案:可以有一个中间人,让中间人帮我们去连接两个系统(例如核心系统和智能生成业务)

分布式消息队列

中间件

连接多个系统、帮助多个系统紧密协作的技术、中间人

Redis、消息队列、分布式存储Etcd

消息队列

存储消息的队列关键词:存储、消息、队列存储:存数据消息:某种数据结构,比如字符串、对象、二进制数据、jso等等队列:先进先出的数据结构

可以理解为消息队列是特殊的数据库

应用场景(作用):在多个不同的系统、应用之间实现消息的传输。不需要考虑传输应用的编程语言、系统、框架等等。可以让jva开发的应用发消息,让php开发的应用收消息,这样就不用把所有代码写到同一个项目里(应用解耦)。

消息队列的模型

生产者:Producer,,类比为快递员,发送消息的人(客户端)消费者:Consumer,类比为取快递的人,接受读取消息的人(客户端)消息:Message,类比为快递,就是生产者要传输给消费者的数据消息队列:Queue

为什么不直接传输,要用消息队列?生产者不用关心你的消费者要不要消费、什么时候消费,我只需要把东西给消息队列,我的工作就算完成了。

生产者和消费者实现了解耦,互不影响。

为什么要用消息队列?

  1. 异步处理生产者发送完消息之后,可以继续忙别的,消费者什么时候消费都可以,不会产生阻塞
  2. 削峰填谷原本:1s内来了1w个请求,但服务器1s只能处理5k个氢气,那可以先把用户的请求放在消息队列中,消费者(实际执行操作的应用)可以按照自己的需求,慢慢去取现在:把1w的请求放在消息队列中,系统按照恒定的速率慢慢执行,从而保护系统的稳定性和安全性

分布式消息队列的优势

  1. 数据持久化:可以把消息存储到硬盘中,服务器重启也不会丢失
  2. 可扩展性:根据需求,随时增加(或减少)节点,搭建MQ集群
  3. 应用解耦:可以连接各个不同语言、框架开发的系统,让这些系统能够灵活传输读取数据

应用解耦的优点:

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中间用户感受不到物流系统的故障,提升系统的可用性。

发布订阅

如果一个非常大的系统要给其他子系统发送通知,最简单直接的方式是大系统直接依次调用(通知)小系统:问题:

  1. 每次发通知都要调用很多系统,很麻烦、有可能失败
  2. 新出现的项目(大项目感知不到的项目)无法得到通知

解决方案:大的核心系统始终往一个地方(消息队列)发消息,其他的系统都去订阅这个消息队列(读取消息队列中的消息)

应用场景

  1. 耗时的场景(异步)
  2. 高并发的场景(异步,削峰填谷)
  3. 分布式系统协作(尤其是跨团队,跨业务协作,应用解耦)
  4. 强稳定性的场景(比如金融业务,持久化、可靠性、削峰填谷)

消息队列的缺点

也叫分布式场景下需要考虑的问题

  1. 给系统引入额外的中间件,系统更复杂,额外维护中间件,额外的费用(部署、维护)成本
  2. 消息丢失、消息的顺序性、重复消费、数据的一致性(分布式系统就要考虑)

主流分布式消息队列选型

主流技术

  1. ActiveMQ
  2. RabbitMQ
  3. kafka
  4. RocketMQ
  5. zeroMQ
  6. pulsar
  7. Apache InLong(Tube)

技术对比

技术选型指标:

  • 吞吐量:IO、并发
  • 时效性:类似延迟,消息的发送,到达时间
  • 可用性:系统可用的比率(比如1年365天宕机1s,可用率大概 X 个9)
  • 可靠性:消息不会丢失(比如不丢失消息(订单)
技术名称 吞吐量 时效性 可用性 可靠性 优势 应用场景
ActiveMQ w级 简单易学 中小型企业、项目
RabbitMQ w级 极高(us) 生态好(基本什么语言都支持、时效性高,易学) 适合绝大多数分布式的应用
kafka 10w级 高(ms以内) 极高 极高 吞吐量大、可靠性、可用性,强大的数据流处理能力 适用于大规模处理数据的场景,比如构建日志手机系统、实时数据流传输、事件流收集传输
RocketMQ 10w级 高(ms) 极高 极高 吞吐量大、可靠性、可用性,可扩展性 适用于金融、电商等对可靠性要求较高的场景,适合大规模的消息处理。
pulsar 10w级 高(ms) 极高 极高 可靠性、可用性很高,新型(技术架构先进),基于发布订阅模型 适合大规模、高并发的分布式系统(云原生)。适合实时分析、事件流处理、loT数据处理等。

RabbitMQ 入门实战

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.18.0</version>
</dependency>

特点:生态好,好学习、易于理解,时效性强,支持很多不同语言的客户端,扩展性、可用性都很不错。学习性价比非常高的消息队列,适用于绝大多数中小规模分布式系统。

基本概念:

AMQP协议:https://www.rabbitmq.com/tutorials/amqp-concepts.html

高级消息队列协议(Advanced Message Queue Protocol))

生产者:发消息到某个交换机消费者:从某个队列中取消息交换机(Exchange):负责把消息转发到对应的队列队列(Queue):存储消息的路由(Routes):转发,就是怎么把消息从一个地方转到另一个地方(比如从生产者转发到某个队列)Channel频道:理解为操作消息队列的client(比如jdbcClient、redisClient),提供了和消息队列server建立通信的传输方法(为了复用连接,提高传输效率)

多消费者

使用场景:多个消费者同时去接收并处理任务(尤其是每个消费者的处理能力有限)

一个生产者给一个队列发消息,多个消费者从这个队列取消息。1对多。

消息确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Channel.basicAck(long deliveryTag, boolean multiple)

deliveryTag: 是消息的唯一标识符,通常由RabbitMQ在消息被投递时提供。
multiple: 如果设置为true,则表示确认所有小于或等于deliveryTag的消息。这可以用于批量确认。
basicAck方法用于确认一条或多条消息已经被成功处理。当消费者处理完一条消息并且准备告诉RabbitMQ可以安全地从队列中删除该消息时,它会调用这个方法。

Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag: 同上,消息的唯一标识符。
multiple: 同上,用于批量确认。
requeue: 如果设置为true,则消息会被重新放回队列中,以便其他消费者可以处理它。如果设置为false,消息会被RabbitMQ删除或发送到死信队列(如果配置了的话)。
basicNack方法用于告知RabbitMQ一条或多条消息处理失败。如果requeue参数设置为true,这些消息会被重新加入队列以供再次消费。

Channel.basicReject(long deliveryTag, boolean requeue)

deliveryTag: 消息的唯一标识符。
requeue: 同上,决定消息是否被重新放回队列。
basicReject方法用于拒绝单条消息。与basicNack不同,basicReject只适用于单条消息,而不是多条。如果消费者不能或不想处理某条消息,它可以调用这个方法。

channel.basicQos(1); // 限制消费者从队列中预取的消息数量;控制单个消费者的处理任务积压数,每个消费者最多同时处理1个任务

2个小技巧:1.使用Scanner接受用户输入,便于快速发送多条消息2,使用for循环创建多个消费者,便于快速验证队列模型工作机制

交换机

场景:多个机器同时去接受并处理任务(尤其是每个机器的处理能力有限)一个生产者给多个队列发消息,1个生产者对多个队列。交换机的作用:提供消息转发功能,类似于网络路由器要解决的问题:怎么把消息转发到不同的队列上,使消费者从不同的队列消费

Direct交换机

绑定:可以让交换机和队列进行关联,可以指定让交互机把什么样的消息发送给哪个队列routingKey:路由键,控制消息要转发给哪个队列的

特点:消息会根据路由键转发到指定的队列场景:特定的消息只交给特定的系统(程序)来处理

比如发日志的场景,希望用独立的程序来处理不同级别的日志,比如C1系统处理eror日志,C2系统处理其他级别的日志

topic交换机

特点:消息会根据一个模糊的路由键转发到指定的队列场景:特定的一类消息可以交给特定的一类系统(程序)来处理绑定关系:可以模糊匹配多个绑定

  • *****:匹配一个单词,比如*.orange,那么a.orange、b.orange都能匹配
  • #:匹配0个或多个单词,比如a.#,那么a.a、a.b、a.a.a都能匹配

应用场景:

老板要下发一个任务,让多个组来处理

注意,这里的匹配和MySQL的like的%不一样,只能按照单词来匹配,每个’,分隔单词,如果是’#.’,其实可以忽略,匹配0个词也Ok

Headers 交换机

Headers Exchange 的路由规则不是基于路由键(routing key),而是基于消息的 header 属性进行匹配。

RPC

支持用消息队列来模拟 RPC 的调用,但是一般没必要,直接用 Dubbo、GRPC等RPC框架就好了

核心特性

消息过期机制

可以给每条消息制定一个有效期,一短时间内违背消费者处理,就过期了

示例场景:消费者(库存系统)挂了,一个订单15分钟还未被库存系统处理,这个订单就已经失效了,哪怕库存系统再恢复,其实也不同扣减库存

适用场景:清理过期数据、模拟延迟队列的实现

  1. 给队列中的所有消息指定过期时间
  2. 给某条消息指定过期时间注意:如果消息已经发到消费者手上,但是未确认,消息是不会过期的。相反,如果消息未发送到消费者手上,并且消息达到过期时间,那么消息就会过期。

死信队列

官方文档:https:/www.rabbitmq.com/dlx.html为了保证消息的可靠性,比如每条消息都成功消费,需要提供一个容措机制,即:失败的消息怎么处理?死信:过期的消息、拒收的消息、消息队列满了、处理失败的消息的统称死信队列:专门处理死信的队列(注意,它就是一个普通队列,只不过是专门用来处理死信的,你甚至可以理解这个队列的名称叫“死信队列”)死信交换机:专门给死信队列转发消息的交换机(注意,它就是一个普通交换机,只不过是专门给死信队列发消息而已,理解为这个交换机的名称就叫“死信交换机”)。也存在路由绑定死信可以通过死信交换机绑定到死信队列。

示例场景:

RabbitMQ重点知识

也是面试重点

  1. 消息队列的概念、模型、应用场景
  2. 交换机的类别、路由绑定的关系
  3. 消息可靠性a. 消息确认机制(ack、nack、reject)b. 消息持久化(durable)c. 消息过期机制d. 死信队列
  4. 延迟队列(类似死信队列)
  5. 顺序消费、消费幂等性(本次不讲)
  6. 可扩展性(仅作了解)a. 集群b. 故障的恢复机制c. 镜像
  7. 运维监控告警(仅作了解)

RabbitMQ 项目实战

怎么在项目中使用RabbitMQ?

  1. 使用官方的客户端(优点:兼容性好,换语言成本低,比较灵活;缺点:太灵活,要自己处理某些事情)
  2. 使用封装好的客户端、比如Spring Boot RabbitMQ Starter优点:简单医用,直接配置直接用缺点:封装的太好了,没学过的话就不知道怎么用。不够灵活,被框架限制死了

根据场景来选择,没有绝对的优劣(类似JDBC和MyBatis)

有技术和英文水平的话,建议看官方文档,不要看过期博客

注意试用版本要和Spring Boot版本一致

生产者代码

1
2
3
4
5
6
7
8
9
@Component
public class MyMessageProducer {
@Resource
private RabbitTemplate rabbitTemplate;

public void sendMessage(String exchange, String routineKey, String message) {
rabbitTemplate.convertAndSend(exchange, routineKey, message);
}
}

消费者队列

1
2
3
4
5
6
7
8
9
10
11
@Component
public class MyMessageConsumer {
@Resource
private RabbitTemplate rabbitTemplate;

@SneakyThrows
@RabbitListener(queues = {"code_queue"}, ackMode = "MANUAL")
public void receiveMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
channel.basicAck(deliveryTag, false);
}
}

单元测试

1
2
3
4
5
6
7
8
9
10
@SpringBootTest
class MyMessageProducerTest {
@Resource
private MyMessageProducer myMessageProducer;

@Test
void sendMessage() {
myMessageProducer.sendMessage("code_exchange", "my_routineKey", "你好呀");
}
}

BI项目通过RabbitMQ改造

以前是把任务提交到线程池,然后在线程池提交中编写处理程序的代码,线程池内排队。如果程序中断了,任务就没了,就丢了。

改造后的流程:

  1. 把任务提交改为向队列发送消息
  2. 写一个专门接收消息的队列,处理任务
  3. 如果程序中断了,消息未被确认,还会重发么?
  4. 现在,消息全部集中发送到消息队列,可以部署多个后端,都从一个地发取任务,实现分布式负载均衡

验证发现,如果程序中中断了,没有ack也没有nack(服务中断,没有任何响应),那么这条消息会重新入队,从而实现了每个任务都会执行。