Redis Pub/Sub 使用教程:轻松掌握发布订阅功能 – wiki基地


Redis Pub/Sub 使用教程:轻松掌握发布订阅功能

在现代分布式系统中,服务间的通信是核心组成部分。除了传统的请求-响应模式(如 RESTful API)之外,异步消息通信模式在解耦、实时通知和构建事件驱动架构方面发挥着至关重要的作用。发布/订阅(Publish/Subscribe,简称 Pub/Sub)就是一种非常流行的异步消息模式。

Redis,作为一个高性能的内存数据结构存储系统,不仅提供了丰富的数据结构,还内置了对 Pub/Sub 功能的支持。Redis 的 Pub/Sub 功能以其简单、快速的特点,成为许多应用场景下的理想选择。

本文将带你深入了解 Redis 的发布订阅功能,从基本概念到实际应用,让你轻松掌握这一强大的特性。

1. 什么是发布/订阅模式 (Pub/Sub)?

在开始学习 Redis Pub/Sub 之前,我们先简单回顾一下发布/订阅模式的基本概念。

发布/订阅模式是一种消息传递模式,它与传统的客户端-服务器模式或请求-响应模式有着本质区别。在 Pub/Sub 模式中:

  • 发布者 (Publisher):消息的发送者,它们将消息发布到特定的“频道”(或称“主题”)。发布者不关心谁会接收到消息,也不直接将消息发送给特定的接收者。
  • 订阅者 (Subscriber):消息的接收者,它们对一个或多个特定的“频道”感兴趣。当有消息发布到它们订阅的频道时,订阅者就会收到消息。订阅者不关心是谁发布的这条消息。
  • 频道 (Channel):消息的中转站或者说分类标签。发布者将消息发送到频道,订阅者从频道接收消息。它解耦了发布者和订阅者,使它们无需知道彼此的存在。

Pub/Sub 模式的核心优势在于:

  • 解耦:发布者和订阅者之间无需直接连接或了解对方。它们只通过频道进行交互,大大降低了系统组件间的依赖。
  • 广播能力:一条消息可以被多个订阅者同时接收,非常适合实现一对多的通知和广播场景。
  • 异步性:发布者发送消息后不会等待订阅者的响应,提高了系统的吞吐量和响应速度。

2. Redis Pub/Sub 的工作原理

Redis 的 Pub/Sub 实现遵循了标准的发布/订阅模式。Redis 服务器充当消息代理的角色,负责接收发布者发布的消息,并将这些消息转发给所有当前订阅了该频道的客户端。

其工作流程大致如下:

  1. 一个或多个客户端通过执行 SUBSCRIBEPSUBSCRIBE 命令,向 Redis 服务器表达对特定频道或频道模式的兴趣,成为订阅者。这些客户端会进入一个特殊的“订阅模式”。
  2. 另一个客户端(发布者)通过执行 PUBLISH 命令,将消息发送到某个频道。
  3. Redis 服务器接收到 PUBLISH 命令后,不会将消息存储在任何数据结构中,而是立即查找当前所有处于订阅模式的客户端。
  4. 对于每一个订阅了该频道(或者订阅了能匹配该频道模式)的客户端,Redis 服务器会将消息发送给它。

重要特性和注意事项:

  • 无持久化:Redis Pub/Sub 的消息是“即发即失”的。如果一个客户端在消息发布时没有连接到 Redis 并订阅该频道,那么这条消息对它来说就永久丢失了。Redis 不会为离线订阅者存储消息。
  • 实时性:消息一旦发布,Redis 会立即将其发送给所有在线的订阅者,提供了非常高的实时性。
  • 客户端状态:一旦客户端进入订阅模式(通过 SUBSCRIBEPSUBSCRIBE),它就只能接收 Pub/Sub 相关的消息(订阅成功确认、收到消息、取消订阅确认)。除了 SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING, QUIT 等少数命令外,它不能执行其他常规的 Redis 命令(如 GET, SET, HGETALL 等)。如果需要执行其他命令,需要另外开启一个 Redis 连接。
  • 发送方不关心接收方PUBLISH 命令会返回接收到消息的订阅者数量,但这只是一个实时计数,发布者并不知道具体是哪些订阅者收到了消息。

3. Redis Pub/Sub 基本命令

Redis 提供了几个简单的命令来实现发布和订阅功能。

3.1 PUBLISH channel message

  • 功能:将一条消息发布到指定的频道。
  • 参数
    • channel: 消息要发送到的频道名称(字符串)。
    • message: 要发送的消息内容(字符串)。
  • 返回值:接收到这条消息的订阅者数量。这个数量包括通过 SUBSCRIBEPSUBSCRIBE 订阅的客户端。如果没有任何订阅者在线,返回 0。
  • 示例

    redis
    PUBLISH news.it "Hello IT subscribers!"

    如果当前有 3 个客户端订阅了 news.it 频道,这个命令将返回 (integer) 3

3.2 SUBSCRIBE channel [channel ...]

  • 功能:订阅一个或多个指定的频道。
  • 参数:一个或多个频道名称。
  • 返回值:客户端执行 SUBSCRIBE 命令后,会进入订阅模式。对于每个成功订阅的频道,Redis 会返回一个确认消息数组,格式为 ["subscribe", channel, count],其中 channel 是订阅成功的频道名称,count 是当前客户端总共订阅的频道数量。之后,当有消息发布到这些频道时,客户端会收到格式为 ["message", channel, data] 的消息数组,其中 channel 是收到消息的频道,data 是消息内容。
  • 示例(在 Redis CLI 中打开一个客户端 A)

    redis
    SUBSCRIBE channel1 channel2

    客户端 A 会收到如下响应(假设这是它首次订阅):

    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "channel1"
    3) (integer) 1
    1) "subscribe"
    2) "channel2"
    3) (integer) 2

    示例(在 Redis CLI 中打开另一个客户端 B,发布消息)

    redis
    PUBLISH channel1 "This is a message for channel1"

    客户端 A 会收到:

    1) "message"
    2) "channel1"
    3) "This is a message for channel1"

    redis
    PUBLISH channel3 "This message won't be received by A"

    客户端 A 不会收到这条消息,因为 A 没有订阅 channel3

3.3 UNSUBSCRIBE [channel [channel ...]]

  • 功能:取消订阅一个或多个指定的频道。如果未指定频道,则取消订阅所有当前客户端已订阅的频道。
  • 参数:可选,一个或多个要取消订阅的频道名称。
  • 返回值:对于每个取消订阅的频道,Redis 返回一个确认消息数组,格式为 ["unsubscribe", channel, count],其中 channel 是取消订阅的频道名称(如果取消所有频道,可能为 nil),count 是当前客户端取消订阅后剩余的频道数量。当 count 变为 0 时,客户端会退出订阅模式,可以再次执行常规 Redis 命令。
  • 示例(在客户端 A 中)

    redis
    UNSUBSCRIBE channel1

    客户端 A 会收到:

    1) "unsubscribe"
    2) "channel1"
    3) (integer) 1

    客户端 A 仍然订阅着 channel2

    redis
    UNSUBSCRIBE

    客户端 A 会取消订阅所有频道,收到:

    1) "unsubscribe"
    2) "channel2"
    3) (integer) 0

    此时客户端 A 退出订阅模式。

4. Redis Pub/Sub 模式匹配订阅

除了精确订阅特定频道,Redis 还支持按照模式订阅频道。这使得客户端可以方便地订阅符合特定命名规则的一组频道。

4.1 PSUBSCRIBE pattern [pattern ...]

  • 功能:订阅一个或多个符合指定模式的频道。
  • 参数:一个或多个频道模式。模式支持以下通配符:
    • *: 匹配任意长度(包括 0 长度)的任意字符序列。
    • ?: 匹配一个任意字符。
  • 返回值:客户端执行 PSUBSCRIBE 后也会进入订阅模式。对于每个成功订阅的模式,Redis 返回 ["psubscribe", pattern, count] 数组,其中 pattern 是订阅成功的模式,count 是当前客户端总共订阅的模式数量。之后,当有消息发布到 任何 匹配 任一 已订阅模式的频道时,客户端会收到格式为 ["pmessage", pattern, channel, data] 的消息数组,其中 pattern 是匹配到的模式,channel 是收到消息的实际频道名称,data 是消息内容。
  • 示例(在 Redis CLI 中打开一个客户端 C)

    redis
    PSUBSCRIBE news.* sports.*

    客户端 C 会收到:

    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "news.*"
    3) (integer) 1
    1) "psubscribe"
    2) "sports.*"
    3) (integer) 2

    示例(在客户端 B 中发布消息)

    redis
    PUBLISH news.it "Latest IT news"
    PUBLISH news.sports "Sports update"
    PUBLISH finance.market "Market news"

    客户端 C 会收到:

    “`
    1) “pmessage”
    2) “news.
    3) “news.it”
    4) “Latest IT news”
    1) “pmessage”
    2) “sports.

    3) “news.sports” # 注意:尽管频道名称是 news.sports,但它匹配了 sports.* 模式
    4) “Sports update”

    finance.market 的消息 C 不会收到,因为它不匹配 news. 或 sports.

    “`

    重要说明:一个频道如果匹配了多个已订阅的模式,客户端会为每个匹配的模式都收到一条 pmessage 消息。例如,如果客户端订阅了 news.*news.it 两个模式,当消息发布到 news.it 频道时,客户端会收到两条 pmessage 消息,一条对应 news.* 模式,一条对应 news.it 模式(因为 news.it 也算是一种模式)。

4.2 PUNSUBSCRIBE [pattern [pattern ...]]

  • 功能:取消订阅一个或多个指定的频道模式。如果未指定模式,则取消订阅所有当前客户端已订阅的模式。
  • 参数:可选,一个或多个要取消订阅的频道模式。
  • 返回值:对于每个取消订阅的模式,Redis 返回 ["punsubscribe", pattern, count] 数组,其中 pattern 是取消订阅的模式(如果取消所有模式,可能为 nil),count 是当前客户端取消订阅后剩余的模式数量。
  • 示例(在客户端 C 中)

    redis
    PUNSUBSCRIBE news.*

    客户端 C 会收到:

    1) "punsubscribe"
    2) "news.*"
    3) (integer) 1

    客户端 C 仍然订阅着 sports.* 模式。

    redis
    PUNSUBSCRIBE

    客户端 C 会取消订阅所有模式。

5. SUBSCRIBE 与 PSUBSCRIBE 的区别与结合

  • SUBSCRIBE 是精确匹配,只能收到发布到指定频道的消息。
  • PSUBSCRIBE 是模式匹配,可以收到发布到任何匹配模式的频道的消息。

一个客户端可以同时使用 SUBSCRIBEPSUBSCRIBE 订阅频道。在这种情况下:

  • 通过 SUBSCRIBE 订阅的频道收到消息时,消息类型是 message
  • 通过 PSUBSCRIBE 订阅的频道收到消息时,消息类型是 pmessage

如果一个频道同时被 SUBSCRIBE 精确订阅,并且也匹配了某个 PSUBSCRIBE 模式,那么当消息发布到该频道时,客户端会收到 两条 消息:一条是 message 类型(来自精确订阅),一条是 pmessage 类型(来自模式订阅)。

这通常不是期望的行为,所以在使用时需要注意避免这种重复接收。通常情况下,如果一个频道需要通过模式订阅来覆盖,就不需要再对其进行精确订阅了。

6. Redis Pub/Sub 的优缺点

优点:

  1. 简单易用:命令非常直观,容易理解和上手。
  2. 高性能:基于内存操作,消息的转发速度非常快,延迟极低。
  3. 实时性:适合需要即时通知和广播的场景。
  4. 解耦:发布者和订阅者完全解耦,降低了系统复杂性。

缺点:

  1. 无持久化:消息即发即失,对于需要保证消息不丢失的场景不适用。如果订阅者断线,会错过期间发布的所有消息。
  2. 无消息确认机制:发布者无法得知订阅者是否成功处理了消息。
  3. 订阅者必须在线:只有在线且处于订阅模式的客户端才能接收消息。
  4. 客户端连接限制:进入订阅模式的客户端连接是阻塞的,不能用于执行其他常规 Redis 命令。一个繁忙的 Pub/Sub 系统可能会消耗大量客户端连接。

7. 典型应用场景

鉴于其特点,Redis Pub/Sub 非常适合以下场景:

  1. 实时通知和广播:如用户上线/下线通知、系统事件广播、实时股票价格更新、聊天室消息分发等。
  2. 缓存失效通知:当某个数据在数据库中更新时,可以通过 Pub/Sub 通知缓存服务清理或更新相关缓存。
  3. 简易的事件总线:在微服务架构中,可以作为轻量级的事件发布平台,用于服务间的异步通知,例如用户注册成功后通知积分服务和邮件服务。
  4. 日志分发:中心化日志收集系统可以将不同服务的日志通过 Pub/Sub 分发给实时监控或分析服务。

8. 动手实践:使用 Python 客户端

理论学习了这么多,现在我们通过 Python 客户端来实际操作一下 Redis Pub/Sub。

前提条件:

  1. 已安装 Redis 服务器并在运行中。
  2. 已安装 Python。
  3. 安装 redis Python 库:pip install redis

我们将创建两个 Python 脚本:一个作为订阅者,一个作为发布者。

8.1 订阅者脚本 (subscriber.py)

“`python
import redis
import time

连接 Redis

host=’localhost’, port=6379 是默认值,如果你的Redis配置不同,请修改

r = redis.Redis(decode_responses=True) # decode_responses=True 会将收到的消息自动解码为字符串

创建一个 Pub/Sub 对象

ignore_subscribe_messages=True 可以忽略订阅成功时的确认消息

pubsub = r.pubsub(ignore_subscribe_messages=False)

def my_handler(message):
“””
处理收到的消息的回调函数
message 是一个字典,包含 ‘type’, ‘channel’, ‘data’ 等键
对于 Pub/Sub 消息:
– ‘type’: ‘subscribe’, ‘unsubscribe’, ‘message’, ‘psubscribe’, ‘punsubscribe’, ‘pmessage’
– ‘channel’: 频道名称 (对于message, unsubscribe, subscribe类型)
– ‘pattern’: 匹配到的模式 (对于 pmessage, punsubscribe, psubscribe 类型)
– ‘data’: 消息内容 (对于 message, pmessage 类型)
– ‘data’: 订阅/取消订阅成功时的计数 (对于 subscribe, unsubscribe, psubscribe, punsubscribe 类型)
“””
msg_type = message[‘type’]
channel = message.get(‘channel’) # 使用 get() 防止 KeyError
pattern = message.get(‘pattern’)
data = message.get(‘data’)

if msg_type == 'message':
    print(f"收到精确订阅消息 -> 频道: {channel}, 内容: {data}")
elif msg_type == 'pmessage':
    print(f"收到模式订阅消息 -> 模式: {pattern}, 频道: {channel}, 内容: {data}")
elif msg_type == 'subscribe':
    print(f"成功订阅频道: {channel}, 当前订阅数: {data}")
elif msg_type == 'unsubscribe':
    print(f"取消订阅频道: {channel}, 剩余订阅数: {data}")
elif msg_type == 'psubscribe':
    print(f"成功订阅模式: {pattern}, 当前订阅模式数: {data}")
elif msg_type == 'punsubscribe':
    print(f"取消订阅模式: {pattern}, 剩余订阅模式数: {data}")
else:
    print(f"收到未知类型消息: {message}")

print(“订阅者启动…”)

订阅频道 ‘mychannel’ 和模式 ‘news.*’

注意:这里将回调函数 my_handler 作为参数传递给 subscribe/psubscribe

这表示当收到这些频道/模式的消息时,会调用 my_handler 处理

如果不传递 handler,则需要手动在循环中处理 pubsub.listen() 或 pubsub.get_message() 的结果

pubsub.subscribe({‘mychannel’: my_handler, ‘channel2’: my_handler})
pubsub.psubscribe(
{‘news.‘: my_handler, ‘logs..error’: my_handler})

或者,不使用回调函数,而是在循环中处理

pubsub.subscribe(‘mychannel’, ‘channel2’)

pubsub.psubscribe(‘news.‘, ‘logs..error’)

try:
# 进入监听模式,这将阻塞当前线程,直到取消订阅所有频道/模式
# 也可以使用 pubsub.get_message() 在非阻塞模式下获取消息
# ignore_subscribe_messages=True 时,订阅确认消息不会触发handler或出现在get_message结果中
print(“开始监听消息…”)
# pubsub.run_in_thread(daemon=True) # 可以在线程中运行监听,不阻塞主线程
# time.sleep(1000) # 如果使用 run_in_thread, 需要保持主线程存活

# 使用 listen() 是最常见的方式,它会阻塞并持续监听
for message in pubsub.listen():
    # 如果使用了回调函数,listen() 内部会调用handler
    # 如果没有使用回调函数,message 就是一个字典,可以在这里直接处理
    # print(f"Received raw message: {message}") # 如果没有使用handler,可以在这里处理

    # 可以在这里添加逻辑来判断是否需要退出循环
    # 例如,收到特定消息时退出
    if message['type'] == 'message' and message['data'] == 'QUIT':
         print("收到退出命令,正在退出...")
         break

except redis.exceptions.ConnectionError as e:
print(f”Redis连接错误: {e}”)
except KeyboardInterrupt:
print(“用户中断,正在退出…”)
finally:
# 在退出前取消所有订阅
print(“取消所有订阅…”)
pubsub.unsubscribe()
pubsub.punsubscribe()
print(“订阅者退出。”)

“`

8.2 发布者脚本 (publisher.py)

“`python
import redis
import time

连接 Redis

r = redis.Redis(decode_responses=True)

print(“发布者启动…”)

try:
while True:
# 从控制台获取输入
input_str = input(“请输入要发布的频道和消息 (格式: channel message) 或输入 ‘QUIT’ 退出: “)
if input_str.upper() == ‘QUIT’:
# 发送一个退出消息通知订阅者 (可选,如果订阅者监听了特定退出消息)
try:
r.publish(‘mychannel’, ‘QUIT’) # 发送退出消息到mychannel
except Exception as e:
print(f”发送退出消息失败: {e}”)
break

    parts = input_str.split(maxsplit=1) # 最多分割一次,将频道和消息分开

    if len(parts) == 2:
        channel = parts[0].strip()
        message = parts[1].strip()

        if not channel or not message:
             print("频道和消息都不能为空。")
             continue

        try:
            # 发布消息
            subscriber_count = r.publish(channel, message)
            print(f"已发布消息到频道 '{channel}',内容 '{message}'。 {subscriber_count} 个订阅者收到。")
        except redis.exceptions.ConnectionError as e:
            print(f"Redis连接错误: {e}")
            time.sleep(1) # 等待一小会儿重试或退出
        except Exception as e:
            print(f"发布消息时发生错误: {e}")

    else:
        print("输入格式不正确,请使用 'channel message' 格式。")

except KeyboardInterrupt:
print(“用户中断,正在退出…”)
except Exception as e:
print(f”发生未知错误: {e}”)
finally:
print(“发布者退出。”)

“`

8.3 如何运行

  1. 确保 Redis 服务器正在运行。
  2. 打开一个终端窗口,运行订阅者脚本:
    bash
    python subscriber.py

    你应该会看到订阅者连接 Redis 并订阅频道/模式的输出。
  3. 打开另一个终端窗口,运行发布者脚本:
    bash
    python publisher.py
  4. 在发布者终端输入消息并发布:
    • 输入 mychannel hello from publisher 并回车。在订阅者终端会看到收到 mychannel 的精确订阅消息。
    • 输入 channel2 another message 并回车。在订阅者终端会看到收到 channel2 的精确订阅消息。
    • 输入 news.tech Latest tech news 并回车。在订阅者终端会看到收到 news.tech 的模式订阅消息(匹配 news.*)。
    • 输入 news.world World news update 并回车。在订阅者终端会看到收到 news.world 的模式订阅消息(匹配 news.*)。
    • 输入 logs.backend.error Something went wrong 并回车。在订阅者终端会看到收到 logs.backend.error 的模式订阅消息(匹配 logs.*.error)。
    • 输入 logs.frontend.info User logged in 并回车。在订阅者终端不会收到消息,因为它不匹配任何已订阅的模式。
  5. 在发布者终端输入 QUIT 退出发布者。
  6. 在订阅者终端输入 Ctrl+C 退出订阅者(或者等待发布者发送了 “QUIT” 消息到 “mychannel”)。

通过这个简单的例子,你就可以看到消息是如何从发布者经过 Redis 服务器,然后分发给所有相关的订阅者的。

9. 总结与展望

Redis 的 Pub/Sub 功能提供了一种简单、高性能的异步消息广播机制。它特别适合那些对消息实时性要求高,但对消息持久性要求不高的场景。其核心优势在于简洁和速度。

然而,对于需要保证消息不丢失、支持离线消息、需要复杂路由或工作队列等场景,Redis Pub/Sub 可能不是最佳选择。在这种情况下,你可能需要考虑更专业的独立消息队列系统(如 RabbitMQ, Kafka, ActiveMQ 等),或者 Redis 本身提供的其他高级功能,例如:

  • Redis Streams:Redis 5.0 引入的全新数据结构,提供了可持久化的消息队列、消费者组等功能,更适合构建可靠的消息处理系统。
  • 基于 List 的简单队列:利用 LPUSHBRPOP 命令可以实现一个简单的阻塞式工作队列,支持消息持久化(因为消息存储在 List 中)。

选择哪种消息机制取决于你的具体需求。对于需要快速、简单的实时通知和广播,Redis Pub/Sub 无疑是一个非常高效且易于集成的方案。

掌握了 Redis 的发布订阅功能,你就解锁了在应用中实现解耦和实时通信的一种强大方式。多加实践,结合你的具体业务场景,你会发现它的更多价值!

希望这篇教程对你有所帮助,让你轻松掌握 Redis Pub/Sub 的使用!


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部