RabbitMQ 之 Queue

摘要

  • 本文介绍 RabbitMQ 的 Queue 的基本概念和用法。

  • Zookeeper官网

  • 本文使用的 RabbitMQ 版本为 4.1.4。

Classic Queue(经典队列)

  • RabbitMQ 经典队列(原始队列类型)是一种通用队列类型。实际上它是在 3.8.x 版本之前唯一的队列类型。

  • 经典队列适用于数据安全不是优先事项的用例,因为存储在经典队列中的数据不会被复制。 经典队列使用非复制的 FIFO 队列实现。

  • 经典队列不适合积累太多的消息,如果队列中积累的消息太多了,会严重影响客户端生产消息以及消费消息的性能。因此,经典队列主要用在数据量比较小,并且生产消息和消费消息的速度比较稳定的业务场景。比如内部系统之间的服务调用。

  • RabbitMQ 4.0 删除了对经典队列版本 1 的支持。

  • 参数说明(每个版本可能都有变化,具体以页面显示为准)

参数名称 配置参数名 数据类型 作用说明 备注 / 使用场景
Auto expire x-expires 整数(毫秒) 队列在 指定时间内无人使用(无消费者、无发布、无访问) 时自动删除 类似“队列空闲过期时间”,节省资源
Message TTL x-message-ttl 整数(毫秒) 消息的 存活时间,超过时间后消息会被丢弃或发送到死信队列 用于限制消息时效性,如延迟消息或短期缓存
Overflow behaviour x-overflow 字符串(drop-headreject-publish 当队列达到 最大长度最大字节数 时的行为 - drop-head:丢弃最早的消息
- reject-publish:拒绝新的消息
Single active consumer x-single-active-consumer 布尔值(true/false) 是否启用 单活消费者模式,一次只允许一个消费者消费队列 用于严格顺序消费,保证某个消息不会被多个消费者同时处理
Dead letter exchange (DLX) x-dead-letter-exchange 字符串 指定队列的 死信交换机,用于接收无法消费或过期的消息 常用于失败重试、消息补偿场景
Dead letter routing key x-dead-letter-routing-key 字符串 消息转发到 DLX 时的 路由键 可以灵活转发到不同队列
Max length x-max-length 整数 队列中 最大消息条数 超过时按照 Overflow behaviour 处理
Max length bytes x-max-length-bytes 整数(字节) 队列中 消息总字节数上限 超过时按照 Overflow behaviour 处理,适合大消息场景
Maximum priority x-max-priority 整数 启用优先级队列时,队列可设置的 最大优先级值 消息优先级范围是 0 到这个值,优先级高的消息先被消费
Leader locator x-leader-locator 字符串(client-localbalanced 设置在集群节点上声明队列时,队列主节点(Leader)的选取规则 client-local(默认):选择客户端所在节点作为Leader
balanced:在节点间均衡Leader分布,用于 HA 队列优化

Quorum Queue(仲裁队列)

  • 仲裁队列(Quorum Queue) 是 RabbitMQ 从3.8.0版本之后引入的一种现代队列类型,也是目前官方比较推荐的一种对列类型。

  • 其基于 Raft 共识算法 实现 持久化、复制和高可用。它保证 数据安全性、可靠的主节点选举,即使在升级或集群波动期间也能保持高可用性。

  • 仲裁队列支持 毒消息处理、至少一次死信投递 以及 AMQP 修改(AMQP.modified)的处理结果。

  • 它适合 以数据安全为首要目标 的场景。与经典队列相比,Quorum是以牺牲很多高级队列特性为代价,来进一步保证消息在分布式环境下的高可靠。

  • 仲裁队列(Quorum Queue)的 Durability 只能设置为 Durable(true)。Auto delete 只能为 No(false)。

  • 参数说明(每个版本可能都有变化,具体以页面显示为准)

参数名称 配置参数名 数据类型 作用说明 备注 / 使用场景
Auto expire x-expires 整数(毫秒) 队列在指定时间内无人使用(无消费者、无发布、无访问)时自动删除 节省资源,队列空闲过期时间
Message TTL x-message-ttl 整数(毫秒) 队列中消息的生存时间,超过时间后消息会被丢弃或转入死信队列 控制消息时效性
Overflow behaviour x-overflow 字符串(drop-headreject-publish 当队列达到最大长度时的处理方式 drop-head:丢弃最早消息,reject-publish:拒绝新消息
Single active consumer x-single-active-consumer 布尔值(true/false) 是否启用单活消费者模式,一次只允许一个消费者消费队列 保证严格顺序消费
Dead letter exchange (DLX) x-dead-letter-exchange 字符串 指定队列的死信交换机,用于接收无法消费或过期的消息 与 DLX 配合使用处理失败消息
Dead letter routing key x-dead-letter-routing-key 字符串 消息转发到 DLX 时的路由键 灵活路由死信消息
Max length x-max-length 整数 队列中最大消息条数 超过时按 Overflow behaviour 处理
Max length bytes x-max-length-bytes 整数(字节) 队列消息总字节数上限 超过时按 Overflow behaviour 处理
Delivery limit x-delivery-limit 整数 消息允许投递的最大次数,超过后变为死信 控制消息重试次数
Initial cluster size x-quorum-initial-group-size 整数 队列在创建时需要的最小节点数 用于保证仲裁队列的高可用性
Target cluster size x-quorum-target-group-size 整数 队列运行时的目标节点数 当集群节点变化时,仲裁队列会尝试调整副本数量
Dead letter strategy x-dead-letter-strategy 字符串(at-most-onceat-least-once 设置仲裁队列的死信处理策略 仅适用于 Quorum Queue。
at-most-once(默认):消息最多投递一次,可能丢失。
at-least-once:确保消息至少投递一次,必须将 Overflow behaviour 设置为 reject-publish,否则回退到 at-most-once
Leader locator x-leader-locator 字符串(client-localbalanced 设置在集群节点上声明队列时,队列主节点(Leader)的选取规则 client-local:选择客户端所在节点作为 Leader
balanced:在节点间均衡 Leader 分布
  • Quorum Queues 和 Classic Queues 的功能对比如下:

Feature Classic queues Quorum queues 说明
Non-durable queues yes no Quorum queues 总是持久化,不支持非持久化
Message replication no yes Quorum queues 内置消息复制,Classic queues 需镜像策略
Exclusivity yes no Classic queues 支持独占队列,Quorum queues 不支持独占
Per message persistence per message always Quorum queues 消息总是持久化
Membership changes no semi-automatic Quorum queues 节点变化时半自动处理复制
Message TTL (Time-To-Live) yes yes 两者都支持消息过期时间
Queue TTL yes partially Quorum queues 的 lease 不会因重新声明而续期
Queue length limits yes yes Quorum queues 支持长度限制,但 x-overflow=reject-publish-dlx 不支持
Keeps messages in memory see Classic Queues never Quorum queues 消息总是写入磁盘,不保留在内存
Message priority yes yes 支持消息优先级
Single Active Consumer yes yes 支持单活消费者
Consumer exclusivity yes no Quorum queues 不支持独占消费者,需使用 Single Active Consumer
Consumer priority yes yes 支持消费者优先级
Dead letter exchanges yes yes 支持死信交换机
Adheres to policies yes yes 支持策略,但 Quorum queues 的部分策略行为不同
Poison message handling no yes Quorum queues 支持毒消息处理
Server-named queues yes no Quorum queues 不支持服务器自动命名队列

Stream(流)

  • Stream 是RabbitMQ自 3.9.0 版本开始引入的一种新的数据队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。

  • Stream 的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset,来实现消息的多次分发。

  • Stream 不支持死信交换机,不支持处理毒消息。

  • 实际上 Stream 不属于队列,流(Streams) 是一种 持久化、可复制的数据结构,功能上类似队列:从生产者缓冲消息供消费者读取。但它与队列有两个重要区别:

    • 存储模型 – 流是 追加日志(append-only log),消息可以 重复读取直到过期。
    • 消费模型 – 流提供 非破坏性消费语义(non-destructive consumer semantics),多个消费者可以多次读取同一条消息而不会删除它。
  • Stream 始终是持久化和复制的,保证数据安全。消费者可以通过 RabbitMQ 客户端库 或 专用二进制协议插件 读取流,其中插件方式可以 访问所有流特性 并提供 最佳性能。合理的客户端连接策略有助于提升 吞吐量和效率。

  • 参数说明(每个版本可能都有变化,具体以页面显示为准)

参数名称 配置参数名 数据类型 作用说明 备注 / 使用场景
Max length bytes x-max-length-bytes 整数(字节) 流中允许存储的 最大数据总字节数 超过时流将停止接收新消息,适合控制存储容量
Max time retention x-max-age 字符串(时间单位,例如 1h, 30m, 1d 设置流队列中消息的 最大保留时间,超过时间的消息会被删除 支持时间单位:Y=年, M=月, D=天, h=小时, m=分钟, s=秒。例如 "1h" 表示只保留最近 1 小时的消息,用于控制数据量和自动清理过期消息
Max segment size in bytes x-stream-max-segment-size 整数(字节) 流分段存储时的 每个段的最大字节数 控制单个文件段大小,有利于 I/O 性能和管理
Filter size (per chunk) in bytes x-stream-filter-size-bytes 整数(字节) 流内部 过滤索引每块的大小 用于加速消息定位和读取,影响内存使用和检索效率
Initial cluster size x-initial-cluster-size 整数 流在创建时的 最小节点数 保证流的复制和高可用性
Leader locator x-leader-locator 字符串(client-localbalanced 设置在集群节点上声明流时,主节点(Leader)的选取规则 client-local:客户端所在节点作为 Leader(默认)
balanced:在节点间均衡 Leader 分布,用于优化 HA
  • Classic Queue vs Stream Queue Feature Matrix

Feature Classic queues Stream queues 说明
Non-durable queues yes no Stream 队列总是持久化,不支持非持久化
Exclusivity yes no Classic 队列支持独占,Stream 队列不支持独占
Per message persistence per message always Stream 队列的消息总是持久化
Membership changes no manual Stream 队列节点变更需要手动管理
TTL yes no (but see Retention) Stream 队列没有消息 TTL,但可通过 Retention 控制过期
Queue length limits yes no (but see Retention) Stream 队列没有固定长度限制,通过 Retention 控制数据量
Keeps messages in memory see Classic Queues never Stream 队列消息不保存在内存中,只写入磁盘
Message priority yes no Stream 队列不支持消息优先级
Consumer priority yes no Stream 队列不支持消费者优先级
Dead letter exchanges yes no Stream 队列不支持死信交换机
Adheres to policies yes yes (see Retention) Stream 队列支持策略,但主要通过 Retention 控制行为
Reacts to memory alarms yes no (uses minimal RAM) Stream 队列使用最小内存,不触发内存告警
Poison message handling no no Stream 队列不支持毒消息处理

队列类型扩展

懒队列

  • 从3.6.x版本到3.12.x版本,RabbitMQ提供了一种针对Classic Queue的优化配置,lazy-mode懒对列。懒队列会尽可能早的将消息内容保存到硬盘当中,并且只有在用户请求到时,才临时从硬盘加载到RAM内存当中。

  • 默认情况下,RabbitMQ接收到消息时,会保存到内存以便使用,同时把消息写到硬盘。但是,消息写入硬盘的过程中,是会阻塞队列的。RabbitMQ虽然针对写入硬盘速度做了很多算法优化,但是在长队列中,依然表现不是很理想,所以就有了懒队列的出现。

  • 懒队列会尝试尽可能早的把消息写到硬盘中。这意味着在正常操作的大多数情况下,RAM中要保存的消息要少得多。当然,这是以增加磁盘IO为代价的。

  • 懒队列适合消息量大且长期有堆积的队列,可以减少内存使用,加快消费速度。但是这是以大量消耗集群的网络及磁盘IO为代价的。

  • 从3.12往后的版本中,RabbitMQ 不再支持“惰性”模式,因为 经典队列 当前的特性就类似于以前的 懒队列。

死信队列

  • 死信队列(Dead Letter Queue),新版中叫做 死信交换机(Dead Letter Exchange, DLX),是RabbitMQ对于未能正常消费的消息进行的一种补救机制,用于保存无法被正常处理的消息。当消息被消费者处理失败时,RabbitMQ会将消息发送到死信队列中,等待消费者处理。

  • 死信队列也是一个普通的队列,同样可以在队列上声明消费者,继续对消息进行消费处理。

  • 有以下几种情况,RabbitMQ会将一个正常消息转成死信

    • 消息被拒绝(Message rejection)
      • 由 AMQP 1.0 接收端使用 rejected 结果拒绝
      • 由 AMQP 0.9.1 消费者使用 basic.reject 或 basic.nack,并且参数 requeue=false
    • 消息过期(Message expiration)
      • 消息超过其配置的 TTL(生存时间) 后过期。
    • 队列长度超限(Queue length exceeded)
      • 队列中的消息数量或总字节数达到配置的最大限制后,被丢弃的消息会死信化。
    • 投递次数超限(仅适用于仲裁队列 Quorum Queue)
      • 消息的投递次数超过了仲裁队列中配置的 delivery-limit。
  • 使用场景

    • 你可以在队列上配置 死信交换机(DLX) 和 死信路由键(Dead Letter Routing Key)。
    • 当消息成为死信时,会被 重新发布到 DLX,这样你可以:
      • 做错误日志记录
      • 进行失败消息重试
      • 用于监控和告警
  • 死信交换机的配置方法(How Dead Lettering is Configured)

    • 在 RabbitMQ 中,任何队列都可以通过客户端或者 策略(policies) 来配置 死信交换机(DLX)。
    • 配置时主要涉及两个核心参数:
配置参数名 说明
dead-letter-exchange 指定用于接收死信消息的 死信交换机名称
dead-letter-routing-key 指定死信消息重新发布时使用的 路由键(Routing Key)

死信在转移到死信队列时,他的 routingkey 也会保存下来。但是如果配置了 x-dead-letter-routing-key 这个参数的话,routingkey 就会被替换为配置的这个值。

  • 在创建队列时,我们可以通过为队列添加 x-dead-letter-exchangex-dead-letter-routing-key 参数,来指定 死信交换机(DLX)和 死信路由键(Dead Letter Routing Key)。但是这样做很麻烦,每个队列都要单独配置,因此,我们可以使用 策略(policies) 来统一配置。

1
2
3
4
# 仅指定死信交换机,这里交换机的名称是 my-dlx,交换机要提前创建好
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues --priority 7
# 同时指定 死信交换机 和 路由键
rabbitmqctl set_policy DLX ".*" '{"dead-letter-exchange":"my-dlx", "dead-letter-routing-key":"my-routing-key"}' --apply-to queues --priority 7
  • 参数说明:

部分 含义
rabbitmqctl RabbitMQ 的命令行管理工具
set_policy 设置一个策略(Policy),用于动态配置交换机、队列或绑定的参数
DLX 策略的名称,用户自定义,例如这里叫 DLX
".*" 正则表达式,匹配对象的名称。.* 表示匹配 所有队列,也可以指定具体队列名,比如 ^my-queue$
{"dead-letter-exchange":"my-dlx", "dead-letter-routing-key":"my-routing-key"} 策略内容,这里设置了死信交换机名称和路由键:
- dead-letter-exchange: 设置死信交换机名称为 my-dlx
- dead-letter-routing-key: 设置路由键为 my-routing-key
--apply-to queues 指定策略作用对象为 队列(queues),而不是交换机(exchanges)或绑定(bindings)
--priority 7 策略的优先级,值越大优先级越高。多个策略作用在同一对象时,优先级高的会覆盖低的
  • 执行这条命令后:

    • 所有队列都会自动带上 x-dead-letter-exchange=my-dlxx-dead-letter-routing-key=my-routing-key 配置。
    • 队列中被拒绝、过期、超长或超过投递次数的消息会被重新发布到 my-dlx 交换机,并使用 my-routing-key 作为路由键。