Redis Stream 入门:快速掌握
在构建现代应用程序时,处理实时数据流、构建消息队列、实现事件驱动架构或记录事件历史是常见的需求。传统上,我们可能会依赖专门的消息中间件,如 Kafka、RabbitMQ 或 ActiveMQ。然而,对于许多应用场景,特别是当你已经在使用 Redis 并需要一个轻量级、高性能且易于集成的流数据解决方案时,Redis Stream 提供了极具吸引力的选择。
Redis Stream 是 Redis 5.0 版本引入的一种新的数据结构,它设计用于实现高性能、持久化的消息队列和消费者组。与 Redis 中已有的 Pub/Sub 或 List 相比,Stream 提供了更强大的功能,包括消息持久化、唯一 ID、消费者组管理、消息确认以及消费历史查询等。
本文将带你深入了解 Redis Stream,从基本概念到核心操作,再到高级特性如消费者组,帮助你快速掌握这一强大的工具。
1. Redis Stream 的基本概念
要理解 Redis Stream,我们需要先认识几个核心概念:
- Stream(流): Stream 是一个只允许追加(append-only)的日志型数据结构。你可以想象它是一条时间线,新的消息总是被添加到末尾。每条添加到 Stream 中的消息都会被赋予一个唯一的 ID。
- Entry(条目/消息): Stream 中的每个 Entry 都是一条消息。它由一个唯一的 Message ID 和一组键值对组成(类似一个小的 Hash 结构)。例如,一条传感器数据消息可能包含
sensor_id: 123
和temperature: 25.5
。 - Message ID(消息 ID): Stream 中最重要的概念之一。每个 Entry 都有一个唯一的 Message ID。默认情况下,Redis 使用
timestamp-sequence
的格式生成 ID。timestamp
是生成 ID 的毫秒级 Unix 时间戳,sequence
是在该毫秒内生成的序列号。这种格式保证了 ID 的单调递增性和唯一性,并且可以用于按时间顺序检索消息。例如,1526985768059-0
表示在特定毫秒内生成的第一个消息 ID。你可以选择让 Redis 自动生成 ID(使用*
),或者手动指定 ID(但通常不推荐,除非你知道自己在做什么)。 - Consumer(消费者): 从 Stream 中读取消息的客户端应用程序。
- Consumer Group(消费者组): 这是 Stream 提供的一个高级特性,用于实现多个消费者共同消费同一个 Stream 中的消息,并且保证组内的每条消息只会被 一个 消费者处理。这对于构建分布式、可伸缩的消费系统至关重要。消费者组会跟踪组内每个消费者已处理(已确认)的消息。
2. 为什么选择 Redis Stream?与其他 Redis 数据结构的对比
Redis Stream 并非 Redis 中唯一的“消息”处理方式。让我们快速对比一下它与 Redis Pub/Sub 和 List 的异同:
-
Redis Pub/Sub:
- 特点: 发布/订阅模式,消息不持久化,客户端订阅频道后只能接收到订阅 之后 发布的消息,无法获取历史消息。没有消息确认机制。
- 适用场景: 实时性要求高,不关心消息是否丢失,不需要消息持久化或历史记录的场景(例如,广播通知)。
- 与 Stream 对比: Stream 消息持久化,可获取历史消息,有消息 ID,支持消费者组和消息确认,功能更强大,适用于构建可靠的消息队列。
-
Redis List (LPUSH/BRPOP):
- 特点: 可以作为简单的消息队列(生产者 LPUSH 到列表头部,消费者 BRPOP 从列表尾部阻塞弹出)。消息在被消费者弹出后就从列表中移除。没有消息 ID,无法轻松实现多消费者共享队列(除非通过客户端协调),难以获取历史消息。
- 适用场景: 简单的任务队列,单个消费者或少量消费者且不关心历史消息的场景。
- 与 Stream 对比: Stream 提供了消息 ID、持久化、多消费者共享队列(通过消费者组)以及消息确认等功能,更适合复杂的分布式消息处理场景。List 的消息在消费后即被移除,而 Stream 的消息默认保留,需要显式修剪。
总结:
Redis Stream 结合了持久化、消息 ID、灵活的读取方式和消费者组,弥补了 Pub/Sub 的无状态和 List 的简单性,为在 Redis 中构建可靠、可扩展的消息处理系统提供了强大的原生支持。
3. Stream 的核心操作:增、读、查
我们从最基本的 Stream 操作开始学习。
3.1 添加消息:XADD
XADD
命令用于向 Stream 中添加新的消息。
语法:
bash
XADD key ID field value [field value ...] [MAXLEN count | MINID ID] [~] [NOMKSTREAM]
key
: Stream 的名称。如果 Stream 不存在,执行XADD
会自动创建它(除非指定NOMKSTREAM
)。ID
: 消息的 ID。- 使用
*
让 Redis 自动生成 ID(推荐)。格式为timestamp-sequence
。 - 手动指定 ID。这要求手动指定的 ID 必须 大于 Stream 中最后一个消息的 ID。
- 使用
field value [field value ...]
: 组成消息体的键值对,可以是一个或多个。MAXLEN count
或MINID ID
: 可选参数,用于对 Stream 进行修剪(trim),限制 Stream 的长度或只保留 ID 大于某个值的消息,防止 Stream 无限增长。~
是一个可选参数,表示进行近似修剪,通常能提高效率。NOMKSTREAM
: 可选参数,如果 Stream 不存在则不创建,返回错误。
示例:
-
向名为
mystream
的 Stream 添加一条消息,内容是name: Alice
,age: 30
,让 Redis 自动生成 ID:bash
XADD mystream * name Alice age 30返回结果通常是自动生成的 ID,例如:
1678881234567-0
-
再添加一条消息,内容是
name: Bob
,city: New York
:bash
XADD mystream * name Bob city "New York"返回另一个 ID,例如:
1678881234568-0
-
添加消息并限制 Stream 的最大长度为 2:
bash
XADD mystream MAXLEN 2 * message "this is the third message"执行此命令后,如果 Stream 的消息数量超过 2,最旧的消息会被删除。
使用 *
让 Redis 自动生成 ID 是最常见和推荐的做法,因为它确保了 ID 的唯一性和单调递增性,符合 Stream 的日志特性。
3.2 读取消息:XREAD
XREAD
命令用于从一个或多个 Stream 中读取消息。这是一个基础的读取命令,适合单消费者或多个消费者独立读取所有消息的场景。
语法:
bash
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key ID [key ID ...]
COUNT count
: 可选参数,指定每次最多读取多少条消息。BLOCK milliseconds
: 可选参数,以毫秒为单位设置阻塞时间。如果 Stream 中没有新消息,命令会阻塞直到有新消息到来或阻塞时间超时。BLOCK 0
表示永远阻塞。STREAMS
: 必需关键字,标志着后面跟着要读取的 Stream 名称和对应的起始 ID。key ID [key ID ...]
: 要读取的 Stream 名称及其对应的起始消息 ID。ID
: 指定从哪个 ID 之后 开始读取消息。0
: 从 Stream 的第一个消息开始读取。- 特定的消息 ID: 从该 ID 之后 的消息开始读取。
$
: 从 Stream 的最后一个消息 之后 开始读取,即只读取新加入的消息。
示例:
-
从
mystream
的开头读取最多 10 条消息:bash
XREAD COUNT 10 STREAMS mystream 0返回结果是一个嵌套列表,包含 Stream 名称和读取到的消息列表,每个消息是一个 ID 和一个键值对列表的组合:
1) 1) "mystream"
2) 1) 1) "1678881234567-0"
2) 1) "name"
2) "Alice"
3) "age"
4) "30"
2) 1) "1678881234568-0"
2) 1) "name"
2) "Bob"
3) "city"
4) "New York"
... (more messages) -
从 ID
1678881234567-0
之后开始读取:bash
XREAD STREAMS mystream 1678881234567-0 -
阻塞式地读取
mystream
的新消息,最多等待 5 秒:bash
XREAD BLOCK 5000 STREAMS mystream $如果在 5 秒内有新消息加入,会立即返回;否则,在 5 秒后返回空结果。
-
同时从多个 Stream 读取新消息:
bash
XREAD STREAMS stream1 $ stream2 $
XREAD
是一个灵活的读取命令,可以用于按 ID 范围查询历史消息(虽然 XRANGE
更适合)、阻塞等待新消息等。但它不适合多消费者协同工作并避免重复消费的场景,这正是消费者组的作用。
3.3 查询消息范围:XRANGE
和 XREVRANGE
XRANGE
和 XREVRANGE
命令用于按 ID 范围查询 Stream 中的消息,非常适合查看历史记录。
语法:
bash
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]
key
: Stream 的名称。start
/end
: 消息 ID 范围。- ID 可以是完整的
timestamp-sequence
格式。 - 可以使用
timestamp
或timestamp-*
作为范围的一部分(例如1678881234567
或1678881234567-*
)。 +
: 表示 Stream 中最大的 ID。-
: 表示 Stream 中最小的 ID。
- ID 可以是完整的
COUNT count
: 可选参数,限制返回的消息数量。
XRANGE
按 ID 升序返回消息,XREVRANGE
按 ID 降序返回消息。
示例:
-
获取
mystream
中所有消息:bash
XRANGE mystream - + -
获取 ID 在
1678881234560-0
和1678881234580-9
之间的消息,最多 5 条:bash
XRANGE mystream 1678881234560-0 1678881234580-9 COUNT 5 -
获取最新的 5 条消息(使用
XREVRANGE
):bash
XREVRANGE mystream + - COUNT 5
3.4 获取 Stream 长度:XLEN
XLEN
命令用于获取 Stream 中当前的消息数量。
语法:
bash
XLEN key
示例:
bash
XLEN mystream
返回 mystream
中的消息数量。
3.5 删除消息:XDEL
XDEL
命令用于从 Stream 中删除指定 ID 的消息。
语法:
bash
XDEL key ID [ID ...]
示例:
删除 ID 为 1678881234567-0
和 1678881234568-0
的消息:
bash
XDEL mystream 1678881234567-0 1678881234568-0
注意: XDEL
只是将消息标记为删除,并不会立即释放内存。内存的释放主要依赖于 Stream 的修剪(XADD MAXLEN
或 XTRIM
)。
4. Redis Stream 的高级特性:消费者组 (Consumer Groups)
对于分布式系统中常见的“多个消费者共享一个消息队列,每条消息只被一个消费者处理”的需求,Redis Stream 提供了消费者组功能。这是一个非常重要的特性,它使得 Redis Stream 能够与传统的重量级消息队列相媲美,适用于构建可伸缩、可靠的消息处理系统。
4.1 为什么需要消费者组?
考虑一个场景:你有多个 worker 进程需要处理同一个 Stream 中的任务。如果它们都使用 XREAD
阻塞读取新消息,那么当一条新消息到来时,所有阻塞的 worker 都会被唤醒,并且都可能读取并尝试处理这条消息,导致重复处理。此外,如果一个 worker 崩溃,它正在处理或已经读取但尚未处理的消息可能会丢失。
消费者组解决了这些问题:
- 负载均衡: 将 Stream 中的消息均匀地分发给组内的不同消费者。
- 状态跟踪: 消费者组会记录每个消费者在 Stream 中已处理到的位置(最后一次读取的消息 ID)。
- 消息确认: 消费者需要显式地确认(ACK)已成功处理的消息,消费者组会跟踪哪些消息已被确认。
- 未确认消息处理: 消费者组会记录未被确认的消息列表(Pending Entry List – PEL),这允许我们在消费者失败后,将未处理的消息重新分配给其他消费者。
4.2 创建消费者组:XGROUP CREATE
在使用消费者组读取消息之前,必须先创建它。
语法:
bash
XGROUP CREATE stream_key group_name ID [MKSTREAM] [ENTRIESREAD entriesread]
stream_key
: 要关联的 Stream 的名称。group_name
: 要创建的消费者组的名称。ID
: 消费者组的初始读取位置。0
: 表示从 Stream 的第一个消息开始(ID 为0-0
)。$
: 表示从 Stream 中最后一个消息之后开始,即只消费新消息。- 特定的 ID: 从该 ID 之后 开始消费。
MKSTREAM
: 可选参数。如果指定的stream_key
不存在,使用此选项会自动创建 Stream。ENTRIESREAD entriesread
: 可选参数。设置消费者组的entries_read
计数器的初始值。通常不需要设置。
示例:
-
为
mystream
创建一个名为mygroup
的消费者组,从 Stream 的开头开始消费:bash
XGROUP CREATE mystream mygroup 0 -
为
anotherstream
创建一个名为anothergroup
的消费者组,只消费创建组之后的新消息:bash
XGROUP CREATE anotherstream anothergroup $ MKSTREAM使用
MKSTREAM
可以在 Stream 不存在时直接创建并关联消费者组。
4.3 从消费者组读取消息:XREADGROUP
XREADGROUP
命令用于消费者组中的消费者读取消息。
语法:
bash
XREADGROUP GROUP group_name consumer_name [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key ID [key ID ...]
GROUP group_name consumer_name
: 必需参数,指定所属的消费者组名称和当前消费者的名称。consumer_name
是一个任意字符串,用于标识组内唯一的消费者实例(例如,可以是应用程序实例 ID)。COUNT count
: 可选参数,每次最多读取多少条消息。BLOCK milliseconds
: 可选参数,阻塞等待新消息的时间。NOACK
: 可选参数。如果指定,读取的消息不会进入 PEL (Pending Entry List),即不需要显式使用XACK
确认。这会牺牲一定的可靠性换取简化操作。通常不推荐在需要可靠性的场景使用。STREAMS key ID [key ID ...]
: 要读取的 Stream 名称及其对应的起始 ID。ID
: 在消费者组模式下,这个 ID 的含义与XREAD
不同。最常用的 ID 是>
。>
: 表示读取对当前消费者组来说是 新 的消息(即从未被组内任何消费者读取过的消息)。这是消费者组中最常见的读取方式。- 特定的 ID: 读取 PEL 中 ID 大于 指定 ID 的消息。这用于手动处理 PEL 中的特定范围消息,或者在消费者重启后从上次处理失败的位置恢复(尽管通常通过
XPENDING
和XCLAIM
更自动化)。
示例:
-
消费者
consumer1
从mygroup
组中读取mystream
的新消息,最多 5 条,阻塞等待 2 秒:bash
XREADGROUP GROUP mygroup consumer1 COUNT 5 BLOCK 2000 STREAMS mystream >首次执行此命令,
consumer1
会从消费者组创建时指定的起始位置开始读取。之后每次执行,它都会从上次读取的位置继续。消息读取后会进入consumer1
的 PEL。 -
消费者
consumer2
从同一个mygroup
组中读取mystream
的新消息:bash
XREADGROUP GROUP mygroup consumer2 BLOCK 0 STREAMS mystream >当有多条新消息到来时,Redis 会将这些消息分发给
consumer1
和consumer2
,每条消息只会被其中一个消费者收到(前提是它们都使用>
作为 ID)。
4.4 确认消息:XACK
当消费者成功处理一条消息后,必须使用 XACK
命令向消费者组确认该消息。确认后,消息将从该消费者的 PEL 中移除。
语法:
bash
XACK stream_key group_name ID [ID ...]
stream_key
: Stream 的名称。group_name
: 消费者组的名称。ID [ID ...]
: 要确认的消息 ID。
示例:
消费者 consumer1
成功处理了 ID 为 1678881234567-0
和 1678881234568-0
的消息后进行确认:
bash
XACK mystream mygroup 1678881234567-0 1678881234568-0
重要: 未确认的消息会一直留在 PEL 中。如果消费者崩溃,这些未确认的消息可以通过 XPENDING
和 XCLAIM
被其他健康的消费者接管处理。
4.5 查看待处理消息:XPENDING
XPENDING
命令用于查看消费者组中待处理(已读取但未确认)的消息列表。这对于监控、调试以及处理失败的消息非常有用。
语法:
bash
XPENDING stream_key group_name [start end count] [consumer_name]
stream_key
: Stream 的名称。group_name
: 消费者组的名称。start end count
: 可选参数,用于分页查看 PEL。类似于XRANGE
的 ID 范围和数量。使用-
表示最小 ID,+
表示最大 ID。consumer_name
: 可选参数。如果指定,只查看特定消费者的 PEL;否则,查看整个组的 PEL 概况。
示例:
-
查看
mygroup
组在mystream
中的 PEL 概况:bash
XPENDING mystream mygroup返回结果包括待处理消息总数、最小/最大 ID、以及每个消费者待处理的消息数量。
-
查看
mygroup
组在mystream
中所有待处理消息的详细列表:bash
XPENDING mystream mygroup - + 10这会返回 PEL 中前 10 条消息的详细信息,包括消息 ID、所属消费者、空闲时间(Idle time)和已尝试投递次数。
-
查看
consumer1
在mygroup
组中的待处理消息列表:bash
XPENDING mystream mygroup - + 10 consumer1
4.6 转移待处理消息:XCLAIM
如果一个消费者长时间未确认 PEL 中的消息(例如,因为消费者进程崩溃或卡住),其他健康的消费者可以通过 XCLAIM
命令“认领”这些消息进行处理。
语法:
bash
XCLAIM stream_key group_name consumer_name min_idle_time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [FORCE] [JUSTID]
stream_key
: Stream 的名称。group_name
: 消费者组的名称。consumer_name
: 要将消息转移给的 新 消费者名称。min_idle_time
: 必需参数,表示消息必须至少空闲(Idle)多长时间(毫秒),才允许被认领。这用于避免认领那些正在被活动的消费者处理的消息。ID [ID ...]
: 要认领的消息 ID。通常这些 ID 是从XPENDING
查出来的。IDLE ms
: 可选参数,设置认领后消息的空闲时间。TIME ms-unix-time
: 可选参数,设置认领后消息的 Idle time 为特定的 Unix 时间戳。RETRYCOUNT count
: 可选参数,设置认领后消息的重试次数。FORCE
: 可选参数,如果指定,即使消息不存在于 PEL 中或者所属的消费者不存在,也会创建或更新 PEL 条目。通常用于故障恢复。JUSTID
: 可选参数,如果指定,只返回认领成功的消息 ID,而不是消息内容。
示例:
假设通过 XPENDING
发现 ID 为 1678881234600-0
的消息已经被 old_consumer
读取并进入 PEL,且空闲时间超过了 60 秒。现在 new_consumer
可以认领它:
bash
XCLAIM mystream mygroup new_consumer 60000 1678881234600-0
如果认领成功,1678881234600-0
这条消息在 mygroup
组的 PEL 中就会从属于 old_consumer
变为属于 new_consumer
。然后 new_consumer
就可以读取(通过 XREADGROUP
读取特定 ID 或通过 XCLAIM
返回内容)并处理它,最后进行 XACK
。
XCLAIM
是实现消费者组成员故障转移和处理“孤儿”消息(Orphaned messages)的关键。通常,你会有一个监控进程定期检查 XPENDING
,找出空闲时间过长的消息,然后调用 XCLAIM
将它们分配给健康的消费者。
5. Stream 的维护:修剪 (XTRIM
)
随着时间的推移,Stream 中的消息会不断增长,消耗内存。你需要定期或在添加消息时对 Stream 进行修剪,移除旧消息。
XTRIM
命令用于显式地修剪 Stream。
语法:
bash
XTRIM key strategy argument [LIMIT count] [~]
key
: Stream 的名称。strategy argument
: 修剪策略和对应的参数。MAXLEN count
: 按最大长度修剪。Stream 将保留最新的count
条消息。MINID ID
: 按最小 ID 修剪。Stream 将只保留 ID 大于等于指定ID
的消息。
LIMIT count
: 可选参数,限制修剪操作检查的条目数量。这不会影响最终保留的条目数量,但可以减少执行修剪时的 CPU 开销,尤其是在 Stream 非常长时。~
: 可选参数,与MAXLEN
配合使用,表示进行近似修剪。Redis 会保留大约count
条消息,而不是精确的count
条。这通常更快,且足以满足大多数场景。
示例:
-
将
mystream
修剪到只保留最新的 1000 条消息:bash
XTRIM mystream MAXLEN 1000 -
近似地将
mystream
修剪到只保留大约 1000 条消息:bash
XTRIM mystream MAXLEN ~ 1000 -
删除 ID 小于
1678881235000-0
的所有消息:bash
XTRIM mystream MINID 1678881235000-0
在 XADD
命令中使用 MAXLEN
参数是在添加消息时进行自动修剪的简便方法。显式使用 XTRIM
更适合于定期维护任务。
6. Stream 的其他管理命令
XGROUP SETID stream_key group_name ID
: 修改消费者组的下一个待读取消息 ID。用于重置消费者组的读取位置。XGROUP DELCONSUMER stream_key group_name consumer_name
: 从消费者组中删除一个消费者。如果该消费者有待处理消息,需要先处理或认领它们。XGROUP DESTROY stream_key group_name
: 删除整个消费者组。
7. Redis Stream 的应用场景
Redis Stream 因其特性,非常适合以下场景:
- 消息队列 (Message Queue): 构建高性能、持久化的消息队列,支持多种消费者模型(独立消费者或消费者组)。
- 事件溯源 (Event Sourcing): 将应用程序的状态变更记录为一系列不可变的事件序列,Stream 的 append-only 特性非常契合。
- 实时数据处理管道: 从 Stream 中读取实时数据(如 IoT 传感器数据、日志、点击流),进行处理和分析。
- 活动流 (Activity Stream): 记录用户在应用中的活动历史(如关注、点赞、评论),可用于构建 Feed 流。
- 分布式日志系统: 收集和分发来自不同服务的日志。
8. 实践中的注意事项和最佳实践
- 选择合适的 Message ID: 绝大多数情况下,使用
*
让 Redis 自动生成 ID 是最好的选择。手动指定 ID 需要确保其单调递增且大于前一个 ID,这增加了复杂性。 - 合理使用消费者组: 对于需要多个实例协作处理同一个 Stream 的场景,务必使用消费者组。组内的每个消费者应该有唯一的名称。
- 务必进行消息确认 (XACK): 除非你明确知道后果且不关心消息丢失,否则在消费者成功处理消息后一定要使用
XACK
进行确认。这保证了消息不会被重复处理,并在消费者失败时能够被其他消费者接管。 - 监控 PEL: 定期检查消费者组的 PEL (
XPENDING
) 是非常重要的监控手段。高数量的待处理消息可能表明消费者处理能力不足或存在处理失败的问题。 - 实现故障恢复逻辑: 利用
XPENDING
和XCLAIM
构建消费者故障恢复机制。当消费者崩溃后重启时,它可以首先检查自己的 PEL,处理上次失败的消息,或者让监控进程代为认领。 - Stream 修剪策略: 务必使用
MAXLEN
或MINID
对 Stream 进行修剪,否则 Stream 会无限增长,最终耗尽 Redis 内存。根据你的保留需求选择合适的策略和长度。在XADD
中使用MAXLEN ~
是一个简单有效的方法。 - 消息体设计: Stream 的消息体是键值对。设计合理的字段可以方便消息的处理。
- 批量读取: 使用
COUNT
参数可以批量读取消息,减少网络往返,提高效率。但批量大小需要权衡,过大可能导致单个消息处理失败时重试成本高。 - 阻塞读取 (
BLOCK
): 在消费者端使用阻塞读取可以有效降低 CPU 占用,避免空轮询。
9. 总结
Redis Stream 是一种功能丰富、性能优异的 Redis 数据结构,为构建各种流数据处理和消息队列场景提供了原生支持。通过掌握 XADD
进行消息生产,XREAD
进行基础读取,以及 XGROUP CREATE
, XREADGROUP
, XACK
, XPENDING
, XCLAIM
等命令进行消费者组管理,你可以轻松地在 Redis 生态系统中搭建起可靠的消息处理系统。
相较于独立的重量级消息中间件,Redis Stream 的优势在于其简单性、易于部署和运维,以及与 Redis 其他数据结构的天然集成。虽然它可能不如 Kafka 那样具有海量数据吞吐和复杂分布式特性,但在许多中小型应用或需要轻量级解决方案的场景下,Redis Stream 是一个极具吸引力的选择。
现在,你已经对 Redis Stream 有了全面的了解,包括其核心概念、基本操作、强大的消费者组功能以及重要的维护和实践建议。是时候动手实践,在你的项目中运用 Redis Stream 来解决实际问题了!