Redis 命令及数据类型 -- Stream

摘要

Stream 核心详解

  • Redis Stream 是 Redis 5.0 新增的有序、可持久化、支持多播的消息队列,底层用基数树+链表实现,兼顾了高效查询与有序写入,完美解决了 List 队列(无法多播、无持久化保障)、Pub/Sub(无持久化、丢消息)的痛点,是生产环境首选的 Redis 消息队列方案。

  • 建议生产环境还是使用传统的 MQ 方案,如果仅是内部系统使用的轻量MQ,已经有了redis,但是不想引入其它中间件,也可以尝试。

  • Stream 与传统的MQ 的对比

对比维度 Redis Stream RabbitMQ Kafka
数据模型 类似日志的有序 KV 消息流(ID → field/value) 队列(FIFO) 日志分区(Partitioned Append-Only Log)
消息持久化 可选持久化(AOF / RDB),默认内存优先 可持久化到磁盘 持久化到磁盘,顺序写入,效率高
消息确认 XACK 对单条消息确认,支持 Pending 消息管理 ACK / NACK Offset 控制,Consumer 自行提交
消费模式 支持 Consumer Group,多消费者共享 Pending 消息 Queue 绑定 Consumer,多消费者抢占 Consumer Group,多消费者平行消费
重复消费 默认可能重复,需要应用端幂等 可通过 ACK/NACK 控制 默认可能重复,Consumer 需幂等处理
消息顺序 按 Stream ID 顺序,可保证分组内顺序 队列顺序保证 Partition 内顺序保证
消息保留策略 可配置 maxlen / minid,按时间或长度裁剪 队列长度 / TTL 控制 基于时间或大小保留(Retention Policy)
延时/定时消费 原生不支持延时队列,需要应用端处理 支持插件或 TTL 原生不支持,需要应用端处理或 Kafka Streams
事务与原子操作 事务可用 MULTI/EXEC,XADD 支持 NOMKSTREAM 等选项 原生事务支持(事务 / confirm 模式) 不支持事务,依赖幂等生产者
性能 内存级高吞吐,持久化会有开销 中等,受磁盘和网络限制 高吞吐,顺序写入磁盘效率极高
典型使用场景 事件日志、轻量 MQ、内部异步流水线 企业级消息、任务调度、RPC 大数据管道、日志收集、流处理
多语言支持 客户端支持多种语言(Java、Python、Go 等) 客户端丰富 客户端丰富
易运维性 单节点即可使用,但持久化需关注内存 集群较复杂,需要 RabbitMQ 集群 集群复杂度高,需要 ZooKeeper 或 KRaft

底层核心实现

  1. 存储结构:核心是基数树(Radix Tree)+ 双向链表,基数树存「消息ID→消息内容」的映射,双向链表按消息ID有序串联所有消息,保证写入和按ID查询的高效性(O(logN))。

  2. 消息ID:默认自动生成,格式为时间戳-序列号(如1734567890000-0),时间戳是毫秒级,序列号解决同一毫秒多消息的有序问题;也支持手动指定,需满足严格递增,否则写入失败。

  3. 持久化:和 Redis 其他数据结构一致,依赖 RDB/AOF 持久化,消息写入后会落盘,重启后不丢失,这是 Pub/Sub 不具备的核心优势。

  4. 核心元数据:每个 Stream 会维护last-id(最新消息ID)、groups(消费组列表)、entries(消息实体)三类元数据,消费组的元数据独立存储,互不干扰。

Stream 核心基础操作(必用)

1. 生产消息(XADD):写入队列

• 核心命令:XADD key ID 字段1 值1 字段2 值2 ...,ID 写*表示自动生成(生产首选)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
XADD key
[NOMKSTREAM]
[MAXLEN | MINID [= | ~] threshold [LIMIT count]]
* | id
field value [field value ...]

# 参数解释
# key: Stream 名称
# NOMKSTREAM: 不自动创建 Stream,若 key 不存在 → 命令直接失败
# MAXLEN threshold —— 按长度裁剪
# MAXLEN 1000: Stream 最多保留 1000 条消息,超出部分会被删除(从最旧开始)
# MAXLEN = 1000: 精确裁剪,严格保证长度 ≤ 1000,每次写入都会检查并裁剪,性能开销较大
# MAXLEN ~ 1000(推荐): 近似裁剪,允许 Stream 长度 略微超过阈值,Redis 在内部批量裁剪,写入性能更高
# LIMIT count —— 每次最多裁剪多少条
# MAXLEN ~ 1000 LIMIT 100: 单次写入 最多删除 100 条旧消息,防止一次裁剪阻塞 Redis 主线程
# MINID threshold —— 按 ID 裁剪(Redis ≥ 6.2)
# MINID ~ 1670000000000-0: 删除 ID 小于 threshold 的消息,更适合 时间窗口型保留策略
# * | id —— 消息 ID
# *: 自动生成 ID(99% 场景),格式:<毫秒时间戳>-<序号>,单调递增,全局有序,消费者组依赖它进行 offset 管理
# id: 指定ID(不常用),ID 必须严格大于 Stream 中最大 ID,否则写入失败
# field value —— 消息体(Payload)
# 至少一对 field-value,field / value 都是 Binary Safe,本质类似 Hash,但不可修改

• 示例

1
2
3
4
5
6
7
# 向订单队列写入1条消息,自动生成消息ID
XADD order_stream * uid 1001 order_no ORD20251220 price 299
## 输出
"1766215406540-0" # 消息ID

# 保留某个时间点之后的日志
XADD log MINID ~ 1689900000000-0 * level INFO msg "startup"

2. 消费消息(2种核心模式)

(1) 独立消费(无消费组):一对一消费,适合简单场景

• XREAD:主动拉取消息,支持阻塞/非阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
XREAD
[COUNT count]
[BLOCK milliseconds]
STREAMS key [key ...]
id [id ...]
# 参数解释
# COUNT count —— 单次最多读取多少条,是“上限”,不是保证值
# BLOCK milliseconds —— 阻塞等待新消息,阻塞期间 不会占用 CPU,超时返回 nil
# BLOCK 0: 无限阻塞,直到有新消息
# STREAMS key [key ...] —— 指定读取的 Stream,key 与 id 一一对应
# id [id ...] —— 从哪个位置开始读,读取 ID 大于该值的消息,不包含 该 ID 本身
# 普通 ID(游标语义): XREAD STREAMS mystream 1689999999999-0
# $: 只关心“将来”的消息,从 当前 Stream 的末尾之后 开始读,历史消息全部忽略: XREAD BLOCK 0 STREAMS mystream $

• 示例1(非阻塞)

1
2
3
4
5
6
7
8
9
10
11
# 从开头拉5条消息
XREAD COUNT 5 STREAMS order_stream 0-0
# 输出
1) 1) "order_stream" # Stream名称
2) 1) 1) "1766215406540-0" # 消息ID
2) 1) "uid" # 消息体 键值对
2) "1001"
3) "order_no"
4) "ORD20251220"
5) "price"
6) "299"

• 示例2(阻塞)

1
2
# $表示从最新消息开始拉,阻塞3秒,有新消息立即返回,无则3秒后超时,是生产常用写法。
XREAD COUNT 5 BLOCK 3000 STREAMS order_stream $

(2) 消费组消费(XGROUP):一对多消费,核心生产模式

  • Stream 最核心的价值就是消费组,支持多消费者协同消费、消息确认、未消费消息追溯,解决了分布式场景下的消息分片与负载均衡问题。

    1. 先创建消费组XGROUP CREATE
1
2
3
4
5
6
7
8
9
10
11
12
13
14
XGROUP CREATE key group id|$
[MKSTREAM]
[ENTRIESREAD entries-read]
# 参数解释
# key:Stream 名称,为指定的Stream创建消费组
# group: 消费组名称,消费者组的唯一标识,一个 Stream 可以有 多个 consumer group
# id | $ —— 关键参数:消费起始位点(offset),$表示从当前最新消息开始消费,用0-0表示从队列开头消费。
# [MKSTREAM] —— 自动创建 Stream,如果指定的key不存在则自动创建,推荐在自动化部署中使用
# [ENTRIESREAD entries-read] —— 设置“已读取条数”(高级参数),一般业务不需要使用,主要用于手动恢复group、数据迁移等场景

# 示例:创建group1消费组,从最新订单消息开始消费
XGROUP CREATE order_stream group1 $ MKSTREAM
# 输出
OK
  1. 消费者拉取消息XREADGROUP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
XREADGROUP GROUP group consumer
[COUNT count]
[BLOCK milliseconds]
[NOACK]
STREAMS key [key ...]
id [id ...]
# 参数解释
# GROUP group consumer: 指定消费者组名:group,消费者名:consumer
# 同一个 group 下,不同 consumer 不会收到重复消息
# [COUNT count] —— 单次最多返回条数,是软限制,不是严格保证
# [BLOCK milliseconds] —— 阻塞等待新消息,最多阻塞 milliseconds 毫秒
# BLOCK 0 → 永久阻塞
# [NOACK] —— 不进入 Pending(⚠️ 谨慎)
# 消息 不会进入 Pending,不需要 XACK,消费后即认为完成
# 风险:消费者崩溃 → 消息直接丢失,不可重投递
# STREAMS key [key ...]: 指定要读取的 Stream(支持多个),key 顺序需与后续 id 顺序一致
# id [id ...] —— 决定“读什么”的关键
# 使用 > —— 读取“新消息”(✅ 生产环境 99% 使用这种方式),从未投递给任何 consumer 的新消息
# 使用 0/具体ID —— 重读 Pending(补偿),读取 已投递但未 ACK 的消息
# 多 Stream 场景: STREAMS stream1 stream2 > >,每个 stream 必须有一个对应 id

# 示例:consumerA 从group1拉3条未被消费的消息,阻塞5秒。
XREADGROUP GROUP group1 consumerA COUNT 3 BLOCK 5000 STREAMS order_stream >
## 此时在5秒内创建新的消息,就会有类似如下输出
1) 1) "order_stream"
2) 1) 1) "1766215642763-0"
2) 1) "uid"
2) "1001"
3) "order_no"
4) "ORD20251220"
5) "price"
6) "299"
(1.50s)

3. 消息确认(XACK)

  • 消费完成后必须确认,否则会被标记为「未确认消息」

1
2
3
4
5
# XACK key 消费组名 消息ID1 消息ID2 ...。
XACK key group id [id ...]

# 示例: 确认2条消息消费完成,Stream 会删除该消息的未确认标记
XACK order_stream group1 1734567890000-0 1734567890001-0

4. 消息重试

  • 未确认的消息,会被存入消费组的「PEL(Pending Entries List)」,可通过XPENDING key 消费组名查看,支持XCLAIMPEL中的消息转移给其他消费者处理,避免单点故障导致消息堆积。

  • XPENDING:查看未被确认的消息情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
XPENDING key group
[[IDLE min-idle-time]
start end count
[consumer]]
# 参数解释
# key: Stream 名称,必须存在
# group: 消费者组名称,必须存在
# [IDLE min-idle-time](Redis ≥ 6.2): 仅返回 空闲时间 ≥ min-idle-time(毫秒) 的 Pending 消息
# min-idle-time(空闲时间) = 从上次投递 / claim到现在
# start end —— ID 范围
# -: 最小ID
# +: 最大ID
# count —— 返回条数上限
# [consumer](可选): 只查看某一个 consumer 的 Pending,不指定则查看 group 内全部
# 只返回 元数据,不返回消息内容

# 示例1: 查看是否有消息积压
XPENDING order_stream group1
## 输出
1) (integer) 1 # 积压消息数,未确认消息数量
2) "1766215642763-0" # Pending 中最小 ID
3) "1766215642763-0" # Pending 中最大 ID
4) 1) 1) "consumerA" # 按 consumer 统计的 Pending 数量
2) "1"

# 示例2: 查找 idle 超过 1 分钟的消息,最多返回10条
XPENDING order_stream group1 IDLE 60000 - + 10
## 输出
1) 1) "1766215642763-0" # 消息 ID
2) "consumerA" # 当前持有该消息的 consumer
3) (integer) 255581 # idle 时间(毫秒)
4) (integer) 2 # delivery count(投递次数),该消息至少被投递过 2 次


# 示例3: 只查看 consumerA 的 Pending,最多返回20条
XPENDING order_stream group1 - + 20 consumerA
  • XCLAIM: 转移投递

将已经投递但未 ACK、且 idle 超过阈值的 Pending 消息,从原 consumer 手中“抢占”给新的 consumer,并重新投递。
一旦抢占成功,原 consumer 就不在拥有该消息的 Pending 记录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
XCLAIM key group consumer min-idle-time id [id ...]
[IDLE ms]
[TIME unix-time-milliseconds]
[RETRYCOUNT count]
[FORCE]
[JUSTID]
[LASTID lastid]
# 参数解释
# key: Stream Key,必须存在
# group: 消费者组名称,必须存在
# consumer: 新的消费者,抢占后的 Pending 消息将归属该 consumer
# min-idle-time: 空闲时间,只有 idle ≥ min-idle-time 的 Pending 消息才允许被 claim
# id [id ...]: 指定要 claim 的消息 ID,ID必须存在
# [IDLE ms]: 人为设置 idle 时间,覆盖 Redis 内部计算的 idle,比如强制制造“已超时”状态
# [TIME unix-time-milliseconds]: 手动指定“最后投递时间”,与 IDLE 二选一使用,极少见于业务代码
# [RETRYCOUNT count]: 手动设置 delivery count,实现“最多重试 N 次,超过进死信队列”
# [FORCE]: 强制 claim 不存在于 Pending 的消息,⚠️ 高风险
# [JUSTID]: 只返回 消息 ID,不返回消息体(field/value),减少网络开销
# [LASTID lastid](Redis ≥ 7.0): 更新 consumer group 的 last-delivered-id,影响后续 XREADGROUP > 的行为
# ⚠️ 高级特性,一般不建议业务代码使用

# 示例:
# 1️⃣ 抢占 idle 超过 60s 的消息
XCLAIM order_stream group1 consumerB 60000 1766215642763-0
# 输出
1) 1) "1766215642763-0"
2) 1) "uid"
2) "1001"
3) "order_no"
4) "ORD20251220"
5) "price"
6) "299"

# 2️⃣ 抢占并标记为第 3 次重试
XCLAIM orders order-group consumerB 60000 1766215642763-0 RETRYCOUNT 3
# 3️⃣ 只返回 ID(配合批处理)
XCLAIM orders order-group consumerB 60000 1766215642763-0 JUSTID

高级特性(生产必备)

  1. 消息回溯与遍历

1
2
3
4
5
6
7
# XRANGE key 起始ID 结束ID [COUNT 条数](正向遍历)
XRANGE key start end [COUNT count]
# XREVRANGE key 结束ID 起始ID [COUNT 条数](反向遍历)
XREVRANGE key end start [COUNT count]
# 示例: 0-0:最小的消息ID,等同于 -,+:表示最新消息,适合数据对账、历史消息查询。
XRANGE order_stream 0-0 + COUNT 10
XRANGE order_stream - + COUNT 10
  1. 队列信息查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 查消息总数
XLEN key
# 查Stream完整元数据(最新ID、消费组数量、消息总数等)

XINFO STREAM key [FULL [COUNT count]]
## FULL: 完整模式
# 👉 返回:
# Stream 元信息
# 所有 Consumer Group
# 每个 Group 的 Consumer
# Pending Entries List(PEL)
# 部分历史 entries
# ⚠️ 开销很大,慎用于生产环境。
## COUNT count(FULL 模式的限制参数)
# XINFO STREAM key FULL COUNT 10: 限制返回的entries 数量以及每个 group / consumer 的 PEL 记录数量

# 查所有消费组信息
XINFO GROUPS key
# 查该组下所有消费者
XINFO CONSUMERS key 消费组名
  1. 消费组管理

1
2
3
4
5
6
# 删除消费组
XGROUP DESTROY key 消费组名
# 删除消费者
XGROUP DELCONSUMER key 消费组名 消费者名
# 重置消费组起始ID
XGROUP SETID key 消费组名 新ID
  1. 惰性删除:Stream 不会主动删除已确认的消息,仅靠MAXLEN淘汰,若需主动清理历史消息,直接用XADD的MAXLEN参数即可,无需额外命令。

生产环境核心痛点与解决方案

  1. 消息丢失:3重保障

    • 开启 Redis AOF 持久化(设为everysec,兼顾性能与可靠性)
    • 生产者写入后确认返回值(确保写入成功)
    • 消费者消费后必须XACK确认。
  2. 消息堆积:2种处理

    • ① 写入时用MAXLEN设置上限,淘汰旧消息;
    • ② 消费端扩容消费者实例,消费组会自动将未消费消息分片给多个消费者,实现并行消费。
  3. 重复消费

    • 根源: 网络抖动(消费者确认消息前断开连接,消息重回PEL)
    • 解决方案: 消息幂等性(生产者给消息加唯一标识,消费者根据唯一标识去重)。
  4. 阻塞超时:消费端用BLOCK阻塞拉取,超时时间建议设为3-5秒,避免频繁空轮询占用CPU,同时保证新消息的响应速度。

典型应用场景

分布式业务解耦(订单-库存-支付-物流解耦)

核心思路:单 Stream 对应核心业务(订单),库存、支付、物流各创建独立消费组,各自消费互不干扰,实现业务解耦。

  1. 生产者(订单服务):写入订单完成消息

1
2
# 自动生成消息ID,写入订单核心信息,设置队列最大1万条消息(近似淘汰)
XADD order_core_stream * MAXLEN ~ 10000 order_no ORD20251220001 uid 1001 amount 299 status created create_time 1734567890
  1. 创建3个独立消费组(库存/支付/物流)

1
2
3
4
5
6
# 库存消费组:从最新消息开始消费,队列不存在则自动创建
XGROUP CREATE order_core_stream group_stock $ MKSTREAM
# 支付消费组
XGROUP CREATE order_core_stream group_pay $ MKSTREAM
# 物流消费组
XGROUP CREATE order_core_stream group_logistics $ MKSTREAM
  1. 各消费组消费者拉取+确认消息

1
2
3
4
5
6
7
8
9
10
11
12
# 库存服务消费者(consumer_stock1)拉取3条未消费消息,阻塞5秒
XREADGROUP GROUP group_stock consumer_stock1 COUNT 3 BLOCK 5000 STREAMS order_core_stream >
# 库存处理完成后确认消息(替换为实际拉取到的消息ID)
XACK order_core_stream group_stock 1734567890000-0 1734567890001-0

# 支付服务消费者(consumer_pay1)同理
XREADGROUP GROUP group_pay consumer_pay1 COUNT 3 BLOCK 5000 STREAMS order_core_stream >
XACK order_core_stream group_pay 1734567890000-0 1734567890001-0

# 物流服务消费者(consumer_log1)同理
XREADGROUP GROUP group_logistics consumer_log1 COUNT 3 BLOCK 5000 STREAMS order_core_stream >
XACK order_core_stream group_logistics 1734567890000-0 1734567890001-0

异步任务处理(用户注册-邮件/短信/积分异步执行)

核心思路:注册接口只负责写入 Stream 消息,无需等待后续任务完成,单消费组多消费者提升异步任务处理效率,核心是快速响应前端。

  1. 生产者(注册服务):用户注册成功后写入消息

1
2
# 写入用户注册信息,MAXLEN限制5000条,避免积压过多无效注册消息
XADD user_register_stream * MAXLEN ~ 5000 uid 1001 username zhangsan phone 13800138000 email zs@xxx.com reg_time 1734567900
  1. 创建单个消费组(统一处理注册后续任务)

1
XGROUP CREATE user_register_stream group_reg_task $ MKSTREAM
  1. 多消费者并行拉取(邮件/短信/积分各1个消费者,或多实例扩容)

1
2
3
4
5
6
7
8
9
10
11
# 短信发送消费者(consumer_sms)
XREADGROUP GROUP group_reg_task consumer_sms COUNT 5 BLOCK 3000 STREAMS user_register_stream >
XACK user_register_stream group_reg_task 消息ID1 消息ID2

# 邮件发送消费者(consumer_email)
XREADGROUP GROUP group_reg_task consumer_email COUNT 5 BLOCK 3000 STREAMS user_register_stream >
XACK user_register_stream group_reg_task 消息ID1 消息ID2

# 积分发放消费者(consumer_score)
XREADGROUP GROUP group_reg_task consumer_score COUNT 5 BLOCK 3000 STREAMS user_register_stream >
XACK user_register_stream group_reg_task 消息ID1 消息ID2

日志收集(系统实时日志-分析/告警)

核心思路:各业务系统实时写入日志到 Stream,多消费组分别做日志分析、实时告警,兼顾实时性与数据留存,支持历史日志回溯。

  1. 生产者(各业务系统):实时写入系统日志(按级别/业务分类,这里统一写入总日志流)

1
2
3
# 写入日志:包含业务模块、日志级别、内容、时间,无消息数量上限(按实际服务器内存调整MAXLEN)
XADD sys_log_stream * module order_service level ERROR content "库存扣减失败,订单号ORD20251220001" log_time 1734567910
XADD sys_log_stream * module pay_service level INFO content "支付成功,uid1001,金额299" log_time 1734567912
  1. 创建2个消费组(日志分析+实时告警)

1
2
3
4
# 日志分析消费组(用于离线统计、数据归档),从队列开头消费(0-0),兜底所有历史日志
XGROUP CREATE sys_log_stream group_log_analysis 0-0 MKSTREAM
# 实时告警消费组(用于实时捕获ERROR日志告警),从最新消息消费
XGROUP CREATE sys_log_stream group_log_alert $ MKSTREAM
  1. 对应消费者拉取处理

1
2
3
4
5
6
7
# 日志分析消费者(批量拉取,非阻塞,适合离线处理)
XREADGROUP GROUP group_log_analysis consumer_analysis COUNT 100 STREAMS sys_log_stream >
XACK sys_log_stream group_log_analysis 批量消息ID...

# 告警消费者(阻塞拉取,快速响应,只处理ERROR级别日志)
XREADGROUP GROUP group_log_alert consumer_alert BLOCK 0 STREAMS sys_log_stream > # BLOCK 0 永久阻塞,有消息立即返回
XACK sys_log_stream group_log_alert 告警消息ID

限流削峰(秒杀场景-请求削峰填谷)

核心思路:秒杀请求高峰时,先写入 Stream 做缓冲,消费端匀速拉取(控制每秒处理量),避免下游数据库/业务服务被压垮,核心是“慢消费、稳处理”。

  1. 生产者(秒杀入口服务):接收秒杀请求,直接写入 Stream,快速返回“排队中”

1
2
# 写入秒杀请求,设置MAXLEN 10000(限制最大排队数,超过则拒绝,避免OOM)
XADD seckill_stream * MAXLEN ~ 10000 seckill_id 101 uid 1001 request_time 1734568000
  1. 创建消费组(单消费组+多消费者,控制总处理速率)

1
XGROUP CREATE seckill_stream group_seckill $ MKSTREAM
  1. 消费端(匀速拉取,核心是控制COUNT和消费频率,比如每秒处理100条)

1
2
3
4
# 消费者(多实例部署,总处理量=单实例COUNT×实例数,这里单实例每次拉10条,每秒拉10次,单实例每秒处理100条)
XREADGROUP GROUP group_seckill consumer_seckill1 COUNT 10 BLOCK 100 STREAMS seckill_stream >
# 业务处理:扣库存、生成订单(核心是处理逻辑同步执行,控制速率)
XACK seckill_stream group_seckill 秒杀请求消息ID...

关键优化:消费端通过定时任务+固定COUNT拉取,而非无限拉取,精准控制处理速率,实现削峰填谷。

与其他 Redis 队列方案对比(核心优势)

• 对比 List
- List 是简单的先进先出,不支持多播(多个消费者会抢消息)、无消费组、无消息确认,仅适合简单一对一队列;
- Stream 支持多播+消费组+确认机制,适合复杂分布式场景。

对比项 Stream List
消费者组
ACK
重试
阻塞
顺序性

• 对比 Pub/Sub
- Pub/Sub 无持久化,Redis 重启或消费者离线会丢消息;
- Pub/Sub 无消费组,消息发完即丢,仅适合实时广播(如聊天室),不适合重要业务。

Stream 命令

  • SpringBoot 的 StringRedisTemplate.opsForStream() 中 Stream 数据类型 的操作方法与 Redis 原生命令的对应关系如下:

注意这里不一定要用 StringRedisTemplate 来操作 Stream,但是用 StringRedisTemplate 可以保证可读性。

  • 核心能力划分:

1
2
3
4
5
6
7
8
消息写入(XADD)
消息确认(XACK)
消息读取(XRANGE / XREAD / XREADGROUP)
Pending 消息管理(XPENDING / XCLAIM)
消费者组管理(XGROUP)
Stream 元信息(XINFO)
Stream 裁剪与删除(XTRIM / XDEL)
对象映射(MapRecord / ObjectRecord)

消息写入(XADD)

  • 1️⃣ 基础写入

方法功能 方法 opsForStream().xxx() Redis 原始命令 说明
写入 Map add(K key, Map<HK,HV>) XADD key * field value 自动生成 ID
写入 Record add(Record<K, ?> record) XADD 支持 ObjectRecord
写入 MapRecord add(MapRecord<K,HK,HV>) XADD Map 形式
  • 2️⃣ 带参数写入(推荐)

方法功能 方法 Redis 原始命令 备注
写入 + 选项 add(record, XAddOptions) XADD ... 支持 MAXLEN / NOMKSTREAM
Map + 选项 add(key, map, XAddOptions) XADD Redis ≥ 6

消息确认(XACK)

方法功能 方法 Redis 原始命令 说明
确认消息 acknowledge(key, group, recordIds…) XACK 标记已消费
Record 确认 acknowledge(group, record) XACK 常用

⚠️ 只对 Consumer Group 生效

消息读取(无消费者组)

  • 1️⃣ 按 Range 读取(历史数据)

方法功能 方法 Redis 原始命令
正序读取 range(key, range) XRANGE
限制条数 range(key, range, limit) XRANGE
反序读取 reverseRange(key, range) XREVRANGE
反序 + limit reverseRange(key, range, limit) XREVRANGE
  • 2️⃣ 实时读取(XREAD)

方法功能 方法 Redis 原始命令 说明
读取 read(StreamOffset…) XREAD 不支持 ACK
带参数 read(options, offsets…) XREAD BLOCK / COUNT
映射对象 read(Class<T>, …) XREAD 自动反序列化

消费者组读取(XREADGROUP)

方法功能 方法 Redis 原始命令 说明
组内读取 read(Consumer, offsets…) XREADGROUP MQ 核心
带参数 read(Consumer, options, offsets…) XREADGROUP BLOCK
映射对象 read(Class<T>, Consumer, …) XREADGROUP

Pending 消息管理(XPENDING / XCLAIM)

  • 1️⃣ Pending 查询

方法功能 方法 Redis 原始命令
Pending 汇总 pending(key, group) XPENDING
指定消费者 pending(key, consumer) XPENDING
范围查询 pending(key, group, range, count) XPENDING
  • 2️⃣ 消息重新分配(XCLAIM)

方法功能 方法 Redis 原始命令 说明
重新分配 claim(key, group, newOwner, minIdle, ids…) XCLAIM 超时接管
高级配置 claim(key, group, newOwner, XClaimOptions) XCLAIM force / retry

消费者组管理(XGROUP)

方法功能 方法 Redis 原始命令 说明
创建组 createGroup(key, group) XGROUP CREATE $ 开始
指定 offset createGroup(key, offset, group) XGROUP CREATE 常用 0-0
删除消费者 deleteConsumer(key, consumer) XGROUP DELCONSUMER
销毁组 destroyGroup(key, group) XGROUP DESTROY

Stream 元信息(XINFO)

方法功能 方法 Redis 原始命令
Stream 信息 info(key) XINFO STREAM
组信息 groups(key) XINFO GROUPS
消费者信息 consumers(key, group) XINFO CONSUMERS

Stream 删除 / 裁剪

  • 1️⃣ 删除消息

方法功能 方法 Redis 原始命令
删除消息 delete(key, recordIds…) XDEL
删除 Record delete(record) XDEL
  • 2️⃣ 裁剪 Stream(XTRIM)

方法功能 方法 Redis 原始命令 说明
精确裁剪 trim(key, count) XTRIM
近似裁剪 trim(key, count, true) XTRIM ~ 性能更好

对象映射能力(非常重要)

能力 方法 说明
Map → Object map(MapRecord, Class<T>) 自动反序列化
List 映射 map(List<MapRecord>, Class<T>)
HashMapper getHashMapper(Class<T>) 自定义映射
反序列化 deserializeRecord(ByteRecord) 底层能力