Redis Streams 入门介绍 – wiki基地


Redis Streams 入门介绍:构建高效、有序、持久的消息系统

随着现代分布式系统的发展,服务间的通信、事件的传递以及实时数据处理变得越来越重要。消息队列、事件流等概念应运而生,成为了构建解耦、弹性系统的基石。在众多的消息中间件中,Redis 作为一款高性能的内存数据库,不仅仅提供了丰富的数据结构(String, List, Set, Hash, Sorted Set),还在 Redis 5.0 版本中引入了一种新的、专为消息场景设计的数据类型——Streams (流)

Redis Streams 提供了一种持久化、只追加(append-only)的日志式数据结构,非常适合构建高性能的消息队列、事件溯源系统、实时数据管道等。它结合了传统消息队列的优点(如发布/订阅、消费者组)与 Redis 本身的高性能和易用性。

本文将带您深入了解 Redis Streams,从基本概念到核心命令,再到消费者组的高级特性,帮助您全面掌握其使用方法和应用场景。

1. Redis Streams 是什么?为什么选择它?

1.1 Streams 的本质

从最基本的层面来看,Redis Streams 是一种 只追加(append-only)的日志式数据结构。您可以向 Stream 中追加新的消息(或称“条目”,Entry),每个消息都有一个唯一的、自动生成的 ID。这个 ID 通常是一个带有时间戳和序列号的组合(例如 1518280254515-0),保证了消息在 Stream 中的全局唯一性和严格有序性。

可以将 Stream 类比为一个仅允许在末尾写入的文件,但读取可以发生在文件的任何位置。或者,将其视为一个时间序列数据的日志,每个事件都有一个精确的时间戳和顺序。

1.2 为什么选择 Redis Streams?

在 Redis 引入 Streams 之前,我们也可以使用 List (LPUSH/BRPOP) 或 Pub/Sub 来实现简单的消息队列或发布/订阅功能。那么 Streams 带来了哪些新的优势,以至于我们需要学习和使用它呢?

  1. 持久化和历史回溯能力: Lists 和 Pub/Sub 都无法轻松地实现消息的持久化和历史回溯。Pub/Sub 是典型的“发布即忘”,订阅者只能接收到订阅之后发布的消息。List 虽然可以存储,但一旦被 LPOPBRPOP 取出,消息就从 List 中消失了,无法被多个独立的消费者重复消费,也难以回溯历史消息。Streams 则天然支持持久化(取决于 Redis 的持久化配置),并且允许消费者根据 ID 从 Stream 的任意位置开始读取,轻松实现历史消息的回溯。
  2. 多消费者独立消费: 使用 Lists 实现队列时,多个消费者从同一个 List 中 BRPOP,会竞争获取消息,每条消息只能被一个消费者处理。Streams 通过 消费者组(Consumer Groups) 的机制,允许多个消费者(甚至不同的应用程序)以分组的形式消费同一个 Stream,并且在组内,每条消息只会被组内的一个消费者处理,而在组外,不同的组可以独立地消费 Stream 的全部消息。这提供了强大的并行处理能力和灵活的消费模式。
  3. 消息的原子性: Streams 中的每个消息都是一个包含多个键值对的结构,类似一个小型 Hash。发送消息时,多个字段作为一个整体原子性地写入 Stream。
  4. 内置的 ACK 机制与可观察性: Streams 的消费者组提供了消息确认(ACK)机制。消费者处理完消息后需要发送 ACK,告知 Stream 这条消息已被成功处理。如果消费者崩溃或处理失败,未被 ACK 的消息会保留在 Stream 中,可以通过 XPENDING 命令查看待处理(Pending)的消息,并支持消息的转移(XCLAIM)或重试,提供了更好的可靠性。
  5. 轻量级且集成于 Redis: 相比于独立的重量级消息中间件(如 Kafka, RabbitMQ),Redis Streams 集成在 Redis 内部,无需额外部署和维护复杂系统,搭建和使用更为简便。对于已经使用 Redis 的应用来说,可以零额外成本地引入消息队列能力。
  6. 高性能: Redis 本身就是高性能的内存数据库,Streams 继承了这一优点,读写性能优秀,适合高吞吐量的场景。

当然,Streams 并不是要取代 Kafka 或 RabbitMQ。对于超大规模、需要复杂路由、事务保证或多种队列模型的企业级场景,专门的消息中间件可能更为适合。但对于许多中小型应用、微服务间的轻量级通信、实时日志处理等场景,Redis Streams 是一个非常强大且易于使用的选择。

2. Stream 的核心概念与结构

在使用 Streams 之前,理解其内部结构和核心概念至关重要:

  1. Stream (流): 这是 Redis 的一种新的数据类型,可以把它想象成一个不断增长的有序日志文件。
  2. Entry ID (消息 ID): Stream 中的每条消息都有一个唯一的 ID。Redis 自动生成的 ID 格式是 millisecondsTimestamp-sequenceNumber,例如 1518280254515-0
    • millisecondsTimestamp: 是生成 ID 时所在 Redis 实例的本地时间戳,单位是毫秒。
    • sequenceNumber: 是一个在该毫秒内递增的序列号。如果同一毫秒内产生了多条消息,序列号会从 0 开始递增。
    • 这种 ID 设计保证了:
      • ID 是单调递增的,因此 Stream 中的消息是严格按照 ID 排序的。
      • 即使在分布式环境下(不同 Redis 实例同步数据),只要时间同步相对准确,ID 也能很好地保持顺序。
      • 在同一毫秒内处理大量消息时,序列号确保了 ID 的唯一性。
    • 您也可以在添加消息时指定 ID,但需要确保指定的 ID 大于 Stream 中已存在的最大 ID,否则会报错。通常推荐使用 * 让 Redis 自动生成 ID。
  3. Message / Entry (消息 / 条目): Stream 中的一个基本单元。它包含一个唯一的 Entry ID 和一个或多个字段-值对(field-value pairs),类似于一个小型 Hash。例如,一个消息可以是 ID: 1518280254515-0, Fields: {"sensor_id": "1001", "temperature": "25.5"}
  4. Producer (生产者): 负责向 Stream 中添加新的消息。
  5. Consumer (消费者): 负责从 Stream 中读取消息并进行处理。消费者可以独立地读取 Stream(使用 XREAD),也可以作为某个消费者组的一部分来协同消费(使用 XREADGROUP)。
  6. Consumer Group (消费者组): 允许多个消费者协同处理同一个 Stream 中的消息。Stream 会记住每个消费者组的消费进度,并且将新消息公平地分发给组内的不同消费者。在组内,一条消息默认只会被组内的一个消费者处理。
  7. Pending Entries List (PEL): 对于使用消费者组消费的消息,当消息被分发给组内某个消费者后,但在该消费者发送 ACK 确认处理完成之前,这条消息会进入该消费者组的 Pending Entries List 中。PEL 记录了每个被分发但尚未确认处理的消息及其对应的消费者和分发时间。这对于跟踪处理进度、处理消费者崩溃和实现消息重试非常重要。
  8. Last Delivered ID (最后投递 ID): 对于每个消费者组,Stream 会记录该组最后一次成功读取(或尝试读取)的消息 ID。XREADGROUP 命令通常会从此 ID 之后开始读取新消息。

理解了这些概念后,我们就可以开始学习如何使用 Redis Streams 的命令了。

3. Redis Streams 核心命令详解

3.1 生产者命令:XADD (添加消息)

XADD 是向 Stream 添加消息的唯一命令。

XADD key ID field value [field value ...]

  • key: Stream 的名称。如果 Stream 不存在,XADD 会自动创建它。
  • ID: 消息的 ID。
    • 使用 * 让 Redis 自动生成 ID。这是最常见和推荐的方式。Redis 会使用 millisecondsTimestamp-sequenceNumber 格式生成一个大于 Stream 中现有最大 ID 的 ID。
    • 可以指定一个具体的 ID,但必须大于 Stream 中已有的最大 ID。指定为 0-1 可以保证 ID 大于任何自动生成的 ID (因为它远超毫秒时间戳)。指定为 0-0 通常用于创建一个空的 Stream 或作为 ID 的起始点(但无法添加消息,因为 0-0 是最小 ID)。
  • field value [field value ...]: 组成消息体的键值对列表。一个消息可以包含一个或多个字段。

示例:

  1. 向名为 mystream 的 Stream 中添加一条消息,包含 sensor_idtemperature 字段,让 Redis 自动生成 ID。

    redis
    XADD mystream * sensor_id 1001 temperature 25.5

    返回值是自动生成的 ID,例如 1678881234567-0

  2. mystream 中添加另一条消息,包含 humidity 字段,自动生成 ID。

    redis
    XADD mystream * sensor_id 1002 humidity 60

    返回值是另一个自动生成的 ID,例如 1678881234568-0。注意时间戳增加了。

  3. mystream 中添加一条消息并指定 ID (不推荐新手使用,除非您非常清楚 ID 的生成规则和目的)。假设当前最大 ID 是 1678881234568-0

    redis
    XADD mystream 1678881234569-0 event "system startup"

    如果指定的 ID 小于等于当前最大 ID,该命令会报错。

XADD 的一些附加选项:

  • MAXLEN count: 限制 Stream 的最大长度。当 Stream 长度超过 count 时,最旧的条目会被删除。可以用于控制内存使用。
  • MAXLEN ~ count: 类似 MAXLEN count,但是使用近似修整策略。Redis 不会严格保证 Stream 的长度 恰好count,而是在 count 附近进行修整,这通常能提高效率。推荐使用此方式。
  • MINID id: 限制 Stream 的最小 ID。ID 小于等于 id 的条目会被删除。
  • MINID ~ id: 类似 MINID id,使用近似修整策略。

示例:

添加消息时,同时限制 Stream 的最大长度为 1000 条(近似)。

redis
XADD mystream MAXLEN ~ 1000 * sensor_id 1003 value 99.9

3.2 消费者命令 (独立模式):XRANGE, XREVRANGE, XREAD

3.2.1 XRANGE (按 ID 范围读取)

XRANGE 命令允许您根据 ID 范围读取 Stream 中的消息。

XRANGE key start end [COUNT count]

  • key: Stream 的名称。
  • start: 开始 ID (包含)。可以使用特殊值 - 表示 Stream 的最小 ID。
  • end: 结束 ID (包含)。可以使用特殊值 + 表示 Stream 的最大 ID。
  • COUNT count (可选): 限制返回消息的数量。

示例:

  1. 读取 mystream 中 ID 在 1678881234567-01678881234569-0 之间的所有消息。

    redis
    XRANGE mystream 1678881234567-0 1678881234569-0

  2. 读取 mystream 中的所有消息(从头到尾)。

    redis
    XRANGE mystream - +

  3. 读取 mystream 中最新的 5 条消息。这通常需要结合 XREVRANGE

3.2.2 XREVRANGE (按 ID 范围逆序读取)

XREVRANGEXRANGE 类似,但按照 ID 逆序返回消息,适合获取最新的消息。

XREVRANGE key end start [COUNT count]

  • key: Stream 的名称。
  • end: 结束 ID (包含)。可以使用 + 表示最大 ID。
  • start: 开始 ID (包含)。可以使用 - 表示最小 ID。
  • 注意:startend 的顺序与 XRANGE 相反,但值仍然代表范围的起始和结束 ID。逆序读取时,end 通常会大于 start
  • COUNT count (可选): 限制返回消息的数量。

示例:

  1. 读取 mystream 中最新的 5 条消息。

    redis
    XREVRANGE mystream + - COUNT 5

  2. 逆序读取 ID 在 1678881234569-01678881234567-0 之间的消息。

    redis
    XREVRANGE mystream 1678881234569-0 1678881234567-0

3.2.3 XREAD (读取一个或多个 Stream 的新消息,支持阻塞)

XREAD 是独立消费者读取 Stream 的主要命令,它可以从指定 ID 之后开始读取,并且支持同时读取多个 Stream。最重要的是,它支持阻塞模式,使得消费者可以等待新消息而无需频繁轮询。

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N

  • COUNT count (可选): 限制从 每个 Stream 返回的消息数量。
  • BLOCK milliseconds (可选): 阻塞读。如果指定了 BLOCK,当所有指定的 Stream 中都没有新消息时,命令会阻塞最多 milliseconds 毫秒。设置为 0 表示永远阻塞直到有新消息。
  • STREAMS: 必须紧跟此关键字,后面是要读取的 Stream 的 key 列表,然后是对应每个 Stream 的起始 ID 列表。
  • key_1 ... key_N: 要读取的 Stream 的名称列表。
  • ID_1 ... ID_N: 对应每个 Stream 的起始 ID。命令会返回 ID 大于该 ID 的消息。
    • 使用 $: 表示从该 Stream 的最新消息开始读取(即最后一条消息的 ID)。通常结合 BLOCK 使用,等待未来的新消息。

示例:

  1. 非阻塞读取 mystream 中 ID 大于 1678881234568-0 的最多 10 条消息。

    redis
    XREAD COUNT 10 STREAMS mystream 1678881234568-0

  2. 阻塞读取 mystream 的新消息,最多等待 5 秒 (5000毫秒)。如果当前没有新消息,就等待。一旦有新消息,就返回,最多返回 10 条。

    redis
    XREAD COUNT 10 BLOCK 5000 STREAMS mystream $

    这里的 $ 意味着从 mystream 当前的最后一条消息之后开始读取。

  3. 同时阻塞读取 stream1stream2 的新消息,最多等待 10 秒。

    redis
    XREAD BLOCK 10000 STREAMS stream1 stream2 $ $

XREAD 返回的结果是一个列表的列表。外层列表代表不同的 Stream,内层列表代表从该 Stream 读取到的消息,每个消息又是一个包含 ID 和字段-值对的列表。

3.3 管理命令:XLEN, XDEL, XTRIM

3.3.1 XLEN (获取 Stream 长度)

XLEN key

返回 Stream 中的消息数量。

示例:

redis
XLEN mystream

3.3.2 XDEL (删除消息)

XDEL 命令可以根据 ID 删除 Stream 中的一条或多条消息。注意,这是一种 逻辑删除。被删除的消息并不会立即从内存中移除,只是被标记为已删除。Stream 的长度(XLEN)也不会改变。物理删除发生在 Stream 被修剪(XTRIM)时。

XDEL key id [id ...]

示例:

删除 ID 为 1678881234567-0 的消息。

redis
XDEL mystream 1678881234567-0

3.3.3 XTRIM (修剪 Stream)

XTRIM 命令用于根据设定的规则(长度或 ID)删除 Stream 中旧的消息,以控制内存使用。这是实现物理删除的主要方式。

XTRIM key strategy argument [LIMIT count]

  • key: Stream 的名称。
  • strategy: 修剪策略。目前主要有两种:
    • MAXLEN: 根据最大长度进行修剪。
    • MINID: 根据最小 ID 进行修剪。
  • argument: 策略的参数。
    • 对于 MAXLEN,参数是保留的最大长度 count
    • 对于 MINID,参数是保留的最小 ID id
  • 可以使用 ~ 修饰符来启用近似修剪,例如 MAXLEN ~ countMINID ~ id。这通常能提高修剪效率。
  • LIMIT count (可选): 限制一次修剪操作删除的最大条目数。这对于避免长时间阻塞服务器非常有用,尤其是在修剪大型 Stream 时。然而,使用 LIMIT 可能意味着您需要多次执行 XTRIM 才能达到期望的长度或 ID。

示例:

  1. mystream 修剪到只保留最新的 1000 条消息(近似)。

    redis
    XTRIM mystream MAXLEN ~ 1000

  2. mystream 修剪到只保留 ID 大于等于 1678900000000-0 的消息。

    redis
    XTRIM mystream MINID 1678900000000-0

4. 消费者组 (Consumer Groups)

消费者组是 Redis Streams 中最重要的特性之一,它解决了多个消费者协同处理同一个 Stream 的问题。

4.1 为什么需要消费者组?

考虑一个场景:您有一个 Stream 接收传感器数据,有多个工作进程需要处理这些数据(例如,一个进程写入数据库,另一个进程进行实时分析)。

  • 如果使用 XREAD,每个工作进程独立地从 Stream 读取,那么每条消息都会被所有工作进程读取和处理一次。这适用于发布/订阅模式,但不适用于需要消息被 独占处理 的任务队列。
  • 如果使用传统的 List 模拟队列,虽然每条消息只会被一个进程处理(通过 BRPOP),但无法回溯历史消息,也难以管理消费进度和处理失败的情况。

消费者组提供了一种机制,允许多个消费者组成一个逻辑组,共同消费一个 Stream。在这个组内:

  • Stream 会记住该组的消费进度(最后投递的 ID)。
  • 新消息会被轮流(通常是公平地)投递给组内的不同消费者。
  • 一条消息被投递给组内某个消费者后,在被该消费者明确确认(ACK)之前,会被标记为 待处理(Pending)
  • 如果消费者崩溃或处理失败,其 Pending 中的消息不会丢失,可以被其他消费者组成员或重启后的原消费者通过 XCLAIM 命令接管处理,或者通过重新读取 Pending List 来重试。

不同的消费者组可以独立地消费同一个 Stream,互不影响。这非常适合构建微服务架构,每个微服务都可以是一个消费者组,独立地处理感兴趣的 Stream 数据。

4.2 消费者组的核心命令

4.2.1 XGROUP CREATE (创建消费者组)

在使用消费者组之前,必须先创建它。

XGROUP CREATE key groupname id [MKSTREAM]

  • key: 要关联的 Stream 的名称。
  • groupname: 要创建的消费者组的名称。
  • id: 该消费者组的起始 ID。 Stream 会记住这个 ID,组内的消费者首次读取时会从这个 ID 之后开始。
    • 0: 表示从 Stream 的第一条消息开始读取。
    • $: 表示从 Stream 的最新消息开始读取(即最后一条消息的 ID),通常用于只关心未来的新消息。
    • 也可以指定一个具体的 ID。
  • MKSTREAM (可选): 如果 Stream 不存在,使用此选项会同时创建 Stream。

示例:

  1. mystream 创建一个名为 mygroup 的消费者组,从 Stream 的头部开始消费。

    redis
    XGROUP CREATE mystream mygroup 0

  2. mystream 创建一个名为 anothergroup 的消费者组,只消费创建组之后到达的新消息。

    redis
    XGROUP CREATE mystream anothergroup $

  3. 创建一个名为 mygroup 的组,如果 nonexistentstream 不存在则同时创建它,并从最新位置开始消费。

    redis
    XGROUP CREATE nonexistentstream mygroup $ MKSTREAM

4.2.2 XREADGROUP (使用消费者组读取消息)

这是消费者组内的消费者读取消息的主要命令。

XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N

  • GROUP groupname consumername: 指定要使用的消费者组名称和当前消费者的名称。consumername 是一个任意字符串,用于唯一标识组内的某个消费者实例(例如,可以是进程 ID 或实例名称)。
  • COUNT count (可选): 限制从 每个 Stream 返回的消息数量。
  • BLOCK milliseconds (可选): 阻塞读。与 XREAD 类似,如果 Stream 中没有新消息或没有待处理的消息可读,会阻塞最多 milliseconds 毫秒。
  • NOACK (可选): 不将读取到的消息添加到 Pending Entries List 中。这意味着消息不会进入待确认状态,无需后续 XACK。适用于可以接受消息丢失或乱序的场景。
  • STREAMS key_1 ... key_N ID_1 ... ID_N: 指定要读取的 Stream 及其对应的起始 ID。
    • 对于消费者组读取,起始 ID 通常使用特殊值 >。它表示“读取 Stream 中下一个从未被当前消费者组内的任何消费者成功读取过的消息”。这是最常见的用于获取新消息的方式。
    • 如果指定具体的 ID (例如 0),则表示从该 ID 之后开始读取该消费者 自己 Pending List 中的消息,用于重试处理。

示例:

  1. 消费者 consumer-1mygroup 组内读取 mystream 中的下一条新消息,最多读取 1 条,阻塞 10 秒等待。

    redis
    XREADGROUP GROUP mygroup consumer-1 COUNT 1 BLOCK 10000 STREAMS mystream >

    当这条命令返回消息 id-A 时,id-A 就进入了 mygroup 组内消费者 consumer-1 的 Pending Entries List 中。

  2. 消费者 consumer-2mygroup 组内读取 mystream 中的下一条新消息。因为消息 id-A 已经投递给了 consumer-1 并进入了其 Pending List,consumer-2 如果执行相同的命令,将收到 Stream 中的下一条未被投递过的消息(假设为 id-B)。

    redis
    XREADGROUP GROUP mygroup consumer-2 COUNT 1 BLOCK 10000 STREAMS mystream >

  3. 消费者 consumer-1 重启后,想要读取自己 Pending List 中未处理的消息(例如,因为它之前崩溃了)。它会指定一个 ID 来读取,例如 0 表示从头开始读取自己的 Pending List。

    redis
    XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS mystream 0

    这里的 0 指的是从该消费者在 Pending List 中的最早消息开始读取。读取到的消息仍然会保留在 Pending List 中,直到 XACK

4.2.3 XACK (确认消息处理完成)

当消费者使用 XREADGROUP 读取并成功处理一条消息后,需要调用 XACK 命令来告知 Stream 该消息已处理完成。被 ACK 的消息会从该消费者组的 Pending Entries List 中移除。

XACK key groupname id [id ...]

  • key: Stream 的名称。
  • groupname: 消费者组的名称。
  • id [id ...]: 一个或多个已成功处理并需要确认的消息 ID。

示例:

消费者 consumer-1 成功处理了消息 1678881234567-0,现在进行确认。

redis
XACK mystream mygroup 1678881234567-0

返回值是被成功 ACK 的消息数量。

4.2.4 XPENDING (查看待处理消息)

XPENDING 命令用于检查消费者组中当前处于待处理(Pending)状态的消息列表。这对于监控消费者组的健康状况、识别处理缓慢或失败的消费者非常有用。

XPENDING key groupname [start end count] [consumername] [IDLE min-idle-time]

  • key: Stream 的名称。
  • groupname: 消费者组的名称。
  • start, end, count (可选): 分页查看 Pending List。startend 是 ID 范围(可以使用 -+),count 是数量。
  • consumername (可选): 只查看特定消费者的 Pending List。
  • IDLE min-idle-time (可选): 只显示空闲时间(自上次被投递或 CLAIM 以来)超过 min-idle-time 毫秒的消息。这对于找到长时间未被处理的“僵尸”消息非常有用。

示例:

  1. 查看 mygroup 组内 mystream Stream 的 Pending 摘要信息。

    redis
    XPENDING mystream mygroup

    返回结果包含 Pending 消息总数、最小 ID、最大 ID 以及每个消费者 Pending 消息的数量。

  2. 详细列出 mygroup 组内 mystream Stream 的所有 Pending 消息。

    redis
    XPENDING mystream mygroup - + 1000 # 读取前1000条Pending消息

    返回结果是 Pending 消息的详细列表,每项包含:消息 ID, 拥有该消息的消费者名称, 该消息已空闲时间(毫秒), 该消息被投递的次数。

  3. 查看消费者 consumer-1mygroup 组内的 Pending 消息。

    redis
    XPENDING mystream mygroup - + 100 consumer-1

  4. 查看 mygroup 组内,空闲时间超过 60 秒(60000毫秒)的 Pending 消息。

    redis
    XPENDING mystream mygroup - + 1000 IDLE 60000

4.2.5 XCLAIM (转移待处理消息)

XCLAIM 命令允许将指定消息从其当前所有者转移到另一个消费者。这通常用于处理消费者崩溃后,其 Pending 消息需要被其他健康消费者接管的情况。

XCLAIM key groupname newconsumername min-idle-time id [id ...] [options]

  • key: Stream 的名称。
  • groupname: 消费者组的名称。
  • newconsumername: 消息的新所有者的消费者名称。
  • min-idle-time: 只有当消息的空闲时间(自上次被投递或 CLAIM 以来)超过 min-idle-time 毫秒时,才能被 CLAIM。这防止了正在正常处理的消息被意外 CLAIM。
  • id [id ...]: 一个或多个要 CLAIM 的消息 ID。这些 ID 通常是从 XPENDING 结果中获取的。
  • options (可选):
    • IDLE ms: 设置被 CLAIM 消息的新空闲时间(毫秒)。
    • TIME ms-unix-time: 设置被 CLAIM 消息的上次投递时间(Unix 毫秒时间戳)。
    • RETRYCOUNT count: 设置被 CLAIM 消息的重试次数。
    • FORCE: 强制 CLAIM 消息,即使其空闲时间小于 min-idle-time
    • JUSTID: 只返回被成功 CLAIM 的消息 ID,而不是完整的消息内容。

示例:

假设 XPENDING 显示消息 1678881234500-0 已经被 consumer-1 拥有并且空闲时间超过了 5 分钟 (300000毫秒)。现在消费者 consumer-2 想要 CLAIM 它:

redis
XCLAIM mystream mygroup consumer-2 300000 1678881234500-0

如果 CLAIM 成功,消息 1678881234500-0 的所有者就变成了 consumer-2,并且会从 consumer-1 的 Pending List 中移除(添加到 consumer-2 的 Pending List)。XCLAIM 成功后会返回被 CLAIM 消息的内容(除非使用 JUSTID)。

4.2.6 XGROUP SETID (设置消费者组的最后投递 ID)

修改消费者组的最后投递 ID。通常用于纠正错误或跳过某些消息。

XGROUP SETID key groupname id

  • key: Stream 的名称。
  • groupname: 消费者组的名称。
  • id: 要设置的新的最后投递 ID。下次使用 XREADGROUP 命令并使用 > 时,将从此 ID 之后开始读取。

示例:

mygroup 组的消费进度跳到 Stream 的最新位置。

redis
XGROUP SETID mystream mygroup $

4.2.7 XGROUP DELCONSUMER (删除消费者)

删除消费者组中的一个消费者。这不会影响该消费者的 Pending 消息(它们仍然保留在 Pending List 中),除非使用了特定选项。

XGROUP DELCONSUMER key groupname consumername

示例:

删除 mygroup 组中的 consumer-1

redis
XGROUP DELCONSUMER mystream mygroup consumer-1

4.2.8 XGROUP DESTROY (删除消费者组)

完全删除一个消费者组及其所有相关的状态(包括 Pending List)。

XGROUP DESTROY key groupname

示例:

删除 mygroup 组。

redis
XGROUP DESTROY mystream mygroup

4.3 消费者组状态查看命令:XINFO

XINFO 命令用于查看 Stream、消费者组和消费者的详细信息。

XINFO STREAM key [FULL [COUNT count]]
XINFO GROUPS key
XINFO CONSUMERS key groupname

  • XINFO STREAM key: 查看 Stream 的元信息,如长度、第一个/最后一个 ID、消费者组数量等。FULL 选项可以显示更多信息,包括所有消息、Pending List、消费者组详情等。
  • XINFO GROUPS key: 列出 Stream 的所有消费者组及其详情。
  • XINFO CONSUMERS key groupname: 列出某个消费者组内的所有消费者及其详情。

示例:

redis
XINFO STREAM mystream
XINFO GROUPS mystream
XINFO CONSUMERS mystream mygroup

5. Streams 的应用场景

Redis Streams 的特性使其非常适合多种应用场景:

  1. 消息队列 (Message Queue): 最直接的应用。生产者使用 XADD 发送消息,消费者(独立或组)使用 XREADXREADGROUP 接收消息。消费者组提供了公平分发、并行处理和可靠性保障。
  2. 事件溯源 (Event Sourcing): 将系统中发生的每个事件(状态变更)作为一条消息追加到 Stream 中。Stream 就成为了一个完整的事件日志。通过重放 Stream 中的事件,可以重建系统的状态。Streams 的有序性和持久性非常适合此场景。
  3. 实时数据管道 (Real-time Data Pipelines): 从一个源头(如传感器、日志文件)收集数据,将每条数据作为一个消息写入 Stream。下游的多个消费者组可以独立地从 Stream 中读取数据进行不同的处理(例如,一个组写入时序数据库,一个组进行实时分析,一个组进行告警)。
  4. 活动流 (Activity Streams): 记录用户在应用中的活动(如“用户 A 点赞了文章 B”,“用户 C 关注了用户 D”)。将每个活动作为一个消息写入 Stream。其他用户或服务可以订阅这个 Stream 来获取相关的活动通知或构建个性化 Feed。
  5. 任务队列 (Task Queue): 将需要异步执行的任务作为消息放入 Stream。一组工作进程作为消费者组来竞争获取并执行任务。结合 ACK 机制,可以确保任务至少被处理一次。
  6. IoT 数据采集: 大量 IoT 设备可以将采集到的数据源源不断地发送到 Redis Streams。Streams 可以处理高并发写入,并由下游的消费者组进行数据的存储、处理和分析。

6. 与其他 Redis 数据类型和 MQ 的比较

  • vs Redis Lists (用作队列):
    • Lists 简单易用,但不支持多消费者组共享消息、历史回溯、ACK 机制和 Pending 消息管理。消息一旦被 LPOP/BRPOP 就会丢失。
    • Streams 支持所有这些 Lists 缺乏的关键特性,更适合复杂的消息场景。
  • vs Redis Pub/Sub:
    • Pub/Sub 是非持久的,消息发布后订阅者才能接收,且不支持历史回溯。是典型的“发布即忘”模式。
    • Pub/Sub 没有消费者组的概念,每条消息会被所有订阅者接收(广播模式)。
    • Streams 是持久的,支持历史回溯和消费者组的独占消费模式。
    • Pub/Sub 适合广播通知,Streams 适合需要可靠投递、多消费者协作和历史记录的场景。
  • vs Kafka/RabbitMQ 等专业 MQ:
    • 优点: Redis Streams 更轻量级,易于部署和维护,性能优秀,与 Redis 其他数据结构集成方便。对于中小型场景或对现有 Redis 基础设施的扩展非常友好。
    • 缺点: Streams 相比 Kafka/RabbitMQ 等功能更为简单,缺少一些高级特性,例如:复杂的消息路由、事务支持、丰富的插件生态、针对超大规模分布式集群的优化、多租户隔离、更成熟的监控和管理工具等。
    • 选择: 对于需要简单、高性能、易于集成的消息功能,并且对内存占用有一定容忍度(Streams 数据在内存中),Redis Streams 是一个很好的选择。对于超大规模、复杂需求、需要严格事务保证或持久化到磁盘的场景,专业 MQ 可能更合适。Redis Streams 可以作为轻量级 MQ 的替代品,或作为进入事件流领域的起点。

7. 总结与展望

Redis Streams 是 Redis 在消息处理领域的一个重要进展。它提供了一种强大、灵活、持久且高性能的日志式数据结构,非常适合构建现代分布式系统中的消息队列、事件流和实时数据管道。

通过本文的介绍,您应该对 Streams 的基本概念、核心命令(XADD, XREAD, XRANGE, XTRIM 等)以及强大的消费者组功能(XGROUP, XREADGROUP, XACK, XPENDING, XCLAIM 等)有了初步的了解。

入门只是第一步,要熟练掌握 Redis Streams,还需要更多的实践。尝试在您的项目中引入 Streams,构建一个简单的消息发布/订阅或任务队列,体验生产者和消费者组的工作流程,并利用 XPENDINGXCLAIM 来处理消费者故障。

随着您对 Redis Streams 的深入使用,您会发现它在简化系统架构、提高系统弹性和解耦服务方面带来的巨大价值。

希望这篇详细的入门介绍能够帮助您迈出使用 Redis Streams 的第一步!祝您使用愉快!


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部