RabbitMQ 之 Queue
摘要
-
本文介绍 RabbitMQ 的 Queue 的基本概念和用法。
-
本文使用的 RabbitMQ 版本为 4.1.4。
Classic Queue(经典队列)
-
RabbitMQ 经典队列(原始队列类型)是一种通用队列类型。实际上它是在 3.8.x 版本之前唯一的队列类型。
-
经典队列适用于数据安全不是优先事项的用例,因为存储在经典队列中的数据不会被复制。 经典队列使用非复制的 FIFO 队列实现。
-
经典队列不适合积累太多的消息,如果队列中积累的消息太多了,会严重影响客户端生产消息以及消费消息的性能。因此,经典队列主要用在数据量比较小,并且生产消息和消费消息的速度比较稳定的业务场景。比如内部系统之间的服务调用。
-
RabbitMQ 4.0 删除了对经典队列版本 1 的支持。
-
参数说明(每个版本可能都有变化,具体以页面显示为准)
参数名称 | 配置参数名 | 数据类型 | 作用说明 | 备注 / 使用场景 |
---|---|---|---|---|
Auto expire | x-expires |
整数(毫秒) | 队列在 指定时间内无人使用(无消费者、无发布、无访问) 时自动删除 | 类似“队列空闲过期时间”,节省资源 |
Message TTL | x-message-ttl |
整数(毫秒) | 消息的 存活时间,超过时间后消息会被丢弃或发送到死信队列 | 用于限制消息时效性,如延迟消息或短期缓存 |
Overflow behaviour | x-overflow |
字符串(drop-head 或 reject-publish ) |
当队列达到 最大长度 或 最大字节数 时的行为 | - drop-head :丢弃最早的消息 - reject-publish :拒绝新的消息 |
Single active consumer | x-single-active-consumer |
布尔值(true/false) | 是否启用 单活消费者模式,一次只允许一个消费者消费队列 | 用于严格顺序消费,保证某个消息不会被多个消费者同时处理 |
Dead letter exchange (DLX) | x-dead-letter-exchange |
字符串 | 指定队列的 死信交换机,用于接收无法消费或过期的消息 | 常用于失败重试、消息补偿场景 |
Dead letter routing key | x-dead-letter-routing-key |
字符串 | 消息转发到 DLX 时的 路由键 | 可以灵活转发到不同队列 |
Max length | x-max-length |
整数 | 队列中 最大消息条数 | 超过时按照 Overflow behaviour 处理 |
Max length bytes | x-max-length-bytes |
整数(字节) | 队列中 消息总字节数上限 | 超过时按照 Overflow behaviour 处理,适合大消息场景 |
Maximum priority | x-max-priority |
整数 | 启用优先级队列时,队列可设置的 最大优先级值 | 消息优先级范围是 0 到这个值,优先级高的消息先被消费 |
Leader locator | x-leader-locator |
字符串(client-local 、balanced ) |
设置在集群节点上声明队列时,队列主节点(Leader)的选取规则 | client-local (默认):选择客户端所在节点作为Leader balanced :在节点间均衡Leader分布,用于 HA 队列优化 |
Quorum Queue(仲裁队列)
-
仲裁队列(Quorum Queue) 是 RabbitMQ 从3.8.0版本之后引入的一种现代队列类型,也是目前官方比较推荐的一种对列类型。
-
其基于 Raft 共识算法 实现 持久化、复制和高可用。它保证 数据安全性、可靠的主节点选举,即使在升级或集群波动期间也能保持高可用性。
-
仲裁队列支持 毒消息处理、至少一次死信投递 以及 AMQP 修改(AMQP.modified)的处理结果。
-
它适合 以数据安全为首要目标 的场景。与经典队列相比,Quorum是以牺牲很多高级队列特性为代价,来进一步保证消息在分布式环境下的高可靠。
-
仲裁队列(Quorum Queue)的
Durability
只能设置为 Durable(true)。Auto delete
只能为 No(false)。 -
参数说明(每个版本可能都有变化,具体以页面显示为准)
参数名称 | 配置参数名 | 数据类型 | 作用说明 | 备注 / 使用场景 |
---|---|---|---|---|
Auto expire | x-expires |
整数(毫秒) | 队列在指定时间内无人使用(无消费者、无发布、无访问)时自动删除 | 节省资源,队列空闲过期时间 |
Message TTL | x-message-ttl |
整数(毫秒) | 队列中消息的生存时间,超过时间后消息会被丢弃或转入死信队列 | 控制消息时效性 |
Overflow behaviour | x-overflow |
字符串(drop-head 或 reject-publish ) |
当队列达到最大长度时的处理方式 | drop-head :丢弃最早消息,reject-publish :拒绝新消息 |
Single active consumer | x-single-active-consumer |
布尔值(true/false) | 是否启用单活消费者模式,一次只允许一个消费者消费队列 | 保证严格顺序消费 |
Dead letter exchange (DLX) | x-dead-letter-exchange |
字符串 | 指定队列的死信交换机,用于接收无法消费或过期的消息 | 与 DLX 配合使用处理失败消息 |
Dead letter routing key | x-dead-letter-routing-key |
字符串 | 消息转发到 DLX 时的路由键 | 灵活路由死信消息 |
Max length | x-max-length |
整数 | 队列中最大消息条数 | 超过时按 Overflow behaviour 处理 |
Max length bytes | x-max-length-bytes |
整数(字节) | 队列消息总字节数上限 | 超过时按 Overflow behaviour 处理 |
Delivery limit | x-delivery-limit |
整数 | 消息允许投递的最大次数,超过后变为死信 | 控制消息重试次数 |
Initial cluster size | x-quorum-initial-group-size |
整数 | 队列在创建时需要的最小节点数 | 用于保证仲裁队列的高可用性 |
Target cluster size | x-quorum-target-group-size |
整数 | 队列运行时的目标节点数 | 当集群节点变化时,仲裁队列会尝试调整副本数量 |
Dead letter strategy | x-dead-letter-strategy |
字符串(at-most-once 、at-least-once ) |
设置仲裁队列的死信处理策略 | 仅适用于 Quorum Queue。at-most-once (默认):消息最多投递一次,可能丢失。at-least-once :确保消息至少投递一次,必须将 Overflow behaviour 设置为 reject-publish ,否则回退到 at-most-once 。 |
Leader locator | x-leader-locator |
字符串(client-local 、balanced ) |
设置在集群节点上声明队列时,队列主节点(Leader)的选取规则 | client-local :选择客户端所在节点作为 Leader balanced :在节点间均衡 Leader 分布 |
-
Quorum Queues 和 Classic Queues 的功能对比如下:
Feature | Classic queues | Quorum queues | 说明 |
---|---|---|---|
Non-durable queues | yes | no | Quorum queues 总是持久化,不支持非持久化 |
Message replication | no | yes | Quorum queues 内置消息复制,Classic queues 需镜像策略 |
Exclusivity | yes | no | Classic queues 支持独占队列,Quorum queues 不支持独占 |
Per message persistence | per message | always | Quorum queues 消息总是持久化 |
Membership changes | no | semi-automatic | Quorum queues 节点变化时半自动处理复制 |
Message TTL (Time-To-Live) | yes | yes | 两者都支持消息过期时间 |
Queue TTL | yes | partially | Quorum queues 的 lease 不会因重新声明而续期 |
Queue length limits | yes | yes | Quorum queues 支持长度限制,但 x-overflow=reject-publish-dlx 不支持 |
Keeps messages in memory | see Classic Queues | never | Quorum queues 消息总是写入磁盘,不保留在内存 |
Message priority | yes | yes | 支持消息优先级 |
Single Active Consumer | yes | yes | 支持单活消费者 |
Consumer exclusivity | yes | no | Quorum queues 不支持独占消费者,需使用 Single Active Consumer |
Consumer priority | yes | yes | 支持消费者优先级 |
Dead letter exchanges | yes | yes | 支持死信交换机 |
Adheres to policies | yes | yes | 支持策略,但 Quorum queues 的部分策略行为不同 |
Poison message handling | no | yes | Quorum queues 支持毒消息处理 |
Server-named queues | yes | no | Quorum queues 不支持服务器自动命名队列 |
Stream(流)
-
Stream 是RabbitMQ自 3.9.0 版本开始引入的一种新的数据队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。
-
Stream 的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset,来实现消息的多次分发。
-
Stream 不支持死信交换机,不支持处理毒消息。
-
实际上 Stream 不属于队列,流(Streams) 是一种 持久化、可复制的数据结构,功能上类似队列:从生产者缓冲消息供消费者读取。但它与队列有两个重要区别:
- 存储模型 – 流是 追加日志(append-only log),消息可以 重复读取直到过期。
- 消费模型 – 流提供 非破坏性消费语义(non-destructive consumer semantics),多个消费者可以多次读取同一条消息而不会删除它。
-
Stream 始终是持久化和复制的,保证数据安全。消费者可以通过 RabbitMQ 客户端库 或 专用二进制协议插件 读取流,其中插件方式可以 访问所有流特性 并提供 最佳性能。合理的客户端连接策略有助于提升 吞吐量和效率。
-
参数说明(每个版本可能都有变化,具体以页面显示为准)
参数名称 | 配置参数名 | 数据类型 | 作用说明 | 备注 / 使用场景 |
---|---|---|---|---|
Max length bytes | x-max-length-bytes |
整数(字节) | 流中允许存储的 最大数据总字节数 | 超过时流将停止接收新消息,适合控制存储容量 |
Max time retention | x-max-age |
字符串(时间单位,例如 1h , 30m , 1d ) |
设置流队列中消息的 最大保留时间,超过时间的消息会被删除 | 支持时间单位:Y=年, M=月, D=天, h=小时, m=分钟, s=秒。例如 "1h" 表示只保留最近 1 小时的消息,用于控制数据量和自动清理过期消息 |
Max segment size in bytes | x-stream-max-segment-size |
整数(字节) | 流分段存储时的 每个段的最大字节数 | 控制单个文件段大小,有利于 I/O 性能和管理 |
Filter size (per chunk) in bytes | x-stream-filter-size-bytes |
整数(字节) | 流内部 过滤索引每块的大小 | 用于加速消息定位和读取,影响内存使用和检索效率 |
Initial cluster size | x-initial-cluster-size |
整数 | 流在创建时的 最小节点数 | 保证流的复制和高可用性 |
Leader locator | x-leader-locator |
字符串(client-local 、balanced ) |
设置在集群节点上声明流时,主节点(Leader)的选取规则 | client-local :客户端所在节点作为 Leader(默认)balanced :在节点间均衡 Leader 分布,用于优化 HA |
-
Classic Queue vs Stream Queue Feature Matrix
Feature | Classic queues | Stream queues | 说明 |
---|---|---|---|
Non-durable queues | yes | no | Stream 队列总是持久化,不支持非持久化 |
Exclusivity | yes | no | Classic 队列支持独占,Stream 队列不支持独占 |
Per message persistence | per message | always | Stream 队列的消息总是持久化 |
Membership changes | no | manual | Stream 队列节点变更需要手动管理 |
TTL | yes | no (but see Retention) | Stream 队列没有消息 TTL,但可通过 Retention 控制过期 |
Queue length limits | yes | no (but see Retention) | Stream 队列没有固定长度限制,通过 Retention 控制数据量 |
Keeps messages in memory | see Classic Queues | never | Stream 队列消息不保存在内存中,只写入磁盘 |
Message priority | yes | no | Stream 队列不支持消息优先级 |
Consumer priority | yes | no | Stream 队列不支持消费者优先级 |
Dead letter exchanges | yes | no | Stream 队列不支持死信交换机 |
Adheres to policies | yes | yes (see Retention) | Stream 队列支持策略,但主要通过 Retention 控制行为 |
Reacts to memory alarms | yes | no (uses minimal RAM) | Stream 队列使用最小内存,不触发内存告警 |
Poison message handling | no | no | Stream 队列不支持毒消息处理 |
队列类型扩展
懒队列
-
从3.6.x版本到3.12.x版本,RabbitMQ提供了一种针对Classic Queue的优化配置,
lazy-mode
,懒对列。懒队列会尽可能早的将消息内容保存到硬盘当中,并且只有在用户请求到时,才临时从硬盘加载到RAM内存当中。 -
默认情况下,RabbitMQ接收到消息时,会保存到内存以便使用,同时把消息写到硬盘。但是,消息写入硬盘的过程中,是会阻塞队列的。RabbitMQ虽然针对写入硬盘速度做了很多算法优化,但是在长队列中,依然表现不是很理想,所以就有了懒队列的出现。
-
懒队列会尝试尽可能早的把消息写到硬盘中。这意味着在正常操作的大多数情况下,RAM中要保存的消息要少得多。当然,这是以增加磁盘IO为代价的。
-
懒队列适合消息量大且长期有堆积的队列,可以减少内存使用,加快消费速度。但是这是以大量消耗集群的网络及磁盘IO为代价的。
-
从3.12往后的版本中,RabbitMQ 不再支持“惰性”模式,因为 经典队列 当前的特性就类似于以前的 懒队列。
死信队列
-
死信队列(Dead Letter Queue),新版中叫做 死信交换机(Dead Letter Exchange, DLX),是RabbitMQ对于未能正常消费的消息进行的一种补救机制,用于保存无法被正常处理的消息。当消息被消费者处理失败时,RabbitMQ会将消息发送到死信队列中,等待消费者处理。
-
死信队列也是一个普通的队列,同样可以在队列上声明消费者,继续对消息进行消费处理。
-
有以下几种情况,RabbitMQ会将一个正常消息转成死信
- 消息被拒绝(Message rejection)
- 由 AMQP 1.0 接收端使用 rejected 结果拒绝
- 由 AMQP 0.9.1 消费者使用 basic.reject 或 basic.nack,并且参数 requeue=false
- 消息过期(Message expiration)
- 消息超过其配置的 TTL(生存时间) 后过期。
- 队列长度超限(Queue length exceeded)
- 队列中的消息数量或总字节数达到配置的最大限制后,被丢弃的消息会死信化。
- 投递次数超限(仅适用于仲裁队列 Quorum Queue)
- 消息的投递次数超过了仲裁队列中配置的 delivery-limit。
- 消息被拒绝(Message rejection)
-
使用场景
- 你可以在队列上配置 死信交换机(DLX) 和 死信路由键(Dead Letter Routing Key)。
- 当消息成为死信时,会被 重新发布到 DLX,这样你可以:
- 做错误日志记录
- 进行失败消息重试
- 用于监控和告警
-
死信交换机的配置方法(How Dead Lettering is Configured)
- 在 RabbitMQ 中,任何队列都可以通过客户端或者 策略(policies) 来配置 死信交换机(DLX)。
- 配置时主要涉及两个核心参数:
配置参数名 | 说明 |
---|---|
dead-letter-exchange | 指定用于接收死信消息的 死信交换机名称 |
dead-letter-routing-key | 指定死信消息重新发布时使用的 路由键(Routing Key) |
死信在转移到死信队列时,他的 routingkey 也会保存下来。但是如果配置了
x-dead-letter-routing-key
这个参数的话,routingkey 就会被替换为配置的这个值。
-
在创建队列时,我们可以通过为队列添加
x-dead-letter-exchange
和x-dead-letter-routing-key
参数,来指定 死信交换机(DLX)和 死信路由键(Dead Letter Routing Key)。但是这样做很麻烦,每个队列都要单独配置,因此,我们可以使用 策略(policies) 来统一配置。
1 | # 仅指定死信交换机,这里交换机的名称是 my-dlx,交换机要提前创建好 |
-
参数说明:
部分 | 含义 |
---|---|
rabbitmqctl |
RabbitMQ 的命令行管理工具 |
set_policy |
设置一个策略(Policy),用于动态配置交换机、队列或绑定的参数 |
DLX |
策略的名称,用户自定义,例如这里叫 DLX |
".*" |
正则表达式,匹配对象的名称。.* 表示匹配 所有队列,也可以指定具体队列名,比如 ^my-queue$ |
{"dead-letter-exchange":"my-dlx", "dead-letter-routing-key":"my-routing-key"} |
策略内容,这里设置了死信交换机名称和路由键: - dead-letter-exchange : 设置死信交换机名称为 my-dlx - dead-letter-routing-key : 设置路由键为 my-routing-key |
--apply-to queues |
指定策略作用对象为 队列(queues),而不是交换机(exchanges)或绑定(bindings) |
--priority 7 |
策略的优先级,值越大优先级越高。多个策略作用在同一对象时,优先级高的会覆盖低的 |
-
执行这条命令后:
- 所有队列都会自动带上
x-dead-letter-exchange=my-dlx
和x-dead-letter-routing-key=my-routing-key
配置。 - 队列中被拒绝、过期、超长或超过投递次数的消息会被重新发布到
my-dlx
交换机,并使用my-routing-key
作为路由键。
- 所有队列都会自动带上