FastAPI SSE (Server-Sent Events) 快速入门与深度指南
在现代 Web 应用中,实时数据更新已成为常态。无论是股票报价、聊天消息、通知推送,还是长时间运行任务的进度报告,客户端都需要能够接收服务器主动发送的数据,而无需频繁地向服务器轮询。实现这一目标的几种常见技术包括 WebSockets 和 Server-Sent Events (SSE)。
本文将专注于 Server-Sent Events (SSE) 技术,并详细探讨如何在高性能的 Python Web 框架 FastAPI 中高效地实现和使用 SSE。我们将从基本概念入手,逐步深入到更复杂的应用场景和最佳实践。
文章大纲:
-
引言:实时通信的需求与 SSE 的地位
- 为什么需要实时通信?
- 常见的实时通信技术概览 (Polling, Long Polling, WebSockets, SSE)
- SSE 的优势与适用场景
-
SSE 工作原理详解
- HTTP 连接的复用
text/event-stream
MIME 类型- SSE 数据格式 (
data
,event
,id
,retry
) - 浏览器
EventSource
API
-
为什么选择 FastAPI 实现 SSE?
- FastAPI 的异步能力 (
asyncio
) StreamingResponse
的妙用- Python 生成器与异步生成器
- FastAPI 的其他优势 (依赖注入、数据验证等)
- FastAPI 的异步能力 (
-
FastAPI SSE 快速入门:构建第一个 SSE Endpoint
- 安装必要的库
- 创建 FastAPI 应用
- 定义一个简单的 SSE 响应
- 客户端如何连接和接收数据 (JavaScript
EventSource
) - 运行与测试
-
深入理解与进阶:构建更完善的 SSE 流
- 发送结构化数据 (JSON)
- 使用不同的事件类型 (
event:
) - 为事件添加唯一标识符 (
id:
) - 控制客户端重连间隔 (
retry:
) - 将 SSE 逻辑封装到生成器函数中
-
SSE 应用场景示例:
- 实时时间流: 持续发送当前时间
- 任务进度报告: 服务器执行耗时任务时,向客户端报告进度
- 简单通知系统: 服务器触发事件时向客户端发送通知
-
更复杂的 SSE 应用:基于队列实现广播
- 使用
asyncio.Queue
管理消息 - 构建一个简单的广播机制 (多个客户端接收同一消息)
- 处理客户端连接与断开
- 生产者-消费者模型在 SSE 中的应用
- 使用
-
错误处理与客户端断开
- 服务器端如何检测客户端断开
- 处理
ClientDisconnect
异常 - 客户端自动重连与服务器端的配合
-
性能与可伸缩性考虑
- 保持连接的开销
- 长连接与代理/负载均衡
- 如何实现大规模广播 (外部消息队列如 Redis Pub/Sub)
- Keep-Alive 机制
-
安全性
- SSE Endpoint 的认证与授权
- 数据加密
-
SSE vs. WebSockets:何时选择哪个?
- 单向 vs. 双向通信
- 协议复杂度与浏览器支持
- 头部开销
-
最佳实践与总结
- 使用异步生成器
- 清晰地格式化 SSE 数据
- 妥善处理断开连接
- 考虑 keep-alive
- 规划可伸缩性
1. 引言:实时通信的需求与 SSE 的地位
在 Web 1.0 和早期的 Web 2.0 时代,客户端获取服务器数据的基本方式是请求-响应模式。浏览器发送 HTTP 请求,服务器返回数据。要获取最新数据,客户端需要不断地发起新的请求,也就是轮询 (Polling)。这种方式简单直接,但效率低下,尤其是在数据更新不频繁或者需要非常低延迟的场景下,会产生大量不必要的 HTTP 请求和服务器负载。
为了解决轮询的效率问题,出现了一些改进技术:
- 长轮询 (Long Polling): 客户端发起请求后,如果服务器没有新数据,则 удерживает(hold)住连接,直到有新数据或者超时才响应。客户端收到响应后立即发起新的请求。这减少了空闲请求的数量,但仍然是请求-响应模式,且实现相对复杂。
- WebSockets: 提供一个全双工(双向)通信通道,允许服务器和客户端随时互相发送数据。基于 TCP,有自己的协议 (
ws://
或wss://
)。功能强大,适用于需要频繁双向通信的场景(如在线游戏、实时协作应用)。但协议相对复杂,实现和维护成本较高,且对于只需要服务器向客户端推送数据的场景,可能显得“过重”。 - Server-Sent Events (SSE): 允许服务器通过一条持久的 HTTP 连接向客户端推送数据。它基于 HTTP/1.1 或 HTTP/2 协议,使用
text/event-stream
MIME 类型。它是一个单向通信技术,只允许服务器向客户端发送数据。
SSE 的优势与适用场景:
- 简单性: 基于标准的 HTTP 协议,易于理解和实现。客户端使用内置的
EventSource
JavaScript API,无需引入第三方库(对于现代浏览器)。 - 高效: 利用单个持久连接,避免了轮询的头部开销和连接建立/关闭开销。
- 内置重连机制: 浏览器
EventSource
自动处理连接断开后的重连,开发者无需手动编写复杂的重连逻辑。 - 适用场景: 非常适合那些只需要服务器向客户端单向推送数据的场景,例如:
- 实时报价(股票、加密货币)
- 新闻/社交媒体动态推送
- 通知系统
- 长时间运行任务的进度条或日志输出
- 仪表盘实时数据更新
FastAPI 凭借其优秀的异步支持和简洁的 API 设计,非常适合实现 SSE endpoint。
2. SSE 工作原理详解
SSE 的核心在于服务器使用一个特殊的 HTTP 响应。
- HTTP 连接: 客户端通过标准的 HTTP/GET 请求连接到服务器。
- 响应头部: 服务器返回一个响应,其中包含
Content-Type: text/event-stream
头部。这个头部告诉浏览器,服务器将通过这个连接持续发送一系列事件流,而不是一个普通的静态文件或 API 响应。 - 持久连接: 服务器不会立即关闭连接,而是保持连接开放。
- 数据格式: 服务器通过这个开放的连接发送特定格式的文本数据块。每个数据块代表一个“事件”。事件数据块之间用两个换行符 (
\n\n
) 分隔。
SSE 数据格式:
每个事件数据块由一系列以换行符 (\n
) 结尾的行组成。常见的行类型有:
data: [数据]
:包含要发送的数据。如果数据跨越多行,可以在每行前都加上data:
。event: [事件名称]
:指定事件的类型。客户端可以使用EventSource.addEventListener()
监听特定类型的事件。如果省略,则使用默认的message
事件。id: [唯一标识符]
:为事件设置一个唯一 ID。浏览器会在重连时将最后一个接收到的 ID 发送给服务器(通过Last-Event-ID
头部),服务器可以据此决定从哪里恢复数据流。retry: [毫秒数]
:指定客户端在连接断开后,等待多少毫秒再尝试重连。
示例事件格式:
“`
data: 这是第一行数据
data: 这是第二行数据
event: 通知
id: 12345
retry: 5000
data: 这是另一个事件的数据
event: 更新
data: 这是一个简单的消息
“`
注意:每个事件块都以 \n\n
结束。空行表示事件块的结束。以冒号 :
开头的行会被视为注释,会被浏览器忽略,常用于发送心跳包防止连接超时。
浏览器 EventSource
API:
现代浏览器内置了 EventSource
对象,用于接收 SSE 流。
“`javascript
const eventSource = new EventSource(‘/my-sse-endpoint’);
// 监听默认的 ‘message’ 事件
eventSource.onmessage = function(event) {
console.log(‘Received message:’, event.data);
};
// 监听特定类型的事件 (例如上面例子中的 ‘通知’)
eventSource.addEventListener(‘通知’, function(event) {
console.log(‘Received notification:’, event.data, ‘ID:’, event.lastEventId);
});
// 监听错误
eventSource.onerror = function(err) {
console.error(‘EventSource failed:’, err);
// 浏览器会自动尝试重连,除非服务器返回特定的 HTTP 状态码或头部
};
// 监听连接打开
eventSource.onopen = function(event) {
console.log(‘SSE connection opened’);
};
“`
EventSource
会自动处理连接断开后的重连,并且在重连请求中包含 Last-Event-ID
头部,发送上一个成功接收的事件 ID(如果事件带有 ID 的话)。这是 SSE 一个非常方便的特性。
3. 为什么选择 FastAPI 实现 SSE?
FastAPI 是一个用于构建 API 的现代、快速 (高性能) 的 Web 框架,基于标准 Python 类型提示。它与 Starlette 和 Pydantic 紧密集成。以下是 FastAPI 特别适合实现 SSE 的原因:
- 基于 ASGI: FastAPI 构建在 ASGI (Asynchronous Server Gateway Interface) 之上,原生支持异步操作。SSE 是一个典型的异步场景,需要在不阻塞主事件循环的情况下保持连接并发送数据。FastAPI 的
async def
和await
语法使得编写异步代码非常自然。 StreamingResponse
: Starlette(FastAPI 的底层)提供了一个StreamingResponse
类,专门用于流式传输响应。它接受一个生成器或异步生成器作为内容,可以非常方便地实现 SSE 这种需要连续发送数据的响应类型。- Python 生成器/异步生成器: Python 的生成器 (使用
yield
关键字) 和异步生成器 (使用async def
和yield
) 是实现 SSE 流的理想工具。它们允许你在需要发送数据时“暂停”函数执行,并在需要时继续,这完美契合了流式发送数据的模式。FastAPI 的StreamingResponse
可以直接消费这些生成器。 - 简洁的代码: FastAPI 的装饰器
@app.get()
,@app.post()
等使得定义 API endpoint 非常直观。结合类型提示和 Pydantic,即使是异步 streaming endpoint 的定义也十分清晰。 - 依赖注入系统: 如果你的 SSE endpoint 需要访问数据库连接、当前认证用户或其他资源,FastAPI 的依赖注入系统可以像处理普通请求一样方便地处理它们。
总而言之,FastAPI 的异步设计和对流式响应的良好支持,使得实现高性能的 SSE 服务变得简单高效。
4. FastAPI SSE 快速入门:构建第一个 SSE Endpoint
让我们从一个最简单的例子开始:一个不断向客户端发送当前时间的 SSE endpoint。
前提条件:
- Python 3.7+
- 安装 FastAPI 和 Uvicorn(ASGI 服务器):
bash
pip install fastapi uvicorn[standard]
代码实现:
创建一个 main.py
文件:
“`python
main.py
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import datetime
app = FastAPI()
这是一个异步生成器函数,将用于生成 SSE 事件流
async def event_stream():
“””
一个异步生成器,每秒产出一个包含当前时间的 SSE 事件。
“””
while True:
# 获取当前时间
now = datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S”)
data = f”当前时间: {now}”
# 构造 SSE 数据格式: data: [数据内容]\n\n
# 每个事件必须以 data: 开头,并以双换行符 \n\n 结束
sse_message = f"data: {data}\n\n"
# 通过 yield 发送数据块
yield sse_message
# 等待一秒
await asyncio.sleep(1)
@app.get(“/time-stream”)
async def time_stream():
“””
SSE Endpoint,返回一个持续发送当前时间的事件流。
“””
# 使用 StreamingResponse,指定 content 为异步生成器,media_type 为 text/event-stream
return StreamingResponse(event_stream(), media_type=”text/event-stream”)
— 客户端文件 (可选,用于测试) —
你可以将下面的 HTML/JavaScript 代码保存为 index.html 在浏览器中打开测试
注意:直接打开本地文件可能存在跨域问题,最好通过一个简单的静态文件服务器提供
例如: python -m http.server 8000
或者将 index.html 放在 FastAPI 项目的 static 目录下并通过 FastAPI 提供静态文件服务
from fastapi.staticfiles import StaticFiles
app.mount(“/static”, StaticFiles(directory=”static”), name=”static”)
然后将 index.html 放在 static 目录
“`
客户端代码 (index.html
):
“`html
FastAPI SSE 时间流示例
“`
运行服务器:
在终端中,导航到 main.py
文件所在的目录,运行 Uvicorn:
bash
uvicorn main:app --reload
服务器将在 http://127.0.0.1:8000
启动。
测试:
- 使用浏览器测试: 将
index.html
文件放在一个简单的 HTTP 服务器下(例如使用python -m http.server
在index.html
所在的目录启动一个服务器,然后通过http://localhost:8000/index.html
访问),或者通过 FastAPI 自身提供静态文件服务。打开index.html
,你应该能看到页面上的时间每秒更新一次。 - 使用
curl
测试: SSE 是基于 HTTP 的,也可以使用curl
来查看原始流数据:
bash
curl -N http://127.0.0.1:8000/time-stream
-N
选项是为了防止curl
缓冲输出,让它在接收到数据时立即显示。你会看到每秒输出一个data: ...\n\n
格式的数据块。
代码解释:
async def event_stream():
定义了一个异步生成器函数。async def
表明它是一个协程,可以使用await
。yield
关键字使其成为一个生成器。while True:
创建一个无限循环,持续生成数据。await asyncio.sleep(1)
:在生成数据后暂停协程1秒,而不是阻塞整个进程。这是异步编程的关键。sse_message = f"data: {data}\n\n"
:严格按照 SSE 格式构造消息。data:
是必须的前缀,\n\n
是每个事件块的终止符。yield sse_message
: 将构造好的 SSE 格式字符串发送给StreamingResponse
。StreamingResponse
会将其写入 HTTP 响应体并发送给客户端。@app.get("/time-stream")
: 定义一个 GET 请求的 endpoint。return StreamingResponse(event_stream(), media_type="text/event-stream")
: 创建并返回一个StreamingResponse
对象。- 第一个参数是生成器(或异步生成器)函数。
StreamingResponse
会不断从这个生成器中获取数据块并发送。 media_type="text/event-stream"
是 SSE 必需的头部。
- 第一个参数是生成器(或异步生成器)函数。
这个例子展示了 FastAPi 实现 SSE 的核心:一个返回 StreamingResponse
的 endpoint,其内容是一个不断 yield
出符合 SSE 格式字符串的异步生成器。
5. 深入理解与进阶:构建更完善的 SSE 流
刚才的例子只使用了 data:
字段。更完善的 SSE 流可以使用 event:
, id:
, retry:
等字段,并且可以发送更复杂的数据结构,比如 JSON。
发送结构化数据 (JSON):
虽然 SSE 数据是文本格式,但通常我们会将 JSON 字符串放在 data:
后面,以便客户端轻松解析。
“`python
import asyncio
import json # 导入 json 库
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import datetime
app = FastAPI()
async def complex_event_stream():
“””
一个异步生成器,发送包含 JSON 数据、事件类型和 ID 的 SSE 事件。
“””
counter = 0
while True:
counter += 1
# 准备要发送的结构化数据 (Python 字典)
data_payload = {
“timestamp”: datetime.datetime.now().isoformat(),
“sequence”: counter,
“message”: f”这是一个序列 {counter} 的更新”,
“status”: “processing” if counter % 5 != 0 else “completed”
}
# 将 Python 字典转换为 JSON 字符串
json_data = json.dumps(data_payload)
# 构造 SSE 数据块
# 可以包含 id:, event:, data: 等行
# 注意每行后面都要有 \n,整个数据块以 \n\n 结束
sse_message = (
f"id: {counter}\n" # 添加事件 ID
f"event: update\n" # 添加事件类型
f"data: {json_data}\n\n" # 添加 JSON 数据
)
print(f"Sending event:\n{sse_message}") # 服务器端打印发送的事件
yield sse_message
await asyncio.sleep(2) # 每2秒发送一次
@app.get(“/complex-stream”)
async def complex_stream():
“””
SSE Endpoint,返回一个发送更复杂事件流。
“””
return StreamingResponse(complex_event_stream(), media_type=”text/event-stream”)
“`
客户端代码 (index.html
部分更新):
“`html
FastAPI SSE 复杂流示例
“`
代码解释:
import json
: 导入json
库用于将 Python 字典转换为 JSON 字符串。json_data = json.dumps(data_payload)
: 将 Python 字典序列化为 JSON 格式的字符串。sse_message = (f"id: {counter}\n" ... f"data: {json_data}\n\n")
: 构建更复杂的 SSE 消息字符串。使用了多行字符串,方便阅读。注意每行末尾的\n
以及整个消息块末尾的\n\n
。event: update
: 指定了事件类型为update
。客户端通过eventSource.addEventListener('update', ...)
来监听这种类型的事件。id: {counter}
: 为每个事件设置一个唯一的 ID。浏览器会在重连时发送这个 ID。- 客户端变化:
- 使用了
eventSource.addEventListener('update', ...)
来专门处理服务器发送的event: update
类型的消息。 - 在事件处理函数中,使用
JSON.parse(event.data)
将接收到的 JSON 字符串解析为 JavaScript 对象,然后可以方便地访问其属性。 event.lastEventId
可以获取当前事件的 ID。event.type
可以获取当前事件的类型(对应服务器发送的event:
值)。
- 使用了
这个例子展示了如何发送带有类型和 ID 的 JSON 数据,使得 SSE 流更加灵活和有用。
6. SSE 应用场景示例
除了上面的时间流和复杂更新流,SSE 还广泛应用于其他场景。
6.1 实时时间流 (已在快速入门中实现)
这是最基本的例子,适用于任何需要定期推送少量数据的场景。
6.2 任务进度报告
当服务器启动一个耗时任务(如文件处理、数据分析、模型训练等),可以通过 SSE 向客户端实时报告进度。
“`python
import asyncio
import time
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
存储当前活动任务的进度信息(这里用字典模拟,实际可能需要更复杂的结构)
注意:如果应用需要多进程或分布式部署,这个状态不能直接存在内存中,需要外部存储(如 Redis)
task_progress = {} # task_id: current_step
async def progress_stream(task_id: str):
“””
为特定任务 ID 生成进度更新的 SSE 事件流。
“””
# 发送初始状态
yield f”data: {json.dumps({‘status’: ‘started’, ‘task_id’: task_id})}\n\n”
while True:
if task_id not in task_progress:
# 任务完成或不存在
yield f"data: {json.dumps({'status': 'completed', 'task_id': task_id, 'message': 'Task finished or not found'})}\n\n"
break # 任务完成,停止流
current_step = task_progress[task_id]
# 假设任务总共10步
total_steps = 10
progress_percentage = (current_step / total_steps) * 100
data_payload = {
"task_id": task_id,
"status": "in_progress",
"step": current_step,
"total_steps": total_steps,
"progress": round(progress_percentage, 2)
}
sse_message = f"event: progress_update\ndata: {json.dumps(data_payload)}\n\n"
yield sse_message
# 如果进度达到100%,也停止流
if current_step >= total_steps:
yield f"data: {json.dumps({'status': 'completed', 'task_id': task_id, 'message': 'Task finished successfully'})}\n\n"
del task_progress[task_id] # 从字典中移除已完成任务
break
await asyncio.sleep(0.5) # 每0.5秒检查并发送一次更新
模拟一个后台任务
async def run_heavy_task(task_id: str):
“””
模拟一个耗时任务,更新 global 的 task_progress。
“””
print(f”Task {task_id} started”)
total_steps = 10
task_progress[task_id] = 0 # 初始化进度
for i in range(1, total_steps + 1):
await asyncio.sleep(1) # 模拟每一步的耗时
task_progress[task_id] = i # 更新进度
print(f"Task {task_id} step {i}/{total_steps}")
# 最后一步完成后, progress_stream 会检测到并发送 completed 事件然后退出
# 或者你可以在这里发送一个最终的 completed 消息,然后等待 stream 自然退出
# 如果 stream 退出,客户端会自动重连,这通常不是我们想要的。
# 更好的方法是让 stream 检测到状态变化并退出。
# await asyncio.sleep(0.1) # 给 stream 发送最后一条消息的时间
# del task_progress[task_id] # 任务完成后清理状态,让 stream 退出
@app.post(“/start-task/{task_id}”)
async def start_task(task_id: str, background_tasks: BackgroundTasks):
“””
启动一个后台任务。
“””
if task_id in task_progress:
return {“message”: f”Task {task_id} is already running.”}
background_tasks.add_task(run_heavy_task, task_id)
return {"message": f"Task {task_id} started in background. Connect to /task-progress/{task_id} for updates."}
@app.get(“/task-progress/{task_id}”)
async def get_task_progress_stream(task_id: str):
“””
为特定任务 ID 提供 SSE 进度流。
“””
if task_id not in task_progress:
# 如果任务不存在或已完成,立即返回一个completed消息并结束流
async def immediate_completion_stream():
yield f”data: {json.dumps({‘status’: ‘completed’, ‘task_id’: task_id, ‘message’: ‘Task not running or already finished’})}\n\n”
# 注意:这里立即结束生成器
return StreamingResponse(immediate_completion_stream(), media_type="text/event-stream")
# 如果任务正在运行,返回进度流
return StreamingResponse(progress_stream(task_id), media_type="text/event-stream")
“`
使用方式:
- 向
/start-task/{some_task_id}
发送 POST 请求,启动任务。 - 客户端连接到
/task-progress/{some_task_id}
endpoint,接收进度更新。
客户端处理: 客户端需要监听 progress_update
事件,并解析 JSON 数据来更新进度条或显示日志。
这个例子中,我们使用了 FastAPI 的 BackgroundTasks
来启动后台任务,并在 SSE 流生成器中通过检查一个共享状态(task_progress
字典)来报告进度。
注意: 直接在内存中维护 task_progress
字典只适用于单进程单线程的简单场景。在生产环境中,特别是需要处理多个并发任务或部署到多进程/多服务器环境时,必须使用外部存储(如 Redis、数据库)来共享任务状态和进度信息。
6.3 简单通知系统
可以构建一个简单的 Pub/Sub 系统,服务器在有新通知时发布,所有连接的客户端接收。这个场景与下面的基于队列的广播非常相似。
7. 更复杂的 SSE 应用:基于队列实现广播
一个常见的 SSE 用例是将特定事件广播给所有(或部分)连接的客户端。例如,一个聊天应用的新消息通知,或者一个管理后台的系统级事件通知。
FastAPI 结合 asyncio.Queue
是实现这一目标的一种优雅方式。我们可以为每个连接的客户端创建一个独立的队列。当有新消息需要广播时,生产者将消息放入所有活动客户端的队列中。每个 SSE 生成器(对应一个客户端连接)则从其专属队列中读取消息并发送。
“`python
import asyncio
import json
import uuid # 用于生成唯一的客户端 ID
from typing import Set, Dict
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import StreamingResponse
from fastapi.templating import Jinja2Templates # 如果需要提供 HTML 页面
from fastapi.staticfiles import StaticFiles
app = FastAPI()
存储所有活跃客户端的队列
Key: 客户端 ID (str)
Value: asyncio.Queue
connected_clients: Dict[str, asyncio.Queue] = {}
简单的广播函数
async def broadcast_message(message: str):
“””
将消息广播给所有连接的客户端。
“””
# 注意:在实际应用中,这里可能需要处理连接断开的情况,
# 比如将发送失败的客户端从 connected_clients 中移除。
# 但 asyncio.Queue put_nowait 通常不会失败,除非队列满(这里队列是无限大小)。
# 真正的断开处理会在 stream_for_client 中完成。
for client_id, queue in list(connected_clients.items()): # 遍历时复制列表,避免在迭代过程中修改
print(f”Broadcasting to client {client_id}”)
try:
await queue.put(message) # 异步放入队列
except Exception as e:
print(f”Error putting message to client {client_id} queue: {e}”)
# 这里的错误可能意味着队列已经被关闭,说明客户端已经断开或即将断开
# 清理工作将在 stream_for_client 中执行
async def stream_for_client(client_id: str, client_queue: asyncio.Queue):
“””
为特定客户端生成 SSE 事件流,从其队列中读取消息。
“””
print(f”Client {client_id} connected, starting stream…”)
try:
# 可选:发送一个连接成功的消息
yield f”data: {json.dumps({‘status’: ‘connected’, ‘client_id’: client_id})}\n\n”
while True:
# 从队列中获取消息,如果队列为空则等待
message = await client_queue.get()
print(f"Client {client_id} received message from queue: {message}")
# 构造 SSE 格式消息
# 可以在这里根据 message 内容设置不同的 event: 或 id:
sse_message = f"data: {message}\n\n"
yield sse_message
# 标记任务完成,释放队列中的项(如果不调用,且队列项数大于最大值,get_nowait 会抛异常)
# 对于无限大小的队列,不是必须的,但养成习惯更好
# client_queue.task_done() # 如果使用了 join(),需要调用 task_done()
except asyncio.CancelledError:
# 当客户端断开时,协程会被取消
print(f"Client {client_id} stream cancelled (disconnected).")
except Exception as e:
print(f"Client {client_id} stream encountered error: {e}")
finally:
# 客户端断开或流结束时执行清理
print(f"Client {client_id} disconnected. Cleaning up.")
if client_id in connected_clients:
# 关闭队列,防止 broadcast_message 继续往里放
client_queue.put_nowait(None) # 放入一个 None 作为结束标志,让 stream 退出 while True
# 或者直接关闭队列 (可能导致还在等待的 get() 抛出 asyncio.CancelledError 或 RuntimeError)
# client_queue.close()
del connected_clients[client_id] # 从活跃客户端列表中移除
print(f"Current connected clients: {len(connected_clients)}")
@app.get(“/stream”)
async def sse_stream():
“””
SSE Endpoint,为每个连接的客户端创建一个 SSE 流。
“””
client_id = str(uuid.uuid4()) # 为新连接生成唯一 ID
client_queue = asyncio.Queue() # 为该客户端创建队列
connected_clients[client_id] = client_queue # 加入活跃客户端列表
print(f"New client connected: {client_id}. Total clients: {len(connected_clients)}")
# 返回 StreamingResponse,使用 stream_for_client 异步生成器
# 注意:stream_for_client 协程会在客户端连接期间一直运行
return StreamingResponse(stream_for_client(client_id, client_queue), media_type="text/event-stream")
示例:一个 endpoint 用于触发广播消息
@app.post(“/send-message”)
async def send_message(message: str):
“””
一个示例 endpoint,用于触发向所有客户端广播消息。
“””
print(f”Received message to broadcast: {message}”)
# 在后台协程中执行广播,避免阻塞当前请求响应
asyncio.create_task(broadcast_message(message))
return {“status”: “broadcasting”, “message”: message}
可以添加一个简单的 HTML 页面来测试广播
templates = Jinja2Templates(directory=”templates”)
app.mount(“/static”, StaticFiles(directory=”static”), name=”static”) # 如果需要提供静态文件 (如js, css)
@app.get(“/”)
async def read_root(request: Request):
return templates.TemplateResponse(“index.html”, {“request”: request})
templates/index.html (示例)
“””
FastAPI SSE 广播示例
“””
“`
代码解释:
connected_clients: Dict[str, asyncio.Queue]
: 一个字典,维护所有当前连接的客户端及其对应的asyncio.Queue
。字典的键是为每个客户端生成的唯一 ID。asyncio.Queue()
: 为每个新连接的客户端创建一个无限大小的队列。connected_clients[client_id] = client_queue
: 将新客户端及其队列添加到字典中。async def stream_for_client(...)
: 这是核心的 SSE 生成器。它接收客户端 ID 和其队列作为参数。await client_queue.get()
: 这是一个阻塞操作,它会暂停当前协程,直到队列中有新的项。一旦有数据,它就会获取该项。yield sse_message
: 将从队列中获取并格式化好的消息发送给客户端。except asyncio.CancelledError:
: 这是处理客户端断开连接的关键。当客户端关闭连接时,FastAPI/Starlette 会取消对应的协程,触发asyncio.CancelledError
异常。finally:
块:无论协程是正常完成、取消还是发生其他异常,finally
块都会执行。这里用于从connected_clients
字典中移除已断开的客户端及其队列,进行清理。
async def broadcast_message(message: str)
: 这个函数遍历connected_clients
字典,并将消息put
到每个客户端的队列中。asyncio.create_task(broadcast_message(message))
: 在/send-message
endpoint 中,我们使用asyncio.create_task
创建一个 新的 协程来执行广播任务。这样做是为了避免send-message
endpoint 因为等待广播完成而长时间阻塞响应。广播是异步进行的。/send-message
endpoint 是一个普通的 FastAPI endpoint,用于接收来自任何地方(例如另一个服务或客户端的 POST 请求)的消息,然后触发广播。- 客户端连接到
/stream
endpoint,服务器会为它创建一个 SSE 流并将其添加到广播列表中。 - 通过
/send-message
发送的消息会被推送到所有/stream
连接的客户端。
这个例子展示了如何构建一个基于队列的广播系统,是实现实时通知、聊天室等功能的良好起点。在生产环境中,如果需要跨多个服务器实例进行广播,connected_clients
字典和 broadcast_message
函数就需要与外部消息代理(如 Redis Pub/Sub, Kafka, RabbitMQ 等)集成。
8. 错误处理与客户端断开
处理客户端断开连接是构建稳定 SSE 服务的重要一环。如前所述,FastAPI/Starlette 在客户端断开时会取消对应的协程,导致 asyncio.CancelledError
。我们需要在生成器中使用 try...except asyncio.CancelledError...finally
结构来捕获取消事件,并在 finally
块中执行清理工作(例如从活跃客户端列表中移除)。
“`python
在 stream_for_client 函数中:
async def stream_for_client(client_id: str, client_queue: asyncio.Queue):
print(f”Client {client_id} connected, starting stream…”)
try:
# … 发送初始消息 …
while True:
message = await client_queue.get()
if message is None: # 约定 None 作为队列结束标志
print(f”Client {client_id} queue received None, closing stream.”)
break # 退出循环,进入 finally
# ... 处理并格式化消息 ...
yield sse_message
except asyncio.CancelledError:
# 客户端断开时触发
print(f"Client {client_id} stream cancelled (disconnected).")
except Exception as e:
# 处理生成器内部的其他异常
print(f"Client {client_id} stream encountered error: {e}")
# 可以选择发送一个错误事件给客户端,或者直接退出流
yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
finally:
# 清理工作,无论如何都会执行
print(f"Client {client_id} disconnected. Cleaning up.")
if client_id in connected_clients:
# 确保队列被标记为完成或关闭,避免泄露
# 如果队列还在 connected_clients 里,但 stream 已经退出,
# broadcast_message 可能会尝试往已关闭的队列里放东西
# 更好的做法是在这里移除
del connected_clients[client_id]
# 关闭队列或放入 None,确保如果有其他地方还在等 get() 会退出
try:
client_queue.put_nowait(None)
except: # 如果队列已经关闭会抛错,忽略
pass
print(f"Current connected clients: {len(connected_clients)}")
在 broadcast_message 函数中,为了更健壮,可以在put失败时考虑移除客户端
async def broadcast_message(message: str):
disconnected_clients = []
for client_id, queue in connected_clients.items():
try:
# put_nowait 尝试非阻塞放入,如果队列已满则抛出 asyncio.QueueFull
# 但我们用的是无限大小队列,所以主要考虑 put 可能在队列关闭后失败
# await queue.put(message) # 异步放入
queue.put_nowait(message) # 非阻塞放入
except Exception:
# 如果放入失败,说明客户端可能已经断开或即将断开
disconnected_clients.append(client_id)
for client_id in disconnected_clients:
print(f”Removing disconnected client {client_id} during broadcast.”)
# 在 stream_for_client 的 finally 里也会移除,这里是为了尽快清理
# del connected_clients[client_id] # 小心并发修改字典问题,这里简单的列表移除再删除是安全的
# 更安全的做法是让 stream 自己清理,广播失败只是一个信号
“`
客户端自动重连:
如前所述,EventSource
默认会自动处理网络中断后的重连。如果服务器希望控制重连行为,可以发送 retry: [毫秒数]
。如果服务器希望客户端 不 重连,可以:
- 返回 HTTP 204 No Content 状态码。
- 在响应头部中包含
Connection: close
。 - 直接关闭连接而不发送
\n\n
结束最后一个事件。
通常情况下,依赖 EventSource
的自动重连和服务器端的清理(当检测到连接取消时)就足够了。
9. 性能与可伸缩性考虑
SSE 相比轮询有更好的性能,因为它维护的是长连接。但长连接本身也有开销:
- 内存开销: 每个连接都需要服务器维护一些状态(如套接字、缓冲区、可能还有我们示例中的队列等)。连接数越多,内存占用越高。
- 服务器并发连接限制: 操作系统和服务器软件(如 uvicorn)对并发连接数有限制。
- 代理与负载均衡: HTTP/1.1 下,大多数代理和负载均衡器默认可能会对空闲的长连接设置超时。如果连接长时间没有数据发送,代理可能会断开它。HTTP/2 对多路复用有更好支持,可能缓解部分问题。
可伸缩性:
对于单服务器应用,基于内存队列的广播是可行的。但如果需要部署到多个服务器实例,内存中的 connected_clients
字典无法共享。这时,必须引入外部消息代理:
- Redis Pub/Sub: 一个常用的轻量级选择。所有 FastAPI 实例连接到同一个 Redis 服务器。客户端连接到 任意 一个 FastAPI 实例的 SSE endpoint。当需要广播时,任何一个 FastAPI 实例都可以向 Redis 特定频道发布消息。所有连接到 Redis 并订阅该频道的 FastAPI 实例都会收到消息,然后将消息推送到它们各自维护的连接客户端队列中。
- Kafka, RabbitMQ 等: 更重量级、功能更丰富的消息队列,适用于需要更高吞吐量、持久性、复杂路由或消费者组的场景。
Keep-Alive 机制:
为了防止代理/负载均衡器因为连接空闲而断开,服务器应该定期发送心跳包。SSE 规范中,以冒号 :
开头的行是注释行,浏览器会忽略,但能保持连接活跃。
“`python
在 stream_for_client 或任何 SSE 生成器中
async def event_stream_with_keepalive():
while True:
# 发送真实数据…
yield f”data: some data\n\n”
# 定期发送心跳 (例如每 15 秒)
await asyncio.sleep(10) # 假设数据发送间隔小于10秒
yield ":keepalive\n\n" # 发送一个注释行作为心跳包
await asyncio.sleep(5) # 等待剩余时间凑满15秒间隔(或者直接 await asyncio.sleep(15))
或者在 while True 循环内部,维护一个计数器或计时器,每隔 N 秒发送一次心跳
“`
在 stream_for_client
的 while True
循环中,除了获取并 yield 消息,还可以定期检查是否需要发送心跳。
“`python
async def stream_for_client_with_keepalive(client_id: str, client_queue: asyncio.Queue):
# … (之前的代码) …
last_data_time = asyncio.get_event_loop().time()
keepalive_interval = 15 # 秒
while True:
try:
# 尝试从队列中获取消息,设置一个较短的超时时间
message = await asyncio.wait_for(client_queue.get(), timeout=keepalive_interval)
if message is None:
print(f"Client {client_id} queue received None, closing stream.")
break
# 处理并发送数据
sse_message = f"data: {message}\n\n"
yield sse_message
last_data_time = asyncio.get_event_loop().time() # 更新发送数据的时间
except asyncio.TimeoutError:
# 超时了,队列里没有新消息,发送心跳
print(f"Client {client_id} sending keepalive.")
yield ": keepalive\n\n"
except asyncio.CancelledError:
print(f"Client {client_id} stream cancelled (disconnected).")
break # 退出循环,进入 finally
except Exception as e:
print(f"Client {client_id} stream encountered error: {e}")
yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
# 可以选择 break 或继续,取决于错误类型
finally:
# ... (清理代码) ...
“`
这个带超时的 await asyncio.wait_for(client_queue.get(), timeout=...)
模式可以在等待消息的同时实现心跳功能。如果超时,说明队列里没有消息,就发送心跳。
10. 安全性
像其他任何 Web endpoint 一样,SSE endpoint 也需要考虑安全性。
-
认证与授权: 不是任何人都能连接到你的 SSE 流。使用 FastAPI 的依赖注入系统,可以在路由处理函数中添加认证和授权依赖,例如:
“`python
from fastapi import Depends, HTTPException, status
from your_auth_module import get_current_user # 假设你有一个认证函数@app.get(“/private-stream”, dependencies=[Depends(get_current_user)])
async def private_sse_stream(current_user: Any = Depends(get_current_user)):
# 只有认证用户才能连接到这个流
# 可以根据 current_user 定制化流内容
return StreamingResponse(user_specific_stream(current_user.id), media_type=”text/event-stream”)
``
wss://
这确保了只有通过验证的用户才能建立 SSE 连接。
* **数据加密:** 始终使用 HTTPS (for WebSockets,
https://for SSE) 来加密传输的数据,防止中间人攻击。在生产环境中,这一点至关重要。Uvicorn 可以配置使用 SSL 证书来启用 HTTPS。
/task-progress/{task_id}`),需要像处理普通请求一样验证这些参数,防止注入或其他恶意输入。FastAPI 和 Pydantic 提供了强大的验证能力。
* **输入验证:** 如果 SSE 流的行为取决于请求参数(如
11. SSE vs. WebSockets:何时选择哪个?
特性 | Server-Sent Events (SSE) | WebSockets |
---|---|---|
通信方向 | 单向 (服务器 -> 客户端) | 全双工 (双向) |
协议 | 基于 HTTP/1.1 或 HTTP/2 (text/event-stream ) |
独立协议 (ws:// , wss:// ) 基于 TCP |
实现复杂度 | 相对简单,基于 HTTP 和 EventSource API | 相对复杂,需要握手,通常需要客户端/服务器库 |
浏览器支持 | 内置 EventSource API,无需额外库 (IE 除外) |
内置 WebSocket API,无需额外库 |
数据格式 | 文本 (特定格式,data: , event: , etc.) |
文本或二进制 |
自动重连 | 内置,可配置 | 需要手动实现 |
头部开销 | 相比 WebSockets,每次发送数据时头部开销稍大 | 握手后头部开销很小 |
代理/负载均衡 | 需要注意长连接超时和 sticky sessions | 相对容易处理 (依赖具体实现和配置) |
适用场景 | 实时通知、进度更新、订阅、仪表盘数据等 | 聊天、在线游戏、实时协作、远程控制等 |
总结:
- 如果你只需要从服务器向客户端推送数据,并且不关心从客户端向服务器发送实时消息,SSE 通常是更简单、更轻量级的选择。
- 如果需要双向实时通信(客户端需要频繁向服务器发送实时消息),或者需要传输二进制数据,WebSockets 是更好的选择。
在 FastAPI 中实现两者都非常方便,你可以根据具体的应用需求来选择合适的技术。
12. 最佳实践与总结
构建稳定、高效的 FastAPI SSE 应用,请遵循以下最佳实践:
- 使用
StreamingResponse
和异步生成器: 这是 FastAPI 实现 SSE 的核心机制,充分利用了异步能力。 - 严格遵守 SSE 数据格式: 每条数据行以
\n
结尾,每个事件块以\n\n
结尾。使用data:
,event:
,id:
,retry:
等字段来构建有意义的事件流。 - 处理客户端断开: 在生成器中使用
try...except asyncio.CancelledError...finally
来捕获断开事件,并在finally
块中进行必要的资源清理(如移除客户端队列)。 - 发送心跳包: 定期发送注释行 (
: some comment\n\n
) 来保持连接活跃,防止代理/负载均衡器超时断开连接。 - 考虑可伸缩性: 对于需要水平扩展的应用,不要将连接状态或广播逻辑存储在内存中,而应使用外部消息代理(如 Redis Pub/Sub)。
- 实施认证与授权: 使用 FastAPI 的依赖注入保护你的 SSE endpoint,确保只有授权用户才能连接。
- 使用 HTTPS: 加密数据传输,保护用户隐私和数据安全。
- 适当的错误处理: 在生成器中捕获可能发生的异常,并决定是发送错误事件给客户端,还是优雅地关闭连接。
总结:
FastAPI 提供了强大而灵活的方式来实现 Server-Sent Events。通过利用其异步能力、StreamingResponse
和 Python 的生成器/异步生成器,开发者可以轻松构建高性能的单向实时数据推送服务。从简单的定期更新到复杂的基于队列的广播系统,FastAPI 都能提供坚实的基础。理解 SSE 的工作原理、数据格式以及如何在 FastAPI 中有效利用异步特性,将帮助你构建出满足现代 Web 应用实时需求的优秀服务。希望这篇详细指南能帮助你快速入门并深入掌握 FastAPI SSE 的开发。