如何在 FastAPI 中实现 SSE – wiki基地


解锁实时通信:在 FastAPI 中深度实现 Server-Sent Events (SSE)

随着 Web 应用变得越来越动态,实时通信的需求日益增长。从股票行情更新、社交媒体推送通知到长时间运行任务的进度指示,应用程序常常需要在服务器端发生事件时主动向客户端推送信息,而无需客户端频繁发起请求(轮询)。在众多实现实时通信的技术中,Server-Sent Events (SSE) 作为一种轻量级、基于 HTTP 的标准,提供了一种高效的单向(服务器到客户端)通信方式。

本文将带你深入了解 SSE 的原理、它与其它实时技术的区别,并详细阐述如何在高性能的 Python Web 框架 FastAPI 中实现 SSE,包括基础实现、高级用法以及应对大规模连接的策略。

1. 理解 Server-Sent Events (SSE)

1.1 什么是 SSE?

Server-Sent Events (SSE) 是一种 HTML5 技术,允许服务器通过 HTTP 连接自动向客户端(通常是 Web 浏览器)推送数据。它建立在一个长寿命的 HTTP 连接之上,服务器利用这个连接将一系列事件流发送给客户端。与传统的短连接请求-响应模式不同,SSE 连接在初始化后会一直保持开放,直到客户端或服务器明确关闭它。

1.2 SSE 的工作原理

SSE 的核心在于利用标准的 HTTP 协议。当客户端发起一个请求时,服务器不是立即返回完整的响应并关闭连接,而是设置响应的 Content-Typetext/event-stream,并保持连接开放。服务器随后可以随时向这个连接写入格式化的事件数据。

SSE 的数据格式非常简单,每条消息由一个或多个字段组成,每个字段一行,以 \n 结尾。消息的结束由一个空行(即 \n\n)标识。主要的字段包括:

  • data::事件携带的数据。如果数据包含多行,每行都应以 data: 开头。
  • event::可选的事件类型名称。客户端可以使用这个名称来监听特定类型的事件。
  • id::可选的事件标识符。客户端可以使用它来跟踪最后一个接收到的事件,并在连接中断后告知服务器从哪个 ID 开始重传。
  • retry::可选的建议重连间隔(毫秒)。如果连接中断,客户端的 EventSource 对象会等待这个时间间隔后再尝试重新连接。
  • ::用于注释,会被客户端忽略。

例如,一个简单的 SSE 消息可能如下所示:

“`text
data: 第一条消息

data: 第二条消息
data: 包含多行数据

event: system_alert
data: 服务器即将重启!

id: 123
data: 带有ID的消息

retry: 5000
“`

客户端通常使用浏览器内置的 EventSource API 来消费 SSE 流。EventSource 对象会自动处理连接管理、解析事件流以及在连接中断时按照 retry 字段的建议自动重连。

1.3 SSE 与 WebSockets 的区别

SSE 和 WebSockets 都可以用于实现服务器向客户端的实时推送,但它们在功能和复杂性上有所不同:

  • 通信方式: SSE 是单向的(服务器到客户端),而 WebSockets 是双向的(全双工)。如果你的应用只需要服务器推送数据而不需要客户端主动向服务器发送实时消息,SSE 通常更简单、更适合。
  • 协议: SSE 建立在标准 HTTP 协议之上,使用 text/event-stream MIME 类型。WebSocket 则引入了一个全新的 ws://wss:// 协议,需要在建立连接时进行 HTTP 握手升级。基于 HTTP 的特性使得 SSE 更容易通过现有的网络基础设施(如防火墙、代理服务器)工作,而 WebSocket 有时可能需要特定的代理配置。
  • 复杂性: SSE 协议和客户端 API (EventSource) 都比 WebSocket (WebSocket API) 简单得多。WebSocket 协议支持文本和二进制数据,需要手动处理消息解析、错误处理、心跳维持等,而 SSE 提供了内置的事件类型、ID、自动重连等特性。
  • 数据类型: SSE 仅支持文本数据,而 WebSocket 支持文本和二进制数据。
  • 使用场景: SSE 更适用于服务器持续推送数据给客户端的场景,如实时日志、股票报价、通知等,其中客户端不需要或很少需要向服务器发送实时数据。WebSocket 更适用于需要频繁双向通信的场景,如在线聊天、多人游戏、实时协作编辑器等。

选择哪种技术取决于你的具体需求。如果只需要服务器推送,SSE 是一个简单高效的选择。如果需要双向实时通信,WebSocket 则是必需的。

2. FastAPI 为何适合实现 SSE?

FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建基于标准 Python 类型提示的 API。其异步能力(基于 Starlette 和 Pydantic)使其天然适合处理需要长时间运行连接或高并发 I/O 的任务,包括 SSE。

FastAPI 基于 Starlette,而 Starlette 提供了 StreamingResponse 类,这个类非常适合用于实现 SSE。StreamingResponse 可以接受一个生成器或异步生成器作为内容源,并随着生成器产生数据块而逐步将响应发送给客户端,这正是实现 SSE 所需的核心能力:保持连接并持续发送数据流。

此外,FastAPI 对异步操作的一流支持(使用 asyncio)使得在 SSE 流中执行异步任务(如等待消息、读取数据库、调用外部服务)变得非常自然和高效。

3. 在 FastAPI 中实现基础 SSE

实现基础 SSE 的核心是创建一个异步生成器函数,该函数按照 SSE 规范格式化数据,并使用 StreamingResponse 将其作为响应返回。

3.1 准备环境

首先,确保你安装了 FastAPI 和一个 ASGI 服务器,如 Uvicorn:

bash
pip install fastapi uvicorn

3.2 创建基础 SSE 端点

创建一个简单的 FastAPI 应用,包含一个 /stream 端点,它每秒向客户端推送一个时间戳。

“`python
import asyncio
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

异步生成器函数,用于生成 SSE 事件流

async def event_generator(request: Request):
“””
生成 SSE 事件的异步生成器。
它会持续运行,直到客户端断开连接。
“””
client_disconnected = False
while not client_disconnected:
# 检查客户端是否已断开连接
# 在 Starlette/FastAPI 中,当客户端断开连接时,相关的协程可能会被取消。
# 捕捉 asyncio.CancelledError 是检测客户端断开的一种常用方法。
try:
# 构建 SSE 格式的数据
# data: 后跟数据内容
# 后面是一个空行 \n\n 表示消息结束
now = datetime.now().strftime(“%Y-%m-%d %H:%M:%S”)
yield f”data: 当前时间是: {now}\n\n”

        # 等待一秒钟再发送下一条消息
        await asyncio.sleep(1)

    except asyncio.CancelledError:
        # 客户端断开连接时会抛出 CancelledError
        print("客户端断开连接")
        client_disconnected = True
        break # 退出循环

    except Exception as e:
        # 处理其他可能的异常
        print(f"生成事件时发生错误: {e}")
        # 可以选择发送一个错误事件给客户端
        yield f"event: error\ndata: 服务器错误\n\n"
        # 或者直接断开连接
        client_disconnected = True
        break

@app.get(“/stream”)
async def stream_events(request: Request):
“””
SSE 端点,返回一个 StreamingResponse。
“””
return StreamingResponse(
event_generator(request),
media_type=”text/event-stream”
)

if name == “main“:
import uvicorn
uvicorn.run(app, host=”0.0.0.0”, port=8000)

“`

3.3 代码解释

  1. event_generator(request: Request) 异步生成器:
    • 这是一个 async def 函数,内部使用 yield 关键字,使其成为一个异步生成器。
    • 它接受一个 Request 对象,尽管在这个基础示例中未使用,但在更高级场景中(如获取客户端信息、检查请求头)很有用。
    • while not client_disconnected: 循环确保生成器持续运行。
    • try...except asyncio.CancelledError: 块是检测客户端断开连接的关键。当客户端关闭浏览器标签页或断开连接时,FastAPI/Starlette 会取消与该请求关联的任务,从而抛出 asyncio.CancelledError。捕获此异常并设置 client_disconnected = True 使循环终止。
    • yield f"data: ...\n\n":这是按照 SSE 格式生成数据的关键部分。data: 是必需的字段前缀,\n\n 是每条消息的结束标记。
    • await asyncio.sleep(1):在生成两条消息之间暂停,避免 CPU 占用过高,也控制消息发送频率。
  2. stream_events(request: Request) 路由函数:
    • 使用 @app.get("/stream") 定义一个 GET 端点。
    • 返回 StreamingResponse 对象。
    • 第一个参数是生成器函数(这里是 event_generator 的调用结果)。StreamingResponse 会迭代这个生成器,并将每次 yield 的内容作为数据块发送。
    • media_type="text/event-stream":这是至关重要的!它告诉客户端这是一个 SSE 流,浏览器会将其视为一个 EventSource 连接。

3.4 运行与测试

  1. 保存代码为 main.py
  2. 在终端中运行:uvicorn main:app --reload
  3. 打开浏览器,访问 http://127.0.0.1:8000/stream。你不会看到一个静态页面,但浏览器会尝试建立 SSE 连接。
  4. 打开浏览器的开发者工具(通常按 F12),切换到 “Network” 标签页。找到 /stream 这个请求,查看其响应。你会看到源源不断地收到格式化好的 SSE 消息。
  5. 或者,你可以创建一个简单的 HTML 页面来使用 EventSource API 消费这个流:

“`html




SSE Test

Server-Sent Events Test



``
保存为
index.html` 并在浏览器中打开(或者通过 FastAPI 的静态文件功能提供)。你将在页面上看到实时更新的时间。

4. 实现更丰富的 SSE 事件

基础示例只发送了带有 data 字段的默认事件。我们可以添加 eventid 字段来实现更灵活的事件推送。

修改 event_generator 函数:

“`python

…(之前的导入和 FastAPI 应用定义)

async def rich_event_generator(request: Request):
client_disconnected = False
message_id = 0
while not client_disconnected:
try:
message_id += 1
now = datetime.now().strftime(“%Y-%m-%d %H:%M:%S”)

        # 模拟发送不同类型的事件
        if message_id % 3 == 0:
            yield f"event: heartbeat\nid: {message_id}\ndata: 心跳信号\n\n"
        else:
            yield f"event: time_update\nid: {message_id}\ndata: 当前时间是: {now}\n\n"

        # 模拟一个随机错误,触发重连
        # if message_id == 5:
        #     raise RuntimeError("模拟一个临时错误")

        await asyncio.sleep(1)

    except asyncio.CancelledError:
        print(f"客户端 {request.client.host}:{request.client.port} 断开连接")
        client_disconnected = True
        break
    except Exception as e:
        print(f"生成事件时发生错误: {e}")
        # 发送一个错误事件并设置 retry,然后断开当前连接
        yield f"event: error\ndata: 服务器内部错误\nretry: 3000\n\n" # 建议客户端3秒后重连
        client_disconnected = True # 在这里选择断开,因为模拟的是一个不可恢复的错误
        break # 退出循环

@app.get(“/rich-stream”)
async def rich_stream_events(request: Request):
return StreamingResponse(
rich_event_generator(request),
media_type=”text/event-stream”
)

…(可以保留原来的 /stream 端点,或者替换掉)

if name == “main“:
import uvicorn
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`

在客户端 HTML 中,你可以修改 JavaScript 来监听不同的事件类型:

“`html

``
现在,客户端可以根据
event:字段区分处理不同类型的消息。event.lastEventId会自动获取服务器发送的id:` 字段的值,用于支持客户端重连后的状态恢复。

5. 处理客户端断开连接

event_generator 中捕获 asyncio.CancelledError 是检测客户端断开连接的标准方法。确保在捕获到这个异常后,及时退出生成器循环,释放资源。

如果你的生成器在执行某个 await 操作时被取消(比如 await asyncio.sleep(1)),CancelledError 就会被抛出。然而,如果在执行同步代码块或长时间运行的同步 I/O 操作时客户端断开,可能不会立即抛出异常。在 FastAPI/Starlette 的 StreamingResponse 实现中,当你 yield 数据时,Starlette 尝试将数据写入到底层传输(例如 TCP socket)。如果连接已关闭,这个写入操作会失败,从而可能触发取消任务或抛出异常,最终还是可以通过捕获 CancelledError 来感知。

6. 实现广播 (Broadcasting)

基础示例是每个连接的客户端都运行一个独立的生成器。如果服务器上的某个事件需要通知所有连接的客户端,我们就需要一个广播机制。这通常涉及维护一个活动客户端列表,并在事件发生时向所有客户端发送消息。

6.1 内存中的简单广播

最简单的方法是在内存中维护一个数据结构,存储每个连接客户端的“消息队列”。当有消息需要广播时,将其放入所有客户端的队列中。客户端的生成器则从自己的队列中读取消息。

“`python
import asyncio
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask # 可选,用于清理

app = FastAPI()

用于存储连接的客户端及其消息队列

{ client_id: asyncio.Queue }

connected_clients: dict[asyncio.Queue] = {}
client_counter = 0 # 用于生成唯一的客户端ID

广播消息的异步函数

async def broadcast_message(message: str):
“””
将消息广播给所有连接的客户端。
“””
# 需要迭代 items 的一个副本,因为在迭代过程中可能有客户端断开导致 items 变化
for client_id, queue in list(connected_clients.items()):
try:
# 将消息放入队列,非阻塞方式
# put_nowait 是线程安全的,但这里是协程,Queue 本身就是 asyncio 原生支持的
# 对于大量消息,put_nowait 可能更好,如果队列满则抛异常
# 如果队列可能积压,使用 await queue.put(message) 可能导致广播任务等待
# 这里简单起见使用 put_nowait
queue.put_nowait(message)
except asyncio.QueueFull:
# 队列满了,说明这个客户端消费太慢或者已经卡住,可以考虑断开它
print(f”客户端 {client_id} 队列满,考虑断开”)
# 在实际应用中,你可能需要一个更健壮的清理机制
pass
except Exception as e:
print(f”向客户端 {client_id} 放入消息时发生错误: {e}”)
# 同样,考虑清理这个客户端

客户端生成器函数

async def client_event_generator(client_id: int, request: Request, queue: asyncio.Queue):
“””
每个连接的客户端使用一个独立的生成器,从其专属队列读取消息。
“””
print(f”客户端 {client_id} 连接成功”)
# 在生成器开始时,向该客户端发送一个确认连接的消息
yield f”event: connected\ndata: Client ID {client_id} connected.\n\n”

try:
    while True:
        # 从队列中获取消息,等待直到有消息
        message = await queue.get()
        yield message # 直接yield格式化好的消息

        # Optional: Add a small sleep if needed, but reading from queue is blocking
        # await asyncio.sleep(0.01) # 避免在队列中消息过多时占用过多CPU

except asyncio.CancelledError:
    print(f"客户端 {client_id} 断开连接")
except Exception as e:
    print(f"客户端 {client_id} 生成事件时发生错误: {e}")
    yield f"event: error\ndata: 服务器内部错误\n\n" # 发送错误消息

finally:
    # 客户端断开或生成器退出时,从列表中移除
    print(f"客户端 {client_id} 清理资源")
    if client_id in connected_clients:
        del connected_clients[client_id]

@app.get(“/broadcast-stream”)
async def broadcast_stream_events(request: Request):
global client_counter
client_id = client_counter
client_counter += 1

# 为新连接的客户端创建一个消息队列
# queue_size 可根据预期流量调整,避免无限增长导致内存溢出
client_queue = asyncio.Queue(maxsize=100) # 设置一个最大队列大小
connected_clients[client_id] = client_queue

# 创建 StreamingResponse
# 在后台任务中进行清理是 Starlette 推荐的方式
# background=BackgroundTask(lambda: connected_clients.pop(client_id, None))
# 但由于我们在生成器的 finally 块中做了清理,这里可以省略 BackgroundTask
# 实际上,生成器退出时,StreamingResponse 会结束,底层的任务也会被取消,从而触发 finally 块

return StreamingResponse(
    client_event_generator(client_id, request, client_queue),
    media_type="text/event-stream"
)

示例:一个定时广播消息的后台任务

async def periodic_broadcaster():
while True:
await asyncio.sleep(5) # 每5秒广播一次
message = f”data: 定时广播消息: {datetime.now().strftime(‘%H:%M:%S’)}\n\n”
print(f”广播消息: {message.strip()}”)
await broadcast_message(message)

在应用启动时启动广播任务

@app.on_event(“startup”)
async def startup_event():
asyncio.create_task(periodic_broadcaster()) # 在后台创建一个任务运行广播函数

if name == “main“:
import uvicorn
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`

6.2 代码解释与考虑

  1. connected_clients 字典: 用作一个简单的客户端注册表,键是客户端 ID,值是每个客户端专属的 asyncio.Queue
  2. broadcast_message(message: str) 函数: 迭代 connected_clients 字典,将消息放入每个客户端的队列。注意这里迭代的是 list(connected_clients.items()) 的副本,以避免在迭代过程中修改字典引发错误。使用 put_nowait 可以避免广播任务被单个慢速客户端阻塞,但可能导致队列满时消息丢失。
  3. client_event_generator(...) 这是每个客户端连接独有的生成器。它从传入的 asyncio.Queue 中读取消息 (await queue.get())。queue.get() 是一个阻塞操作(相对于协程),它会暂停当前协程直到队列中有消息。
  4. broadcast_stream_events(...) 在处理新连接时,生成一个唯一的客户端 ID,创建一个新的 asyncio.Queue,将其添加到 connected_clients 字典,然后返回一个 StreamingResponse,其内容源是针对这个客户端的 client_event_generator
  5. finally 块:client_event_generatorfinally 块中,无论生成器如何退出(正常完成、异常或取消),都会执行清理操作,将客户端从 connected_clients 字典中移除。
  6. periodic_broadcaster()startup_event() 这是一个示例,展示如何在后台启动一个独立的任务,该任务会定期调用 broadcast_message 来向所有客户端发送消息。@app.on_event("startup") 装饰器确保 periodic_broadcaster 协程在 FastAPI 应用启动时被调度运行。
  7. 队列大小: 设置 asyncio.Queue(maxsize=...) 非常重要,可以防止单个慢速客户端的队列无限增长,最终耗尽服务器内存。当队列满时,put_nowait 会抛出 asyncio.QueueFull 异常。你可能需要在 broadcast_message 中处理这个异常,例如记录日志或直接断开该客户端。

6.3 局限性

这种基于内存的广播方法简单易实现,但存在以下局限性:

  • 单进程限制: connected_clients 字典只存在于当前的 Python 进程中。如果你运行多个 FastAPI 工作进程(例如,使用 Uvicorn 的 --workers 参数或部署在 Kubernetes Pods 的多个副本中),这些进程之间的客户端列表是独立的。一个进程中发生的事件只能广播给连接到该进程的客户端,无法广播给连接到其他进程的客户端。
  • 内存消耗: 随着连接客户端数量的增加,每个客户端维护一个队列会消耗更多的内存。
  • 可靠性: 如果 FastAPI 进程崩溃,所有连接信息和未发送的消息都会丢失。

6.4 更可扩展的广播 (使用消息队列)

为了克服单进程限制并提高可伸缩性和可靠性,通常会使用外部消息队列系统来实现广播,例如 Redis Pub/Sub、Kafka、RabbitMQ 等。

其基本思路是:

  1. 每个 FastAPI 工作进程都连接到消息队列系统。
  2. 每个工作进程都订阅一个或多个相关的消息通道(Topic)。
  3. 当需要广播一个事件时,不论事件源在哪里(可能是任何一个 FastAPI 进程,甚至是另一个独立的服务),都将事件发布到消息队列的相应通道。
  4. 所有订阅了该通道的 FastAPI 工作进程都会收到这个事件。
  5. 收到事件的进程再将事件推送给连接到该进程的客户端队列中。

这种架构将客户端连接管理与事件发布解耦,使得你可以独立地扩展 FastAPI 工作进程和事件发布源,并且可以在多个进程/服务器之间实现真正的广播。

实现这种方案需要引入对应的客户端库(如 redis-py)。每个 FastAPI 进程需要一个后台任务来监听消息队列。这里提供一个基于 Redis Pub/Sub 的概念示例(代码会更复杂,仅作说明):

“`python

概念代码,非完整可运行示例

import asyncio
import redis.asyncio as redis # 使用支持 async/await 的 redis 库

… 其他导入

… (connected_clients, client_event_generator 函数与之前类似)

REDIS_URL = “redis://localhost”
PUBSUB_CHANNEL = “sse_events”

async def redis_listener():
“””后台任务:连接 Redis Pub/Sub 并监听事件”””
try:
r = redis.from_url(REDIS_URL)
pubsub = r.pubsub()
await pubsub.subscribe(PUBSUB_CHANNEL)
print(f”已订阅 Redis 频道: {PUBSUB_CHANNEL}”)

    while True:
        message = await pubsub.get_message(ignore_subscribe_messages=True)
        if message and message['type'] == 'message':
            data = message['data'].decode('utf-8')
            print(f"收到 Redis 消息: {data}")
            # 将收到的消息广播给本地连接的客户端
            # 这里需要确保 data 已经是 SSE 格式,或者在广播函数中格式化
            await broadcast_message(data)

        # 避免紧密循环消耗过多CPU,尤其在没有消息时
        await asyncio.sleep(0.01)

except Exception as e:
    print(f"Redis 监听任务出错: {e}")
    # 错误处理和重连机制在生产环境需要更健壮

修改 broadcast_message 函数,使其发布到 Redis

async def publish_event_to_redis(sse_message: str):
“””将 SSE 格式的消息发布到 Redis”””
try:
r = redis.from_url(REDIS_URL)
await r.publish(PUBSUB_CHANNEL, sse_message)
print(f”已发布消息到 Redis: {sse_message.strip()}”)
except Exception as e:
print(f”发布消息到 Redis 失败: {e}”)
# 错误处理

在需要广播事件的地方调用 publish_event_to_redis

例如,在 periodic_broadcaster 中

async def periodic_broadcaster_redis():
while True:
await asyncio.sleep(5)
# 这里的 message 必须是完整的 SSE 格式
message = f”data: 定时广播消息 via Redis: {datetime.now().strftime(‘%H:%M:%S’)}\n\n”
await publish_event_to_redis(message)

@app.on_event(“startup”)
async def startup_event_redis():
# 启动 Redis 监听任务
asyncio.create_task(redis_listener())
# 启动定时发布任务(如果需要)
asyncio.create_task(periodic_broadcaster_redis())

… broadcast_stream_events 路由函数和 client_event_generator 保持不变

client_event_generator 仍然从本地队列读取,只是消息来源变成了 Redis 监听任务

“`

在这种 Redis Pub/Sub 方案中,redis_listener 任务负责从 Redis 接收外部事件,然后调用本地的 broadcast_message 函数(需要修改该函数,使其只向 connected_clients 中的队列放入消息,而不是自己决定消息内容)。事件的 产生(发布到 Redis)与事件的 分发(从 Redis 收到并推送到客户端)是分开的。

7. 客户端 EventSource API 详解

重申一下客户端使用 EventSource 的关键点:

  • new EventSource(url):创建连接。URL 必须返回 text/event-stream
  • eventSource.onopen:连接成功建立时触发。
  • eventSource.onmessage:收到没有 event: 字段的消息时触发。事件对象 eventdatalastEventId 属性。
  • eventSource.addEventListener(eventType, handler):收到带有 event: eventType 字段的消息时触发指定的 handler。handler 函数接收的事件对象同样有 datalastEventId 属性。
  • eventSource.onerror:连接发生错误或中断时触发。event.eventPhase 可以是 EventSource.CONNECTING (0)、EventSource.OPEN (1)、EventSource.CLOSED (2)。浏览器会根据 retry 字段自动尝试重连。
  • eventSource.close():手动关闭连接。

EventSource 自动处理连接的建立、维护、解析、错误处理和重连,大大简化了客户端代码。重连时,浏览器会自动在请求头中包含 Last-Event-ID 字段,其值为上次成功接收到的 id: 字段的值,服务器可以利用这个信息来决定从哪里开始发送丢失的事件。

8. 性能与扩展性考虑

  • 连接数: 每个 SSE 连接都会消耗服务器的资源(文件描述符、内存、少量 CPU)。现代服务器和操作系统可以处理数万甚至数十万并发连接,但这仍然是有限的。设计时需要考虑预期的最大并发连接数。
  • 消息频率与大小: 频繁发送小消息或发送大消息都会增加网络和服务器的负担。合理控制消息的频率和大小。
  • CPU 使用: 生成器中的 await asyncio.sleep() 是让出 CPU 的关键。如果你的生成器内部有耗时的同步计算,会导致整个工作进程阻塞。将耗时操作放在独立的异步任务中,或者使用线程池/进程池。
  • 内存广播的瓶颈: 前面提到的内存广播方案在单个进程内性能尚可,但在需要多进程/多服务器时无法扩展。
  • 消息队列的优势: 使用 Redis Pub/Sub 等消息队列可以将事件分发的工作转移给专业的服务,并且容易实现多进程/多服务器间的协调和扩展。它也提供了更强的消息持久性和可靠性(取决于具体的配置和系统)。
  • 代理与负载均衡: 如果使用反向代理(如 Nginx, Traefik)或负载均衡器,需要确保它们配置正确以支持长连接。一些代理的默认超时设置可能会过早地关闭 SSE 连接。需要调整 proxy_read_timeout, proxy_send_timeout 等参数。同时,负载均衡器需要支持 “sticky sessions” 或基于 IP hash 的路由,以确保特定客户端的重连请求能够回到同一个工作进程,从而能够利用 Last-Event-ID 恢复状态(尽管在无状态的 Redis 广播模式下,客户端连接到哪个进程并不重要)。
  • 心跳机制: 即使没有业务数据,服务器也可以定期发送心跳消息(例如,只包含 :\n\n 的注释行或带有 event: heartbeat 的消息)。这有助于保持连接活跃,防止中间代理或防火墙因超时而关闭连接,并帮助客户端检测连接是否仍然有效。

9. 总结

Server-Sent Events (SSE) 是一个强大而简单的技术,用于实现服务器到客户端的单向实时推送。它利用标准的 HTTP 协议,具有易于理解的事件流格式和浏览器内置的 EventSource API,特别适合于通知、实时数据流和任务进度更新等场景。

FastAPI 凭借其优秀的异步支持和 StreamingResponse 类,成为实现 SSE 的绝佳选择。通过构建异步生成器来产生符合 SSE 格式的数据流,我们可以轻松地在 FastAPI 中创建实时的服务器推送端点。

对于需要将事件广播给多个客户端的应用,基于内存的队列方案适合简单场景和单进程部署,而结合外部消息队列系统(如 Redis Pub/Sub)则是实现高可用、可扩展广播的推荐方法。

理解 SSE 的原理、掌握 FastAPI 的 StreamingResponse 和异步编程技巧,并考虑客户端断开、错误处理以及广播机制,你就可以在 FastAPI 应用中高效地构建稳定可靠的实时推送功能了。

选择 SSE 还是 WebSocket 取决于你的具体需求:如果仅需服务器推送,SSE 以其简洁性胜出;如果需要双向实时通信,WebSocket 则是必需的。在确定 SSE 是合适的技术后,FastAPI 将助你快速且高效地实现它。

希望本文为你提供了在 FastAPI 中实现 SSE 的全面指导!


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部