Redis Stream 教程:从入门到精通 – wiki基地


Redis Stream 教程:从入门到精通

1. 什么是 Redis Stream?

Redis Stream 是 Redis 5.0 引入的一种新型数据结构,它是一个只追加(append-only)的数据结构,用于实现高性能、低延迟的发布/订阅(Pub/Sub)功能,同时提供了消息持久化、消费者组(Consumer Groups)等高级特性。它非常适合构建事件驱动的架构、实时数据管道、消息队列和日志系统。

与传统的 Redis Pub/Sub 相比,Stream 的主要优势在于:
* 持久化:消息会被存储在 Stream 中,即使消费者离线,上线后也能获取到未处理的消息。
* 消息历史:可以回溯(replay)历史消息。
* 消费者组:允许多个消费者共享一个 Stream,共同处理消息,实现负载均衡和故障恢复。
* 消息确认:消费者可以对处理完的消息进行确认,Stream 会跟踪已处理和未处理的消息。

2. 核心概念

在深入学习 Stream 之前,了解其核心概念至关重要:

  • Stream (流):一个有序的、只追加的条目(Entry)序列。每个条目都有一个唯一的 ID 和一组键值对数据。
  • Entry ID (条目 ID):由两部分组成:timestamp-sequence(例如 1580000000000-0)。timestamp 是毫秒级 Unix 时间戳,sequence 是同一毫秒内产生的消息的递增序列号。Redis 会自动生成 ID,也可以手动指定。
  • Message (消息/条目):Stream 中的每一个数据记录。
  • Consumer (消费者):一个独立的客户端,从 Stream 中读取和处理消息。
  • Consumer Group (消费者组):由一个或多个消费者组成的逻辑组。组内的消费者共享对 Stream 的处理,每条消息只会发送给组内的一个消费者。消费者组会记住其已消费的 Stream 位置。
  • Pending Entries List (PEL):对于消费者组,当一个消息被消费者读取但尚未确认(ACK)时,它会进入该消费者的 PEL。这允许在消费者故障后,其他消费者(或同一消费者重启后)接管处理这些“待处理”消息。
  • Message Acknowledgment (消息确认):消费者在成功处理完一条消息后,向 Stream 发送确认(ACK),将其从 PEL 中移除。

3. 基本命令

让我们从 Stream 的基本操作开始。

3.1. XADD:添加消息到 Stream

XADD key ID field value [field value ...]

  • key:Stream 的名称。
  • ID:消息的 ID。通常使用 * 让 Redis 自动生成唯一的 ID。也可以手动指定,但必须比当前 Stream 中最大的 ID 大。
  • field value:消息的键值对数据。

示例:

“`redis

自动生成ID,添加一条消息

XADD mystream * sensor_id 1 temperature 25.5

Output: “1678886400000-0” (示例ID)

自动生成ID,添加另一条消息

XADD mystream * sensor_id 2 temperature 26.1

Output: “1678886400001-0” (示例ID)

“`

3.2. XRANGE / XREVRANGE:获取范围内的消息

XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]

  • key:Stream 的名称。
  • start / end:ID 范围。可以使用 - 表示最小 ID,+ 表示最大 ID。也可以使用具体 ID,或 timestamp-0 格式进行日期范围查询。
  • COUNT count:可选,限制返回的消息数量。

示例:

“`redis

获取 mystream 中的所有消息

XRANGE mystream – +

Output:

1) 1) “1678886400000-0”

2) 1) “sensor_id”

2) “1”

3) “temperature”

4) “25.5”

2) 1) “1678886400001-0”

2) 1) “sensor_id”

2) “2”

3) “temperature”

4) “26.1”

获取 ID 大于某个值的最新消息

XRANGE mystream 1678886400000-0 + COUNT 1

Output:

1) 1) “1678886400001-0”

2) 1) “sensor_id”

2) “2”

3) “temperature”

4) “26.1”

“`

3.3. XREAD:从一个或多个 Stream 读取消息

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

  • COUNT count:可选,限制每个 Stream 返回的消息数量。
  • BLOCK milliseconds:可选,阻塞读取。如果没有新消息,客户端将阻塞指定毫秒数(0 表示永远阻塞)直到有新消息或超时。
  • STREAMS key [key ...] ID [ID ...]:指定要读取的 Stream 及其起始 ID。
    • ID:从哪个 ID 之后开始读取。可以使用 $ 表示 Stream 中最新的 ID(仅读取新消息)。

示例:

“`redis

读取 mystream 中 ID 大于 1678886400000-0 的消息

XREAD STREAMS mystream 1678886400000-0

Output:

1) 1) “mystream”

2) 1) 1) “1678886400001-0”

2) 1) “sensor_id”

2) “2”

3) “temperature”

4) “26.1”

阻塞读取 mystream 的新消息 (读取最新消息之后的新消息)

在另一个客户端执行 XADD mystream * sensor_id 3 temperature 27.0 后,此命令会返回

XREAD BLOCK 0 STREAMS mystream $
“`

4. 消费者组 (Consumer Groups)

消费者组是 Redis Stream 最强大的特性之一,它允许在多个消费者之间分配消息处理任务。

4.1. XGROUP CREATE:创建消费者组

XGROUP CREATE key groupname ID | $ [MKSTREAM]

  • key:Stream 的名称。
  • groupname:消费者组的名称。
  • ID:消费者组的起始 ID。组内的消费者将从这个 ID 之后开始消费。
    • 0:从 Stream 的最开始(第一个消息)消费。
    • $:只消费创建组之后的新消息。
  • MKSTREAM:可选,如果 Stream 不存在,则自动创建。

示例:

“`redis

创建一个名为 mygroup 的消费者组,从 mystream 的最新消息之后开始消费

XGROUP CREATE mystream mygroup $

Output: “OK”

“`

4.2. XREADGROUP:从消费者组读取消息

XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

  • groupname:消费者组的名称。
  • consumername:当前消费者的名称(组内唯一)。
  • COUNT count:可选,限制返回的消息数量。
  • BLOCK milliseconds:可选,阻塞读取。
  • STREAMS key [key ...] ID [ID ...]:指定要读取的 Stream。
    • ID:对于消费者组,通常使用 > 来表示“尚未传递给当前消费者组的任何消费者”的消息。这确保了每条新消息只被组内一个消费者读取。也可以使用具体 ID,从 PEL 中读取消息。

示例:

“`redis

消费者 consumer1 从 mygroup 组读取 mystream 的新消息

XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >

Output (假设有新消息):

1) 1) “mystream”

2) 1) 1) “1678886400002-0”

2) 1) “sensor_id”

2) “3”

3) “temperature”

4) “27.0”

``
consumer1读取到1678886400002-0这条消息时,这条消息会被标记为“待处理”状态,并进入consumer1` 的 PEL。

4.3. XACK:确认消息

XACK key groupname ID [ID ...]

  • key:Stream 的名称。
  • groupname:消费者组的名称。
  • ID:要确认的消息 ID。确认后,消息将从对应消费者的 PEL 中移除。

示例:

“`redis

消费者 consumer1 确认已处理 ID 为 1678886400002-0 的消息

XACK mystream mygroup 1678886400002-0

Output: (integer) 1 (表示确认了1条消息)

“`

4.4. XPENDING:查看待处理消息

XPENDING key groupname [start end count] [consumername]

  • key:Stream 的名称。
  • groupname:消费者组的名称。
  • start / end / count:可选,用于分页查看待处理消息列表。
  • consumername:可选,指定查看某个消费者的待处理消息。

示例:

“`redis

查看 mygroup 组中所有的待处理消息

XPENDING mystream mygroup – + 10

Output (如果 consumer1 没有 ACK 消息):

1) 1) “1678886400002-0” # 消息ID

2) “consumer1” # 拥有该消息的消费者

3) (integer) 12345 # 消息的空闲时间(自消费者获取后经过的毫秒数)

4) (integer) 1 # 消息的递送尝试次数

“`

4.5. XCLAIM:转移待处理消息所有权

XCLAIM key groupname consumername min-idle-time ID [ID ...]

当一个消费者故障或处理缓慢时,其 PEL 中的消息可能长时间未被处理。XCLAIM 允许另一个消费者(或重新启动的消费者)“认领”这些消息,将其所有权转移给自己,并继续处理。

  • key:Stream 的名称。
  • groupname:消费者组的名称。
  • consumername:新的消费者名称。
  • min-idle-time:一个毫秒值,只有当消息的空闲时间超过此值时才能被认领。

示例:

“`redis

假设 consumer1 离线,ID 为 1678886400002-0 的消息长时间未处理

consumer2 认领此消息,要求空闲时间超过 10000 毫秒 (10秒)

XCLAIM mystream mygroup consumer2 10000 1678886400002-0

Output:

1) 1) “1678886400002-0”

2) 1) “sensor_id”

2) “3”

3) “temperature”

4) “27.0”

``
认领后,
1678886400002-0消息的所有权从consumer1转移到consumer2`。

4.6. XGROUP SETID:设置消费者组的最后传递 ID

XGROUP SETID key groupname ID

此命令用于手动更改消费者组的“last delivered ID”。例如,在消费者组创建后,如果需要重新处理历史消息,可以将其设置为一个较旧的 ID。

4.7. XGROUP DELCONSUMER:删除消费者

XGROUP DELCONSUMER key groupname consumername

从消费者组中删除一个特定的消费者。如果该消费者有待处理消息,它们会保留在 PEL 中,直到被 XCLAIM 或组的 ID 被重新设置。

4.8. XGROUP DESTROY:删除消费者组

XGROUP DESTROY key groupname

删除整个消费者组。

5. 高级特性和使用场景

5.1. Stream 长度限制 (XADD MAXLEN)

Stream 默认会无限增长,这可能消耗大量内存。XADD 命令可以通过 MAXLEN 选项限制 Stream 的长度。

XADD mystream MAXLEN ~ 1000 * field value ...

  • MAXLEN count:将 Stream 长度限制在 count 条消息以内。当超出限制时,最旧的消息会被删除。
  • ~:是一个近似值,Redis 可能会稍微超出这个长度以提高效率,但会保证内存使用在一个可控范围内。如果没有 ~,则会精确地保持长度,但性能可能略低。

示例:

“`redis

限制 mystream 最多包含 1000 条消息

XADD mystream MAXLEN ~ 1000 * sensor_id 4 temperature 28.1
“`

5.2. 消息剪裁 (XTRIM)

你可以使用 XTRIM 命令手动剪裁 Stream,移除旧消息。

XTRIM key MAXLEN ~ count
XTRIM key MINID ID (移除 ID 小于指定 ID 的消息)

示例:

“`redis

将 mystream 剪裁到最多 1000 条消息

XTRIM mystream MAXLEN ~ 1000
“`

5.3. 消息删除 (XDEL)

XDEL key ID [ID ...]

可以删除 Stream 中的特定消息。注意:删除消息后,消息 ID 会保留,但消息内容会被移除。对于消费者组,如果删除的消息位于 PEL 中,它将自动从 PEL 中移除。

5.4. 消息交付保证

Redis Stream 提供“至少一次”(at-least-once)的消息交付保证。这意味着每条消息至少会被成功处理一次。在消费者故障的情况下,未确认的消息会保留在 PEL 中,等待其他消费者 XCLAIM 或同一消费者重启后继续处理。

5.5. 应用场景

  • 实时数据管道:收集传感器数据、用户行为日志等,进行实时处理和分析。
  • 事件溯源 (Event Sourcing):将所有业务操作记录为一系列事件,Stream 作为事件日志存储。
  • 消息队列:替代传统的消息队列,实现高吞吐、可持久化的消息传递。
  • 通知系统:向用户推送实时通知。
  • 任务队列:通过消费者组实现分布式任务处理。

6. 使用 Go 语言操作 Redis Stream 示例

这里使用 Go 语言和 go-redis/redis 库来演示 Redis Stream 的基本操作。

首先,确保你的 Go 项目中安装了 go-redis/redis 库:
go get github.com/go-redis/redis/v8

main.go 示例:

“`go
package main

import (
“context”
“fmt”
“log”
“strconv”
“time”

"github.com/go-redis/redis/v8"

)

var ctx = context.Background()

func main() {
// 初始化 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: “localhost:6379”, // Redis 服务器地址
Password: “”, // 密码,如果没有设置则为空
DB: 0, // 默认数据库
})

// 测试连接
_, err := rdb.Ping(ctx).Result()
if err != nil {
    log.Fatalf("无法连接到 Redis: %v", err)
}
fmt.Println("成功连接到 Redis!")

streamKey := "my_go_stream"
groupName := "go_group"
consumerName1 := "go_consumer_1"
consumerName2 := "go_consumer_2"

// 1. 清理之前的 Stream 和消费者组 (可选)
rdb.Del(ctx, streamKey)
rdb.XGroupDestroy(ctx, streamKey, groupName)
fmt.Printf("清理 Stream '%s' 和消费者组 '%s' (如果存在)\n", streamKey, groupName)

// 2. 创建消费者组,从最新消息开始消费
_, err = rdb.XGroupCreateConsumer(ctx, streamKey, groupName, "0").Result()
if err != nil && err != redis.Nil { // redis.Nil 表示组已存在,不是真正的错误
    fmt.Printf("创建消费者组失败: %v\n", err)
} else {
    fmt.Printf("消费者组 '%s' 创建成功或已存在\n", groupName)
}

// 3. 生产者:添加消息到 Stream
go func() {
    for i := 0; i < 10; i++ {
        message := map[string]interface{}{
            "event":   "sensor_data",
            "value":   strconv.Itoa(i * 10),
            "timestamp": time.Now().Format(time.RFC3339),
        }
        id, err := rdb.XAdd(ctx, &redis.XAddArgs{
            Stream: streamKey,
            MaxLen: 100, // 限制 Stream 长度
            Approx: true,
            ID:     "*", // 自动生成 ID
            Values: message,
        }).Result()
        if err != nil {
            log.Printf("添加消息失败: %v", err)
        } else {
            fmt.Printf("生产者添加消息: ID=%s, Data=%v\n", id, message)
        }
        time.Sleep(500 * time.Millisecond) // 每500ms发送一条
    }
    fmt.Println("生产者发送完毕所有消息。")
}()

// 4. 消费者 1:从消费者组读取并处理消息
go func(consumer string) {
    for {
        // 从消费者组读取待处理消息或新消息
        // ">" 表示读取从未传递给任何消费者的消息,或者该消费者之前从未处理过的消息
        streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumer,
            Streams:  []string{streamKey, ">"}, // 读取 StreamKey 的新消息
            Count:    1,                        // 一次读取一条
            Block:    0,                        // 阻塞读取,0表示永不超时
        }).Result()

        if err != nil {
            if err == redis.Nil {
                // fmt.Printf("消费者 %s: 暂无新消息,等待...\n", consumer)
                time.Sleep(1 * time.Second) // 没有消息时等待一秒
                continue
            }
            log.Printf("消费者 %s 读取消息失败: %v", consumer, err)
            time.Sleep(1 * time.Second)
            continue
        }

        for _, stream := range streams {
            for _, msg := range stream.Messages {
                fmt.Printf("消费者 %s 接收到消息: ID=%s, Data=%v\n", consumer, msg.ID, msg.Values)
                // 模拟处理消息
                time.Sleep(50 * time.Millisecond)
                // 确认消息
                _, err := rdb.XAck(ctx, streamKey, groupName, msg.ID).Result()
                if err != nil {
                    log.Printf("消费者 %s 确认消息 %s 失败: %v", consumer, msg.ID, err)
                } else {
                    fmt.Printf("消费者 %s 确认消息: ID=%s\n", consumer, msg.ID)
                }
            }
        }
    }
}(consumerName1)

// 5. 消费者 2:从消费者组读取并处理消息 (模拟另一个消费者共享任务)
go func(consumer string) {
    for {
        streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumer,
            Streams:  []string{streamKey, ">"},
            Count:    1,
            Block:    0,
        }).Result()

        if err != nil {
            if err == redis.Nil {
                time.Sleep(1 * time.Second)
                continue
            }
            log.Printf("消费者 %s 读取消息失败: %v", consumer, err)
            time.Sleep(1 * time.Second)
            continue
        }

        for _, stream := range streams {
            for _, msg := range stream.Messages {
                fmt.Printf("消费者 %s 接收到消息: ID=%s, Data=%v\n", consumer, msg.ID, msg.Values)
                time.Sleep(100 * time.Millisecond) // 模拟处理时间稍长
                _, err := rdb.XAck(ctx, streamKey, groupName, msg.ID).Result()
                if err != nil {
                    log.Printf("消费者 %s 确认消息 %s 失败: %v", consumer, msg.ID, err)
                } else {
                    fmt.Printf("消费者 %s 确认消息: ID=%s\n", consumer, msg.ID)
                }
            }
        }
    }
}(consumerName2)

// 保持主 goroutine 运行,以便生产者和消费者可以工作
select {}

}
“`

7. 总结

Redis Stream 是一个强大而灵活的消息队列和事件流解决方案,它结合了传统消息队列的持久性、可靠性与 Redis 的高性能。通过对核心概念、基本命令和消费者组的理解,你可以利用 Stream 构建出高效、可伸缩的实时数据处理系统。从简单的日志收集到复杂的事件驱动微服务架构,Redis Stream 都能提供坚实的基础。

滚动至顶部