Redis Stream 教程:从入门到精通
1. 什么是 Redis Stream?
Redis Stream 是 Redis 5.0 引入的一种新型数据结构,它是一个只追加(append-only)的数据结构,用于实现高性能、低延迟的发布/订阅(Pub/Sub)功能,同时提供了消息持久化、消费者组(Consumer Groups)等高级特性。它非常适合构建事件驱动的架构、实时数据管道、消息队列和日志系统。
与传统的 Redis Pub/Sub 相比,Stream 的主要优势在于:
* 持久化:消息会被存储在 Stream 中,即使消费者离线,上线后也能获取到未处理的消息。
* 消息历史:可以回溯(replay)历史消息。
* 消费者组:允许多个消费者共享一个 Stream,共同处理消息,实现负载均衡和故障恢复。
* 消息确认:消费者可以对处理完的消息进行确认,Stream 会跟踪已处理和未处理的消息。
2. 核心概念
在深入学习 Stream 之前,了解其核心概念至关重要:
- Stream (流):一个有序的、只追加的条目(Entry)序列。每个条目都有一个唯一的 ID 和一组键值对数据。
- Entry ID (条目 ID):由两部分组成:
timestamp-sequence(例如1580000000000-0)。timestamp是毫秒级 Unix 时间戳,sequence是同一毫秒内产生的消息的递增序列号。Redis 会自动生成 ID,也可以手动指定。 - Message (消息/条目):Stream 中的每一个数据记录。
- Consumer (消费者):一个独立的客户端,从 Stream 中读取和处理消息。
- Consumer Group (消费者组):由一个或多个消费者组成的逻辑组。组内的消费者共享对 Stream 的处理,每条消息只会发送给组内的一个消费者。消费者组会记住其已消费的 Stream 位置。
- Pending Entries List (PEL):对于消费者组,当一个消息被消费者读取但尚未确认(ACK)时,它会进入该消费者的 PEL。这允许在消费者故障后,其他消费者(或同一消费者重启后)接管处理这些“待处理”消息。
- Message Acknowledgment (消息确认):消费者在成功处理完一条消息后,向 Stream 发送确认(ACK),将其从 PEL 中移除。
3. 基本命令
让我们从 Stream 的基本操作开始。
3.1. XADD:添加消息到 Stream
XADD key ID field value [field value ...]
key:Stream 的名称。ID:消息的 ID。通常使用*让 Redis 自动生成唯一的 ID。也可以手动指定,但必须比当前 Stream 中最大的 ID 大。field value:消息的键值对数据。
示例:
“`redis
自动生成ID,添加一条消息
XADD mystream * sensor_id 1 temperature 25.5
Output: “1678886400000-0” (示例ID)
自动生成ID,添加另一条消息
XADD mystream * sensor_id 2 temperature 26.1
Output: “1678886400001-0” (示例ID)
“`
3.2. XRANGE / XREVRANGE:获取范围内的消息
XRANGE key start end [COUNT count]
XREVRANGE key end start [COUNT count]
key:Stream 的名称。start/end:ID 范围。可以使用-表示最小 ID,+表示最大 ID。也可以使用具体 ID,或timestamp-0格式进行日期范围查询。COUNT count:可选,限制返回的消息数量。
示例:
“`redis
获取 mystream 中的所有消息
XRANGE mystream – +
Output:
1) 1) “1678886400000-0”
2) 1) “sensor_id”
2) “1”
3) “temperature”
4) “25.5”
2) 1) “1678886400001-0”
2) 1) “sensor_id”
2) “2”
3) “temperature”
4) “26.1”
获取 ID 大于某个值的最新消息
XRANGE mystream 1678886400000-0 + COUNT 1
Output:
1) 1) “1678886400001-0”
2) 1) “sensor_id”
2) “2”
3) “temperature”
4) “26.1”
“`
3.3. XREAD:从一个或多个 Stream 读取消息
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
COUNT count:可选,限制每个 Stream 返回的消息数量。BLOCK milliseconds:可选,阻塞读取。如果没有新消息,客户端将阻塞指定毫秒数(0 表示永远阻塞)直到有新消息或超时。STREAMS key [key ...] ID [ID ...]:指定要读取的 Stream 及其起始 ID。ID:从哪个 ID 之后开始读取。可以使用$表示 Stream 中最新的 ID(仅读取新消息)。
示例:
“`redis
读取 mystream 中 ID 大于 1678886400000-0 的消息
XREAD STREAMS mystream 1678886400000-0
Output:
1) 1) “mystream”
2) 1) 1) “1678886400001-0”
2) 1) “sensor_id”
2) “2”
3) “temperature”
4) “26.1”
阻塞读取 mystream 的新消息 (读取最新消息之后的新消息)
在另一个客户端执行 XADD mystream * sensor_id 3 temperature 27.0 后,此命令会返回
XREAD BLOCK 0 STREAMS mystream $
“`
4. 消费者组 (Consumer Groups)
消费者组是 Redis Stream 最强大的特性之一,它允许在多个消费者之间分配消息处理任务。
4.1. XGROUP CREATE:创建消费者组
XGROUP CREATE key groupname ID | $ [MKSTREAM]
key:Stream 的名称。groupname:消费者组的名称。ID:消费者组的起始 ID。组内的消费者将从这个 ID 之后开始消费。0:从 Stream 的最开始(第一个消息)消费。$:只消费创建组之后的新消息。
MKSTREAM:可选,如果 Stream 不存在,则自动创建。
示例:
“`redis
创建一个名为 mygroup 的消费者组,从 mystream 的最新消息之后开始消费
XGROUP CREATE mystream mygroup $
Output: “OK”
“`
4.2. XREADGROUP:从消费者组读取消息
XREADGROUP GROUP groupname consumername [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
groupname:消费者组的名称。consumername:当前消费者的名称(组内唯一)。COUNT count:可选,限制返回的消息数量。BLOCK milliseconds:可选,阻塞读取。STREAMS key [key ...] ID [ID ...]:指定要读取的 Stream。ID:对于消费者组,通常使用>来表示“尚未传递给当前消费者组的任何消费者”的消息。这确保了每条新消息只被组内一个消费者读取。也可以使用具体 ID,从 PEL 中读取消息。
示例:
“`redis
消费者 consumer1 从 mygroup 组读取 mystream 的新消息
XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream >
Output (假设有新消息):
1) 1) “mystream”
2) 1) 1) “1678886400002-0”
2) 1) “sensor_id”
2) “3”
3) “temperature”
4) “27.0”
``consumer1
当读取到1678886400002-0这条消息时,这条消息会被标记为“待处理”状态,并进入consumer1` 的 PEL。
4.3. XACK:确认消息
XACK key groupname ID [ID ...]
key:Stream 的名称。groupname:消费者组的名称。ID:要确认的消息 ID。确认后,消息将从对应消费者的 PEL 中移除。
示例:
“`redis
消费者 consumer1 确认已处理 ID 为 1678886400002-0 的消息
XACK mystream mygroup 1678886400002-0
Output: (integer) 1 (表示确认了1条消息)
“`
4.4. XPENDING:查看待处理消息
XPENDING key groupname [start end count] [consumername]
key:Stream 的名称。groupname:消费者组的名称。start/end/count:可选,用于分页查看待处理消息列表。consumername:可选,指定查看某个消费者的待处理消息。
示例:
“`redis
查看 mygroup 组中所有的待处理消息
XPENDING mystream mygroup – + 10
Output (如果 consumer1 没有 ACK 消息):
1) 1) “1678886400002-0” # 消息ID
2) “consumer1” # 拥有该消息的消费者
3) (integer) 12345 # 消息的空闲时间(自消费者获取后经过的毫秒数)
4) (integer) 1 # 消息的递送尝试次数
“`
4.5. XCLAIM:转移待处理消息所有权
XCLAIM key groupname consumername min-idle-time ID [ID ...]
当一个消费者故障或处理缓慢时,其 PEL 中的消息可能长时间未被处理。XCLAIM 允许另一个消费者(或重新启动的消费者)“认领”这些消息,将其所有权转移给自己,并继续处理。
key:Stream 的名称。groupname:消费者组的名称。consumername:新的消费者名称。min-idle-time:一个毫秒值,只有当消息的空闲时间超过此值时才能被认领。
示例:
“`redis
假设 consumer1 离线,ID 为 1678886400002-0 的消息长时间未处理
consumer2 认领此消息,要求空闲时间超过 10000 毫秒 (10秒)
XCLAIM mystream mygroup consumer2 10000 1678886400002-0
Output:
1) 1) “1678886400002-0”
2) 1) “sensor_id”
2) “3”
3) “temperature”
4) “27.0”
``1678886400002-0
认领后,消息的所有权从consumer1转移到consumer2`。
4.6. XGROUP SETID:设置消费者组的最后传递 ID
XGROUP SETID key groupname ID
此命令用于手动更改消费者组的“last delivered ID”。例如,在消费者组创建后,如果需要重新处理历史消息,可以将其设置为一个较旧的 ID。
4.7. XGROUP DELCONSUMER:删除消费者
XGROUP DELCONSUMER key groupname consumername
从消费者组中删除一个特定的消费者。如果该消费者有待处理消息,它们会保留在 PEL 中,直到被 XCLAIM 或组的 ID 被重新设置。
4.8. XGROUP DESTROY:删除消费者组
XGROUP DESTROY key groupname
删除整个消费者组。
5. 高级特性和使用场景
5.1. Stream 长度限制 (XADD MAXLEN)
Stream 默认会无限增长,这可能消耗大量内存。XADD 命令可以通过 MAXLEN 选项限制 Stream 的长度。
XADD mystream MAXLEN ~ 1000 * field value ...
MAXLEN count:将 Stream 长度限制在count条消息以内。当超出限制时,最旧的消息会被删除。~:是一个近似值,Redis 可能会稍微超出这个长度以提高效率,但会保证内存使用在一个可控范围内。如果没有~,则会精确地保持长度,但性能可能略低。
示例:
“`redis
限制 mystream 最多包含 1000 条消息
XADD mystream MAXLEN ~ 1000 * sensor_id 4 temperature 28.1
“`
5.2. 消息剪裁 (XTRIM)
你可以使用 XTRIM 命令手动剪裁 Stream,移除旧消息。
XTRIM key MAXLEN ~ count
XTRIM key MINID ID (移除 ID 小于指定 ID 的消息)
示例:
“`redis
将 mystream 剪裁到最多 1000 条消息
XTRIM mystream MAXLEN ~ 1000
“`
5.3. 消息删除 (XDEL)
XDEL key ID [ID ...]
可以删除 Stream 中的特定消息。注意:删除消息后,消息 ID 会保留,但消息内容会被移除。对于消费者组,如果删除的消息位于 PEL 中,它将自动从 PEL 中移除。
5.4. 消息交付保证
Redis Stream 提供“至少一次”(at-least-once)的消息交付保证。这意味着每条消息至少会被成功处理一次。在消费者故障的情况下,未确认的消息会保留在 PEL 中,等待其他消费者 XCLAIM 或同一消费者重启后继续处理。
5.5. 应用场景
- 实时数据管道:收集传感器数据、用户行为日志等,进行实时处理和分析。
- 事件溯源 (Event Sourcing):将所有业务操作记录为一系列事件,Stream 作为事件日志存储。
- 消息队列:替代传统的消息队列,实现高吞吐、可持久化的消息传递。
- 通知系统:向用户推送实时通知。
- 任务队列:通过消费者组实现分布式任务处理。
6. 使用 Go 语言操作 Redis Stream 示例
这里使用 Go 语言和 go-redis/redis 库来演示 Redis Stream 的基本操作。
首先,确保你的 Go 项目中安装了 go-redis/redis 库:
go get github.com/go-redis/redis/v8
main.go 示例:
“`go
package main
import (
“context”
“fmt”
“log”
“strconv”
“time”
"github.com/go-redis/redis/v8"
)
var ctx = context.Background()
func main() {
// 初始化 Redis 客户端
rdb := redis.NewClient(&redis.Options{
Addr: “localhost:6379”, // Redis 服务器地址
Password: “”, // 密码,如果没有设置则为空
DB: 0, // 默认数据库
})
// 测试连接
_, err := rdb.Ping(ctx).Result()
if err != nil {
log.Fatalf("无法连接到 Redis: %v", err)
}
fmt.Println("成功连接到 Redis!")
streamKey := "my_go_stream"
groupName := "go_group"
consumerName1 := "go_consumer_1"
consumerName2 := "go_consumer_2"
// 1. 清理之前的 Stream 和消费者组 (可选)
rdb.Del(ctx, streamKey)
rdb.XGroupDestroy(ctx, streamKey, groupName)
fmt.Printf("清理 Stream '%s' 和消费者组 '%s' (如果存在)\n", streamKey, groupName)
// 2. 创建消费者组,从最新消息开始消费
_, err = rdb.XGroupCreateConsumer(ctx, streamKey, groupName, "0").Result()
if err != nil && err != redis.Nil { // redis.Nil 表示组已存在,不是真正的错误
fmt.Printf("创建消费者组失败: %v\n", err)
} else {
fmt.Printf("消费者组 '%s' 创建成功或已存在\n", groupName)
}
// 3. 生产者:添加消息到 Stream
go func() {
for i := 0; i < 10; i++ {
message := map[string]interface{}{
"event": "sensor_data",
"value": strconv.Itoa(i * 10),
"timestamp": time.Now().Format(time.RFC3339),
}
id, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamKey,
MaxLen: 100, // 限制 Stream 长度
Approx: true,
ID: "*", // 自动生成 ID
Values: message,
}).Result()
if err != nil {
log.Printf("添加消息失败: %v", err)
} else {
fmt.Printf("生产者添加消息: ID=%s, Data=%v\n", id, message)
}
time.Sleep(500 * time.Millisecond) // 每500ms发送一条
}
fmt.Println("生产者发送完毕所有消息。")
}()
// 4. 消费者 1:从消费者组读取并处理消息
go func(consumer string) {
for {
// 从消费者组读取待处理消息或新消息
// ">" 表示读取从未传递给任何消费者的消息,或者该消费者之前从未处理过的消息
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumer,
Streams: []string{streamKey, ">"}, // 读取 StreamKey 的新消息
Count: 1, // 一次读取一条
Block: 0, // 阻塞读取,0表示永不超时
}).Result()
if err != nil {
if err == redis.Nil {
// fmt.Printf("消费者 %s: 暂无新消息,等待...\n", consumer)
time.Sleep(1 * time.Second) // 没有消息时等待一秒
continue
}
log.Printf("消费者 %s 读取消息失败: %v", consumer, err)
time.Sleep(1 * time.Second)
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
fmt.Printf("消费者 %s 接收到消息: ID=%s, Data=%v\n", consumer, msg.ID, msg.Values)
// 模拟处理消息
time.Sleep(50 * time.Millisecond)
// 确认消息
_, err := rdb.XAck(ctx, streamKey, groupName, msg.ID).Result()
if err != nil {
log.Printf("消费者 %s 确认消息 %s 失败: %v", consumer, msg.ID, err)
} else {
fmt.Printf("消费者 %s 确认消息: ID=%s\n", consumer, msg.ID)
}
}
}
}
}(consumerName1)
// 5. 消费者 2:从消费者组读取并处理消息 (模拟另一个消费者共享任务)
go func(consumer string) {
for {
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumer,
Streams: []string{streamKey, ">"},
Count: 1,
Block: 0,
}).Result()
if err != nil {
if err == redis.Nil {
time.Sleep(1 * time.Second)
continue
}
log.Printf("消费者 %s 读取消息失败: %v", consumer, err)
time.Sleep(1 * time.Second)
continue
}
for _, stream := range streams {
for _, msg := range stream.Messages {
fmt.Printf("消费者 %s 接收到消息: ID=%s, Data=%v\n", consumer, msg.ID, msg.Values)
time.Sleep(100 * time.Millisecond) // 模拟处理时间稍长
_, err := rdb.XAck(ctx, streamKey, groupName, msg.ID).Result()
if err != nil {
log.Printf("消费者 %s 确认消息 %s 失败: %v", consumer, msg.ID, err)
} else {
fmt.Printf("消费者 %s 确认消息: ID=%s\n", consumer, msg.ID)
}
}
}
}
}(consumerName2)
// 保持主 goroutine 运行,以便生产者和消费者可以工作
select {}
}
“`
7. 总结
Redis Stream 是一个强大而灵活的消息队列和事件流解决方案,它结合了传统消息队列的持久性、可靠性与 Redis 的高性能。通过对核心概念、基本命令和消费者组的理解,你可以利用 Stream 构建出高效、可伸缩的实时数据处理系统。从简单的日志收集到复杂的事件驱动微服务架构,Redis Stream 都能提供坚实的基础。