什么是 RabbitMQ?其有什么特点?

RabbitMQ 是一款开源的消息中间件,由 Erlang 语言开发,天生就适合分布式场景并能在分布式应用中传递消息。RabbitMQ 的显著特点如下:

  1. 消息传递模式:RabbitMQ 支持多种消息传递模式,如发布/订阅、点对点和工作队列等,使其更灵活适用于各种消息通信场景。
  2. 消息路由和交换机:RabbitMQ 引入交换机的概念,允许将消息根据内容、路由键等路由到一个或多个队列。
  3. 消息确认机制:RabbitMQ 支持消息确认机制,消费者可确认已成功处理的消息,确保消息在传递后不会被重复消费。
  4. 可扩展性:RabbitMQ 可通过添加节点和集群增加吞吐量和可用性。
  5. 多种编程语言支持:RabbitMQ 支持多种客户端库和插件,支持多种编程语言,如 Java、Python、Ruby、Node.js 等。
  6. 消息持久化:RabbitMQ 支持消息和队列持久化,确保消息在 RabbitMQ 重启后不会丢失。
  7. 灵活的插件系统:RabbitMQ 具有丰富的插件系统以便可以拓展功能,包括管理插件、数据复制插件、分布式部署插件等。
  8. 出色的 WEB 管理界面:RabbitMQ 提供了一个易于使用的 Web 管理界面,用于监视和管理队列、交换机、连接和用户权限等。

RabbitMQ 有哪些核心组件?

  1. Virtual Host:虚拟主机,是 RabbitMQ 的逻辑容器,用于隔离不同环境或不同应用程序的信息流。每个虚拟主机都有独立的队列、交换机的等设置。
  2. Connection 连接:管理和维护与 RabbitMQ 服务器的连接,生产者、消费者通过这个连接和 Broker 建立物理网络连接。
  3. broker:RabbitMQ 服务器,负责消息的接收和分发的应用。
  4. Channel 通道:是在 Connection 内创建的轻量级的通信通道,用于进行消息的传输和交互。应用程序通过 Channel 进行消息的发送和接收。通常一个 Connection 可建立多个 Channel。
  5. Exchange:是消息的中转站,负责接收生产者的消息并将消息根据特定规则路由到指定队列。
  6. Queue:是消息的存储位置。每个队列都有唯一的名称,消息从交换机路由到队列中,接着等待消费者获取和处理。
  7. Binding:是交换机和队列的关联规则,定义了消息如何从交换机路由到特定的队列。

此外,生产者和消费者也是RabbitMQ的核心组件,生产者负责发送消息到Exchange或者 Queue,消费者负责从Queue中订阅和处理消息。

img

RabbitMQ 交换机类型

  1. Direct(直连交换机):只有当消息的路由键和队列绑定的路由键完全相同时,消息才会被路由给指定队列。

  2. Fanout(扇出/广播交换机):将消息路由给所有绑定了该交换机的队列,所以 RoutingKey 有了等于没有。

  3. Topic(话题交换机):根据消息的路由键和队列绑定的路由键的匹配程度,将消息路由给指定队列。

  4. Topic中,将routingkey通过”.”来分为多个部分

  5. “*”:代表一个部分

  6. “#”:代表0个或多个部分(如果绑定的路由键为 “#” 时,则接受所有消息,因为路由键所有都匹配)

  7. Headers(头部交换机):Headers 匹配 AMQP 消息的 header 而不是 RoutingKey,其余功能和 Topic 差不多,但是性能差很多。

消费方指定的headers中必须包含一个”x-match”的键。

键”x-match”的值有2个

  1. x-match = all :表示所有的键值对都匹配才能接受到消息
  2. x-match = any :表示只要有键值对匹配就能接受到消息

img

RabbitMQ 支持哪些消息模式?

RabbitMQ 支持多种消息传递模式,这些模式允许应用程序在不同场景下进行灵活的消息通信。

  • WorkQueue 工作队列机制:生产者将消息发送到队列中,多个消费者同时消费队列上的消息,消息会均匀的分配给多个消费者消费
  • Publish/Subscribe 机制:生产者只负责将消息发送至交换机,交换机将消息转发至所有订阅的队列,并由对应的消费者消费
  • Routing 基于内容路由机制:再发布/订阅的基础上,增加一个 RoutingKey,交换机根据 RoutingKey 判断要把消息发给哪个队列
  • Topic 基于话题路由机制:在基于内容路由机制的基础上,对 ROutingKey 增加模糊匹配的功能

RabbitMQ 如何实现消息的持久化?

  1. 消息持久化:在发送消息时,将 delivery_mode 属性设置为 2,就可将消息标记为持久化
  2. 队列持久化: RabbitMQ 的队列有三种类型:经典队列、流式队列、仲裁队列。其中经典队列只需将 durable 设为 true 即可实现队列持久化,其余两种队列默认持久化保存
  3. 交换机持久化:声明交换机时将 durable 属性设置为 true 即可

RabbitMQ 是如何实现死信队列的?

在 RabbitMQ 中,实现死信队列只需要给正常队列增加三个核心参数即可:

  1. dead-letter-exchange:指定当前队列对应的死信队列
  2. dead-letter-routing-key:指定消息转入死信队列时的路由键
  3. message-ttl:消息在队列中的过期时间。

接下来,就可以往正常队列中发送消息。如果消息满足了某些条件,就会成为死信,并被重新发送到对应的死信队列中。而此时,RabbitMQ 会在消息的头部添加一些与死信相关的补充信息,例如时间、成为死信的原因、原队列等。应用程序可以按需处理这些补充的信息。

最后,死信队列中的消息都是正常业务处理失败的消息,应用程序需要创建一个消费者来专门处理这些被遗漏的消息。例如记录日志、发送警报等。这样才能保证业务数据的完整性。

RabbitMQ 中如何进行事务处理?

RabbitMQ 提供了事务处理机制,允许生产者在发送消息时将操作包装在一个事务中,以确保消息的可靠性传递。在 RabbitMQ 中,事务是通过通道(Channel)来实现的。可以通过以下步骤进行事务处理:

  1. 开启事务:在生产者端,可以通过调用 Channel 的 tx_select 方法来开启一个事务。这将启动一个新的事务,并将所有后续的消息发布操作放在该事务内。
  2. 发送消息:接下来在事务中,可以正常发送消息。如果消息发送失败,事务会自动回滚。
  3. 提交事务:如果事务中所有消息发送成功后,需要提交事务。可以通过调用 Channel 的tx_commit方法提交事务。
  4. 处理异常:如果在事务过程中发生异常,可以使用 try/catch 快来捕获异常。然后在异常处理过程中,调用 Channel 的 tx_rollback 方法来回滚 RabbitMQ 相关的事务操作。

需要注意的是,RabbitMQ 的事务处理是基于存储过程的,它可以保证在事务中的操作要么全部成功,要么全部失败。但是,由于 RabbitMQ 是一个异步的消息队列系统,事务处理可能会对其性能产生影响。因此,需要根据具体的应用场景和需求来权衡是否需要使用事务以及如何使用事务。

RabbitMQ 如何保证消息不丢失

消息丢失原因

从下述流程我们可以得知:消息从生产者到达消费者,经过两次网络传输,并且在 RabbitMQ 服务器中进行路由。

因此我们能知道整个流程中可能会出现三种消息丢失场景:

  • 生产者发送消息到 RabbitMQ 服务器的过程中出现消息丢失。 可能是网络波动未收到消息,又或者是服务器宕机。
  • RabbitMQ 服务器消息持久化出现消息丢失。 消息发送到 RabbitMQ 之后,未能及时存储完成持久化,RabbitMQ 服务器出现宕机重启,消息出现丢失。
  • 消费者拉取消息过程以及拿到消息后出现消息丢失。 消费者从 RabbitMQ 服务器获取到消息过程出现网络波动等问题可能出现消息丢失;消费者拿到消息后但是消费者未能正常消费,导致丢失,可能是消费者出现处理异常又或者是消费者宕机。

img

解决方案

1.1. 生产者 confirm 消息确认机制

生产者通过 Confirm 模式发送消息,它会等待 RabbitMQ 的确认,以确保消息被正确投递到指定的队列中。

消息正确投递到队列中,返回 ack,否则返回 nack。如果交换机没有绑定队列,也可能发生消息丢失。

使用方法:

  • 生产者通过 confirm.select 方法将 Channel 设置为 Confirm 模式。
  • 发送消息后,通过添加 add_confirm_listener 方法,监听消息的确认状态。

1.2. 消息持久化机制

持久化机制是指将消息存储到磁盘,以保证在 RabbitMQ 服务器宕机或重启时,消息不会丢失。

使用方法:

  • 生产者通过将消息的 delivery_mode 属性设置为 2,将消息标记为持久化。
  • 队列也需要进行持久化设置,确保队列在 RabbitMQ 服务器重启后仍然存在。经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存。

注意事项:

  • 持久化机制会影响性能,因此在需要确保消息不丢失的场景下使用。

1.3. 消费者 ack 事务机制

ACK 事务机制用于确保消息被正确消费。当消息被消费者成功处理后,消费者发送确认(ACK)给 RabbitMQ,告知消息可以被移除。这个过程是自动处理的,也可以关闭进行手工发送 ACK。

使用方法:

  • 在 RabbitMQ 中,ACK 机制默认是开启的。当消息被消费者接收后,会立即从队列中删除,除非消费者发生异常。
  • 可以手动开启 ACK 机制,通过将 auto_ack 参数设置为 False,手动控制消息的 ACK。

注意事项:

  • ACK 机制可以确保消息不会被重复处理,但如果消费者发生异常或者未发送 ACK,消息可能会被重复投递。

RabbitMQ 中如何解决消息堆积问题

消息堆积原因

img

解决方案

  1. 消费者处理消息的速度太慢
  • 增加消费者数量:通过水平扩展,增加消费者的数量来提高处理能力。
  • 优化消费者性能:提高消费者处理消息的效率,例如优化代码、增加资源。
  • **消息预取限制(prefetch count)**:调整消费者的预取数量以避免一次处理过多消息而导致处理缓慢。
  1. 队列的容量太小
  • 增加队列的容量:调整队列设置以允许更多消息存储。
  1. 网络故障
  • 监控和告警:通过监控网络状况并设置告警,确保在网络故障时快速发现并解决问题。
  • 持久化和高可用性:确保消息和队列的持久化以避免消息丢失,并使用镜像队列提高可用性。
  1. 消费者故障
  • 使用死信队列:将无法处理的消息转移到死信队列,防止堵塞主队列。
  • 容错机制:实现消费者的自动重启和错误处理逻辑。
  1. 队列配置不当
  • 优化队列配置:检查并优化消息确认模式、队列长度限制和其他相关配置。
  1. 消息大小
  • 消息分片:将大型消息分割成小的消息片段,加快处理速度。
  1. 业务逻辑复杂或耗时
  • 优化业务逻辑:简化消费者中的业务逻辑,减少处理每个消息所需的时间。
  1. 消息产生速度快于消费速度
  • 使用消息限流:控制消息的生产速度,确保它不会超过消费者的处理能力。
  • 负载均衡:确保消息在消费者之间公平分配,避免个别消费者过载。
  1. 其他配置优化
  • 消息优先级:使用消息优先级确保高优先级消息优先处理。
  • 调整RabbitMQ配置:优化RabbitMQ服务的配置,如文件描述符限制、内存使用限制等。

RabbitMQ中如何保证消息不被重复消费

1. 什么情况会导致消息被重复消费呢

  1. 生产者:生产者可能会重复推送一条数据到 MQ 中,比如 Controller 接口被重复调用了 2 次,没有做接口幂等性导致的;

  2. MQ:在消费者消费完准备响应 ack 消息消费成功时,MQ 突然挂了,导致 MQ 以为消费者还未消费该条数据,MQ 恢复后再次推送了该条消息,导致了重复消费。

  3. 消费者:消费者已经消费完消息,正准备但是还未响应给ack消息到时,此时消费者挂了,服务重启后 MQ 以为消费者还没有消费该消息,再次推送了该条消息。

2. 解决方案

2.1 使用数据库唯一键约束

缺点:局限性很大,仅仅只能用在我们数据新增场景,并且性能也比较低

2.2 使用乐观锁

假设是更新订单状态,在发送的消息的时候带上修改字段的版本号
缺点:如果说更新字段比较多,并且更新场景比较多,可能会导致数据库字段增加并且还有可能出现多条消息同时在队列中此时他们修改字段版本号一致,排在后续的消息无法被消费

2.3 简单的消息去重,插入消费记录,增加数据库判断

优点:很多场景下的确能起到不错的效果
缺点:
1这个消费者的代码执行需要1秒,重复消息在执行期间(假设100毫秒)内到达(例如生产者快速重发,Broker重启等),增加校验的地方是不是还是没数据(因为上一条消息还没消费完,没有记录)
2那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到非幂等安全的业务代码中,从而引发重复消费的问题

2.4 并发消息去重基于消息幂等表

●缺点:如果说第一次消息投递异常没有消费成功,并且没有将消息状态给置为成功或者没有删除消息表记录,此时延时消费每次执行下列都是一直处于消费中,最后消费就会被视为消费失败而被投递到死信Topic中
●方案:插入的消息表必须要带一个最长消费过期时间,例如10分钟
●上述方案只需要一个存储的中心媒介,那我们可以选择更灵活的存储中心媒介,比如Redis。使用Redis有两个好处:
○性能上损耗更低
○上面我们讲到的超时时间可以直接利用Redis本身的ttl实现

3.总结

  1. 利用数据库唯一键约束
  2. 可以利用我们的乐观锁
  3. 插入消费记录

不丢和不重是矛盾的(在分布式场景下),总的来说,开发者根据业务的实际需求来选择相应的方式即可。