Redis 命令及数据类型 -- Stream
摘要
- 本文介绍 Redis Stream 数据类型
- 本文基于
redis-7.4.7,springboot-3.5.8 - Redis官网:https://redis.io/
- Redis 命令文档:https://redis.io/docs/latest/commands/
Stream 核心详解
-
Redis Stream 是 Redis 5.0 新增的有序、可持久化、支持多播的
消息队列,底层用基数树+链表实现,兼顾了高效查询与有序写入,完美解决了 List 队列(无法多播、无持久化保障)、Pub/Sub(无持久化、丢消息)的痛点,是生产环境首选的 Redis 消息队列方案。 -
建议生产环境还是使用传统的 MQ 方案,如果仅是内部系统使用的轻量MQ,已经有了redis,但是不想引入其它中间件,也可以尝试。
-
Stream 与传统的MQ 的对比
| 对比维度 | Redis Stream | RabbitMQ | Kafka |
|---|---|---|---|
| 数据模型 | 类似日志的有序 KV 消息流(ID → field/value) | 队列(FIFO) | 日志分区(Partitioned Append-Only Log) |
| 消息持久化 | 可选持久化(AOF / RDB),默认内存优先 | 可持久化到磁盘 | 持久化到磁盘,顺序写入,效率高 |
| 消息确认 | XACK 对单条消息确认,支持 Pending 消息管理 | ACK / NACK | Offset 控制,Consumer 自行提交 |
| 消费模式 | 支持 Consumer Group,多消费者共享 Pending 消息 | Queue 绑定 Consumer,多消费者抢占 | Consumer Group,多消费者平行消费 |
| 重复消费 | 默认可能重复,需要应用端幂等 | 可通过 ACK/NACK 控制 | 默认可能重复,Consumer 需幂等处理 |
| 消息顺序 | 按 Stream ID 顺序,可保证分组内顺序 | 队列顺序保证 | Partition 内顺序保证 |
| 消息保留策略 | 可配置 maxlen / minid,按时间或长度裁剪 | 队列长度 / TTL 控制 | 基于时间或大小保留(Retention Policy) |
| 延时/定时消费 | 原生不支持延时队列,需要应用端处理 | 支持插件或 TTL | 原生不支持,需要应用端处理或 Kafka Streams |
| 事务与原子操作 | 事务可用 MULTI/EXEC,XADD 支持 NOMKSTREAM 等选项 | 原生事务支持(事务 / confirm 模式) | 不支持事务,依赖幂等生产者 |
| 性能 | 内存级高吞吐,持久化会有开销 | 中等,受磁盘和网络限制 | 高吞吐,顺序写入磁盘效率极高 |
| 典型使用场景 | 事件日志、轻量 MQ、内部异步流水线 | 企业级消息、任务调度、RPC | 大数据管道、日志收集、流处理 |
| 多语言支持 | 客户端支持多种语言(Java、Python、Go 等) | 客户端丰富 | 客户端丰富 |
| 易运维性 | 单节点即可使用,但持久化需关注内存 | 集群较复杂,需要 RabbitMQ 集群 | 集群复杂度高,需要 ZooKeeper 或 KRaft |
底层核心实现
-
存储结构:核心是基数树(Radix Tree)+ 双向链表,基数树存「消息ID→消息内容」的映射,双向链表按消息ID有序串联所有消息,保证写入和按ID查询的高效性(O(logN))。
-
消息ID:默认自动生成,格式为时间戳-序列号(如1734567890000-0),时间戳是毫秒级,序列号解决同一毫秒多消息的有序问题;也支持手动指定,需满足严格递增,否则写入失败。
-
持久化:和 Redis 其他数据结构一致,依赖 RDB/AOF 持久化,消息写入后会落盘,重启后不丢失,这是 Pub/Sub 不具备的核心优势。
-
核心元数据:每个 Stream 会维护last-id(最新消息ID)、groups(消费组列表)、entries(消息实体)三类元数据,消费组的元数据独立存储,互不干扰。
Stream 核心基础操作(必用)
1. 生产消息(XADD):写入队列
• 核心命令:XADD key ID 字段1 值1 字段2 值2 ...,ID 写*表示自动生成(生产首选)
1 | XADD key |
• 示例
1 | # 向订单队列写入1条消息,自动生成消息ID |
2. 消费消息(2种核心模式)
(1) 独立消费(无消费组):一对一消费,适合简单场景
• XREAD:主动拉取消息,支持阻塞/非阻塞
1 | XREAD |
• 示例1(非阻塞)
1 | # 从开头拉5条消息 |
• 示例2(阻塞)
1 | # $表示从最新消息开始拉,阻塞3秒,有新消息立即返回,无则3秒后超时,是生产常用写法。 |
(2) 消费组消费(XGROUP):一对多消费,核心生产模式
-
Stream 最核心的价值就是消费组,支持多消费者协同消费、消息确认、未消费消息追溯,解决了分布式场景下的消息分片与负载均衡问题。
-
- 先创建消费组
XGROUP CREATE
- 先创建消费组
1 | XGROUP CREATE key group id|$ |
-
消费者拉取消息
XREADGROUP
1 | XREADGROUP GROUP group consumer |
3. 消息确认(XACK)
-
消费完成后必须确认,否则会被标记为「未确认消息」
1 | # XACK key 消费组名 消息ID1 消息ID2 ...。 |
4. 消息重试
-
未确认的消息,会被存入消费组的
「PEL(Pending Entries List)」,可通过XPENDING key 消费组名查看,支持XCLAIM将PEL中的消息转移给其他消费者处理,避免单点故障导致消息堆积。 -
XPENDING:查看未被确认的消息情况
1 | XPENDING key group |
-
XCLAIM: 转移投递
将已经投递但未 ACK、且 idle 超过阈值的 Pending 消息,从原 consumer 手中“抢占”给新的 consumer,并重新投递。
一旦抢占成功,原 consumer 就不在拥有该消息的 Pending 记录
1 | XCLAIM key group consumer min-idle-time id [id ...] |
高级特性(生产必备)
-
消息回溯与遍历
1 | # XRANGE key 起始ID 结束ID [COUNT 条数](正向遍历) |
-
队列信息查询
1 | # 查消息总数 |
-
消费组管理
1 | # 删除消费组 |
-
惰性删除:Stream 不会主动删除已确认的消息,仅靠MAXLEN淘汰,若需主动清理历史消息,直接用XADD的MAXLEN参数即可,无需额外命令。
生产环境核心痛点与解决方案
-
消息丢失:3重保障
- 开启 Redis AOF 持久化(设为everysec,兼顾性能与可靠性)
- 生产者写入后确认返回值(确保写入成功)
- 消费者消费后必须
XACK确认。
-
消息堆积:2种处理
- ① 写入时用
MAXLEN设置上限,淘汰旧消息; - ② 消费端扩容消费者实例,消费组会自动将未消费消息分片给多个消费者,实现并行消费。
- ① 写入时用
-
重复消费
- 根源: 网络抖动(消费者确认消息前断开连接,消息重回PEL)
- 解决方案: 消息幂等性(生产者给消息加唯一标识,消费者根据唯一标识去重)。
-
阻塞超时:消费端用BLOCK阻塞拉取,超时时间建议设为3-5秒,避免频繁空轮询占用CPU,同时保证新消息的响应速度。
典型应用场景
分布式业务解耦(订单-库存-支付-物流解耦)
核心思路:单 Stream 对应核心业务(订单),库存、支付、物流各创建独立消费组,各自消费互不干扰,实现业务解耦。
-
生产者(订单服务):写入订单完成消息
1 | # 自动生成消息ID,写入订单核心信息,设置队列最大1万条消息(近似淘汰) |
-
创建3个独立消费组(库存/支付/物流)
1 | # 库存消费组:从最新消息开始消费,队列不存在则自动创建 |
-
各消费组消费者拉取+确认消息
1 | # 库存服务消费者(consumer_stock1)拉取3条未消费消息,阻塞5秒 |
异步任务处理(用户注册-邮件/短信/积分异步执行)
核心思路:注册接口只负责写入 Stream 消息,无需等待后续任务完成,单消费组多消费者提升异步任务处理效率,核心是快速响应前端。
-
生产者(注册服务):用户注册成功后写入消息
1 | # 写入用户注册信息,MAXLEN限制5000条,避免积压过多无效注册消息 |
-
创建单个消费组(统一处理注册后续任务)
1 | XGROUP CREATE user_register_stream group_reg_task $ MKSTREAM |
-
多消费者并行拉取(邮件/短信/积分各1个消费者,或多实例扩容)
1 | # 短信发送消费者(consumer_sms) |
日志收集(系统实时日志-分析/告警)
核心思路:各业务系统实时写入日志到 Stream,多消费组分别做日志分析、实时告警,兼顾实时性与数据留存,支持历史日志回溯。
-
生产者(各业务系统):实时写入系统日志(按级别/业务分类,这里统一写入总日志流)
1 | # 写入日志:包含业务模块、日志级别、内容、时间,无消息数量上限(按实际服务器内存调整MAXLEN) |
-
创建2个消费组(日志分析+实时告警)
1 | # 日志分析消费组(用于离线统计、数据归档),从队列开头消费(0-0),兜底所有历史日志 |
-
对应消费者拉取处理
1 | # 日志分析消费者(批量拉取,非阻塞,适合离线处理) |
限流削峰(秒杀场景-请求削峰填谷)
核心思路:秒杀请求高峰时,先写入 Stream 做缓冲,消费端匀速拉取(控制每秒处理量),避免下游数据库/业务服务被压垮,核心是“慢消费、稳处理”。
-
生产者(秒杀入口服务):接收秒杀请求,直接写入 Stream,快速返回“排队中”
1 | # 写入秒杀请求,设置MAXLEN 10000(限制最大排队数,超过则拒绝,避免OOM) |
-
创建消费组(单消费组+多消费者,控制总处理速率)
1 | XGROUP CREATE seckill_stream group_seckill $ MKSTREAM |
-
消费端(匀速拉取,核心是控制COUNT和消费频率,比如每秒处理100条)
1 | # 消费者(多实例部署,总处理量=单实例COUNT×实例数,这里单实例每次拉10条,每秒拉10次,单实例每秒处理100条) |
关键优化:消费端通过定时任务+固定COUNT拉取,而非无限拉取,精准控制处理速率,实现削峰填谷。
与其他 Redis 队列方案对比(核心优势)
• 对比 List
- List 是简单的先进先出,不支持多播(多个消费者会抢消息)、无消费组、无消息确认,仅适合简单一对一队列;
- Stream 支持多播+消费组+确认机制,适合复杂分布式场景。
| 对比项 | Stream | List |
|---|---|---|
| 消费者组 | ✅ | ❌ |
| ACK | ✅ | ❌ |
| 重试 | ✅ | ❌ |
| 阻塞 | ✅ | ✅ |
| 顺序性 | 强 | 强 |
• 对比 Pub/Sub
- Pub/Sub 无持久化,Redis 重启或消费者离线会丢消息;
- Pub/Sub 无消费组,消息发完即丢,仅适合实时广播(如聊天室),不适合重要业务。
Stream 命令
-
SpringBoot 的
StringRedisTemplate.opsForStream()中 Stream 数据类型 的操作方法与 Redis 原生命令的对应关系如下:
注意这里不一定要用
StringRedisTemplate来操作 Stream,但是用StringRedisTemplate可以保证可读性。
-
核心能力划分:
1 | 消息写入(XADD) |
消息写入(XADD)
-
1️⃣ 基础写入
| 方法功能 | 方法 opsForStream().xxx() |
Redis 原始命令 | 说明 |
|---|---|---|---|
| 写入 Map | add(K key, Map<HK,HV>) |
XADD key * field value |
自动生成 ID |
| 写入 Record | add(Record<K, ?> record) |
XADD |
支持 ObjectRecord |
| 写入 MapRecord | add(MapRecord<K,HK,HV>) |
XADD |
Map 形式 |
-
2️⃣ 带参数写入(推荐)
| 方法功能 | 方法 | Redis 原始命令 | 备注 |
|---|---|---|---|
| 写入 + 选项 | add(record, XAddOptions) |
XADD ... |
支持 MAXLEN / NOMKSTREAM |
| Map + 选项 | add(key, map, XAddOptions) |
XADD |
Redis ≥ 6 |
消息确认(XACK)
| 方法功能 | 方法 | Redis 原始命令 | 说明 |
|---|---|---|---|
| 确认消息 | acknowledge(key, group, recordIds…) |
XACK |
标记已消费 |
| Record 确认 | acknowledge(group, record) |
XACK |
常用 |
⚠️ 只对 Consumer Group 生效
消息读取(无消费者组)
-
1️⃣ 按 Range 读取(历史数据)
| 方法功能 | 方法 | Redis 原始命令 |
|---|---|---|
| 正序读取 | range(key, range) |
XRANGE |
| 限制条数 | range(key, range, limit) |
XRANGE |
| 反序读取 | reverseRange(key, range) |
XREVRANGE |
| 反序 + limit | reverseRange(key, range, limit) |
XREVRANGE |
-
2️⃣ 实时读取(XREAD)
| 方法功能 | 方法 | Redis 原始命令 | 说明 |
|---|---|---|---|
| 读取 | read(StreamOffset…) |
XREAD |
不支持 ACK |
| 带参数 | read(options, offsets…) |
XREAD |
BLOCK / COUNT |
| 映射对象 | read(Class<T>, …) |
XREAD |
自动反序列化 |
消费者组读取(XREADGROUP)
| 方法功能 | 方法 | Redis 原始命令 | 说明 |
|---|---|---|---|
| 组内读取 | read(Consumer, offsets…) |
XREADGROUP |
MQ 核心 |
| 带参数 | read(Consumer, options, offsets…) |
XREADGROUP |
BLOCK |
| 映射对象 | read(Class<T>, Consumer, …) |
XREADGROUP |
— |
Pending 消息管理(XPENDING / XCLAIM)
-
1️⃣ Pending 查询
| 方法功能 | 方法 | Redis 原始命令 |
|---|---|---|
| Pending 汇总 | pending(key, group) |
XPENDING |
| 指定消费者 | pending(key, consumer) |
XPENDING |
| 范围查询 | pending(key, group, range, count) |
XPENDING |
-
2️⃣ 消息重新分配(XCLAIM)
| 方法功能 | 方法 | Redis 原始命令 | 说明 |
|---|---|---|---|
| 重新分配 | claim(key, group, newOwner, minIdle, ids…) |
XCLAIM |
超时接管 |
| 高级配置 | claim(key, group, newOwner, XClaimOptions) |
XCLAIM |
force / retry |
消费者组管理(XGROUP)
| 方法功能 | 方法 | Redis 原始命令 | 说明 |
|---|---|---|---|
| 创建组 | createGroup(key, group) |
XGROUP CREATE |
从 $ 开始 |
| 指定 offset | createGroup(key, offset, group) |
XGROUP CREATE |
常用 0-0 |
| 删除消费者 | deleteConsumer(key, consumer) |
XGROUP DELCONSUMER |
|
| 销毁组 | destroyGroup(key, group) |
XGROUP DESTROY |
Stream 元信息(XINFO)
| 方法功能 | 方法 | Redis 原始命令 |
|---|---|---|
| Stream 信息 | info(key) |
XINFO STREAM |
| 组信息 | groups(key) |
XINFO GROUPS |
| 消费者信息 | consumers(key, group) |
XINFO CONSUMERS |
Stream 删除 / 裁剪
-
1️⃣ 删除消息
| 方法功能 | 方法 | Redis 原始命令 |
|---|---|---|
| 删除消息 | delete(key, recordIds…) |
XDEL |
| 删除 Record | delete(record) |
XDEL |
-
2️⃣ 裁剪 Stream(XTRIM)
| 方法功能 | 方法 | Redis 原始命令 | 说明 |
|---|---|---|---|
| 精确裁剪 | trim(key, count) |
XTRIM |
|
| 近似裁剪 | trim(key, count, true) |
XTRIM ~ |
性能更好 |
对象映射能力(非常重要)
| 能力 | 方法 | 说明 |
|---|---|---|
| Map → Object | map(MapRecord, Class<T>) |
自动反序列化 |
| List 映射 | map(List<MapRecord>, Class<T>) |
|
| HashMapper | getHashMapper(Class<T>) |
自定义映射 |
| 反序列化 | deserializeRecord(ByteRecord) |
底层能力 |