掌握 Redis 发布订阅:从概念到实战
在构建现代分布式系统时,不同服务或组件之间的通信是一个核心问题。传统的请求-响应模式适用于许多场景,但在需要实时更新、事件驱动或解耦生产者与消费者时,这种模式往往力不从心。发布/订阅(Publish/Subscribe,简称 Pub/Sub)模式应运而生,提供了一种灵活高效的消息传递机制。
Redis,作为一款高性能的内存键值数据库,不仅提供了丰富的数据结构和强大的缓存能力,还内置了对 Pub/Sub 模式的支持。Redis 的 Pub/Sub 功能以其简洁、快速的特性,在轻量级消息传递场景中表现出色。
本文将深入探讨 Redis 的发布订阅机制,从基本概念出发,详细讲解其工作原理、核心命令、编程实践,并通过实际案例展示其应用,帮助您全面掌握 Redis Pub/Sub。
第一部分:概念篇 – 理解发布订阅模式
在深入 Redis 实现之前,我们先理解通用的 Pub/Sub 模式。
什么是发布订阅模式?
发布订阅模式是一种消息范式,其中发送者(称为发布者,Publisher)不会将消息直接发送给特定的接收者。相反,发布者将消息分到不同的类别或主题(称为频道,Channel)。订阅者(Subscriber)可以向自己感兴趣的频道订阅消息。任何发送到频道的消息都会被转发给所有订阅该频道的订阅者。
这种模式的核心优点在于解耦:
- 时间解耦: 发布者和订阅者无需同时在线。发布者发布消息时,订阅者可能不在线;订阅者上线后,如果消息系统支持持久化,可以接收到离线期间的消息(Redis Pub/Sub 不支持这一点,消息是非持久的)。
- 空间解耦: 发布者和订阅者无需知道彼此的存在。发布者只知道将消息发送到哪个频道,而订阅者只知道订阅哪个频道。它们不需要知道对方的网络地址、数量等信息。
- 关系解耦: 发布者和订阅者之间的关系是通过频道而非直接连接建立的,这使得系统结构更加灵活,易于扩展和修改。
发布订阅模式中的角色
- 发布者 (Publisher): 消息的生产者,将消息发送到特定频道。
- 订阅者 (Subscriber): 消息的消费者,监听特定频道,接收发送到该频道的消息。
- 频道 (Channel): 消息的载体或分类,发布者将消息发送到频道,订阅者从频道接收消息。
Redis Pub/Sub 的特点
Redis 的 Pub/Sub 实现是基于内存的,因此具有极高的效率和低延迟。但它也有一些关键特点需要注意:
- 非持久化: Redis 不会存储 Pub/Sub 消息。如果一个订阅者在消息发布时没有连接到 Redis 或者没有订阅该频道,它将永远接收不到这条消息。消息是“即时”发送的。
- “火后即忘” (Fire-and-Forget): 发布者发送消息后,不会确认是否有订阅者接收到,也不会关心消息是否被成功处理。
- 广播性质: 发送到一个频道的消息会被推送给所有当前订阅该频道的客户端。
这些特点使得 Redis Pub/Sub 非常适合需要低延迟、高吞吐、不要求消息可靠性或持久化的场景,例如实时通知、广播消息、简单的事件总线等。
第二部分:基础篇 – Redis Pub/Sub 命令详解
Redis Pub/Sub 功能的核心是通过几个简单的命令来实现的。
核心命令
-
PUBLISH channel message
- 功能: 将
message
发送(发布)到指定的channel
。 - 参数:
channel
: 频道名称,字符串类型。message
: 要发布的消息内容,字符串类型。
- 返回值: 接收到这条消息的订阅者数量。如果没有任何订阅者(或者只有模式订阅者,下面会讲到),返回 0。
- 示例:
redis
PUBLISH news "Hello, world!"
如果有一个客户端订阅了news
频道,这个命令将返回1
。
- 功能: 将
-
SUBSCRIBE channel [channel ...]
- 功能: 订阅一个或多个频道。
- 参数:
channel
: 一个或多个频道名称。
- 返回值: 这是一个阻塞命令。一旦客户端执行
SUBSCRIBE
命令,它就会进入订阅模式,不再接受除了SUBSCRIBE
,PSUBSCRIBE
,UNSUBSCRIBE
,PUNSUBSCRIBE
,READONLY
,CLIENT
之外的常规命令。客户端会持续接收来自已订阅频道的消息,直到执行UNSUBSCRIBE
命令退出订阅模式。 - 订阅成功响应: 当成功订阅一个频道时,客户端会收到一个多批量回复 (Multi-Bulk Reply),通常包含三个元素:
"subscribed"
: 表示订阅成功。channel-name
: 成功订阅的频道名称。number-of-channels
: 当前连接总共订阅的频道数量(包括通过SUBSCRIBE
和PSUBSCRIBE
订阅的)。
- 收到消息响应: 当收到一个频道的消息时,客户端会收到一个多批量回复,通常包含三个元素:
"message"
: 表示这是一条普通消息。channel-name
: 消息来自的频道名称。message-content
: 消息内容。
- 示例:
redis
SUBSCRIBE chat news
客户端订阅chat
和news
两个频道。成功后会收到类似这样的回复:
1) "subscribed"
2) "chat"
3) (integer) 1
1) "subscribed"
2) "news"
3) (integer) 2
当PUBLISH news "New article published!"
被执行时,该客户端会收到:
1) "message"
2) "news"
3) "New article published!"
-
UNSUBSCRIBE [channel [channel ...]]
- 功能: 退订指定的频道。如果在没有参数的情况下调用,则退订当前客户端订阅的所有频道。
- 参数:
channel
: 要退订的一个或多个频道名称(可选)。
- 返回值:
"unsubscribed"
: 表示退订成功。channel-name
: 成功退订的频道名称。number-of-channels
: 当前连接总共剩余订阅的频道数量。当这个数字降到 0 时,客户端会退出订阅模式,可以开始执行其他常规 Redis 命令。
- 示例:
redis
UNSUBSCRIBE chat
退订chat
频道。
redis
UNSUBSCRIBE
退订所有频道。
模式匹配订阅 (Pattern Matching)
Redis Pub/Sub 还支持按模式订阅频道,这使得订阅者可以接收来自符合特定模式的所有频道的消息,而无需单独订阅每个频道。
-
PSUBSCRIBE pattern [pattern ...]
- 功能: 订阅一个或多个按模式匹配的频道。
- 参数:
pattern
: 一个或多个模式字符串。模式支持 glob 风格的通配符:*
: 匹配任意数量(包括零个)的字符。?
: 匹配一个任意字符。[abc]
: 匹配方括号内的任意一个字符。[^abc]
: 匹配不在方括号内的任意一个字符。
- 返回值: 同样是阻塞命令。客户端进入订阅模式。
- 模式订阅成功响应:
"psubscribed"
: 表示模式订阅成功。pattern-name
: 成功订阅的模式字符串。number-of-channels
: 当前连接总共订阅的频道和模式数量。
- 收到模式匹配消息响应:
"pmessage"
: 表示这是一条模式匹配到的消息。pattern-that-matched
: 匹配到此消息的模式字符串。channel-name
: 消息实际来自的频道名称。message-content
: 消息内容。
- 示例:
redis
PSUBSCRIBE news.* user.123.*
客户端订阅所有以news.
开头的频道,以及所有以user.123.
开头的频道。
如果此时执行PUBLISH news.sports "World Cup updates"
,客户端会收到:
1) "pmessage"
2) "news.*" # 匹配到的模式
3) "news.sports" # 实际频道
4) "World Cup updates"
如果执行PUBLISH news.weather "Sunny today"
,客户端也会收到类似的消息,模式仍是"news.*"
。
如果执行PUBLISH user.123.profile "Updated profile"
,客户端会收到匹配到"user.123.*"
模式的消息。
-
PUNSUBSCRIBE [pattern [pattern ...]]
- 功能: 退订指定的模式。如果在没有参数的情况下调用,则退订当前客户端订阅的所有模式。
- 参数:
pattern
: 要退订的一个或多个模式字符串(可选)。
- 返回值:
"punsubscribed"
: 表示模式退订成功。pattern-name
: 成功退订的模式字符串。number-of-channels
: 当前连接总共剩余订阅的频道和模式数量。
- 示例:
redis
PUNSUBSCRIBE news.*
退订news.*
模式。
redis
PUNSUBSCRIBE
退订所有模式。
PUBSUB 监控命令
Redis 还提供了一些 PUBSUB
子命令来监控 Pub/Sub 系统:
PUBSUB CHANNELS [pattern]
- 功能: 列出当前活跃的频道(至少有一个订阅者)的名称。可以根据可选的
pattern
进行过滤。
- 功能: 列出当前活跃的频道(至少有一个订阅者)的名称。可以根据可选的
PUBSUB NUMSUB [channel ...]
- 功能: 返回指定频道名称的当前订阅者数量(不包括模式订阅者)。
PUBSUB NUMPAT
- 功能: 返回当前模式订阅(
PSUBSCRIBE
)的数量。注意,这返回的是客户端连接订阅的模式数量,而不是订阅这些模式的总客户端数量。一个客户端可能订阅多个模式,算作多个模式订阅。
- 功能: 返回当前模式订阅(
这些监控命令可以在另一个未处于 Pub/Sub 模式的连接上执行。
重要注意点:
- 一旦一个客户端连接进入
SUBSCRIBE
或PSUBSCRIBE
模式,它就不能执行任何其他常规 Redis 命令(除了少数例外,如上面提到的UNSUBSCRIBE
,PUNSUBSCRIBE
等)。如果要执行其他命令,需要使用另一个独立的 Redis 连接。 PUBLISH
命令会同时将消息发送给直接订阅该频道的客户端和通过模式订阅匹配到该频道的客户端。同一个消息,如果一个客户端既直接订阅了频道,又通过模式订阅匹配到了同一个频道,它会收到两次消息(一次是message
类型,一次是pmessage
类型)。
第三部分:实战篇 – 编程实践与案例
在实际应用中,我们通常会使用 Redis 客户端库来与 Redis 进行交互。几乎所有的主流编程语言都有成熟的 Redis 客户端库,它们通常会封装 Pub/Sub 的复杂性,例如处理连接的阻塞状态,提供回调函数或事件监听器来处理接收到的消息。
以 Python 语言为例,使用 redis-py
库:
示例:简单的发布者和订阅者
首先,确保安装 redis-py
:
bash
pip install redis
发布者 (publisher.py):
“`python
import redis
import time
连接 Redis 服务器
decode_responses=True 会自动将 bytes 解码为 string
r = redis.Redis(host=’localhost’, port=6379, db=0, decode_responses=True)
print(“Publisher started. Publishing messages to ‘mychannel’…”)
try:
while True:
message = f”Hello from publisher! Current time: {time.strftime(‘%H:%M:%S’)}”
# PUBLISH 命令返回接收到消息的订阅者数量
subscribers_count = r.publish(‘mychannel’, message)
print(f”Published: ‘{message}’ to ‘mychannel’. Received by {subscribers_count} subscribers.”)
time.sleep(1) # 每秒发布一条消息
except KeyboardInterrupt:
print(“Publisher stopped.”)
“`
订阅者 (subscriber.py):
“`python
import redis
import time
连接 Redis 服务器
r = redis.Redis(host=’localhost’, port=6379, db=0, decode_responses=True)
创建一个 Pub/Sub 对象
ignore_subscribe_messages=True 可以过滤掉 SUBSCRIBE/PSUBSCRIBE 成功时的通知消息
如果设置为 False,listen() 会先收到订阅成功通知,再收到消息
p = r.pubsub(ignore_subscribe_messages=True)
订阅频道
p.subscribe(‘mychannel’)
print(“Subscriber started. Listening on ‘mychannel’…”)
进入监听循环
listen() 是一个生成器,会阻塞直到接收到消息或事件
for message in p.listen():
print(f”Received message: {message}”)
# message 字典结构通常是:
# {‘type’: ‘message’, ‘pattern’: None, ‘channel’: ‘mychannel’, ‘data’: ‘…’}
# 或 {‘type’: ‘pmessage’, ‘pattern’: ‘my.*’, ‘channel’: ‘mychannel’, ‘data’: ‘…’}
# 可以根据 type 处理不同类型的消息
if message['type'] == 'message':
print(f" Channel: {message['channel']}, Data: {message['data']}")
elif message['type'] == 'pmessage':
print(f" Pattern: {message['pattern']}, Channel: {message['channel']}, Data: {message['data']}")
# 其他类型如 'subscribe', 'unsubscribe', 'psubscribe', 'punsubscribe' 等
# 可以在这里添加逻辑,例如处理特定消息后退出
# if message['data'] == 'exit':
# print("Exit command received. Stopping subscriber.")
# p.unsubscribe() # 取消订阅
# break # 退出循环
print(“Subscriber stopped.”)
“`
运行:
- 先运行一个或多个
subscriber.py
实例。 - 再运行
publisher.py
实例。 - 您将看到发布者每秒发送一条消息,所有运行中的订阅者几乎同时接收到这些消息。
- 尝试在发布者运行时启动新的订阅者,新的订阅者会立即接收到后续的消息。
- 尝试在发布者运行时关闭一个订阅者,发布者会继续发布,其他订阅者不受影响,关闭的订阅者不会接收到离线期间的消息。
示例:模式匹配订阅 (PSUBSCRIBE)
修改 subscriber.py
使用模式订阅:
“`python
import redis
import time
r = redis.Redis(host=’localhost’, port=6379, db=0, decode_responses=True)
p = r.pubsub(ignore_subscribe_messages=True)
订阅模式
p.psubscribe(‘news.‘, ‘user.‘) # 订阅所有以 news. 开头的频道,以及所有以 user. 开头的频道
print(“Pattern Subscriber started. Listening on patterns ‘news.‘ and ‘user.‘…”)
for message in p.listen():
print(f”Received raw message: {message}”)
if message[‘type’] == ‘pmessage’:
print(f” Pattern: {message[‘pattern’]}, Channel: {message[‘channel’]}, Data: {message[‘data’]}”)
print(“Pattern Subscriber stopped.”)
“`
现在,运行修改后的订阅者,并使用发布者向不同频道发布消息:
“`bash
在 publisher.py 中发布消息到不同频道
这条消息会被订阅者接收到 (匹配 news.*)
PUBLISH news.sports “Football score updated: 3-1”
这条消息也会被订阅者接收到 (匹配 news.*)
PUBLISH news.weather “It’s raining cats and dogs”
这条消息会被订阅者接收到 (匹配 user.*)
PUBLISH user.alice.status “Alice is online”
这条消息不会被订阅者接收到 (不匹配任何模式)
PUBLISH alerts “System warning!”
“`
您会看到订阅者只打印出匹配 news.*
和 user.*
模式的频道消息。
实际应用场景
- 实时通知系统: 用户关注某个事件或对象,当事件发生或对象状态改变时,后台服务发布消息到特定频道(例如
user.123.feed
,order.456.status
),订阅了这些频道的客户端(网页、移动应用)实时接收通知并更新界面。 - 聊天应用: 简单的聊天室可以通过 Pub/Sub 实现。每个聊天室对应一个频道,用户进入聊天室即订阅该频道,发送消息即发布到该频道。但对于大型、复杂的聊天系统,需要考虑消息历史、离线消息、用户状态等,可能需要更健壮的消息队列或 Redis Streams。
- 广播系统: 向所有连接的客户端广播系统公告、活动信息等。
- 缓存失效通知: 分布式应用中,当某个缓存数据更新时,可以通过 Pub/Sub 发布一个失效通知到特定频道(例如
cache.invalidate.user.123
),所有监听该频道的应用实例收到通知后,将各自的本地缓存失效。 - 简单的事件总线: 不同微服务之间需要进行轻量级的事件通知,例如订单服务创建新订单后发布事件,库存服务和支付服务订阅该事件并执行后续操作。
第四部分:进阶与考量
尽管 Redis Pub/Sub 功能强大且易用,但在生产环境中部署和使用时,还需要考虑一些进阶话题和潜在问题。
客户端库的重要性
如前所述,Redis 客户端库极大地简化了 Pub/Sub 的使用。它们通常负责:
- 管理与 Redis 的连接。
- 处理阻塞模式和接收消息的循环。
- 提供友好的 API 来订阅、退订和发布。
- 在后台线程或协程中处理消息接收,避免阻塞主应用线程。
选择一个成熟、社区活跃的客户端库至关重要。
性能与扩展性
Redis Pub/Sub 性能很高,但它是基于单线程模型。虽然 I/O 多路复用使得它可以处理大量并发连接,但所有的 Pub/Sub 消息处理(将消息推送到订阅者)都在主线程中完成。
- 消息吞吐量: 如果消息发布非常频繁,或者消息体很大,可能会占用 Redis 主线程大量的 CPU 时间,影响其他命令的执行。
- 订阅者数量: 当一个频道有巨量的订阅者时,Redis 需要将每条消息复制并发送给每一个订阅者,这会消耗带宽和 Redis 的 CPU 资源。极端情况下可能导致性能瓶颈。
- 横向扩展: Redis Pub/Sub 本身不像 Kafka 或 RabbitMQ 那样容易进行水平扩展以分担消息处理负载。Redis 集群虽然可以分片存储数据,但 Pub/Sub 消息是广播到集群中的每个节点的。如果一个节点收到 Pub/Sub 消息,它会将其转发给所有其他节点,其他节点再将其推送给本地订阅者。这在集群规模较大时可能导致网络带宽的巨大消耗和效率低下。
因此,Redis Pub/Sub 更适合于中小型规模、对消息丢失不敏感、需要低延迟广播的场景。对于需要高可靠性、消息持久化、巨大吞吐量、复杂路由或易于水平扩展的场景,应考虑使用更专业的分布式消息队列系统,如 Kafka、RabbitMQ、Redis Streams 等。
错误处理与可靠性
Redis Pub/Sub 的“火后即忘”特性意味着消息是不可靠的。
- 订阅者掉线: 如果订阅者在消息发布时掉线,它会错过消息。重新连接后,不会收到离线期间的消息。
- Redis 服务器宕机: 如果 Redis 服务器宕机,所有正在进行中的 Pub/Sub 活动都会中断,所有连接会断开。恢复后,宕机期间的消息会丢失。
- 消息处理失败: 如果订阅者接收到消息后处理失败(例如,业务逻辑错误、数据库连接中断),消息不会被重新投递。需要应用层自行实现重试或死信队列机制(这超出了 Redis Pub/Sub 本身的能力)。
如果您的应用需要消息可靠性,即确保消息至少被处理一次(At Least Once)或恰好被处理一次(Exactly Once),或者需要消息历史记录,那么 Redis Streams 或其他消息队列系统可能是更合适的选择。
订阅模式的开销
虽然模式订阅非常灵活,但也存在一些开销:
- Redis 在接收到消息时,需要遍历所有模式订阅,检查哪些模式与发布消息的频道匹配。模式订阅数量庞大时,这可能增加一些 CPU 开销。
- 如前所述,如果一个客户端既订阅了频道,又订阅了匹配的模式,它会收到重复的消息(不同类型)。应用代码需要妥善处理这种情况。
监控
利用 PUBSUB CHANNELS
, PUBSUB NUMSUB
, PUBSUB NUMPAT
等命令可以帮助您了解当前系统的 Pub/Sub 状态,例如有多少活跃频道,每个频道有多少订阅者,有多少模式订阅等,这对于诊断问题和容量规划非常有帮助。
第五部分:总结
Redis 的发布订阅功能提供了一种简单、高效、基于内存的消息广播机制。它非常适合需要低延迟、高吞吐、不需要消息持久性和可靠性的场景,例如实时通知、缓存失效、简单的事件分发等。
通过 PUBLISH
、SUBSCRIBE
、PSUBSCRIBE
等核心命令,我们可以轻松地构建发布者和订阅者。模式匹配订阅 (PSUBSCRIBE
) 增加了灵活性。借助成熟的客户端库,在各种编程语言中实现 Redis Pub/Sub 应用也非常方便。
然而,在使用 Redis Pub/Sub 时,必须清楚地认识到它的局限性:消息非持久化、无可靠投递保证、订阅者连接的阻塞特性以及在面对海量订阅者或消息时的单点扩展限制。对于需要消息持久化、可靠性保证或更高级消息路由的场景,应该优先考虑 Redis Streams 或其他专业的分布式消息队列系统。
掌握 Redis Pub/Sub,意味着理解其设计哲学、核心命令及其背后的权衡。在合适的场景中,它将是构建响应式、解耦系统的强大工具。通过本文的介绍,希望您能从概念到实战,熟练运用 Redis Pub/Sub,为您的应用架构添砖加瓦。