Redis Pub/Sub 使用教程:轻松掌握发布订阅功能
在现代分布式系统中,服务间的通信是核心组成部分。除了传统的请求-响应模式(如 RESTful API)之外,异步消息通信模式在解耦、实时通知和构建事件驱动架构方面发挥着至关重要的作用。发布/订阅(Publish/Subscribe,简称 Pub/Sub)就是一种非常流行的异步消息模式。
Redis,作为一个高性能的内存数据结构存储系统,不仅提供了丰富的数据结构,还内置了对 Pub/Sub 功能的支持。Redis 的 Pub/Sub 功能以其简单、快速的特点,成为许多应用场景下的理想选择。
本文将带你深入了解 Redis 的发布订阅功能,从基本概念到实际应用,让你轻松掌握这一强大的特性。
1. 什么是发布/订阅模式 (Pub/Sub)?
在开始学习 Redis Pub/Sub 之前,我们先简单回顾一下发布/订阅模式的基本概念。
发布/订阅模式是一种消息传递模式,它与传统的客户端-服务器模式或请求-响应模式有着本质区别。在 Pub/Sub 模式中:
- 发布者 (Publisher):消息的发送者,它们将消息发布到特定的“频道”(或称“主题”)。发布者不关心谁会接收到消息,也不直接将消息发送给特定的接收者。
- 订阅者 (Subscriber):消息的接收者,它们对一个或多个特定的“频道”感兴趣。当有消息发布到它们订阅的频道时,订阅者就会收到消息。订阅者不关心是谁发布的这条消息。
- 频道 (Channel):消息的中转站或者说分类标签。发布者将消息发送到频道,订阅者从频道接收消息。它解耦了发布者和订阅者,使它们无需知道彼此的存在。
Pub/Sub 模式的核心优势在于:
- 解耦:发布者和订阅者之间无需直接连接或了解对方。它们只通过频道进行交互,大大降低了系统组件间的依赖。
- 广播能力:一条消息可以被多个订阅者同时接收,非常适合实现一对多的通知和广播场景。
- 异步性:发布者发送消息后不会等待订阅者的响应,提高了系统的吞吐量和响应速度。
2. Redis Pub/Sub 的工作原理
Redis 的 Pub/Sub 实现遵循了标准的发布/订阅模式。Redis 服务器充当消息代理的角色,负责接收发布者发布的消息,并将这些消息转发给所有当前订阅了该频道的客户端。
其工作流程大致如下:
- 一个或多个客户端通过执行
SUBSCRIBE
或PSUBSCRIBE
命令,向 Redis 服务器表达对特定频道或频道模式的兴趣,成为订阅者。这些客户端会进入一个特殊的“订阅模式”。 - 另一个客户端(发布者)通过执行
PUBLISH
命令,将消息发送到某个频道。 - Redis 服务器接收到
PUBLISH
命令后,不会将消息存储在任何数据结构中,而是立即查找当前所有处于订阅模式的客户端。 - 对于每一个订阅了该频道(或者订阅了能匹配该频道模式)的客户端,Redis 服务器会将消息发送给它。
重要特性和注意事项:
- 无持久化:Redis Pub/Sub 的消息是“即发即失”的。如果一个客户端在消息发布时没有连接到 Redis 并订阅该频道,那么这条消息对它来说就永久丢失了。Redis 不会为离线订阅者存储消息。
- 实时性:消息一旦发布,Redis 会立即将其发送给所有在线的订阅者,提供了非常高的实时性。
- 客户端状态:一旦客户端进入订阅模式(通过
SUBSCRIBE
或PSUBSCRIBE
),它就只能接收 Pub/Sub 相关的消息(订阅成功确认、收到消息、取消订阅确认)。除了SUBSCRIBE
,PSUBSCRIBE
,UNSUBSCRIBE
,PUNSUBSCRIBE
,PING
,QUIT
等少数命令外,它不能执行其他常规的 Redis 命令(如GET
,SET
,HGETALL
等)。如果需要执行其他命令,需要另外开启一个 Redis 连接。 - 发送方不关心接收方:
PUBLISH
命令会返回接收到消息的订阅者数量,但这只是一个实时计数,发布者并不知道具体是哪些订阅者收到了消息。
3. Redis Pub/Sub 基本命令
Redis 提供了几个简单的命令来实现发布和订阅功能。
3.1 PUBLISH channel message
- 功能:将一条消息发布到指定的频道。
- 参数:
channel
: 消息要发送到的频道名称(字符串)。message
: 要发送的消息内容(字符串)。
- 返回值:接收到这条消息的订阅者数量。这个数量包括通过
SUBSCRIBE
和PSUBSCRIBE
订阅的客户端。如果没有任何订阅者在线,返回 0。 -
示例:
redis
PUBLISH news.it "Hello IT subscribers!"如果当前有 3 个客户端订阅了
news.it
频道,这个命令将返回(integer) 3
。
3.2 SUBSCRIBE channel [channel ...]
- 功能:订阅一个或多个指定的频道。
- 参数:一个或多个频道名称。
- 返回值:客户端执行
SUBSCRIBE
命令后,会进入订阅模式。对于每个成功订阅的频道,Redis 会返回一个确认消息数组,格式为["subscribe", channel, count]
,其中channel
是订阅成功的频道名称,count
是当前客户端总共订阅的频道数量。之后,当有消息发布到这些频道时,客户端会收到格式为["message", channel, data]
的消息数组,其中channel
是收到消息的频道,data
是消息内容。 -
示例(在 Redis CLI 中打开一个客户端 A):
redis
SUBSCRIBE channel1 channel2客户端 A 会收到如下响应(假设这是它首次订阅):
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel1"
3) (integer) 1
1) "subscribe"
2) "channel2"
3) (integer) 2示例(在 Redis CLI 中打开另一个客户端 B,发布消息):
redis
PUBLISH channel1 "This is a message for channel1"客户端 A 会收到:
1) "message"
2) "channel1"
3) "This is a message for channel1"redis
PUBLISH channel3 "This message won't be received by A"客户端 A 不会收到这条消息,因为 A 没有订阅
channel3
。
3.3 UNSUBSCRIBE [channel [channel ...]]
- 功能:取消订阅一个或多个指定的频道。如果未指定频道,则取消订阅所有当前客户端已订阅的频道。
- 参数:可选,一个或多个要取消订阅的频道名称。
- 返回值:对于每个取消订阅的频道,Redis 返回一个确认消息数组,格式为
["unsubscribe", channel, count]
,其中channel
是取消订阅的频道名称(如果取消所有频道,可能为nil
),count
是当前客户端取消订阅后剩余的频道数量。当count
变为 0 时,客户端会退出订阅模式,可以再次执行常规 Redis 命令。 -
示例(在客户端 A 中):
redis
UNSUBSCRIBE channel1客户端 A 会收到:
1) "unsubscribe"
2) "channel1"
3) (integer) 1客户端 A 仍然订阅着
channel2
。redis
UNSUBSCRIBE客户端 A 会取消订阅所有频道,收到:
1) "unsubscribe"
2) "channel2"
3) (integer) 0此时客户端 A 退出订阅模式。
4. Redis Pub/Sub 模式匹配订阅
除了精确订阅特定频道,Redis 还支持按照模式订阅频道。这使得客户端可以方便地订阅符合特定命名规则的一组频道。
4.1 PSUBSCRIBE pattern [pattern ...]
- 功能:订阅一个或多个符合指定模式的频道。
- 参数:一个或多个频道模式。模式支持以下通配符:
*
: 匹配任意长度(包括 0 长度)的任意字符序列。?
: 匹配一个任意字符。
- 返回值:客户端执行
PSUBSCRIBE
后也会进入订阅模式。对于每个成功订阅的模式,Redis 返回["psubscribe", pattern, count]
数组,其中pattern
是订阅成功的模式,count
是当前客户端总共订阅的模式数量。之后,当有消息发布到 任何 匹配 任一 已订阅模式的频道时,客户端会收到格式为["pmessage", pattern, channel, data]
的消息数组,其中pattern
是匹配到的模式,channel
是收到消息的实际频道名称,data
是消息内容。 -
示例(在 Redis CLI 中打开一个客户端 C):
redis
PSUBSCRIBE news.* sports.*客户端 C 会收到:
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news.*"
3) (integer) 1
1) "psubscribe"
2) "sports.*"
3) (integer) 2示例(在客户端 B 中发布消息):
redis
PUBLISH news.it "Latest IT news"
PUBLISH news.sports "Sports update"
PUBLISH finance.market "Market news"客户端 C 会收到:
“`
1) “pmessage”
2) “news.”
3) “news.it”
4) “Latest IT news”
1) “pmessage”
2) “sports.”
3) “news.sports” # 注意:尽管频道名称是 news.sports,但它匹配了 sports.* 模式
4) “Sports update”finance.market 的消息 C 不会收到,因为它不匹配 news. 或 sports.
“`
重要说明:一个频道如果匹配了多个已订阅的模式,客户端会为每个匹配的模式都收到一条
pmessage
消息。例如,如果客户端订阅了news.*
和news.it
两个模式,当消息发布到news.it
频道时,客户端会收到两条pmessage
消息,一条对应news.*
模式,一条对应news.it
模式(因为news.it
也算是一种模式)。
4.2 PUNSUBSCRIBE [pattern [pattern ...]]
- 功能:取消订阅一个或多个指定的频道模式。如果未指定模式,则取消订阅所有当前客户端已订阅的模式。
- 参数:可选,一个或多个要取消订阅的频道模式。
- 返回值:对于每个取消订阅的模式,Redis 返回
["punsubscribe", pattern, count]
数组,其中pattern
是取消订阅的模式(如果取消所有模式,可能为nil
),count
是当前客户端取消订阅后剩余的模式数量。 -
示例(在客户端 C 中):
redis
PUNSUBSCRIBE news.*客户端 C 会收到:
1) "punsubscribe"
2) "news.*"
3) (integer) 1客户端 C 仍然订阅着
sports.*
模式。redis
PUNSUBSCRIBE客户端 C 会取消订阅所有模式。
5. SUBSCRIBE 与 PSUBSCRIBE 的区别与结合
SUBSCRIBE
是精确匹配,只能收到发布到指定频道的消息。PSUBSCRIBE
是模式匹配,可以收到发布到任何匹配模式的频道的消息。
一个客户端可以同时使用 SUBSCRIBE
和 PSUBSCRIBE
订阅频道。在这种情况下:
- 通过
SUBSCRIBE
订阅的频道收到消息时,消息类型是message
。 - 通过
PSUBSCRIBE
订阅的频道收到消息时,消息类型是pmessage
。
如果一个频道同时被 SUBSCRIBE
精确订阅,并且也匹配了某个 PSUBSCRIBE
模式,那么当消息发布到该频道时,客户端会收到 两条 消息:一条是 message
类型(来自精确订阅),一条是 pmessage
类型(来自模式订阅)。
这通常不是期望的行为,所以在使用时需要注意避免这种重复接收。通常情况下,如果一个频道需要通过模式订阅来覆盖,就不需要再对其进行精确订阅了。
6. Redis Pub/Sub 的优缺点
优点:
- 简单易用:命令非常直观,容易理解和上手。
- 高性能:基于内存操作,消息的转发速度非常快,延迟极低。
- 实时性:适合需要即时通知和广播的场景。
- 解耦:发布者和订阅者完全解耦,降低了系统复杂性。
缺点:
- 无持久化:消息即发即失,对于需要保证消息不丢失的场景不适用。如果订阅者断线,会错过期间发布的所有消息。
- 无消息确认机制:发布者无法得知订阅者是否成功处理了消息。
- 订阅者必须在线:只有在线且处于订阅模式的客户端才能接收消息。
- 客户端连接限制:进入订阅模式的客户端连接是阻塞的,不能用于执行其他常规 Redis 命令。一个繁忙的 Pub/Sub 系统可能会消耗大量客户端连接。
7. 典型应用场景
鉴于其特点,Redis Pub/Sub 非常适合以下场景:
- 实时通知和广播:如用户上线/下线通知、系统事件广播、实时股票价格更新、聊天室消息分发等。
- 缓存失效通知:当某个数据在数据库中更新时,可以通过 Pub/Sub 通知缓存服务清理或更新相关缓存。
- 简易的事件总线:在微服务架构中,可以作为轻量级的事件发布平台,用于服务间的异步通知,例如用户注册成功后通知积分服务和邮件服务。
- 日志分发:中心化日志收集系统可以将不同服务的日志通过 Pub/Sub 分发给实时监控或分析服务。
8. 动手实践:使用 Python 客户端
理论学习了这么多,现在我们通过 Python 客户端来实际操作一下 Redis Pub/Sub。
前提条件:
- 已安装 Redis 服务器并在运行中。
- 已安装 Python。
- 安装
redis
Python 库:pip install redis
我们将创建两个 Python 脚本:一个作为订阅者,一个作为发布者。
8.1 订阅者脚本 (subscriber.py
)
“`python
import redis
import time
连接 Redis
host=’localhost’, port=6379 是默认值,如果你的Redis配置不同,请修改
r = redis.Redis(decode_responses=True) # decode_responses=True 会将收到的消息自动解码为字符串
创建一个 Pub/Sub 对象
ignore_subscribe_messages=True 可以忽略订阅成功时的确认消息
pubsub = r.pubsub(ignore_subscribe_messages=False)
def my_handler(message):
“””
处理收到的消息的回调函数
message 是一个字典,包含 ‘type’, ‘channel’, ‘data’ 等键
对于 Pub/Sub 消息:
– ‘type’: ‘subscribe’, ‘unsubscribe’, ‘message’, ‘psubscribe’, ‘punsubscribe’, ‘pmessage’
– ‘channel’: 频道名称 (对于message, unsubscribe, subscribe类型)
– ‘pattern’: 匹配到的模式 (对于 pmessage, punsubscribe, psubscribe 类型)
– ‘data’: 消息内容 (对于 message, pmessage 类型)
– ‘data’: 订阅/取消订阅成功时的计数 (对于 subscribe, unsubscribe, psubscribe, punsubscribe 类型)
“””
msg_type = message[‘type’]
channel = message.get(‘channel’) # 使用 get() 防止 KeyError
pattern = message.get(‘pattern’)
data = message.get(‘data’)
if msg_type == 'message':
print(f"收到精确订阅消息 -> 频道: {channel}, 内容: {data}")
elif msg_type == 'pmessage':
print(f"收到模式订阅消息 -> 模式: {pattern}, 频道: {channel}, 内容: {data}")
elif msg_type == 'subscribe':
print(f"成功订阅频道: {channel}, 当前订阅数: {data}")
elif msg_type == 'unsubscribe':
print(f"取消订阅频道: {channel}, 剩余订阅数: {data}")
elif msg_type == 'psubscribe':
print(f"成功订阅模式: {pattern}, 当前订阅模式数: {data}")
elif msg_type == 'punsubscribe':
print(f"取消订阅模式: {pattern}, 剩余订阅模式数: {data}")
else:
print(f"收到未知类型消息: {message}")
print(“订阅者启动…”)
订阅频道 ‘mychannel’ 和模式 ‘news.*’
注意:这里将回调函数 my_handler 作为参数传递给 subscribe/psubscribe
这表示当收到这些频道/模式的消息时,会调用 my_handler 处理
如果不传递 handler,则需要手动在循环中处理 pubsub.listen() 或 pubsub.get_message() 的结果
pubsub.subscribe({‘mychannel’: my_handler, ‘channel2’: my_handler})
pubsub.psubscribe({‘news.‘: my_handler, ‘logs..error’: my_handler})
或者,不使用回调函数,而是在循环中处理
pubsub.subscribe(‘mychannel’, ‘channel2’)
pubsub.psubscribe(‘news.‘, ‘logs..error’)
try:
# 进入监听模式,这将阻塞当前线程,直到取消订阅所有频道/模式
# 也可以使用 pubsub.get_message() 在非阻塞模式下获取消息
# ignore_subscribe_messages=True 时,订阅确认消息不会触发handler或出现在get_message结果中
print(“开始监听消息…”)
# pubsub.run_in_thread(daemon=True) # 可以在线程中运行监听,不阻塞主线程
# time.sleep(1000) # 如果使用 run_in_thread, 需要保持主线程存活
# 使用 listen() 是最常见的方式,它会阻塞并持续监听
for message in pubsub.listen():
# 如果使用了回调函数,listen() 内部会调用handler
# 如果没有使用回调函数,message 就是一个字典,可以在这里直接处理
# print(f"Received raw message: {message}") # 如果没有使用handler,可以在这里处理
# 可以在这里添加逻辑来判断是否需要退出循环
# 例如,收到特定消息时退出
if message['type'] == 'message' and message['data'] == 'QUIT':
print("收到退出命令,正在退出...")
break
except redis.exceptions.ConnectionError as e:
print(f”Redis连接错误: {e}”)
except KeyboardInterrupt:
print(“用户中断,正在退出…”)
finally:
# 在退出前取消所有订阅
print(“取消所有订阅…”)
pubsub.unsubscribe()
pubsub.punsubscribe()
print(“订阅者退出。”)
“`
8.2 发布者脚本 (publisher.py
)
“`python
import redis
import time
连接 Redis
r = redis.Redis(decode_responses=True)
print(“发布者启动…”)
try:
while True:
# 从控制台获取输入
input_str = input(“请输入要发布的频道和消息 (格式: channel message) 或输入 ‘QUIT’ 退出: “)
if input_str.upper() == ‘QUIT’:
# 发送一个退出消息通知订阅者 (可选,如果订阅者监听了特定退出消息)
try:
r.publish(‘mychannel’, ‘QUIT’) # 发送退出消息到mychannel
except Exception as e:
print(f”发送退出消息失败: {e}”)
break
parts = input_str.split(maxsplit=1) # 最多分割一次,将频道和消息分开
if len(parts) == 2:
channel = parts[0].strip()
message = parts[1].strip()
if not channel or not message:
print("频道和消息都不能为空。")
continue
try:
# 发布消息
subscriber_count = r.publish(channel, message)
print(f"已发布消息到频道 '{channel}',内容 '{message}'。 {subscriber_count} 个订阅者收到。")
except redis.exceptions.ConnectionError as e:
print(f"Redis连接错误: {e}")
time.sleep(1) # 等待一小会儿重试或退出
except Exception as e:
print(f"发布消息时发生错误: {e}")
else:
print("输入格式不正确,请使用 'channel message' 格式。")
except KeyboardInterrupt:
print(“用户中断,正在退出…”)
except Exception as e:
print(f”发生未知错误: {e}”)
finally:
print(“发布者退出。”)
“`
8.3 如何运行
- 确保 Redis 服务器正在运行。
- 打开一个终端窗口,运行订阅者脚本:
bash
python subscriber.py
你应该会看到订阅者连接 Redis 并订阅频道/模式的输出。 - 打开另一个终端窗口,运行发布者脚本:
bash
python publisher.py - 在发布者终端输入消息并发布:
- 输入
mychannel hello from publisher
并回车。在订阅者终端会看到收到mychannel
的精确订阅消息。 - 输入
channel2 another message
并回车。在订阅者终端会看到收到channel2
的精确订阅消息。 - 输入
news.tech Latest tech news
并回车。在订阅者终端会看到收到news.tech
的模式订阅消息(匹配news.*
)。 - 输入
news.world World news update
并回车。在订阅者终端会看到收到news.world
的模式订阅消息(匹配news.*
)。 - 输入
logs.backend.error Something went wrong
并回车。在订阅者终端会看到收到logs.backend.error
的模式订阅消息(匹配logs.*.error
)。 - 输入
logs.frontend.info User logged in
并回车。在订阅者终端不会收到消息,因为它不匹配任何已订阅的模式。
- 输入
- 在发布者终端输入
QUIT
退出发布者。 - 在订阅者终端输入
Ctrl+C
退出订阅者(或者等待发布者发送了 “QUIT” 消息到 “mychannel”)。
通过这个简单的例子,你就可以看到消息是如何从发布者经过 Redis 服务器,然后分发给所有相关的订阅者的。
9. 总结与展望
Redis 的 Pub/Sub 功能提供了一种简单、高性能的异步消息广播机制。它特别适合那些对消息实时性要求高,但对消息持久性要求不高的场景。其核心优势在于简洁和速度。
然而,对于需要保证消息不丢失、支持离线消息、需要复杂路由或工作队列等场景,Redis Pub/Sub 可能不是最佳选择。在这种情况下,你可能需要考虑更专业的独立消息队列系统(如 RabbitMQ, Kafka, ActiveMQ 等),或者 Redis 本身提供的其他高级功能,例如:
- Redis Streams:Redis 5.0 引入的全新数据结构,提供了可持久化的消息队列、消费者组等功能,更适合构建可靠的消息处理系统。
- 基于 List 的简单队列:利用
LPUSH
和BRPOP
命令可以实现一个简单的阻塞式工作队列,支持消息持久化(因为消息存储在 List 中)。
选择哪种消息机制取决于你的具体需求。对于需要快速、简单的实时通知和广播,Redis Pub/Sub 无疑是一个非常高效且易于集成的方案。
掌握了 Redis 的发布订阅功能,你就解锁了在应用中实现解耦和实时通信的一种强大方式。多加实践,结合你的具体业务场景,你会发现它的更多价值!
希望这篇教程对你有所帮助,让你轻松掌握 Redis Pub/Sub 的使用!