FastAPI SSE 开发指南:构建实时服务器推送应用
引言
在现代 Web 应用中,实时性变得越来越重要。用户期望能够即时获取最新的数据、通知或状态更新,而无需手动刷新页面。传统的客户端轮询(Polling)机制虽然简单,但效率低下,会给服务器和客户端带来不必要的负担,尤其是在更新不频繁的情况下。
为了解决这一问题,双向通信技术如 WebSockets 应运而生。然而,在许多场景下,我们只需要实现服务器向客户端的单向数据推送,而客户端并不需要频繁地向服务器发送数据。在这种情况下,使用功能更强大、更复杂的 WebSockets 可能显得“杀鸡用牛刀”了。
这时,服务器发送事件(Server-Sent Events,简称 SSE)提供了一种更轻量、更简单的解决方案。SSE 允许服务器通过一个持久的 HTTP 连接向客户端推送数据。它基于标准的 HTTP 协议,并且浏览器原生支持,使用起来比 WebSockets 更简单。
FastAPI 是一个现代、快速(高性能)的 Web 框架,基于标准的 Python 类型提示构建,非常适合构建 API,包括需要实时能力的 API。FastAPI 对异步编程的良好支持,使其与 SSE 的异步推送特性天然契合。
本文将详细介绍如何在 FastAPI 中实现 Server-Sent Events,包括 SSE 的基本原理、在 FastAPI 中的具体实现方法、如何生成事件、管理客户端以及一些高级话题和最佳实践。
什么是 Server-Sent Events (SSE)?
SSE 是一种基于 HTTP 的服务器向客户端推送数据的技术标准。它的核心思想是:
- 客户端发起一个标准的 HTTP 请求(通常是 GET 请求)。
- 服务器保持连接开放,不立即关闭。
- 服务器可以随时通过这个开放的连接向客户端发送格式化的数据流。
- 客户端接收到数据后,可以解析并处理。
与 WebSockets 的双向全双工通信不同,SSE 是单向的,数据流只能从服务器流向客户端。这使得它特别适合那些只需要接收服务器更新的场景,例如:
- 实时股票报价
- 新闻/社交媒体动态更新
- 后台任务进度通知
- 日志流输出
SSE 数据格式
SSE 使用 text/event-stream
作为 MIME 类型。服务器发送的每个事件都由一系列以换行符分隔的字段组成,每个字段以字段名开头,后跟一个冒号 :
,然后是字段值。一个事件的结束由一个空行(即连续两个换行符 \n\n
)标记。
常见的 SSE 字段包括:
data
: 事件的数据。如果数据很长,可以分多行发送,每行都以data:
开头。event
: 事件类型。客户端可以通过指定event
类型来监听不同类型的事件。id
: 事件 ID。客户端可以使用这个 ID 来跟踪最后接收到的事件,如果连接中断,可以在重新连接时告知服务器从哪个 ID 开始发送。retry
: 客户端在连接中断后应该等待多少毫秒再尝试重新连接。
一个简单的 SSE 数据流示例:
“`
data: This is the first message.
data: This is the second message.
data: It spans multiple lines.
event: user_joined
data: {“username”: “Alice”}
id: 123
event: message
data: {“sender”: “Bob”, “text”: “Hello!”}
id: 124
“`
SSE 的优势与劣势
优势:
- 简单: 基于 HTTP,使用标准的
EventSource
客户端 API,比 WebSockets 更容易实现。 - 原生支持: 浏览器内置
EventSource
API,无需额外的库(除了旧版本 IE)。 - 自动重连:
EventSource
客户端 API 内置了自动重连机制,并且可以使用retry
字段控制重连间隔。 - 基于 HTTP: 可以利用现有的 HTTP 基础设施(如代理、负载均衡),但需要注意配置,避免它们干扰长连接。
劣势:
- 单向: 客户端不能方便地向服务器发送数据(需要额外的 HTTP 请求)。
- 限制: 某些旧的浏览器或代理可能存在连接数限制(通常是每个源 6 个连接)。
- 二进制数据: 主要设计用于传输文本数据,传输二进制数据不如 WebSockets 方便。
在 FastAPI 中实现 SSE
FastAPI 构建在 Starlette 之上,Starlette 对 ASGI 协议提供了很好的支持,包括流式响应。实现 SSE 在 FastAPI 中非常直观,核心就是使用 StreamingResponse
返回一个生成器或异步生成器,该生成器以正确的 SSE 格式 yield
数据块,并将 media_type
设置为 text/event-stream
。
基本实现步骤
- 导入必要的模块:
FastAPI
,StreamingResponse
,asyncio
等。 - 创建一个
FastAPI
应用实例。 - 定义一个异步路由处理函数。
- 在处理函数中定义一个异步生成器函数。
- 生成器函数负责按一定频率或在特定事件发生时生成 SSE 格式的字符串(如
data: message\n\n
)。 - 使用
StreamingResponse
包装这个生成器,并设置media_type="text/event-stream"
。
示例:一个简单的计时器
这个示例将创建一个 SSE 端点,每秒向客户端发送当前时间。
“`python
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import datetime
app = FastAPI()
异步生成器函数,用于生成SSE事件
async def event_generator(request: Request):
client_disconnected = False
try:
while True:
# 检查客户端是否已断开连接
# Starlette/FastAPI 内部会通过监听连接状态来处理断开
# 虽然 request.is_disconnected() 可以用,但对于StreamingResponse
# 更可靠的方式是在yield或await操作后,如果连接断开会抛出异常
# yield 语句在连接断开时会触发 ClientDisconnect 异常
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# SSE 数据格式:data: <数据>\n\n
# 确保以双换行符 \n\n 结束每个事件
sse_message = f"data: Current time is {now}\n\n"
yield sse_message
# 等待一秒,模拟数据更新间隔
await asyncio.sleep(1)
except asyncio.CancelledError:
# 当客户端断开连接时,uvicorn/Starlette 会取消这个任务
print("Client disconnected via CancelledError")
client_disconnected = True
except Exception as e:
# 捕获其他可能的异常
print(f"An error occurred: {e}")
client_disconnected = True
finally:
# 可以在这里进行一些清理工作,如果需要的话
if client_disconnected:
print("Generator cleanup completed.")
定义SSE端点
@app.get(“/stream”)
async def sse_endpoint(request: Request):
# 返回StreamingResponse,使用异步生成器和正确的媒体类型
return StreamingResponse(event_generator(request), media_type=”text/event-stream”)
运行应用 (使用 uvicorn)
uvicorn main:app –reload
“`
代码解释:
import asyncio
: 导入asyncio
用于异步操作,特别是asyncio.sleep
。import datetime
: 用于生成实时时间戳。from fastapi import FastAPI, Request
: 导入 FastAPI 和 Request 对象(Request 用于获取请求信息,虽然在这个简单示例中主要用于演示如何获取并在生成器中使用)。from fastapi.responses import StreamingResponse
: 导入StreamingResponse
,这是 FastAPI/Starlette 用于返回流式响应的类。async def event_generator(request: Request)
: 定义一个异步生成器函数。异步生成器是使用async def
定义并包含yield
关键字的函数。它能够与await
一起使用。while True:
: 创建一个无限循环,使生成器持续产生事件。now = ...
: 获取当前时间。sse_message = f"data: Current time is {now}\n\n"
: 构建 SSE 格式的字符串。data:
是必需的字段前缀,\n\n
标志着一个事件的结束。yield sse_message
: 将构建好的 SSE 字符串发送给客户端。yield
关键字使得函数成为一个生成器。await asyncio.sleep(1)
: 暂停一秒。await
允许在等待时释放事件循环,处理其他请求。try...except asyncio.CancelledError...finally:
: 这是处理客户端断开连接的关键部分。当客户端关闭连接时, ASGI 服务器(如 Uvicorn)会取消正在执行的与该连接相关的协程。捕获asyncio.CancelledError
是检测这一情况的标准方法。finally
块用于执行清理任务。@app.get("/stream")
: 定义一个 GET 请求的路由/stream
。async def sse_endpoint(request: Request):
: 路由处理函数,标记为async
。return StreamingResponse(...)
: 创建并返回StreamingResponse
实例,将event_generator
作为内容,并将media_type
设置为text/event-stream
。
如何测试:
- 保存代码为
main.py
。 - 安装 uvicorn:
pip install uvicorn fastapi
。 - 运行应用:
uvicorn main:app --reload
。 - 在浏览器中访问
http://127.0.0.1:8000/stream
。大多数现代浏览器(Chrome, Firefox, Edge, Safari)都原生支持 SSE,它们会尝试连接并显示接收到的数据流。或者使用curl
命令:curl -N http://127.0.0.1:8000/stream
(-N
选项禁用缓冲)。
你会在浏览器页面(或 curl 输出)中看到每秒更新的时间。关闭浏览器标签页或中断 curl 进程,会在服务器控制台看到 “Client disconnected…” 的输出。
发送不同类型的事件
你可以通过在 data:
字段前添加 event:
字段来发送不同类型的事件:
“`python
在 event_generator 中修改 yield 部分
async def event_generator(request: Request):
count = 0
try:
while True:
count += 1
if count % 5 == 0:
# 每5秒发送一个特殊事件
special_message = f”event: five_second_marker\ndata: It’s been {count} seconds!\n\n”
yield special_message
else:
# 发送常规数据事件
now = datetime.datetime.now().strftime(“%H:%M:%S”)
regular_message = f”data: Ping at {now}\n\n”
yield regular_message
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Client disconnected.")
except Exception as e:
print(f"An error occurred: {e}")
finally:
print("Generator cleanup completed.")
“`
客户端 JavaScript EventSource
API 可以通过 addEventListener()
监听特定的事件类型。
添加事件 ID 和重连间隔
可以在 SSE 消息中加入 id:
和 retry:
字段:
“`python
在 event_generator 中修改 yield 部分
async def event_generator(request: Request):
message_id = 0
try:
# 可以设置一个初始的重连间隔,或者在消息中发送
# initial_retry = 2000 # 2秒
# yield f”retry: {initial_retry}\n\n” # 如果需要在开始时指定
while True:
message_id += 1
now = datetime.datetime.now().strftime("%H:%M:%S")
# 包含 id 字段
sse_message = f"id: {message_id}\ndata: Message {message_id} at {now}\n\n"
# 可以根据需要动态调整 retry 间隔,例如在检测到问题时增加
# if some_condition:
# sse_message = f"{sse_message}retry: 5000\n\n" # 尝试5秒后重连
yield sse_message
await asyncio.sleep(1)
except asyncio.CancelledError:
print(f"Client disconnected. Last ID sent: {message_id}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
print("Generator cleanup completed.")
“`
客户端 EventSource
会自动处理 id
和 retry
字段。当连接中断并自动重连时,浏览器会在新的连接请求头中包含一个 Last-Event-ID
字段,其值是客户端接收到的最后一个事件的 ID。服务器端可以检查这个头,从而从上次中断的地方继续发送事件。在 FastAPI 中,你可以通过 request.headers.get('Last-Event-ID')
获取这个值。
生成事件的策略
上面的示例中,事件生成器本身就是事件的来源(它自己每秒生成一个时间戳)。在更实际的应用中,事件通常来源于外部:
- 后台任务: 一个独立的后台协程或线程正在执行某个任务,并在任务有更新时通知 SSE 生成器。
- 数据库变化: 监听数据库的变化(例如使用触发器、CDC 等),并在数据更新时推送。
- 消息队列: 从消息队列(如 Redis Pub/Sub, Kafka, RabbitMQ)订阅消息,并在接收到消息时推送给客户端。
为了将外部事件传递给 SSE 生成器,我们需要一个中间机制。
使用 asyncio.Queue
(适用于单进程/单机)
对于简单的应用或单进程部署,可以使用 asyncio.Queue
在不同的协程之间传递事件。
“`python
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import datetime
import uvicorn
app = FastAPI()
创建一个异步队列来存放待发送的事件
注意:在多进程/多副本部署时,这需要被一个共享的消息队列替换
event_queue = asyncio.Queue()
后台任务:模拟一个每秒产生数据的源
async def generate_data_source():
i = 0
while True:
i += 1
message = f”Background data {i} at {datetime.datetime.now().strftime(‘%H:%M:%S’)}”
# 将数据放入队列
await event_queue.put(message)
print(f”Put data into queue: {message}”)
await asyncio.sleep(1)
异步生成器函数:从队列中获取数据并格式化为 SSE
async def sse_event_publisher(request: Request):
print(“Client connected”)
client_disconnected = False
try:
while True:
# 从队列中获取事件,如果队列为空则等待
# await event_queue.get() 是一个阻塞操作,但不会阻塞整个事件循环
data = await event_queue.get()
# 格式化为 SSE 消息
sse_message = f"data: {data}\n\n"
# 将消息发送给客户端
yield sse_message
# 通知队列该项已处理完毕 (如果使用 Queue.task_done)
# event_queue.task_done()
except asyncio.CancelledError:
print("Client disconnected.")
client_disconnected = True
except Exception as e:
print(f"An error occurred: {e}")
client_disconnected = True
finally:
print("Generator cleanup completed.")
# 清理工作 (例如,如果需要从某个订阅列表中移除客户端)
定义SSE端点
@app.get(“/stream”)
async def sse_endpoint(request: Request):
# 返回StreamingResponse,使用异步生成器和正确的媒体类型
# 注意:每个连接都会启动一个新的 sse_event_publisher 生成器
# 这个简单的 Queue 示例无法将同一个事件广播给所有客户端
# 下面会讨论广播问题
return StreamingResponse(sse_event_publisher(request), media_type=”text/event-stream”)
启动后台任务
@app.on_event(“startup”)
async def startup_event():
print(“Starting background data generator…”)
asyncio.create_task(generate_data_source())
运行应用 (使用 uvicorn)
uvicorn main:app –reload
“`
注意: 上面的 asyncio.Queue
示例只是演示了如何将后台任务产生的数据传递给 SSE 生成器。然而,它存在一个严重的问题:event_queue
是共享的,但 sse_event_publisher
是每个客户端连接创建一个新的。当 generate_data_source
将一个消息放入队列时,只有一个正在等待的 sse_event_publisher
会收到它。这个模式无法实现将同一个事件广播给所有连接的客户端。
广播事件给多个客户端
要将事件广播给所有连接的客户端,你需要一个机制来管理所有活动的 SSE 连接,并在事件发生时遍历这些连接,将事件发送给每一个客户端。
一种常见模式是:
- 维护一个活动客户端连接(或更准确地说,它们的事件队列或写入通道)的列表或集合。
- 当新的 SSE 连接建立时,将其添加到列表中。
- 当连接断开时,将其从列表中移除。
- 当事件发生时,遍历列表,将事件发送给每个客户端。
这需要更复杂的事件管理类。为了实现高效广播和处理客户端断开,通常会使用每个客户端独立的 asyncio.Queue
或类似的结构,以及一个中心化的广播逻辑。
高级示例:一个简单的广播器 (单机)
“`python
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import datetime
import uuid # 用于给客户端分配唯一ID
app = FastAPI()
存储所有活动客户端的字典 {client_id: asyncio.Queue}
队列用于存储待发送给该客户端的事件
active_clients: dict[str, asyncio.Queue] = {}
后台任务:模拟数据源并将数据广播给所有客户端
async def broadcast_events():
i = 0
while True:
i += 1
message_data = f”Broadcast message {i} at {datetime.datetime.now().strftime(‘%H:%M:%S’)}”
print(f”Broadcasting: {message_data}”)
# 构建 SSE 消息
sse_message = f"data: {message_data}\n\n"
# 遍历所有活动客户端,将消息放入它们的队列
disconnected_clients = []
for client_id, queue in active_clients.items():
try:
# 使用 put_nowait 以避免广播任务阻塞
# 如果队列满了,表示客户端处理慢,可以忽略或使用 put 等待
# 这里简单处理:如果队列满则跳过该客户端
queue.put_nowait(sse_message)
except asyncio.QueueFull:
print(f"Client {client_id} queue is full. Skipping.")
# 可选:认为客户端不活跃,标记为断开
# disconnected_clients.append(client_id)
except Exception as e:
print(f"Error broadcasting to client {client_id}: {e}")
disconnected_clients.append(client_id)
# 移除已断开或处理异常的客户端
for client_id in disconnected_clients:
print(f"Removing disconnected client: {client_id}")
del active_clients[client_id]
await asyncio.sleep(1) # 模拟广播间隔
SSE 生成器:管理单个客户端连接,从其专属队列中读取事件
async def client_event_generator(client_id: str, queue: asyncio.Queue, request: Request):
print(f”Client {client_id} connected.”)
try:
while True:
# 从客户端专属队列中获取事件,如果队列为空则等待
# get() 会等待直到队列中有项目
sse_message = await queue.get()
# 检查连接是否仍然活跃 (yield 会处理断开异常)
yield sse_message
# 通知队列该项已处理完毕
queue.task_done()
except asyncio.CancelledError:
print(f"Client {client_id} disconnected.")
except Exception as e:
print(f"An error occurred for client {client_id}: {e}")
finally:
# 客户端断开时,将其从活动客户端列表中移除
if client_id in active_clients:
print(f"Removing client {client_id} from active list.")
del active_clients[client_id]
SSE 端点:创建客户端队列并启动生成器
@app.get(“/stream”)
async def sse_endpoint(request: Request):
# 为新连接生成唯一ID和队列
client_id = str(uuid.uuid4())
client_queue = asyncio.Queue() # 每个客户端一个队列
active_clients[client_id] = client_queue
print(f”New client connected: {client_id}. Total clients: {len(active_clients)}”)
# 返回 StreamingResponse,使用为该客户端创建的生成器
return StreamingResponse(client_event_generator(client_id, client_queue, request), media_type="text/event-stream")
启动后台广播任务
@app.on_event(“startup”)
async def startup_event():
print(“Starting background broadcast task…”)
asyncio.create_task(broadcast_events())
运行应用 (使用 uvicorn)
uvicorn main:app –reload
“`
解释:
active_clients
: 一个字典,键是客户端的唯一 ID,值是专门用于该客户端的asyncio.Queue
。broadcast_events()
: 这是一个独立的异步任务,负责生成(或从外部获取)事件。它遍历active_clients
字典,并将事件放入每个客户端的队列中。client_event_generator()
: 这是每个客户端连接的 SSE 生成器。它不再自己生成数据,而是从传入的queue
中读取数据。await queue.get()
会在该客户端队列为空时等待。/stream
端点:当新的客户端连接时,为其创建一个唯一的 ID 和一个专用的队列,将其添加到active_clients
字典,然后启动client_event_generator
并将该队列传递进去。- 客户端断开时:
client_event_generator
中的try...finally
块会捕获CancelledError
并将该客户端从active_clients
中移除。broadcast_events
也会检查队列是否可写,并在异常时移除客户端。
这个广播模式解决了将同一事件发送给所有客户端的问题,并且通过每个客户端自己的队列提供了一定的背压(backpressure)控制:如果一个客户端处理消息很慢导致其队列满了,它不会影响其他客户端。
限制: 这个基于 asyncio.Queue
和全局字典的广播器只适用于单进程的 FastAPI 应用。如果使用 Gunicorn 启动多个 worker 进程,或者部署到多个服务器实例,每个进程都会有自己的 active_clients
字典,它们之间无法直接通信,广播将失效。
扩展到多进程/分布式环境
要在多进程或分布式环境中实现 SSE 广播,必须使用一个外部的、进程间/跨服务器通信的消息队列。流行的选择包括:
- Redis Pub/Sub: 轻量、高性能的发布/订阅系统,非常适合简单的广播场景。
- Kafka: 分布式流处理平台,功能强大,适合处理大量数据和更复杂的场景。
- RabbitMQ: 成熟的消息代理,支持多种消息模式。
基本模式是:
- 事件源: 将事件发布到消息队列的一个特定频道或主题。
- FastAPI 应用: 每个运行的 FastAPI 进程(或 worker)都订阅这个频道或主题。
- 广播器: 每个 FastAPI 进程中有一个协程或任务,负责从消息队列中接收事件。
- 客户端管理: 每个 FastAPI 进程维护连接到该进程的活动客户端列表(如上面的
active_clients
字典)。 - 推送: 当消息队列订阅者接收到事件时,它遍历该进程内的活动客户端列表,并将事件推送到每个客户端的队列或直接通过 SSE 连接发送。
实现这一模式需要集成相应的消息队列客户端库(如 aioredis
用于 Redis,aiokafka
用于 Kafka 等),并在 FastAPI 应用启动时建立订阅连接。
客户端实现 (JavaScript EventSource
)
浏览器端接收 SSE 非常简单,使用原生的 EventSource
API 即可:
“`javascript
// 连接到你的 SSE 端点
const eventSource = new EventSource(‘http://127.0.0.1:8000/stream’);
// 监听 “message” 事件 (默认事件类型)
eventSource.onmessage = function(event) {
console.log(‘Received message:’, event.data);
// event.data 包含服务器发送的数据 (去掉 data: 前缀和\n\n)
};
// 监听特定事件类型 (如果服务器发送了 event:
// eventSource.addEventListener(‘five_second_marker’, function(event) {
// console.log(‘Received five_second_marker:’, event.data);
// });
// 监听连接打开事件
eventSource.onopen = function(event) {
console.log(‘SSE connection opened.’);
};
// 监听错误事件 (连接中断,重连失败等)
eventSource.onerror = function(event) {
console.error(‘SSE error:’, event);
// EventSource 会自动尝试重连,除非遇到永久性错误或服务器未指定 retry 间隔
// 可以根据需要在这里手动关闭并重新创建 EventSource
// eventSource.close();
// setTimeout(() => {
// const newEventSource = new EventSource(‘http://127.0.0.1:8000/stream’);
// // 重新设置事件监听器…
// }, eventSource.retry || 3000); // 使用服务器指定的retry间隔或默认3秒
};
// 可以随时关闭连接
// eventSource.close();
“`
EventSource
对象的 readyState
属性表示连接状态:0 (CONNECTING), 1 (OPEN), 2 (CLOSED)。
实际考虑与最佳实践
- 认证与授权: SSE 端点通常需要保护。可以在建立 SSE 连接前进行标准的认证(如基于 Token 的认证),并在路由处理函数中检查认证信息。如果使用 Token,可以考虑将其作为查询参数或自定义请求头发送(注意:
EventSource
不支持发送自定义头,查询参数是更常见的做法)。 - 代理与防火墙: 某些代理服务器可能会缓冲 HTTP 响应或关闭长时间不活动的连接。确保你的代理(如 Nginx)配置正确,以支持 SSE 长连接(关闭缓冲)。
- 心跳机制: 虽然 SSE 标准本身没有内置心跳,但长时间连接可能会被中间网络设备关闭。可以考虑定期发送一个空的
data: \n\n
或event: heartbeat\ndata: \n\n
消息作为心跳,保持连接活跃。 - 错误处理与日志: 在生成器中捕获异常(特别是
CancelledError
和ConnectionClosedError
,取决于 ASGI 服务器),并记录有用的信息,如哪个客户端断开、断开原因等。 - 资源管理: 当有大量客户端连接时,每个连接都会消耗服务器资源(内存、文件描述符)。设计时要考虑如何高效管理这些资源,并在客户端断开时正确清理。使用消息队列是扩展性的关键。
- 客户端重连: 浏览器
EventSource
默认会自动重连,并遵循retry
字段。确保你的服务器在客户端断开后能正确处理Last-Event-ID
头,从中断处继续发送(如果你的应用需要此功能)。 - 性能: SSE 的开销相对较小,但在连接数非常多时(几万、几十万),总的连接管理开销可能会变大。对于极高并发、低延迟且需要双向通信的场景,WebSockets 可能更合适。
- 使用 Uvicorn/Gunicorn: 使用支持 ASGI 的服务器(如 Uvicorn 或 Gunicorn 配合 Uvicorn worker)来运行 FastAPI 应用,它们能够正确处理异步生成器和长连接。
SSE vs WebSockets
特性 | SSE (Server-Sent Events) | WebSockets |
---|---|---|
通信方向 | 单向(服务器 -> 客户端) | 双向 |
协议 | 基于 HTTP/1.1 长连接 | 独立的 WS 协议 (ws:// 或 wss://) |
复杂性 | 简单,基于 HTTP,浏览器原生 EventSource |
相对复杂,需要握手,通常需要库,更底层的 API |
数据格式 | 文本 (text/event-stream ) |
文本或二进制 |
自动重连 | 内置于 EventSource 客户端 API |
需要手动实现 |
Use Case | 服务器推送更新,通知,数据流(只读) | 需要频繁双向通信,聊天,游戏,协作工具 |
HTTP 兼容性 | 更好,利用现有 HTTP 基础设施(但需注意代理) | 需要升级握手,对代理/防火墙要求更高 |
浏览器支持 | 广泛支持(除旧版 IE) | 现代浏览器普遍支持 |
性能 | 每连接开销相对 WebSocket 稍高一点(HTTP 头) | 每连接开销通常更低 |
选择 SSE 还是 WebSockets 取决于你的具体需求。如果你只需要服务器向客户端推送数据,并且对延迟要求不是极致低,SSE 通常是更简单、更合适的选择。如果需要双向通信、传输二进制数据或极低延迟,WebSockets 是更好的选择。
结论
Server-Sent Events 为 Web 应用提供了高效、简洁的服务器推送能力。结合 FastAPI 强大的异步特性和简洁的 API 设计,实现 SSE 端点变得非常容易。
从基础的 StreamingResponse
和异步生成器,到使用 asyncio.Queue
实现单机广播,再到理解如何通过消息队列扩展到分布式环境,FastAPI 提供了构建各种规模实时推送应用的良好基础。
通过本文的指南,你应该能够理解 SSE 的原理,掌握在 FastAPI 中实现 SSE 的核心技术,并能根据需求选择合适的事件生成和广播策略,从而构建出响应迅速、用户体验更好的实时应用。记住,对于生产环境,尤其是需要处理大量并发连接或分布式部署时,集成一个外部的消息队列是实现可靠和可伸缩 SSE 推送的关键。