RocketMQ 的安装及使用
摘要
-
本文介绍 CentOS9 中 RocketMQ 的安装与使用。
-
本文使用的 RocketMQ 版本为 5.3.2。
Apache RocketMQ 简介
一、RocketMQ 是什么?
RocketMQ 是一个分布式、队列模型的消息中间件。它由阿里巴巴在2012年开源,并于2017年正式成为 Apache 基金会的顶级项目。
你可以把它想象成一个在分布式系统中负责可靠传递消息的“邮局”或“快递系统”。当系统A需要发送数据给系统B,但它们之间不直接通信时,就可以通过 RocketMQ 来中转,确保消息不丢失、不重复,并且能按顺序送达。
RocketMQ 是一个高性能、高可靠、高实时的分布式消息中间件。它就像分布式系统的“中枢神经系统”,负责在各个服务之间可靠、高效地传递数据,是现代互联网架构中不可或缺的基础组件之一。
RocketMQ 5.x 通过引入 Proxy 模式,极大地提升了架构的灵活性、多语言支持能力和云原生亲和力,是其在消息中间件领域持续演进的重要里程碑。
它与 Kafka、RabbitMQ 等都是业界顶级的消息队列,但各有侧重。RocketMQ 在事务消息、顺序消息和对在线业务的稳定性支持方面表现尤为出色。
二、核心特点与优势
| 序号 | 特性 | 典型场景 | 主要作用 |
|---|---|---|---|
| 1 | 削峰填谷 | 电商秒杀、大促活动时大量下单请求瞬间涌入 | 将突发请求先缓存为消息,后端系统按自身能力平稳消费,避免系统过载崩溃 |
| 2 | 异步解耦 | 用户注册后触发多系统任务(邮件、优惠券、积分) | 主流程只负责发送消息,其他系统独立异步处理,降低系统间耦合、提高扩展性 |
| 3 | 顺序消息 | 订单状态变更(创建 → 付款 → 发货 → 收货) | 同一业务键(如订单ID)的消息按顺序发送和消费,保证业务逻辑正确性 |
| 4 | 持久化与高可靠性 | 关键业务消息必须不丢失(交易、支付、日志) | 所有消息写入磁盘并支持主从复制,即使服务器重启也能恢复,保证高可用 |
| 5 | 消息回溯 | 消费逻辑出错、数据重算、补偿任务 | 支持重置消费位点,重新消费历史消息,实现业务补偿与追溯 |
| 6 | 海量消息堆积能力 | 大规模异步日志收集、IoT 数据汇聚、埋点分析 | 支持万亿级消息堆积,性能稳定不衰减,适用于大规模数据场景 |
三、核心架构与概念
要理解 RocketMQ,需要知道几个关键角色:

经典核心组件
| 序号 | 组件名称 | 主要作用 | 说明 / 特点 |
|---|---|---|---|
| 1 | Producer(生产者) | 发送消息的客户端 | 负责将业务系统的消息发送到指定的 Topic,支持同步、异步、单向三种发送方式 |
| 2 | Consumer(消费者) | 接收并消费消息的客户端 | 从 Broker 拉取消息并进行业务处理,可分为 Push 和 Pull 两种消费模式 |
| 3 | Consumer Group(消费者组) | 实现负载均衡与高可用消费 | 多个消费者订阅同一 Topic 时组成消费者组,一个分区只会被组内一个消费者消费 |
| 4 | Broker(消息服务器) | 存储和转发消息 | RocketMQ 的核心组件,负责消息的持久化、转发、主从复制和高可用 |
| 5 | Topic(主题) | 消息的分类与路由单元 | Producer 发送消息到指定 Topic,Consumer 订阅 Topic 消费消息;一个 Topic 可包含多个消息队列(分区) |
| 6 | Name Server(名字服务) | 管理 Broker 地址信息 | 类似轻量级注册中心,维护 Broker 元数据,帮助 Producer 和 Consumer 定位消息存储位置 |
| 7 | Controller(控制器) | 主从自动切换与高可用控制 | RocketMQ 5.x 引入,基于 Raft(DLedger)协议实现 Broker 自动选主和元数据管理 |
| 8 | Proxy(代理层) | 客户端访问入口与协议转换 | RocketMQ 5.x 新组件,无状态,可横向扩展;统一接入层,支持多协议(如 HTTP、gRPC),隔离客户端与 Broker |
引入 Proxy 模式的优势:
1 | **架构解耦与语言无关**:Proxy 作为通用代理,将复杂的 Broker 协议封装成更简单的接口(如 gRPC),使得用不同编程语言(如 Go, Python, C++ 等)开发的客户端更容易接入,而无需实现复杂的原生协议。 |
消息中间件功能对比表(ActiveMQ vs Kafka vs RabbitMQ vs RocketMQ)
| 功能项 | ActiveMQ | Kafka | RabbitMQ | RocketMQ |
|---|---|---|---|---|
| 客户端 SDK | Java、.NET、C++ 等 | Java、Scala 等 | Java、.NET、Go、Python、C 等 | Java、C++、Go |
| 通信协议与规范 | 推送模型(Push),支持 OpenWire、STOMP、AMQP、MQTT、JMS | 拉取模型(Pull),支持 TCP | 推送模型(Push),支持 AMQP、MQTT、STOMP、HTTP、WebSocket | 拉取模型(Pull),支持 TCP、JMS、OpenMessaging |
| 消息有序性 | 通过独占消费者(Exclusive Consumer)或独占队列(Exclusive Queues)保证顺序 | 保证分区内消息顺序 | 单队列内消息天然有序 | 严格顺序消息,可平滑扩展 |
| 定时/延迟消息 | 支持 | 不支持 | 支持(使用延迟插件) | 支持 |
| 批量消息 | 不支持 | 支持(异步生产者) | 支持(Publisher Confirms 模式下) | 支持(同步模式可避免消息丢失) |
| 广播消息 | 支持 | 不支持 | 支持(Fanout 交换机) | 支持 |
| 消息过滤 | 支持 | 支持(可用 Kafka Streams 实现) | 支持(基于 Exchange 的路由键或 Header) | 支持(基于 SQL92 属性过滤) |
| 服务器端触发重投递 | 不支持 | 不支持 | 支持(Nack 或 TTL+DLX) | 支持 |
| 消息存储 | 支持高性能持久化(JDBC + LevelDB/KahaDB) | 高性能文件存储 | 内存+磁盘混合存储(Mnesia/基于 Erlang 的日志) | 高性能、低延迟文件存储 |
| 消息回溯(历史消息查询) | 支持 | 支持(通过 offset) | 不支持(消息被消费后无法回溯) | 支持(时间戳与 offset) |
| 消息优先级 | 支持 | 不支持 | 支持(优先级队列) | 不支持 |
| 高可用与故障转移 | 依赖存储,如 LevelDB 需 ZooKeeper | 需要 ZooKeeper | 支持镜像队列(Classic / Quorum 模式) | 支持主从模式(无需额外组件) |
| 消息轨迹(Message Track) | 不支持 | 不支持 | 不支持(可通过插件扩展) | 支持 |
| 配置复杂度 | 默认配置较低,需手动优化 | 配置为键值对,可文件或代码提供 | 开箱即用,配置灵活但选项较多 | 开箱即用,仅需关注少量配置 |
| 管理与运维工具 | 支持 | 支持(命令行监控) | 支持(Web 管理控制台、CLI) | 支持(丰富的 Web 与命令行工具) |
RocketMQ 的安装
-
RocketMQ 5.x 依赖 JDK 1.8+。
单机安装
-
下载RocketMQ
1 | mkdir -p /usr/local/soft/rocketmq/ |
小贴士
- 默认脚本中,NameServer需要4G内存,Broker 需要8G内存,如果内存不够,可以进入bin目录,对其中的
runserver.sh和runbroker.sh两个脚本进行一下修改
1 | # 使用vi runserver.sh指令,编辑这个脚本,找到下面的一行配置,调整Java进程的内存大小。 |
-
启动 NameServer
安装完RocketMQ包后,我们启动NameServer
1 | ### 启动namesrv |
namesrv.conf 示例
1 | # The port of nameserver |
-
启动 Broker+Proxy
NameServer成功启动后,我们启动Broker和Proxy。这里我们使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。
1 | ### 先启动broker |
-
关闭服务器
1 | # 先停止 Broker |
集群安装:多节点(集群)多副本模式-异步复制
-
官网文档 对集群安装的方式介绍了多种,本文仅实战一种:
多节点(集群)多副本模式-异步复制 -
每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
- 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
-
该模式下,Master 节点和 Slave 节点之间是异步复制的,Master 节点挂掉后,Slave 节点不会自动切换为 Master 节点。
-
集群规划
1 | # NameServer 3 台 |
部署 NameServer
-
在三台服务器上分别启动RocketMQ NameServer
1 | cd /usr/local/soft/rocketmq/rocketmq5 |
部署Broker
-
broker-a.properties
1 | brokerClusterName=DefaultCluster # 集群名称必须一致 |
-
broker-a-s.properties
1 | brokerClusterName=DefaultCluster # 集群名称必须一致 |
-
broker-b.properties
1 | brokerClusterName=DefaultCluster # 集群名称必须一致 |
-
broker-b-s.properties
1 | brokerClusterName=DefaultCluster # 集群名称必须一致 |
-
在 Broker1 10.250.0.188 上启动 broker-a 和 broker-b-s
1 | # 启动 broker-a |
-
在 Broker2 10.250.0.31 上启动 broker-b 和 broker-a-s
1 | # 启动 broker-b |
-
启动成功后,可以通过如下命令检查机器状态
1 | # 确认 Broker 是否已经成功注册到 Nameserver,执行以下命令(在任意一台机器上) |
配置 Proxy
-
在三台服务器上分别启动RocketMQ NameServer
1 | nohup sh bin/mqproxy -n "10.250.0.175:9876;10.250.0.188:9876;10.250.0.31:9876" & |
rmq-proxy.json 示例
1 | { |
-
停止Proxy
1 | # 停止 Proxy |
集群安装:主备自动切换模式部署
-
RocketMQ 5.x 提供了一种新的部署方式
Controller,可以在主从模式下实现主备自动切换,当主节点挂掉时,自动切换到从节点上运行。 -
Controller 组件提供选主能力,若需要保证 Controller 具备容错能力,Controller 部署需要三副本及以上(遵循 Raft 的多数派协议)。
-
本文在上文“集群安装:多节点(集群)多副本模式-异步复制”的基础上进行修改
-
Controller 部署有两种方式。一种是嵌入于 NameServer 进行部署,另一种是独立部署,本文采用独立部署 Controller 组件的方式。
-
集群规划
1 | # Controller 3 台 |
-
分别在每台机器上创建
controller.conf配置文件,内容如下(注意修改节点Id)
1 | # controller.conf |
-
分别启动每台机器上的 Controller
1 | nohup sh bin/mqcontroller -n "10.250.0.175:9876;10.250.0.188:9876;10.250.0.31:9876" -c conf/controller.conf & |
-
修改 broker 配置文件,以
broker-a.properties为例
1 | # 去掉如下配置,Controller 模式下 会自动分配 |
RocketMQ 5 Broker Controller 模式配置参数表
| 参数名 | 说明 | 默认值 | 备注 / 建议 |
|---|---|---|---|
| enableControllerMode | 是否启用 Controller 模式(自动主从切换总开关) | false |
必须设为 true 才能启用自动主从切换 |
| controllerAddr | Controller 集群地址列表(以分号分隔) | 无 | 所有 Broker 配置应一致,例如 10.250.0.175:9877;10.250.0.188:9877;10.250.0.31:9877 |
| syncBrokerMetadataPeriod | 向 Controller 同步 Broker 副本信息的时间间隔(毫秒) | 5000 (5s) |
保持默认即可;用于上报心跳与元数据 |
| checkSyncStateSetPeriod | 检查同步状态集(SyncStateSet)的时间间隔(毫秒) | 5000 (5s) |
Controller 会定期剔除落后副本 |
| syncControllerMetadataPeriod | 同步 Controller 元数据的时间间隔(毫秒) | 10000 (10s) |
Broker 定期从集群获取当前活跃 Controller 地址 |
| haMaxTimeSlaveNotCatchup | Slave 未跟上 Master 的最大时间间隔(毫秒) | 15000 (15s) |
超过该时间将 Slave 移出 SyncStateSet |
| storePathEpochFile | Epoch 文件存储路径 | store/epochFile |
非常重要!不要删除;存储主从任期、epoch 等元信息 |
| allAckInSyncStateSet | 是否要求所有同步副本都 ACK 后才返回成功 | false |
true 可保证强一致但性能下降;建议保持默认 |
| syncFromLastFile | Slave 是否从最后一个文件开始复制(空盘启动时) | false |
若历史日志很大且 Slave 新建,可设为 true |
| asyncLearner | 是否为异步 learner 副本(不参与选主) | false |
用于远程灾备副本,不会被选举为 Master |
| inSyncReplicas | 需保持同步的副本组数量 | 1 |
若 allAckInSyncStateSet=true,该参数无效 |
| minInSyncReplicas | 最小同步副本数量,低于该值则拒绝写入 | 1 |
防止写入过多未同步副本导致数据丢失风险 |
-
重新启动 Broker,为保证主从数据一致性在重启时不被破坏,启动顺序应为先重新原Master,再重启原Slave
-
启动成功后,可以通过如下命令检查机器状态,可以看到集群内部自动分配了主从
1 | # 确认 Broker 是否已经成功注册到 Nameserver,执行以下命令(在任意一台机器上) |
-
验证主备自动切换,此时关闭
broker-b的 Master,并查看集群状态
1 | sh bin/mqadmin clusterList -n 10.250.0.175:9876 |
-
重新启动刚才关闭的
broker-b,节点会自动加入集群,角色为 Slave -
停止 Controller
1 | # 停止 PrControlleroxy |
端口说明
| 端口号 | 协议 | 组件/服务 | 作用说明 |
|---|---|---|---|
| 9876 | TCP | NameServer | RocketMQ 集群的 NameServer 服务端口。 用于 Broker 注册、客户端路由发现。 Producer/Consumer 连接此端口以获取 Broker 地址。 |
| 8080 | TCP | Proxy (gRPC / HTTP) | RocketMQ 5 引入的 Proxy 服务 默认端口之一。 用于 HTTP/gRPC 客户端接入,例如 RocketMQ Proxy REST API、异步消息接口等。 |
| 8081 | TCP | Proxy Admin / Dashboard / gRPC Alt | 通常是 Proxy 的 管理接口 或 gRPC 辅助端口(依配置而定)。 也可能是控制面接口,用于与 Console 或控制工具通信。 |
| 10909 | TCP | Broker HA (High Availability) | Broker 主从同步端口(Master ↔ Slave 之间的数据复制)。 用于消息数据与元数据的同步。 |
| 10911 | TCP | Broker 服务端口 | Broker 的 主通信端口,客户端连接发送消息、消费消息、心跳等。 Producer 和 Consumer 通过 NameServer 获取该端口地址后进行通信。 |
| 10912 | TCP | Broker HA 客户端端口 | Broker 主从复制中的 Slave 连接 Master 时使用的 客户端监听端口。 通常与 10909 配合使用,一主多从模式中 Slave 主动连接 Master。 |
日志及数据存储路径
-
RocketMQ 5 主要有三类服务组件需要关注它们的存储目录
| 组件 | 功能 | 默认存储内容 | 默认路径(Linux 环境) |
|---|---|---|---|
| NameServer | 路由服务(注册中心) | 各个组件的的注册 | 日志文件:~/logs/rocketmqlogs/namesrv.log日志配置: ${ROCKETMQ_HOME}/conf/rmq.namesrv.logback.xml 配置文件: ${ROCKETMQ_HOME}/conf/namesrv.conf(可选) |
| Broker | 核心消息存储与转发服务 | 消息数据(CommitLog、ConsumeQueue、Index、Config) 目录结构: ├── commitlog/ → 消息物理文件├── consumequeue/ → 消费队列索引├── index/ → 消息索引├── config/ → topic、offset、subscription 信息├── checkpoint → 存储校验点├── abort → 异常退出标志 |
数据目录:~/store日志文件: ~/logs/rocketmqlogs/broker.log日志配置: ${ROCKETMQ_HOME}/conf/rmq.broker.logback.xml配置文件: ${ROCKETMQ_HOME}/conf/broker.conf |
| Proxy | 客户端访问入口层(无状态代理) (5.x 新引入组件) |
转发日志、访问日志 | 日志配置:${ROCKETMQ_HOME}/conf/rmq.proxy.logback.xml日志文件: ~/logs/rocketmqlogs/proxy.log配置文件: ${ROCKETMQ_HOME}/conf/rmq-proxy.json |
| Controller | Broker 主从协调与高可用管理 (5.x 新引入组件) |
- 集群主从元数据(主从关系、broker注册信息) - Controller 自身运行状态与选举元数据 |
数据目录:~/store/controller日志文件: ~/logs/rocketmqlogs/controller.log日志配置: ${ROCKETMQ_HOME}/conf/rmq.controller.logback.xml配置文件: ${ROCKETMQ_HOME}/conf/controller.conf |
-
NameServer 和 Proxy 都是无状态(stateless)组件,不会持久化业务数据。
-
Broker 数据路径说明
| 配置项 | 默认路径 | 说明 | 主要作用 |
|---|---|---|---|
| storePathRootDir | /home/rocketmq/store(默认) |
消息存储的根目录 | 作为所有存储文件的父级目录,其他路径若未单独配置,则在此目录下创建 |
| storePathCommitLog | ${storePathRootDir}/commitlog |
CommitLog 文件存放路径 | 存储消息主体内容,是最核心的数据文件(顺序写入) |
| storePathConsumeQueue | ${storePathRootDir}/consumequeue |
消费队列文件存放路径 | 存储消息在队列中的索引(逻辑队列),指向 CommitLog 的物理位置 |
| storePathIndex | ${storePathRootDir}/index |
索引文件存放路径 | 提供按 Key 查询消息的索引结构,便于通过 Message Key 快速检索消息 |
| storePathConfig | ${storePathRootDir}/config |
Broker 运行时配置存储路径 | 存储运行时生成的配置文件,如 topic、consumerOffset、subscriptionGroup 等 |
| storeCheckpoint | ${storePathRootDir}/checkpoint |
Checkpoint 文件路径 | 记录 CommitLog、ConsumeQueue、Index 三者的刷盘进度,用于崩溃恢复 |
| abortFile | ${storePathRootDir}/abort |
异常退出标志文件路径 | 用于标识 Broker 是否异常退出,启动时据此判断是否执行恢复流程 |
安装过程中遇到的问题
1.启动 Proxy 失败
-
无论是
Broker+Proxy启动,还是 单独启动Proxy,都报如下错误:
1 | # 错误会在 nohup.out 中输出 |
-
原因分析
-
- Netty-tcnative 的编译依赖:RocketMQ 使用的 Netty 的 tcnative 模块是在较旧的环境中编译的,而动态链接的版本锁定:编译时链接的是 libcrypt.so.1,运行时必须找到相同主版本号的库
-
- 而我当前使用的系统为 Amazon Linux 2023,基于更新的 glibc,其加密功能已经迁移到 libcrypt.so.2。(Amazon Linux 2:基于较旧的 glibc 版本,libcrypt.so.1 是主要的加密库)
-
1 | # 检查 libcrypt 是否存在 |
-
解决办法
1 | # 安装兼容性包 |
2.写入消息失败,并报如下错误
1 | Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.95 CQ: 0.95 INDEX: -1.00], messages are put to the slave, message store has been shut down, etc. BROKER: 10.250.0.175:10911 |
-
错误原因
- RocketMQ 返回的 CODE: 14 表示:Broker 当前 不接受消息写入(服务暂不可用)。
- the broker’s disk is full [CL: 0.95 CQ: 0.95 INDEX: -1.00]: Broker 的磁盘已满
1
2
3CL: 0.95 → CommitLog 95% 已使用
CQ: 0.95 → ConsumeQueue 95% 已使用
INDEX: -1.00 → 索引异常或未采集
| 配置项 | 含义 | 默认值 |
|---|---|---|
diskMaxUsedSpaceRatio |
Broker 磁盘最大可用比例(超过后禁止写入) | 75% |
storePathCommitLog |
消息存储路径(CommitLog) | ~/store/commitlog |
storePathConsumeQueue |
消费队列路径(ConsumeQueue) | ~/store/consumequeue |
storePathIndex |
索引路径 | ~/store/index |
-
总结:可以确认是 磁盘使用率过高 导致 Broker 自动进入 “写保护” 模式。
-
解决方法
-
- 清理磁盘:确认磁盘使用率过高,并清理磁盘空间,既降低磁盘使用率
-
- 磁盘扩容:如果清理磁盘空间后,磁盘使用率依然过高,则需要扩容磁盘
-
- 配置调整:调整 Broker 配置(
broker.conf),将diskMaxUsedSpaceRatio配置适当提高,如 96%(diskMaxUsedSpaceRatio=96),调整后重启 Broker。仅建议在紧急情况下临时解决。
- 配置调整:调整 Broker 配置(
-