Redis Stream 入门指南:构建高性能、可靠的消息队列与事件流
引言
在现代分布式系统中,消息队列和事件流已成为不可或缺的组件。它们是实现异步通信、解耦服务、构建可扩展系统以及处理实时数据流的关键。市面上有许多优秀的消息中间件,如 Kafka、RabbitMQ、ActiveMQ 等。然而,对于许多应用场景,尤其是那些已经广泛使用 Redis 的场景,我们可能会希望找到一个更轻量级、更易于集成、且性能表现不俗的解决方案。
Redis 自 5.0 版本引入的 Stream 数据结构,正是为了满足这一需求而生。Stream 并非简单地模仿传统消息队列,它结合了日志文件和消息代理的优点,提供了一种强大而灵活的方式来处理时间序列数据、构建消息队列和实现事件驱动架构。
本文将带你深入了解 Redis Stream,从核心概念到常用命令,再到高级特性如消费者组,帮助你快速掌握这一强大的工具,并在实际项目中加以应用。
1. Redis Stream 核心概念
要理解 Redis Stream,首先需要掌握几个核心概念:
- Stream (流): 这是 Redis 中的一种数据结构,类似于一个只能追加(append-only)的日志文件。所有数据都以追加的方式写入流的末尾。每个 Stream 都有一个唯一的 Key。
- Entry (条目/消息): Stream 中的最小单位。每个 Entry 都包含一个唯一的 ID 和一组键值对(Field-Value Pair)。
- Entry ID (条目 ID): 每个 Entry 在 Stream 中的唯一标识符。它是一个由两部分组成的字符串,通常是
timestampInMillis-sequenceNumber
的格式,例如1591234560000-0
。时间戳部分精确到毫秒,序列号用于处理在同一毫秒内产生的多个 Entry。Redis 会自动生成这个 ID,保证其单调递增。这种 ID 结构使得 Stream 天然地按时间顺序排序。 - Consumer (消费者): 从 Stream 中读取数据的客户端。多个消费者可以同时从同一个 Stream 中读取数据。
- Consumer Group (消费者组): 这是 Stream 最强大的特性之一,用于实现分布式消费。一个消费者组关联到一个特定的 Stream。组内的多个消费者共同消费 Stream 中的消息,每个消息在一个组内只会被一个消费者处理。消费者组会记录其在 Stream 中的消费进度。
- Pending Entry List (PEL – 待处理列表): 对于消费者组,当一个消息被投递给组内某个消费者后,它并不会立即从 Stream 中消失,而是会被加入到该消费者的 PEL 中。只有当消费者明确地向 Stream 所在的消费者组发送确认(ACK)指令后,该消息才会从 PEL 中移除,表示已被成功处理。PEL 的存在是实现消息可靠性的关键。
2. 为什么使用 Redis Stream?与 Pub/Sub、List 的比较
在 Redis 中,除了 Stream,还有 Pub/Sub 和 List 也可以用于构建消息系统。理解它们之间的差异有助于选择合适的工具:
- Pub/Sub (发布/订阅):
- 特性: 基于频道(Channel)的发布/订阅模式。消息发布到频道后,所有订阅该频道的客户端都会收到消息。
- 优点: 简单,实时性高。
- 缺点: 无持久化(客户端离线期间的消息会丢失),无历史消息(新订阅者无法获取之前发布的消息),无消费确认(消息发送即焚,无法保证消费者是否收到或处理),无法实现分布式消费(每个订阅者都收到所有消息)。
- List (列表):
- 特性: 可以用作队列(Queue),例如通过
LPUSH
写入,RPOP
或BRPOP
读取。 - 优点: 简单易用,支持阻塞读取(
BRPOP
)。 - 缺点: 难以实现多个消费者独立消费(一个消息被一个
RPOP
读走后,其他消费者就看不到了),无法实现消费者组(难以协调多个消费者共同处理一个队列),无消费确认机制(读走即认为处理),数据结构不具备时间序列特性(仅是简单列表)。
- 特性: 可以用作队列(Queue),例如通过
- Stream (流):
- 特性: 具备不可变、只能追加的日志特性,每个 Entry 有唯一 ID。支持独立的消费者读取历史消息,支持强大的消费者组实现分布式、可靠消费。
- 优点: 持久化(数据存储在 Redis 中),支持历史消息回溯,基于 ID 的有序性,强大的消费者组(实现分布式、可靠、负载均衡的消费),支持消费确认,支持阻塞读取。
- 缺点: 相较于 Pub/Sub 和 List,概念和命令稍复杂一些。
总结:
- 如果只需要简单的“广播”,不关心谁收到、是否收到、历史消息,使用 Pub/Sub。
- 如果只需要简单的“点对点”队列(一个消息只被一个消费者处理,通常是单个消费者或通过外部协调),使用 List。
- 如果需要持久化、消息顺序、历史回溯、可靠传输、分布式消费(消费者组)等特性,那么 Stream 是 Redis 中最合适的选择。
3. Stream 基本操作:数据写入 (XADD)
XADD
命令用于向 Stream 中添加新的 Entry。
语法:
bash
XADD key ID field value [field value ...]
key
: Stream 的名称。ID
: 新 Entry 的 ID。通常使用*
让 Redis 自动生成一个唯一的 ID(timestampInMillis-sequenceNumber
)。你也可以显式指定 ID,但必须确保它比 Stream 中现有的任何 ID 都要大,否则会报错。使用*
是最常见和推荐的方式。field value
: 要添加到 Entry 中的一个或多个键值对。
示例:
-
向名为
mystream
的 Stream 添加一个 Entry,包含name
和age
字段,让 Redis 自动生成 ID:bash
XADD mystream * name Alice age 30
返回结果类似1591234567890-0
,这就是新 Entry 的 ID。 -
再添加一个 Entry,包含
city
和zip
字段:bash
XADD mystream * city NewYork zip 10001
返回结果类似1591234567900-0
。注意 ID 会随着时间推移而增加。
4. Stream 基本操作:数据读取 (XRANGE, XREAD)
有两种主要的方式从 Stream 中读取数据:按范围读取 (XRANGE
) 和按 ID 顺序读取 (XREAD
)。
4.1. 按范围读取 (XRANGE)
XRANGE
用于获取一个 ID 范围内的 Entry 列表。这类似于对 Stream 进行切片查询。
语法:
bash
XRANGE key start end [COUNT count]
key
: Stream 的名称。start
: 起始 Entry ID。end
: 结束 Entry ID。[COUNT count]
: 可选参数,限制返回的 Entry 数量。
ID 特殊值:
-
: 表示 Stream 中最小可能的 ID。+
: 表示 Stream 中最大可能的 ID。- 可以使用精确的 ID (
timestamp-sequence
)。 - 可以使用只有时间戳部分的 ID (
timestamp
),Redis 会自动补齐 sequence number(例如1591234560000
会被视为1591234560000-0
)。 - 可以使用不完整的 ID 作为前缀,Redis 会查找匹配前缀的 ID。
示例:
-
获取 Stream
mystream
中的所有 Entry:bash
XRANGE mystream - +
返回结果可能类似:
1) 1) "1591234567890-0"
2) 1) "name"
2) "Alice"
3) "age"
4) "30"
2) 1) "1591234567900-0"
2) 1) "city"
2) "NewYork"
3) "zip"
4) "10001" -
获取 ID 在
1591234567890-0
到1591234567900-0
之间的 Entry (包含两端):bash
XRANGE mystream 1591234567890-0 1591234567900-0 -
获取从开头开始的最多 1 个 Entry:
bash
XRANGE mystream - + COUNT 1
4.2. 按 ID 顺序读取 (XREAD)
XREAD
用于按顺序从一个或多个 Stream 中读取数据,通常用于获取 Stream 中某个特定 ID 之后的新 Entry。它支持阻塞读取。
语法:
bash
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key1 key2 ... ID1 ID2 ...
[COUNT count]
: 可选参数,限制每个 Stream 返回的 Entry 数量。[BLOCK milliseconds]
: 可选参数,启用阻塞模式。如果指定的 Stream 当前没有新数据,命令会阻塞最多指定的毫秒数,直到有新数据到达或超时。BLOCK 0
表示永远阻塞。STREAMS key1 key2 ... ID1 ID2 ...
: 指定要读取的 Stream 和从哪个 ID 开始读取。key
和ID
是一一对应的。
ID 特殊值:
0
: 表示从 Stream 的第一个 Entry 开始读取。$
: 表示从 Stream 的最后一个 Entry 开始读取(即只读取命令执行之后新加入的 Entry)。这是实现类似 Pub/Sub 功能的常用 ID。- 可以使用具体的 ID,表示从该 ID 之后 的 Entry 开始读取。
示例:
-
从 Stream
mystream
中读取 ID 在1591234567890-0
之后的所有 Entry:bash
XREAD STREAMS mystream 1591234567890-0 -
从 Stream
mystream
中读取最新的 1 个 Entry:bash
XREAD COUNT 1 STREAMS mystream $ -
从 Stream
mystream
的末尾开始读取,如果没有新数据,阻塞 5 秒:bash
XREAD BLOCK 5000 STREAMS mystream $
如果 5 秒内有新数据,它会立即返回;否则,返回空列表。 -
同时从多个 Stream 读取,从
mystream
的 ID0
之后开始,从anotherstream
的末尾开始:bash
XREAD STREAMS mystream anotherstream 0 $
XREAD
的一个典型应用是构建简单的消费者,每个消费者独立地记录自己消费到的最后一个 ID,下次读取时从该 ID 之后继续。但这会带来一个问题:多个这样的消费者会重复处理消息。为了解决这个问题,就需要使用消费者组。
5. Stream 消费者组:分布式与可靠消费
消费者组是 Redis Stream 最重要的特性,它允许多个消费者协同处理同一个 Stream,实现负载均衡和故障恢复。
5.1. 消费者组的工作原理
在一个消费者组中:
- 每个消费者都属于一个特定的组,并有一个唯一的消费者名称。
- 组会记录它在 Stream 中已“看到”的最新消息 ID。
- 当组内的消费者使用
XREADGROUP
命令读取消息时,Stream 会将消息“分发”给组内的一个消费者。同一个消息在一个组内只会被一个消费者收到。 - 被分发的消息会被放入该消费者的 PEL (待处理列表) 中。
- 消费者处理完消息后,必须使用
XACK
命令进行确认。确认后,消息才会从该消费者的 PEL 中移除。 - 如果消费者在处理消息过程中崩溃或退出而未确认,该消息会一直留在其 PEL 中,其他消费者(或同一个消费者在恢复后)可以通过特定命令 (
XPENDING
,XCLAIM
) 来发现并接管这些待处理消息,实现故障恢复和至少一次(at-least-once)的消息投递保证。
5.2. 创建消费者组 (XGROUP CREATE)
在使用消费者组之前,需要先创建它。
语法:
bash
XGROUP CREATE key groupname id [MKSTREAM]
key
: 关联的 Stream 名称。groupname
: 要创建的消费者组名称。id
: 消费者组的起始读取 ID。指定从 Stream 的哪个位置开始向组内消费者分发消息。0
: 从 Stream 的开头开始读取。$
: 从 Stream 的当前最新消息之后开始读取。- 一个具体的 Entry ID:从该 ID 之后开始读取。
[MKSTREAM]
: 可选参数。如果 Stream 不存在,使用此选项会同时创建 Stream。
示例:
-
为
mystream
创建一个名为mygroup
的消费者组,从 Stream 的开头开始读取:bash
XGROUP CREATE mystream mygroup 0 -
为
anotherstream
创建一个名为anothergroup
的消费者组,从 Stream 的末尾开始读取(只消费后续的新消息),如果anotherstream
不存在则创建它:bash
XGROUP CREATE anotherstream anothergroup $ MKSTREAM
5.3. 使用消费者组读取数据 (XREADGROUP)
组内的消费者使用 XREADGROUP
命令读取数据。
语法:
bash
XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key1 key2 ... ID1 ID2 ...
GROUP groupname consumername
: 指定所属的消费者组名称和当前消费者名称。消费者名称在组内必须唯一。[COUNT count]
: 可选参数,限制每次读取的 Entry 数量。[BLOCK milliseconds]
: 可选参数,启用阻塞模式,与XREAD
类似。[NOACK]
: 可选参数。如果使用此选项,读取的消息将不会被添加到 PEL,也无需显式使用XACK
确认。这牺牲了可靠性,提高了吞吐量,类似于 Pub/Sub 的行为,但在消费者组上下文中使用较少。STREAMS key1 key2 ... ID1 ID2 ...
: 指定要读取的 Stream 和从哪个 ID 开始读取。与XREAD
不同的是,对于消费者组,这个 ID 通常有特殊含义。
XREADGROUP
中的 ID 特殊值:
>
: 这是在使用消费者组时最常用的 ID。它表示从消费者组的当前消费进度之后开始读取新消息。Stream 会将这些新消息分发给当前消费者,并添加到其 PEL。- 一个具体的 ID 或
0
: 表示从指定位置开始读取待处理消息。如果指定 ID 是0
,则表示读取当前消费者 PEL 中的所有待处理消息(从最早的开始)。这通常用于消费者启动时,检查并重新处理上次运行时未能完成的消息。
示例:
-
消费者
consumer1
在mygroup
组中,从组的当前进度开始读取最多 10 条新消息:bash
XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS mystream >
如果成功读取到消息,这些消息将被添加到consumer1
的 PEL 中。 -
消费者
consumer2
在mygroup
组中,阻塞读取新消息,最多等待 5 秒:bash
XREADGROUP GROUP mygroup consumer2 BLOCK 5000 STREAMS mystream > -
消费者
consumer1
启动时,检查并读取其在mygroup
中所有未处理的消息:bash
XREADGROUP GROUP mygroup consumer1 STREAMS mystream 0
这里的0
表示读取 PEL 中所有 ID 大于等于 0 的消息。
5.4. 确认消息处理 (XACK)
消费者处理完通过 XREADGROUP
收到的消息后,必须使用 XACK
命令向消费者组确认消息已成功处理。
语法:
bash
XACK key groupname id [id ...]
key
: Stream 的名称。groupname
: 消费者组的名称。id [id ...]
: 要确认的一个或多个消息 ID。这些 ID 必须当前存在于该 Stream 和该组关联的某个消费者的 PEL 中。
示例:
假设消费者 consumer1
通过 XREADGROUP
收到了 ID 为 1591234567890-0
和 1591234567900-0
的两条消息,并成功处理了它们:
bash
XACK mystream mygroup 1591234567890-0 1591234567900-0
成功确认会返回成功确认的 ID 数量。这些 ID 将从 consumer1
的 PEL 中移除。
5.5. 待处理消息与故障恢复 (XPENDING, XCLAIM)
消息可靠性的关键在于 PEL 和 XACK
。如果消费者在处理消息后崩溃,或在处理过程中未发送 XACK
,这些消息将一直留在其 PEL 中。我们可以使用 XPENDING
查看待处理消息,并使用 XCLAIM
将这些消息转移给其他健康的消费者处理。
XPENDING (查看待处理消息)
XPENDING
用于查看某个消费者组中待处理的消息列表。
语法:
bash
XPENDING key groupname [start end count] [consumername]
key
: Stream 的名称。groupname
: 消费者组的名称。[start end count]
: 可选参数,用于分页查看 PEL 中的消息。与XRANGE
类似,start
和end
是 ID 范围(-
,+
),count
限制数量。[consumername]
: 可选参数,如果指定,只查看特定消费者的 PEL;如果不指定,查看整个组的概览信息。
示例:
-
查看
mygroup
组的待处理消息概览:bash
XPENDING mystream mygroup
返回结果包含待处理消息总数、最小/最大 ID、以及每个消费者拥有的待处理消息数量等信息。 -
查看
mygroup
组中 ID 在某个范围内的待处理消息详情,最多 10 条:bash
XPENDING mystream mygroup - + 10
返回结果列表,每项包含消息 ID、拥有该消息的消费者名称、空闲时间(自从被投递给该消费者后经过的时间)、被投递的次数。 -
查看消费者
consumer1
在mygroup
中所有待处理消息:bash
XPENDING mystream mygroup - + 1000 consumer1
XCLAIM (转移待处理消息)
XCLAIM
用于将某个消费者的 PEL 中的消息转移给另一个消费者。这通常用于处理因消费者崩溃而遗留的待处理消息。
语法:
bash
XCLAIM key groupname consumername min-idle-time id [id ...] [IDLE ms] [TIME ms-us] [RETRYCOUNT count] [FORCE] [JUSTID]
key
: Stream 的名称。groupname
: 消费者组的名称。consumername
: 要将消息转移给的目标消费者名称。min-idle-time
: 最低空闲时间(毫秒)。只有当消息在原消费者那里待处理的时间超过这个值时,才会被转移。这可以避免“抢占”正在被健康消费者处理的消息。id [id ...]
: 要转移的一个或多个消息 ID。这些 ID 必须当前存在于某个消费者的 PEL 中。[IDLE ms]
: 可选参数,设置转移后消息的空闲时间。[TIME ms-us]
: 可选参数,设置转移后消息的内部时间(Entry ID 的时间部分)。[RETRYCOUNT count]
: 可选参数,设置转移后消息的投递尝试次数。[FORCE]
: 可选参数,强制转移,即使消息的空闲时间未达到min-idle-time
或指定的 ID 不在任何 PEL 中。[JUSTID]
: 可选参数,只返回被成功转移的消息 ID,而不返回消息内容。
示例:
假设 XPENDING
发现消费者 consumer1
有一些空闲时间超过 30 秒的待处理消息,我们希望将它们转移给 consumer2
处理:
- 首先使用
XPENDING
找到空闲时间长的消息 ID,例如1591234567000-0
。 -
将该消息转移给
consumer2
,要求空闲时间至少 30000 毫秒:bash
XCLAIM mystream mygroup consumer2 30000 1591234567000-0
如果成功转移,该消息将从consumer1
的 PEL 移到consumer2
的 PEL,并返回消息内容。
恢复策略概览:
常见的故障恢复策略是:
- 消费者启动时,先使用
XREADGROUP
从 ID0
开始读取(即读取自己的 PEL)。 - 处理完 PEL 中的消息并确认。
- 然后切换到从 ID
>
开始读取新消息。 - 一个独立的监控进程或某个消费者定期使用
XPENDING
检查是否有空闲时间过长的消息。 - 如果发现空闲时间过长的消息,使用
XCLAIM
将这些消息转移给其他健康消费者处理。
5.6. 销毁消费者组 (XGROUP DESTROY)
当不再需要某个消费者组时,可以使用 XGROUP DESTROY
删除它。
语法:
bash
XGROUP DESTROY key groupname
示例:
bash
XGROUP DESTROY mystream mygroup
6. Stream 容量管理 (XTRIM)
Stream 是只能追加的,长时间运行可能会占用大量内存。XTRIM
命令用于裁剪 Stream,移除旧的 Entry。
语法:
bash
XTRIM key MAXLEN|MINID approx_count
key
: Stream 的名称。MAXLEN count
: 按最大 Entry 数量进行裁剪。Stream 将只保留最新的count
个 Entry。MINID id
: 按最小 Entry ID 进行裁剪。Stream 将移除所有 ID 小于id
的 Entry。approx_count
: 可选参数,用于MAXLEN
。如果在count
前加上~
,表示使用近似裁剪。Redis 不会精确地保留count
个 Entry,而是在接近count
时进行裁剪,这可以提高性能,避免精确计数带来的开销。对于大多数使用场景,近似裁剪是足够的。
示例:
-
将
mystream
裁剪到最多保留 1000 个 Entry:bash
XTRIM mystream MAXLEN 1000 -
将
mystream
近似裁剪到最多保留 10000 个 Entry:bash
XTRIM mystream MAXLEN ~ 10000 -
移除
mystream
中所有 ID 小于1591234567000-0
的 Entry:bash
XTRIM mystream MINID 1591234567000-0
裁剪是 Stream 维护的重要操作,通常需要在应用层面或通过定时任务来执行,以控制内存使用。
7. Stream 监控与信息 (XINFO)
XINFO
命令用于获取 Stream、消费者组和消费者的详细信息,对于监控和调试非常有用。
语法:
bash
XINFO STREAM key
XINFO GROUPS key
XINFO CONSUMERS key groupname
示例:
-
查看
mystream
的 Stream 信息(长度、第一个/最后一个 Entry ID、消费者组数量等):bash
XINFO STREAM mystream -
查看
mystream
关联的所有消费者组信息(组名、PEL 数量、最后消费 ID 等):bash
XINFO GROUPS mystream -
查看
mygroup
组中的所有消费者信息(消费者名、PEL 数量、空闲时间等):bash
XINFO CONSUMERS mystream mygroup
8. Stream 的实际应用场景
Redis Stream 可以应用于多种场景:
- 消息队列: 作为轻量级的消息中间件,实现服务间的异步通信、任务分发。消费者组提供了可靠性和可伸缩性。
- 事件源 (Event Sourcing): 记录系统中发生的所有状态变更事件,Stream 的追加日志特性非常适合此场景。
- 任务队列: 将待处理的任务作为 Entry 加入 Stream,工作进程作为消费者从组中读取并处理任务。
- 实时数据管道: 收集和分发实时数据流,例如日志、监控数据等。
- 排行榜或活动流: 记录用户行为或系统事件,并按时间顺序展示。
9. 使用 Redis Stream 的一些注意事项和最佳实践
- Entry ID: 除非有特殊需求,总是使用
*
让 Redis 自动生成 ID。手动指定 ID 容易出错,且可能破坏顺序性(虽然 Redis 会拒绝乱序 ID)。 - 消费者组与消费者名称: 在消费者组中,每个消费者必须有唯一的名称。这个名称应该具有标识性,例如结合 hostname 和 process ID。
- 消息确认 (XACK): 使用消费者组时,务必在成功处理消息后调用
XACK
。忘记确认会导致 PEL 不断增长,最终耗尽内存。 - 故障恢复: 结合
XPENDING
和XCLAIM
实现故障恢复逻辑是构建高可靠消费者的关键。消费者启动时先检查自己的 PEL,或者通过监控定期扫描长期空闲的消息。 - 裁剪 (XTRIM): 定期或在
XADD
时使用XTRIM
(MAXLEN
) 控制 Stream 的大小,防止内存无限增长。对于高写入量的 Stream,使用近似裁剪 (MAXLEN ~
) 性能更佳。 - 阻塞读取 (BLOCK): 在消费者中使用
BLOCK
可以减少 CPU 占用,避免空轮询。设置一个合理的超时时间,以便在超时后可以执行一些维护任务(如检查关闭信号)。 - 多 Stream vs 单 Stream: 根据业务需求决定是将不同类型的消息放入同一个 Stream 还是不同的 Stream。同一个 Stream 的所有 Entry 共享 ID 空间,按时间全局排序。不同的 Stream 更易于隔离和管理。
- Entry 内容: Entry 中的键值对内容没有严格限制,但建议结构化,例如使用 JSON 或其他序列化格式放入一个字段中,方便消费者解析。
- 监控: 利用
XINFO
命令结合监控系统,实时了解 Stream 长度、消费者组状态、PEL 大小等关键指标。
结论
Redis Stream 是 Redis 在消息处理和事件流领域的一个重要扩展,它以其简洁的设计、强大的功能和与 Redis 生态的无缝集成,为许多应用场景提供了极具吸引力的解决方案。与传统的 Redis List 和 Pub/Sub 相比,Stream 在持久化、历史回溯、有序性以及最重要的分布式可靠消费方面具有显著优势。
通过本文的详细介绍,你应该对 Redis Stream 的核心概念、基本命令(XADD
, XRANGE
, XREAD
),以及关键的消费者组特性(XGROUP CREATE
, XREADGROUP
, XACK
, XPENDING
, XCLAIM
, XGROUP DESTROY
)有了全面的了解。同时,也掌握了如何进行 Stream 裁剪(XTRIM
)和获取信息(XINFO
)。
虽然 Redis Stream 不像 Kafka 或 RabbitMQ 那样是专门的消息中间件,拥有它们全部的高级特性和极致的规模伸缩能力,但对于许多中小型项目、对现有 Redis 依赖强的系统、或者需要一个简单易用且可靠的消息解决方案的场景,Redis Stream 绝对是值得深入探索和应用的强大工具。
现在,是时候动手实践,在你的项目中尝试使用 Redis Stream,体验它带来的便利和强大了!