RocketMQ 的安装及使用

摘要

  • 本文介绍 CentOS9 中 RocketMQ 的安装与使用。

  • RocketMQ官网

  • 本文使用的 RocketMQ 版本为 5.3.2。

Apache RocketMQ 简介

一、RocketMQ 是什么?

RocketMQ 是一个分布式、队列模型的消息中间件。它由阿里巴巴在2012年开源,并于2017年正式成为 Apache 基金会的顶级项目。

你可以把它想象成一个在分布式系统中负责可靠传递消息的“邮局”或“快递系统”。当系统A需要发送数据给系统B,但它们之间不直接通信时,就可以通过 RocketMQ 来中转,确保消息不丢失、不重复,并且能按顺序送达。

RocketMQ 是一个高性能、高可靠、高实时的分布式消息中间件。它就像分布式系统的“中枢神经系统”,负责在各个服务之间可靠、高效地传递数据,是现代互联网架构中不可或缺的基础组件之一。

RocketMQ 5.x 通过引入 Proxy 模式,极大地提升了架构的灵活性、多语言支持能力和云原生亲和力,是其在消息中间件领域持续演进的重要里程碑。

它与 Kafka、RabbitMQ 等都是业界顶级的消息队列,但各有侧重。RocketMQ 在事务消息、顺序消息和对在线业务的稳定性支持方面表现尤为出色。


二、核心特点与优势

序号 特性 典型场景 主要作用
1 削峰填谷 电商秒杀、大促活动时大量下单请求瞬间涌入 将突发请求先缓存为消息,后端系统按自身能力平稳消费,避免系统过载崩溃
2 异步解耦 用户注册后触发多系统任务(邮件、优惠券、积分) 主流程只负责发送消息,其他系统独立异步处理,降低系统间耦合、提高扩展性
3 顺序消息 订单状态变更(创建 → 付款 → 发货 → 收货) 同一业务键(如订单ID)的消息按顺序发送和消费,保证业务逻辑正确性
4 持久化与高可靠性 关键业务消息必须不丢失(交易、支付、日志) 所有消息写入磁盘并支持主从复制,即使服务器重启也能恢复,保证高可用
5 消息回溯 消费逻辑出错、数据重算、补偿任务 支持重置消费位点,重新消费历史消息,实现业务补偿与追溯
6 海量消息堆积能力 大规模异步日志收集、IoT 数据汇聚、埋点分析 支持万亿级消息堆积,性能稳定不衰减,适用于大规模数据场景

三、核心架构与概念

要理解 RocketMQ,需要知道几个关键角色:

经典核心组件

序号 组件名称 主要作用 说明 / 特点
1 Producer(生产者) 发送消息的客户端 负责将业务系统的消息发送到指定的 Topic,支持同步、异步、单向三种发送方式
2 Consumer(消费者) 接收并消费消息的客户端 从 Broker 拉取消息并进行业务处理,可分为 PushPull 两种消费模式
3 Consumer Group(消费者组) 实现负载均衡与高可用消费 多个消费者订阅同一 Topic 时组成消费者组,一个分区只会被组内一个消费者消费
4 Broker(消息服务器) 存储和转发消息 RocketMQ 的核心组件,负责消息的持久化、转发、主从复制和高可用
5 Topic(主题) 消息的分类与路由单元 Producer 发送消息到指定 Topic,Consumer 订阅 Topic 消费消息;一个 Topic 可包含多个消息队列(分区)
6 Name Server(名字服务) 管理 Broker 地址信息 类似轻量级注册中心,维护 Broker 元数据,帮助 Producer 和 Consumer 定位消息存储位置
7 Controller(控制器) 主从自动切换与高可用控制 RocketMQ 5.x 引入,基于 Raft(DLedger)协议实现 Broker 自动选主和元数据管理
8 Proxy(代理层) 客户端访问入口与协议转换 RocketMQ 5.x 新组件,无状态,可横向扩展;统一接入层,支持多协议(如 HTTP、gRPC),隔离客户端与 Broker

引入 Proxy 模式的优势:

1
2
3
4
**架构解耦与语言无关**:Proxy 作为通用代理,将复杂的 Broker 协议封装成更简单的接口(如 gRPC),使得用不同编程语言(如 Go, Python, C++ 等)开发的客户端更容易接入,而无需实现复杂的原生协议。
**简化客户端**:客户端不再需要感知 Name Server 和 Broker 的地址变化,只需连接固定的 Proxy 地址即可,大大降低了客户端的复杂度。
**增强安全性**:可以在 Proxy 层统一实现安全认证、限流、审计等策略,作为Broker集群的安全屏障。
**云原生友好**:无状态的 Proxy 非常适合在 Kubernetes 等容器化环境中进行部署和弹性伸缩。

消息中间件功能对比表(ActiveMQ vs Kafka vs RabbitMQ vs RocketMQ)

功能项 ActiveMQ Kafka RabbitMQ RocketMQ
客户端 SDK Java、.NET、C++ 等 Java、Scala 等 Java、.NET、Go、Python、C 等 Java、C++、Go
通信协议与规范 推送模型(Push),支持 OpenWire、STOMP、AMQP、MQTT、JMS 拉取模型(Pull),支持 TCP 推送模型(Push),支持 AMQP、MQTT、STOMP、HTTP、WebSocket 拉取模型(Pull),支持 TCP、JMS、OpenMessaging
消息有序性 通过独占消费者(Exclusive Consumer)或独占队列(Exclusive Queues)保证顺序 保证分区内消息顺序 单队列内消息天然有序 严格顺序消息,可平滑扩展
定时/延迟消息 支持 不支持 支持(使用延迟插件) 支持
批量消息 不支持 支持(异步生产者) 支持(Publisher Confirms 模式下) 支持(同步模式可避免消息丢失)
广播消息 支持 不支持 支持(Fanout 交换机) 支持
消息过滤 支持 支持(可用 Kafka Streams 实现) 支持(基于 Exchange 的路由键或 Header) 支持(基于 SQL92 属性过滤)
服务器端触发重投递 不支持 不支持 支持(Nack 或 TTL+DLX) 支持
消息存储 支持高性能持久化(JDBC + LevelDB/KahaDB) 高性能文件存储 内存+磁盘混合存储(Mnesia/基于 Erlang 的日志) 高性能、低延迟文件存储
消息回溯(历史消息查询) 支持 支持(通过 offset) 不支持(消息被消费后无法回溯) 支持(时间戳与 offset)
消息优先级 支持 不支持 支持(优先级队列) 不支持
高可用与故障转移 依赖存储,如 LevelDB 需 ZooKeeper 需要 ZooKeeper 支持镜像队列(Classic / Quorum 模式) 支持主从模式(无需额外组件)
消息轨迹(Message Track) 不支持 不支持 不支持(可通过插件扩展) 支持
配置复杂度 默认配置较低,需手动优化 配置为键值对,可文件或代码提供 开箱即用,配置灵活但选项较多 开箱即用,仅需关注少量配置
管理与运维工具 支持 支持(命令行监控) 支持(Web 管理控制台、CLI) 支持(丰富的 Web 与命令行工具)

RocketMQ 的安装

  • RocketMQ 5.x 依赖 JDK 1.8+。

单机安装

1
2
3
4
5
mkdir -p /usr/local/soft/rocketmq/
wget https://dist.apache.org/repos/dist/release/rocketmq/5.3.2/rocketmq-all-5.3.2-bin-release.zip
unzip rocketmq-all-5.3.2-bin-release.zip
ln -s rocketmq-all-5.3.2-bin-release rocketmq5
cd rocketmq5

小贴士

  • 默认脚本中,NameServer需要4G内存,Broker 需要8G内存,如果内存不够,可以进入bin目录,对其中的runserver.shrunbroker.sh两个脚本进行一下修改
1
2
3
4
5
6
7
8
9
10
# 使用vi runserver.sh指令,编辑这个脚本,找到下面的一行配置,调整Java进程的内存大小。
# NameServer,Controller,Proxy 都使用这个脚本启动
JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2G -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改为:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

# 接下来,同样调整runbroker.sh中的内存大小。Broker 使用这个脚本启动
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
修改为:
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g"
  • 启动 NameServer

安装完RocketMQ包后,我们启动NameServer

1
2
3
4
5
6
7
8
9
### 启动namesrv
$ nohup sh bin/mqnamesrv &
## 指定配置文件
$ nohup sh bin/mqnamesrv -c namesrv.conf &

### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
# 我们可以在namesrv.log 中看到 'The Name Server boot success..', 表示NameServer 已成功启动。
The Name Server boot success. serializeType=JSON, address 0.0.0.0:9876

namesrv.conf 示例

1
2
# The port of nameserver
listenPort = 9876
  • 启动 Broker+Proxy

NameServer成功启动后,我们启动Broker和Proxy。这里我们使用 Local 模式部署,即 Broker 和 Proxy 同进程部署。5.x 版本也支持 Broker 和 Proxy 分离部署以实现更灵活的集群能力。

1
2
3
4
5
6
7
8
9
### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &
# 指定配置文件, 默认就是 conf/broker.conf
$ nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf --enable-proxy &

### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/proxy.log
# 我们可以在 proxy.log 中看到“The broker[brokerName,ip:port] boot success..”,这表明 broker 已成功启动。
The broker[broker-a, 10.250.0.175:10911] boot success. serializeType=JSON and name server is localhost:9876
  • 关闭服务器

1
2
3
4
# 先停止 Broker
$ sh bin/mqshutdown broker
# 停止 NameServer
$ sh bin/mqshutdown namesrv

集群安装:多节点(集群)多副本模式-异步复制

  • 官网文档 对集群安装的方式介绍了多种,本文仅实战一种:多节点(集群)多副本模式-异步复制

  • 每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

    • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
    • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
  • 该模式下,Master 节点和 Slave 节点之间是异步复制的,Master 节点挂掉后,Slave 节点不会自动切换为 Master 节点。

  • 集群规划

1
2
3
4
5
6
7
8
9
10
11
12
13
# NameServer 3 台
NameServer1 10.250.0.175
NameServer2 10.250.0.188
NameServer3 10.250.0.31

# Broker 2 Master 2 Replicas
Broker1 10.250.0.188 broker-a,broker-b-s
Broker2 10.250.0.31 broker-b,broker-a-s

# Proxy 3 台
Proxy1 10.250.0.175
Proxy2 10.250.0.188
Proxy3 10.250.0.31

部署 NameServer

  • 在三台服务器上分别启动RocketMQ NameServer

1
2
cd /usr/local/soft/rocketmq/rocketmq5
nohup sh bin/mqnamesrv &

部署Broker

  • broker-a.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
brokerClusterName=DefaultCluster # 集群名称必须一致
brokerName=broker-a # broker 名称,master 和 slave 的 brokerName 必须一致
brokerId=0 # brokerId 必须唯一 ,且 master 的 brokerId 必须为 0
deleteWhen=04 # 表示凌晨 4 点清理
fileReservedTime=48 # 表示保存 48 小时的数据
brokerRole=ASYNC_MASTER # 角色,表示异步复制的主节点
flushDiskType=ASYNC_FLUSH # 表示异步刷盘

# 因为同一台服务器上启动多个 Broker,所以需要指定不同的存储路径和端口
# 存储数据路径,后面会介绍
storePathRootDir=/usr/local/soft/rocketmq/data/store-a
storePathCommitLog=/usr/local/soft/rocketmq/data/store-a/commitlog
storePathConsumeQueue=/usr/local/soft/rocketmq/data/store-a/consumequeue
storePathIndex=/usr/local/soft/rocketmq/data/store-a/index
storePathConfig=/usr/local/soft/rocketmq/data/store-a/config
storeCheckpoint=/usr/local/soft/rocketmq/data/store-a/checkpoint
abortFile=/usr/local/soft/rocketmq/data/store-a/abort
#Broker 对外服务的监听端口,同一台机器上启动多个Broker,需要指定不同的端口
listenPort=10911
  • broker-a-s.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
brokerClusterName=DefaultCluster # 集群名称必须一致
brokerName=broker-a # broker 名称,master 和 slave 的 brokerName 必须一致
brokerId=1 # brokerId 必须唯一 ,且 slave 的 brokerId 必须大于 0
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE # 角色,表示异步复制的从节点
flushDiskType=ASYNC_FLUSH

# 因为同一台服务器上启动多个 Broker,所以需要指定不同的存储路径和端口
# 存储数据路径
storePathRootDir=/usr/local/soft/rocketmq/data/store-a
storePathCommitLog=/usr/local/soft/rocketmq/data/store-a/commitlog
storePathConsumeQueue=/usr/local/soft/rocketmq/data/store-a/consumequeue
storePathIndex=/usr/local/soft/rocketmq/data/store-a/index
storePathConfig=/usr/local/soft/rocketmq/data/store-a/config
storeCheckpoint=/usr/local/soft/rocketmq/data/store-a/checkpoint
abortFile=/usr/local/soft/rocketmq/data/store-a/abort
#Broker 对外服务的监听端口,同一台机器上启动多个Broker,需要指定不同的端口
listenPort=11011
  • broker-b.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
brokerClusterName=DefaultCluster # 集群名称必须一致
brokerName=broker-b # broker 名称,master 和 slave 的 brokerName 必须一致
brokerId=0 # brokerId 必须唯一 ,且 master 的 brokerId 必须为 0
deleteWhen=04 # 表示凌晨 4 点清理
fileReservedTime=48 # 表示保存 48 小时的数据
brokerRole=ASYNC_MASTER # 角色,表示异步复制的主节点
flushDiskType=ASYNC_FLUSH # 表示异步刷盘

# 因为同一台服务器上启动多个 Broker,所以需要指定不同的存储路径和端口
# 存储数据路径
storePathRootDir=/usr/local/soft/rocketmq/data/store-b
storePathCommitLog=/usr/local/soft/rocketmq/data/store-b/commitlog
storePathConsumeQueue=/usr/local/soft/rocketmq/data/store-b/consumequeue
storePathIndex=/usr/local/soft/rocketmq/data/store-b/index
storePathConfig=/usr/local/soft/rocketmq/data/store-b/config
storeCheckpoint=/usr/local/soft/rocketmq/data/store-b/checkpoint
abortFile=/usr/local/soft/rocketmq/data/store-b/abort

#Broker 对外服务的监听端口,同一台机器上启动多个Broker,需要指定不同的端口
listenPort=10911
  • broker-b-s.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
brokerClusterName=DefaultCluster # 集群名称必须一致
brokerName=broker-b # broker 名称,master 和 slave 的 brokerName 必须一致
brokerId=1 # brokerId 必须唯一 ,且 slave 的 brokerId 必须大于 0
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE # 角色,表示异步复制的从节点
flushDiskType=ASYNC_FLUSH

# 因为同一台服务器上启动多个 Broker,所以需要指定不同的存储路径和端口
# 存储数据路径
storePathRootDir=/usr/local/soft/rocketmq/data/store-b
storePathCommitLog=/usr/local/soft/rocketmq/data/store-b/commitlog
storePathConsumeQueue=/usr/local/soft/rocketmq/data/store-b/consumequeue
storePathIndex=/usr/local/soft/rocketmq/data/store-b/index
storePathConfig=/usr/local/soft/rocketmq/data/store-b/config
storeCheckpoint=/usr/local/soft/rocketmq/data/store-b/checkpoint
abortFile=/usr/local/soft/rocketmq/data/store-b/abort

#Broker 对外服务的监听端口,同一台机器上启动多个Broker,需要指定不同的端口
listenPort=11011
  • 在 Broker1 10.250.0.188 上启动 broker-a 和 broker-b-s

1
2
3
4
5
6
7
# 启动 broker-a
nohup sh bin/mqbroker -n "10.250.0.175:9876;10.250.0.188:9876;10.250.0.31:9876" -c conf/2m-2s-async/broker-a.properties &
# 启动 broker-b-s
nohup sh bin/mqbroker -n "10.250.0.175:9876;10.250.0.188:9876;10.250.0.31:9876" -c conf/2m-2s-async/broker-b-s.properties &

## nohup.out 中的输出类似与下面这种就表示启动成功
The broker[broker-a, 10.250.0.31:11011] boot success. serializeType=JSON and name server is 10.250.0.175:9876;10.250.0.188:9876;10.250.0.31:9876
  • 在 Broker2 10.250.0.31 上启动 broker-b 和 broker-a-s

1
2
3
4
# 启动 broker-b
nohup sh bin/mqbroker -n "10.250.0.175:9876;10.250.0.188:9876;10.250.0.31:9876" -c conf/2m-2s-async/broker-b.properties &
# 启动 broker-a-s
nohup sh bin/mqbroker -n "10.250.0.175:9876;10.250.0.188:9876;10.250.0.31:9876" -c conf/2m-2s-async/broker-a-s.properties &
  • 启动成功后,可以通过如下命令检查机器状态

1
2
3
4
5
6
7
8
# 确认 Broker 是否已经成功注册到 Nameserver,执行以下命令(在任意一台机器上)
sh bin/mqadmin clusterList -n 10.250.0.175:9876
## 输出类似如下
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #Timer(Progress) #PCWait(ms) #Hour #SPACE #ACTIVATED
DefaultCluster broker-a 0 10.250.0.188:10911 V5_3_2 0.00(0,0ms) 0.00(0,0ms|0,0ms) 0-0(0.0w, 0.0, 0.0) 0 489250.72 0.2900 true
DefaultCluster broker-a 1 10.250.0.31:11011 V5_3_2 0.00(0,0ms) 0.00(0,0ms|0,0ms) 3-0(0.0w, 0.0, 0.0) 0 489250.72 0.2600 false
DefaultCluster broker-b 0 10.250.0.31:10911 V5_3_2 0.00(0,0ms) 0.00(0,0ms|0,0ms) 0-0(0.0w, 0.0, 0.0) 0 489250.72 0.2600 true
DefaultCluster broker-b 1 10.250.0.188:11011 V5_3_2 0.00(0,0ms) 0.00(0,0ms|0,0ms) 3-0(0.0w, 0.0, 0.0) 0 489250.72 0.2900 false

配置 Proxy

  • 在三台服务器上分别启动RocketMQ NameServer

1
2
3
4
5
6
7
8
nohup sh bin/mqproxy -n "10.250.0.175:9876;10.250.0.188:9876;10.250.0.31:9876" &

## 指定配置文件,这里要注意,集群的名称要与 conf/rmq-proxy.json 中配置的集群名称必须一致,默认是 DefaultCluster
## 默认的配置文件就是 conf/rmq-proxy.json
nohup sh bin/mqproxy -n "10.250.0.175:9876;10.250.0.188:9876;10.250.0.31:9876" -pc conf/rmq-proxy.json &

## 查看日志,输出如下内容就表示启动成功,tail -f nohup.out
rocketmq-proxy startup successfully

rmq-proxy.json 示例

1
2
3
4
5
{
"rocketMQClusterName": "DefaultCluster", # 集群名称
"remotingListenPort": 8080, # 监听端口,默认 8080
"grpcServerPort": 8081 # grpc 监听端口,默认 8081
}
  • 停止Proxy

1
2
# 停止 Proxy
sh bin/mqshutdown proxy

集群安装:主备自动切换模式部署

  • RocketMQ 5.x 提供了一种新的部署方式 Controller,可以在主从模式下实现主备自动切换,当主节点挂掉时,自动切换到从节点上运行。

  • 官方文档:主备自动切换模式部署

  • Controller 组件提供选主能力,若需要保证 Controller 具备容错能力,Controller 部署需要三副本及以上(遵循 Raft 的多数派协议)。

  • 本文在上文“集群安装:多节点(集群)多副本模式-异步复制”的基础上进行修改

  • Controller 部署有两种方式。一种是嵌入于 NameServer 进行部署,另一种是独立部署,本文采用独立部署 Controller 组件的方式。

  • 集群规划

1
2
3
4
# Controller 3 台
Controller1 10.250.0.175
Controller2 10.250.0.188
Controller3 10.250.0.31
  • 分别在每台机器上创建controller.conf配置文件,内容如下(注意修改节点Id)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# controller.conf
# ---------------------------------------------------------
# DLedger Raft Group 的名字,同一集群保持一致
controllerDLegerGroup = group1
# 集群中三个节点的成员定义,每个节点都必须一致
controllerDLegerPeers = n0-10.250.0.175:9877;n1-10.250.0.188:9877;n2-10.250.0.31:9877
# 节点 id,必须属于 controllerDLegerPeers 中的一个;同 Group 内各个节点要唯一
controllerDLegerSelfId = n0
# Controller 数据存储路径(非常关键!不要删除)
controllerStorePath = /usr/local/soft/rocketmq/data/DledgerController
# 是否允许从 SyncStateSet 外选举 Master
# true 会加快选举但可能丢消息,建议生产保持 false
enableElectUncleanMaster = false
# 当 Broker 副本角色变化时是否主动通知(建议开启)
notifyBrokerRoleChanged = true
# 启动端口,端口不能与 NameServer、Broker、Proxy 端口冲突
listenPort = 9877
  • 分别启动每台机器上的 Controller

1
2
3
4
nohup sh bin/mqcontroller -n "10.250.0.175:9876;10.250.0.188:9876;10.250.0.31:9876" -c conf/controller.conf &
## 启动成功后,查看 nohup.out 文件,输出如下内容就表示启动成功
load config properties file OK, conf/controller.conf
The Controller Server boot success. serializeType=JSON
  • 修改 broker 配置文件,以 broker-a.properties 为例

1
2
3
4
5
6
7
8
9
# 去掉如下配置,Controller 模式下 会自动分配
# brokerId=1 # brokerId 必须唯一 ,且 slave 的 brokerId 必须大于 0
# brokerRole=ASYNC_MASTER # 角色,表示异步复制的主节点

# 添加如下配置
# 启用 Controller 模式(自动主从切换模式的总开关)
enableControllerMode = true
# Controller 集群地址列表(建议与 Controller 集群保持一致)
controllerAddr = 10.250.0.175:9877;10.250.0.188:9877;10.250.0.31:9877

RocketMQ 5 Broker Controller 模式配置参数表

参数名 说明 默认值 备注 / 建议
enableControllerMode 是否启用 Controller 模式(自动主从切换总开关) false 必须设为 true 才能启用自动主从切换
controllerAddr Controller 集群地址列表(以分号分隔) 所有 Broker 配置应一致,例如 10.250.0.175:9877;10.250.0.188:9877;10.250.0.31:9877
syncBrokerMetadataPeriod 向 Controller 同步 Broker 副本信息的时间间隔(毫秒) 5000 (5s) 保持默认即可;用于上报心跳与元数据
checkSyncStateSetPeriod 检查同步状态集(SyncStateSet)的时间间隔(毫秒) 5000 (5s) Controller 会定期剔除落后副本
syncControllerMetadataPeriod 同步 Controller 元数据的时间间隔(毫秒) 10000 (10s) Broker 定期从集群获取当前活跃 Controller 地址
haMaxTimeSlaveNotCatchup Slave 未跟上 Master 的最大时间间隔(毫秒) 15000 (15s) 超过该时间将 Slave 移出 SyncStateSet
storePathEpochFile Epoch 文件存储路径 store/epochFile 非常重要!不要删除;存储主从任期、epoch 等元信息
allAckInSyncStateSet 是否要求所有同步副本都 ACK 后才返回成功 false true 可保证强一致但性能下降;建议保持默认
syncFromLastFile Slave 是否从最后一个文件开始复制(空盘启动时) false 若历史日志很大且 Slave 新建,可设为 true
asyncLearner 是否为异步 learner 副本(不参与选主) false 用于远程灾备副本,不会被选举为 Master
inSyncReplicas 需保持同步的副本组数量 1 allAckInSyncStateSet=true,该参数无效
minInSyncReplicas 最小同步副本数量,低于该值则拒绝写入 1 防止写入过多未同步副本导致数据丢失风险
  • 重新启动 Broker,为保证主从数据一致性在重启时不被破坏,启动顺序应为先重新原Master,再重启原Slave

  • 启动成功后,可以通过如下命令检查机器状态,可以看到集群内部自动分配了主从

1
2
3
4
5
6
7
8
# 确认 Broker 是否已经成功注册到 Nameserver,执行以下命令(在任意一台机器上)
sh bin/mqadmin clusterList -n 10.250.0.175:9876
## 输出类似如下
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #Timer(Progress) #PCWait(ms) #Hour #SPACE #ACTIVATED
DefaultCluster broker-a 0 10.250.0.188:10911 V5_3_2 0.00(0,0ms) 0.00(0,0ms|0,0ms) 0-0(0.0w, 0.0, 0.0) 0 489268.48 0.2900 true
DefaultCluster broker-a 2 10.250.0.31:11011 V5_3_2 0.00(0,0ms) 0.00(0,0ms|0,0ms) 2-0(0.0w, 0.0, 0.0) 0 489268.48 0.2700 false
DefaultCluster broker-b 0 10.250.0.31:10911 V5_3_2 0.00(0,0ms) 0.00(0,0ms|0,0ms) 0-0(0.0w, 0.0, 0.0) 0 489268.48 0.2700 true
DefaultCluster broker-b 2 10.250.0.188:11011 V5_3_2 0.00(0,0ms) 0.00(0,0ms|0,0ms) 3-0(0.0w, 0.0, 0.0) 0 489268.48 0.2900 false
  • 验证主备自动切换,此时关闭 broker-b 的 Master,并查看集群状态

1
2
3
4
5
6
sh bin/mqadmin clusterList -n 10.250.0.175:9876
## 输出类似如下,可以看到`broker-b`原来的 Slave 被切换为 Master
#Cluster Name #Broker Name #BID #Addr #Version #InTPS(LOAD) #OutTPS(LOAD) #Timer(Progress) #PCWait(ms) #Hour #SPACE #ACTIVATED
DefaultCluster broker-a 0 10.250.0.188:10911 V5_3_2 0.00(0,0ms) 0.00(0,0ms|0,0ms) 0-0(0.0w, 0.0, 0.0) 0 489268.58 0.2900 true
DefaultCluster broker-a 2 10.250.0.31:11011 V5_3_2 0.00(0,0ms) 0.00(0,0ms|0,0ms) 1-0(0.0w, 0.0, 0.0) 0 489268.58 0.2700 false
DefaultCluster broker-b 0 10.250.0.188:11011 V5_3_2 0.00(0,0ms) 0.00(0,0ms|0,0ms) 0-0(0.0w, 0.0, 0.0) 0 489268.58 0.2900 true
  • 重新启动刚才关闭的 broker-b ,节点会自动加入集群,角色为 Slave

  • 停止 Controller

1
2
# 停止 PrControlleroxy
sh bin/mqshutdown controller

端口说明

端口号 协议 组件/服务 作用说明
9876 TCP NameServer RocketMQ 集群的 NameServer 服务端口。
用于 Broker 注册、客户端路由发现。
Producer/Consumer 连接此端口以获取 Broker 地址。
8080 TCP Proxy (gRPC / HTTP) RocketMQ 5 引入的 Proxy 服务 默认端口之一。
用于 HTTP/gRPC 客户端接入,例如 RocketMQ Proxy REST API、异步消息接口等。
8081 TCP Proxy Admin / Dashboard / gRPC Alt 通常是 Proxy 的 管理接口gRPC 辅助端口(依配置而定)。
也可能是控制面接口,用于与 Console 或控制工具通信。
10909 TCP Broker HA (High Availability) Broker 主从同步端口(Master ↔ Slave 之间的数据复制)。
用于消息数据与元数据的同步。
10911 TCP Broker 服务端口 Broker 的 主通信端口,客户端连接发送消息、消费消息、心跳等。
Producer 和 Consumer 通过 NameServer 获取该端口地址后进行通信。
10912 TCP Broker HA 客户端端口 Broker 主从复制中的 Slave 连接 Master 时使用的 客户端监听端口
通常与 10909 配合使用,一主多从模式中 Slave 主动连接 Master。

日志及数据存储路径

  • RocketMQ 5 主要有三类服务组件需要关注它们的存储目录

组件 功能 默认存储内容 默认路径(Linux 环境)
NameServer 路由服务(注册中心) 各个组件的的注册 日志文件:~/logs/rocketmqlogs/namesrv.log
日志配置:${ROCKETMQ_HOME}/conf/rmq.namesrv.logback.xml
配置文件:${ROCKETMQ_HOME}/conf/namesrv.conf(可选)
Broker 核心消息存储与转发服务 消息数据(CommitLog、ConsumeQueue、Index、Config)
目录结构:
├── commitlog/ → 消息物理文件
├── consumequeue/ → 消费队列索引
├── index/ → 消息索引
├── config/ → topic、offset、subscription 信息
├── checkpoint → 存储校验点
├── abort → 异常退出标志
数据目录~/store
日志文件:~/logs/rocketmqlogs/broker.log
日志配置:${ROCKETMQ_HOME}/conf/rmq.broker.logback.xml
配置文件:${ROCKETMQ_HOME}/conf/broker.conf
Proxy 客户端访问入口层(无状态代理)
(5.x 新引入组件)
转发日志、访问日志 日志配置:${ROCKETMQ_HOME}/conf/rmq.proxy.logback.xml
日志文件:~/logs/rocketmqlogs/proxy.log
配置文件:${ROCKETMQ_HOME}/conf/rmq-proxy.json
Controller Broker 主从协调与高可用管理
(5.x 新引入组件)
- 集群主从元数据(主从关系、broker注册信息)
- Controller 自身运行状态与选举元数据
数据目录~/store/controller
日志文件:~/logs/rocketmqlogs/controller.log
日志配置:${ROCKETMQ_HOME}/conf/rmq.controller.logback.xml
配置文件:${ROCKETMQ_HOME}/conf/controller.conf
  • NameServer 和 Proxy 都是无状态(stateless)组件,不会持久化业务数据。

  • Broker 数据路径说明

配置项 默认路径 说明 主要作用
storePathRootDir /home/rocketmq/store
(默认)
消息存储的根目录 作为所有存储文件的父级目录,其他路径若未单独配置,则在此目录下创建
storePathCommitLog ${storePathRootDir}/commitlog CommitLog 文件存放路径 存储消息主体内容,是最核心的数据文件(顺序写入)
storePathConsumeQueue ${storePathRootDir}/consumequeue 消费队列文件存放路径 存储消息在队列中的索引(逻辑队列),指向 CommitLog 的物理位置
storePathIndex ${storePathRootDir}/index 索引文件存放路径 提供按 Key 查询消息的索引结构,便于通过 Message Key 快速检索消息
storePathConfig ${storePathRootDir}/config Broker 运行时配置存储路径 存储运行时生成的配置文件,如 topic、consumerOffset、subscriptionGroup 等
storeCheckpoint ${storePathRootDir}/checkpoint Checkpoint 文件路径 记录 CommitLog、ConsumeQueue、Index 三者的刷盘进度,用于崩溃恢复
abortFile ${storePathRootDir}/abort 异常退出标志文件路径 用于标识 Broker 是否异常退出,启动时据此判断是否执行恢复流程

安装过程中遇到的问题

1.启动 Proxy 失败

  • 无论是 Broker+Proxy 启动,还是 单独启动 Proxy,都报如下错误:

1
2
3
4
5
6
# 错误会在 nohup.out 中输出
Exception in thread "main" java.lang.UnsatisfiedLinkError: failed to load the required native library

Caused by: java.lang.IllegalArgumentException: Failed to load any of the given libraries: [netty_tcnative_linux_x86_64_fedora, netty_tcnative_linux_x86_64, netty_tcnative_x86_64, netty_tcnative]

Suppressed: java.lang.UnsatisfiedLinkError: /tmp/libnetty_tcnative_linux_x86_642308675901892111861.so: libcrypt.so.1: cannot open shared object file: No such file or directory
  • 原因分析

      1. Netty-tcnative 的编译依赖:RocketMQ 使用的 Netty 的 tcnative 模块是在较旧的环境中编译的,而动态链接的版本锁定:编译时链接的是 libcrypt.so.1,运行时必须找到相同主版本号的库
      1. 而我当前使用的系统为 Amazon Linux 2023,基于更新的 glibc,其加密功能已经迁移到 libcrypt.so.2。(Amazon Linux 2:基于较旧的 glibc 版本,libcrypt.so.1 是主要的加密库)
1
2
3
4
5
6
7
8
# 检查 libcrypt 是否存在
$ ldconfig -p | grep libcrypt
## 输出
libcryptsetup.so.12 (libc6,x86-64) => /lib64/libcryptsetup.so.12
libcrypto.so.3 (libc6,x86-64) => /lib64/libcrypto.so.3
libcrypto.so (libc6,x86-64) => /lib64/libcrypto.so
libcrypt.so.2 (libc6,x86-64) => /lib64/libcrypt.so.2
libcrypt.so (libc6,x86-64) => /lib64/libcrypt.so
  • 解决办法

1
2
3
4
5
6
7
8
9
10
11
# 安装兼容性包
sudo yum install libxcrypt-compat
# 检查 libcrypt 是否存在
$ ldconfig -p | grep libcrypt
## 输出
libcryptsetup.so.12 (libc6,x86-64) => /lib64/libcryptsetup.so.12
libcrypto.so.3 (libc6,x86-64) => /lib64/libcrypto.so.3
libcrypto.so (libc6,x86-64) => /lib64/libcrypto.so
libcrypt.so.2 (libc6,x86-64) => /lib64/libcrypt.so.2
libcrypt.so.1 (libc6,x86-64) => /lib64/libcrypt.so.1
libcrypt.so (libc6,x86-64) => /lib64/libcrypt.so

2.写入消息失败,并报如下错误

1
Caused by: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL: 0.95 CQ: 0.95 INDEX: -1.00], messages are put to the slave, message store has been shut down, etc. BROKER: 10.250.0.175:10911
  • 错误原因

    • RocketMQ 返回的 CODE: 14 表示:Broker 当前 不接受消息写入(服务暂不可用)。
    • the broker’s disk is full [CL: 0.95 CQ: 0.95 INDEX: -1.00]: Broker 的磁盘已满
    1
    2
    3
    CL: 0.95 → CommitLog 95% 已使用
    CQ: 0.95 → ConsumeQueue 95% 已使用
    INDEX: -1.00 → 索引异常或未采集
配置项 含义 默认值
diskMaxUsedSpaceRatio Broker 磁盘最大可用比例(超过后禁止写入) 75%
storePathCommitLog 消息存储路径(CommitLog) ~/store/commitlog
storePathConsumeQueue 消费队列路径(ConsumeQueue) ~/store/consumequeue
storePathIndex 索引路径 ~/store/index
  • 总结:可以确认是 磁盘使用率过高 导致 Broker 自动进入 “写保护” 模式。

  • 解决方法

      1. 清理磁盘:确认磁盘使用率过高,并清理磁盘空间,既降低磁盘使用率
      1. 磁盘扩容:如果清理磁盘空间后,磁盘使用率依然过高,则需要扩容磁盘
      1. 配置调整:调整 Broker 配置(broker.conf),将 diskMaxUsedSpaceRatio 配置适当提高,如 96%(diskMaxUsedSpaceRatio=96),调整后重启 Broker。仅建议在紧急情况下临时解决。