RocketMQ Admin Tool
摘要
-
本文介绍 RocketMQ Admin Tool 的常用命令。
-
本文使用的 RocketMQ 版本为 5.3.2。
RocketMQ Admin Tool 简介
-
RocketMQ Admin Tool 是 RocketMQ 的一个命令行工具,用于管理 RocketMQ 的集群。
Topic 相关命令
-
创建或更新 Topic
| 参数 | 全称 | 说明 | 可选值 / 格式 | 是否必填 | 示例 |
|---|---|---|---|---|---|
-t |
--topic |
主题名称 | 字符串 | ✅ 必填 | -t MyTopic |
-b |
--brokerAddr |
指定创建 Topic 的 Broker 地址(与 -c 二选一) |
ip:port |
✅ 必填(与 -c 二选一) |
-b 192.168.1.10:10911 |
-c |
--clusterName |
指定创建 Topic 的集群名(与 -b 二选一) |
字符串 | ✅ 必填(与 -b 二选一) |
-c DefaultCluster |
-n |
--namesrvAddr |
NameServer 地址列表 | 多个地址用 ; 分隔 |
可选 | -n 192.168.1.1:9876;192.168.1.2:9876 |
-r |
--readQueueNums |
读队列数量,默认为8,始终保持 r == w | 整数 | 可选 | -r 4 |
-w |
--writeQueueNums |
写队列数量,默认为8,始终保持 r == w | 整数 | 可选 | -w 4 |
-p |
--perm |
Topic 权限,默认为6 | 2:写(W) 4:读(R) 6:读写(RW) |
可选 | -p 6 |
-o |
--order |
是否顺序 Topic,兼容4.x版本,5.x版本使用 -a “+message.type=FIFO” | true / false |
可选 | -o false |
-u |
--unit |
是否为单元(Unit)Topic(用于多租户隔离) | true / false |
可选 | -u false |
-s |
--hasUnitSub |
是否有 Unit 订阅 | true / false |
可选 | -s false |
-a |
--attributes |
额外属性设置,用 + 表示添加、- 表示删除 |
例:+a=b,+c=d,-e |
可选 | -a "+message.type=NORMAL" |
-h |
--help |
打印帮助信息 | 无 | 否 | -h |
-
示例
1 | # 创建 Topic,此时 Topic 类型为 UNSPECIFIED,集群下所有 Broker 都会创建该 Topic |
-
查看与删除 Topic
1 | # 查看所有 Topic,此时只打印 topic 列表 |
-
其它 Topic 命令
1 | # 查看 Topic 路由信息 |
集群相关命令
-
查看集群信息,集群、BrokerName、BrokerId、TPS等信息
1 | # 集群信息 |
-
查看集群统计信息
1 | sh bin/mqadmin clusterList -n 127.0.0.1:9876 -c DefaultCluster -m |
消息相关
发送消息
| 参数 | 全写 | 说明 | 是否必填 | 示例值 | 备注 |
|---|---|---|---|---|---|
-t |
--topic |
消息要发送的 Topic 名称 | ✅ | TestTopic |
必须指定目标 Topic |
-p |
--body |
消息体内容(UTF-8 字符串) | ✅ | "Hello RocketMQ" |
实际消息内容 |
-n |
--namesrvAddr |
NameServer 地址 | ❌ | 127.0.0.1:9876 |
不指定则用默认配置 |
-b |
--broker |
指定发送到哪个 broker | ❌ | broker-a |
一般用于测试 Broker 状态 |
-i |
--qid |
指定发送到的队列 ID | ❌ | 2 |
一般不需要设置,RocketMQ 会自动选择 |
-c |
--tags |
消息的标签(tag) | ❌ | testTag |
用于消息过滤 |
-k |
--key |
消息的业务键(key) | ❌ | order123 |
可用于追踪消息 |
-m |
--msgTraceEnable |
是否开启消息轨迹 | ❌ | true |
默认 false |
-h |
--help |
打印帮助信息 | ❌ | 无 | 显示命令参数说明 |
-
示例
1 | # 发送消息 |
消费消息
| 参数 | 全写 | 说明 | 是否必填 | 示例值 | 备注 |
|---|---|---|---|---|---|
-t |
--topic |
目标 Topic 名称 | ✅ | TestTopic |
必填 |
-n |
--namesrvAddr |
NameServer 地址 | ❌ | 127.0.0.1:9876 |
建议填写以避免默认配置不生效 |
-g |
--consumerGroup |
消费组名称 | ❌ | TestGroup |
可指定消费组(影响消费位点) |
-b |
--brokerName |
Broker 名称 | ❌ | broker-a |
指定从哪个 broker 拉取消息 |
-i |
--queueId |
队列 ID | ❌ | 0 |
默认从 0 号队列开始 |
-o |
--offset |
队列起始偏移量(offset) | ❌ | 0 |
指定从哪个位置开始消费 |
-c |
--MessageNumber |
消费消息数量 | ❌ | 10 |
默认通常为 1 |
-s |
--beginTimestamp |
起始时间 | ❌ | 2025-10-28#10:00:00:000 |
格式或时间戳均可 |
-e |
--endTimestamp |
结束时间 | ❌ | 2025-10-28#12:00:00:000 |
与 -s 一起使用 |
-h |
--help |
打印帮助信息 | ❌ | 无 | 显示命令参数说明 |
-
示例
1 | # 默认从队列 0 开始消费,拉取全部消息 |
消费结果
1 | Consume ok |
-
📘 字段解析表格
| 字段 | 示例值 | 说明 |
|---|---|---|
| MSGID | 0AFA00AFCF171EB44E468CC7D5EE0000 |
消息唯一标识(客户端生成) |
| brokerName | broker-b |
消息存储在哪个 Broker 上 |
| queueId | 2 |
存储的队列编号(TestTopic 有多个队列时的第 3 个) |
| queueOffset | 0 |
队列中的偏移量(从 0 开始) |
| storeSize | 228 |
消息在磁盘中的存储字节大小 |
| sysFlag | 0 |
消息系统标志位(内部用途) |
| bornTimestamp | 1761638706671 |
消息在生产者端创建的时间(毫秒) |
| bornHost | /10.250.0.175:41362 |
生产者客户端的 IP 和端口 |
| storeTimestamp | 1761638706691 |
消息被 Broker 存储的时间(毫秒) |
| storeHost | /10.250.0.188:11011 |
Broker 的存储节点地址 |
| msgId | 0AFA00BC00002B0300000000000CA1E9 |
消息在 Broker 存储系统生成的唯一 ID |
| commitLogOffset | 827881 |
消息在 commitLog 文件中的偏移量 |
| bodyCRC | 1774740973 |
消息体的 CRC 校验码(用于校验数据一致性) |
| reconsumeTimes | 0 |
被重新消费的次数(0 表示第一次消费) |
| preparedTransactionOffset | 0 |
如果是事务消息,这里会记录预提交偏移量;普通消息为 0 |
| topic | TestTopic |
消息所属主题 |
| properties | {MSG_REGION=DefaultRegion, UNIQ_KEY=..., ...} |
消息属性,包括系统属性与用户属性 |
| body | [72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81] |
消息体的字节数组 |
| BODY(解码后) | Hello RocketMQ |
实际消息内容(UTF-8 字符串) |
-
⚙️ 消息状态说明
| 输出信息 | 含义 |
|---|---|
Consume ok |
表示成功从 Broker 拉取消息 |
status=NO_NEW_MSG |
当前队列(queueId=2)中已经没有比 offset=1 更新的消息 |
offset=1 |
当前队列消费到 offset=1(下次消费从此开始) |
The older -1 message of the 2 queue will be provided |
这是一句提示语,意思是:队列中没有更早的消息(offset=-1 表示无历史消息) |
查询消息
根据消息 ID 查询消息(queryMsgById)
| 参数 | 全写 | 说明 | 是否必填 | 示例值 | 备注 |
|---|---|---|---|---|---|
-i |
--msgId |
要查询的消息 ID | ✅ | 0AFA00AFCF171EB44E468CC7D5EE0000 |
必填,用于精确定位消息 |
-t |
--topic |
目标 Topic 名称 | ✅ | TestTopic |
必填 |
-n |
--namesrvAddr |
NameServer 地址 | ❌ | 127.0.0.1:9876 |
建议明确指定 |
-c |
--cluster |
集群名称或 LMQ 父 Topic | ❌ | DefaultCluster |
在多集群场景下使用 |
-g |
--consumerGroup |
消费组名称 | ❌ | TestGroup |
当用于消费者关联查询时可指定 |
-d |
--clientId |
消费者客户端 ID | ❌ | 192.168.0.1@12345 |
辅助定位消费实例 |
-f |
--bodyFormat |
消息体输出格式 | ❌ | UTF-8 / HEX / BASE64 |
默认 UTF-8 |
-s |
--sendMessage |
是否重新发送消息 | ❌ | true |
调试时可使用 |
-u |
--unitName |
单元名(多单元部署时使用) | ❌ | unit01 |
一般场景可忽略 |
-h |
--help |
打印帮助信息 | ❌ | 无 | 显示命令说明 |
-
示例
1 | sh bin/mqadmin queryMsgById -n 127.0.0.1:9876 -t TestTopic -i 0AFA00AFCF171EB44E468CC7D5EE0000 -f UTF-8 |
-
输出字段详解
| 字段名 | 含义 | 示例 | 说明 |
|---|---|---|---|
| OffsetID | 消息在 CommitLog 中的偏移标识(内部定位使用) | 0AFA00BC00002B0300000000000CA1E9 |
可用于 broker 内部追踪定位消息 |
| Topic | 主题名称 | TestTopic |
消息所属的主题 |
| Tags | 消息标签 | [null] |
若生产消息时未设置 tag,则为 null |
| Keys | 消息键 | [null] |
通常可用于业务层索引查询 |
| Queue ID | 消息所在的队列编号 | 2 |
对应 topic 的第 3 个队列(从 0 开始) |
| Queue Offset | 队列偏移量 | 0 |
表示是该队列的第一条消息 |
| CommitLog Offset | 消息在 commitlog 文件中的偏移量 | 827881 |
broker 存储层位置 |
| Reconsume Times | 被重新消费的次数 | 0 |
表示未重试消费过 |
| Born Timestamp | 消息生成时间 | 2025-10-28 08:05:06,671 |
生产者发送消息的时间 |
| Store Timestamp | 消息存储时间 | 2025-10-28 08:05:06,691 |
broker 写入消息的时间(通常相差几毫秒) |
| Born Host | 生产者客户端 IP:端口 | 10.250.0.175:41362 |
生产者所在机器 |
| Store Host | broker 存储该消息的地址 | 10.250.0.188:11011 |
对应的 broker 服务端 |
| System Flag | 系统标志位 | 0 |
内部使用(标识压缩/事务等) |
| Properties | 消息属性 | {MSG_REGION=DefaultRegion, UNIQ_KEY=..., CLUSTER=DefaultCluster, WAIT=true, TRACE_ON=true} |
包含 RocketMQ 自动附加的元数据 |
| Message Body Path | 消息体在本地保存的文件路径 | /tmp/rocketmq/msgbodys/0AFA00AFCF171EB44E468CC7D5EE0000 |
RocketMQ CLI 将消息体内容(字节数组)写入文件以供查看 |
| Message Body | 消息内容 | Hello RocketMQ |
消息内容 |
根据消息 Key 查询消息(queryMsgByKey)
| 参数 | 必填 | 示例 | 说明 |
|---|---|---|---|
-t, --topic |
✅ | TestTopic |
要查询的主题名称 |
-k, --msgKey |
✅ | order_10001 |
发送消息时设置的业务 Key |
-n, --namesrvAddr |
❌ | 127.0.0.1:9876 |
NameServer 地址 |
-c, --cluster |
❌ | DefaultCluster |
指定集群名称(可选) |
-b, --beginTimestamp |
❌ | 1730083200000 |
查询起始时间戳(ms) |
-e, --endTimestamp |
❌ | 1730173200000 |
查询结束时间戳(ms) |
-m, --maxNum |
❌ | 64 |
返回的最大消息数,默认 64 |
-h, --help |
❌ | - | 打印帮助信息 |
-
示例
1 | sh bin/mqadmin queryMsgByKey -n 127.0.0.1:9876 -t TestTopic -k order123 |
消费者
创建或更新消费者订阅组(updateSubGroup)
-
消费者订阅组 就是 消费者组,其主要作用是调整消费者消费,例如:
- 消费顺序(顺序/并发)
- 广播模式
- 消费使能
- 消费重试策略
- 延迟消费队列等
| 参数 | 全写 | 说明 | 是否必填 | 示例值 | 备注 |
|---|---|---|---|---|---|
-g |
--groupName |
消费者组名称 | ✅ | TestConsumerGroup |
必填 |
-n |
--namesrvAddr |
NameServer 地址 | ❌ | 127.0.0.1:9876 |
建议明确指定 |
-b |
--brokerAddr |
指定 Broker 地址 | ❌ | 10.250.0.188:10911 |
仅对单 Broker 更新 |
-c |
--clusterName |
指定 Cluster 名称 | ❌ | DefaultCluster |
对整个集群更新 |
-d |
--consumeBroadcastEnable |
是否广播消费 | ❌ | true / false |
true 表示广播,false 表示集群模式 |
-o |
--consumeMessageOrderly |
是否顺序消费 | ❌ | true / false |
顺序消费只在同队列中生效 |
-i |
--brokerId |
从哪个 Broker 获取订阅信息 | ❌ | 0 |
内部用途,通常不用设置 |
-m |
--consumeFromMinEnable |
是否从最小 offset 消费 | ❌ | true / false |
新组首次消费时生效 |
-p |
--groupRetryPolicy |
消费组重试策略 JSON | ❌ | {"type":"EXPONENTIAL","exponentialRetryPolicy":{"initial":5000,"max":7200000,"multiplier":2}} |
可以自定义重试间隔 |
-q |
--retryQueueNums |
重试队列数量 | ❌ | 1 ~ 16 |
默认为 1 |
-r |
--retryMaxTimes |
最大重试次数 | ❌ | 16 |
默认 16 次 |
-s |
--consumeEnable |
是否使能消费 | ❌ | true / false |
false 表示暂停消费 |
-w |
--whichBrokerWhenConsumeSlowly |
慢消费选择 Broker ID | ❌ | 0 |
内部使用 |
-a |
--notifyConsumerIdsChanged |
通知 ConsumerId 改变 | ❌ | true / false |
可触发消费者刷新订阅信息 |
--attributes |
--attributes |
其他自定义属性 | ❌ | attr1=val1,attr2=val2 |
可设置自定义配置 |
-h |
--help |
打印帮助 | ❌ | - | 显示命令帮助 |
-
示例
1 | # 创建普通消费者组 |
删除消费者订阅组(deleteSubGroup)
1 | sh bin/mqadmin deleteSubGroup -n 127.0.0.1:9876 -g TestConsumerGroup -c DefaultCluster |