FastAPI SSE (Server-Sent Events) 快速入门与深度指南
在现代 Web 应用中,实时数据更新已成为常态。无论是股票报价、聊天消息、通知推送,还是长时间运行任务的进度报告,客户端都需要能够接收服务器主动发送的数据,而无需频繁地向服务器轮询。实现这一目标的几种常见技术包括 WebSockets 和 Server-Sent Events (SSE)。
本文将专注于 Server-Sent Events (SSE) 技术,并详细探讨如何在高性能的 Python Web 框架 FastAPI 中高效地实现和使用 SSE。我们将从基本概念入手,逐步深入到更复杂的应用场景和最佳实践。
文章大纲:
- 
引言:实时通信的需求与 SSE 的地位 - 为什么需要实时通信?
- 常见的实时通信技术概览 (Polling, Long Polling, WebSockets, SSE)
- SSE 的优势与适用场景
 
- 
SSE 工作原理详解 - HTTP 连接的复用
- text/event-streamMIME 类型
- SSE 数据格式 (data,event,id,retry)
- 浏览器 EventSourceAPI
 
- 
为什么选择 FastAPI 实现 SSE? - FastAPI 的异步能力 (asyncio)
- StreamingResponse的妙用
- Python 生成器与异步生成器
- FastAPI 的其他优势 (依赖注入、数据验证等)
 
- FastAPI 的异步能力 (
- 
FastAPI SSE 快速入门:构建第一个 SSE Endpoint - 安装必要的库
- 创建 FastAPI 应用
- 定义一个简单的 SSE 响应
- 客户端如何连接和接收数据 (JavaScript EventSource)
- 运行与测试
 
- 
深入理解与进阶:构建更完善的 SSE 流 - 发送结构化数据 (JSON)
- 使用不同的事件类型 (event:)
- 为事件添加唯一标识符 (id:)
- 控制客户端重连间隔 (retry:)
- 将 SSE 逻辑封装到生成器函数中
 
- 
SSE 应用场景示例: - 实时时间流: 持续发送当前时间
- 任务进度报告: 服务器执行耗时任务时,向客户端报告进度
- 简单通知系统: 服务器触发事件时向客户端发送通知
 
- 
更复杂的 SSE 应用:基于队列实现广播 - 使用 asyncio.Queue管理消息
- 构建一个简单的广播机制 (多个客户端接收同一消息)
- 处理客户端连接与断开
- 生产者-消费者模型在 SSE 中的应用
 
- 使用 
- 
错误处理与客户端断开 - 服务器端如何检测客户端断开
- 处理 ClientDisconnect异常
- 客户端自动重连与服务器端的配合
 
- 
性能与可伸缩性考虑 - 保持连接的开销
- 长连接与代理/负载均衡
- 如何实现大规模广播 (外部消息队列如 Redis Pub/Sub)
- Keep-Alive 机制
 
- 
安全性 - SSE Endpoint 的认证与授权
- 数据加密
 
- 
SSE vs. WebSockets:何时选择哪个? - 单向 vs. 双向通信
- 协议复杂度与浏览器支持
- 头部开销
 
- 
最佳实践与总结 - 使用异步生成器
- 清晰地格式化 SSE 数据
- 妥善处理断开连接
- 考虑 keep-alive
- 规划可伸缩性
 
1. 引言:实时通信的需求与 SSE 的地位
在 Web 1.0 和早期的 Web 2.0 时代,客户端获取服务器数据的基本方式是请求-响应模式。浏览器发送 HTTP 请求,服务器返回数据。要获取最新数据,客户端需要不断地发起新的请求,也就是轮询 (Polling)。这种方式简单直接,但效率低下,尤其是在数据更新不频繁或者需要非常低延迟的场景下,会产生大量不必要的 HTTP 请求和服务器负载。
为了解决轮询的效率问题,出现了一些改进技术:
- 长轮询 (Long Polling): 客户端发起请求后,如果服务器没有新数据,则 удерживает(hold)住连接,直到有新数据或者超时才响应。客户端收到响应后立即发起新的请求。这减少了空闲请求的数量,但仍然是请求-响应模式,且实现相对复杂。
- WebSockets: 提供一个全双工(双向)通信通道,允许服务器和客户端随时互相发送数据。基于 TCP,有自己的协议 (ws://或wss://)。功能强大,适用于需要频繁双向通信的场景(如在线游戏、实时协作应用)。但协议相对复杂,实现和维护成本较高,且对于只需要服务器向客户端推送数据的场景,可能显得“过重”。
- Server-Sent Events (SSE): 允许服务器通过一条持久的 HTTP 连接向客户端推送数据。它基于 HTTP/1.1 或 HTTP/2 协议,使用 text/event-streamMIME 类型。它是一个单向通信技术,只允许服务器向客户端发送数据。
SSE 的优势与适用场景:
- 简单性: 基于标准的 HTTP 协议,易于理解和实现。客户端使用内置的 EventSourceJavaScript API,无需引入第三方库(对于现代浏览器)。
- 高效: 利用单个持久连接,避免了轮询的头部开销和连接建立/关闭开销。
- 内置重连机制: 浏览器 EventSource自动处理连接断开后的重连,开发者无需手动编写复杂的重连逻辑。
- 适用场景: 非常适合那些只需要服务器向客户端单向推送数据的场景,例如:
- 实时报价(股票、加密货币)
- 新闻/社交媒体动态推送
- 通知系统
- 长时间运行任务的进度条或日志输出
- 仪表盘实时数据更新
 
FastAPI 凭借其优秀的异步支持和简洁的 API 设计,非常适合实现 SSE endpoint。
2. SSE 工作原理详解
SSE 的核心在于服务器使用一个特殊的 HTTP 响应。
- HTTP 连接: 客户端通过标准的 HTTP/GET 请求连接到服务器。
- 响应头部: 服务器返回一个响应,其中包含 Content-Type: text/event-stream头部。这个头部告诉浏览器,服务器将通过这个连接持续发送一系列事件流,而不是一个普通的静态文件或 API 响应。
- 持久连接: 服务器不会立即关闭连接,而是保持连接开放。
- 数据格式: 服务器通过这个开放的连接发送特定格式的文本数据块。每个数据块代表一个“事件”。事件数据块之间用两个换行符 (\n\n) 分隔。
SSE 数据格式:
每个事件数据块由一系列以换行符 (\n) 结尾的行组成。常见的行类型有:
- data: [数据]:包含要发送的数据。如果数据跨越多行,可以在每行前都加上- data:。
- event: [事件名称]:指定事件的类型。客户端可以使用- EventSource.addEventListener()监听特定类型的事件。如果省略,则使用默认的- message事件。
- id: [唯一标识符]:为事件设置一个唯一 ID。浏览器会在重连时将最后一个接收到的 ID 发送给服务器(通过- Last-Event-ID头部),服务器可以据此决定从哪里恢复数据流。
- retry: [毫秒数]:指定客户端在连接断开后,等待多少毫秒再尝试重连。
示例事件格式:
“`
data: 这是第一行数据
data: 这是第二行数据
event: 通知
id: 12345
retry: 5000
data: 这是另一个事件的数据
event: 更新
data: 这是一个简单的消息
“`
注意:每个事件块都以 \n\n 结束。空行表示事件块的结束。以冒号 : 开头的行会被视为注释,会被浏览器忽略,常用于发送心跳包防止连接超时。
浏览器 EventSource API:
现代浏览器内置了 EventSource 对象,用于接收 SSE 流。
“`javascript
const eventSource = new EventSource(‘/my-sse-endpoint’);
// 监听默认的 ‘message’ 事件
eventSource.onmessage = function(event) {
console.log(‘Received message:’, event.data);
};
// 监听特定类型的事件 (例如上面例子中的 ‘通知’)
eventSource.addEventListener(‘通知’, function(event) {
console.log(‘Received notification:’, event.data, ‘ID:’, event.lastEventId);
});
// 监听错误
eventSource.onerror = function(err) {
console.error(‘EventSource failed:’, err);
// 浏览器会自动尝试重连,除非服务器返回特定的 HTTP 状态码或头部
};
// 监听连接打开
eventSource.onopen = function(event) {
console.log(‘SSE connection opened’);
};
“`
EventSource 会自动处理连接断开后的重连,并且在重连请求中包含 Last-Event-ID 头部,发送上一个成功接收的事件 ID(如果事件带有 ID 的话)。这是 SSE 一个非常方便的特性。
3. 为什么选择 FastAPI 实现 SSE?
FastAPI 是一个用于构建 API 的现代、快速 (高性能) 的 Web 框架,基于标准 Python 类型提示。它与 Starlette 和 Pydantic 紧密集成。以下是 FastAPI 特别适合实现 SSE 的原因:
- 基于 ASGI: FastAPI 构建在 ASGI (Asynchronous Server Gateway Interface) 之上,原生支持异步操作。SSE 是一个典型的异步场景,需要在不阻塞主事件循环的情况下保持连接并发送数据。FastAPI 的 async def和await语法使得编写异步代码非常自然。
- StreamingResponse: Starlette(FastAPI 的底层)提供了一个- StreamingResponse类,专门用于流式传输响应。它接受一个生成器或异步生成器作为内容,可以非常方便地实现 SSE 这种需要连续发送数据的响应类型。
- Python 生成器/异步生成器: Python 的生成器 (使用 yield关键字) 和异步生成器 (使用async def和yield) 是实现 SSE 流的理想工具。它们允许你在需要发送数据时“暂停”函数执行,并在需要时继续,这完美契合了流式发送数据的模式。FastAPI 的StreamingResponse可以直接消费这些生成器。
- 简洁的代码: FastAPI 的装饰器 @app.get(),@app.post()等使得定义 API endpoint 非常直观。结合类型提示和 Pydantic,即使是异步 streaming endpoint 的定义也十分清晰。
- 依赖注入系统: 如果你的 SSE endpoint 需要访问数据库连接、当前认证用户或其他资源,FastAPI 的依赖注入系统可以像处理普通请求一样方便地处理它们。
总而言之,FastAPI 的异步设计和对流式响应的良好支持,使得实现高性能的 SSE 服务变得简单高效。
4. FastAPI SSE 快速入门:构建第一个 SSE Endpoint
让我们从一个最简单的例子开始:一个不断向客户端发送当前时间的 SSE endpoint。
前提条件:
- Python 3.7+
- 安装 FastAPI 和 Uvicorn(ASGI 服务器):
 bash
 pip install fastapi uvicorn[standard]
代码实现:
创建一个 main.py 文件:
“`python
main.py
import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import datetime
app = FastAPI()
这是一个异步生成器函数,将用于生成 SSE 事件流
async def event_stream():
“””
一个异步生成器,每秒产出一个包含当前时间的 SSE 事件。
“””
while True:
# 获取当前时间
now = datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S”)
data = f”当前时间: {now}”
    # 构造 SSE 数据格式: data: [数据内容]\n\n
    # 每个事件必须以 data: 开头,并以双换行符 \n\n 结束
    sse_message = f"data: {data}\n\n"
    # 通过 yield 发送数据块
    yield sse_message
    # 等待一秒
    await asyncio.sleep(1)
@app.get(“/time-stream”)
async def time_stream():
“””
SSE Endpoint,返回一个持续发送当前时间的事件流。
“””
# 使用 StreamingResponse,指定 content 为异步生成器,media_type 为 text/event-stream
return StreamingResponse(event_stream(), media_type=”text/event-stream”)
— 客户端文件 (可选,用于测试) —
你可以将下面的 HTML/JavaScript 代码保存为 index.html 在浏览器中打开测试
注意:直接打开本地文件可能存在跨域问题,最好通过一个简单的静态文件服务器提供
例如: python -m http.server 8000
或者将 index.html 放在 FastAPI 项目的 static 目录下并通过 FastAPI 提供静态文件服务
from fastapi.staticfiles import StaticFiles
app.mount(“/static”, StaticFiles(directory=”static”), name=”static”)
然后将 index.html 放在 static 目录
“`
客户端代码 (index.html):
“`html
FastAPI SSE 时间流示例
    
“`
运行服务器:
在终端中,导航到 main.py 文件所在的目录,运行 Uvicorn:
bash
uvicorn main:app --reload
服务器将在 http://127.0.0.1:8000 启动。
测试:
- 使用浏览器测试: 将 index.html文件放在一个简单的 HTTP 服务器下(例如使用python -m http.server在index.html所在的目录启动一个服务器,然后通过http://localhost:8000/index.html访问),或者通过 FastAPI 自身提供静态文件服务。打开index.html,你应该能看到页面上的时间每秒更新一次。
- 使用 curl测试: SSE 是基于 HTTP 的,也可以使用curl来查看原始流数据:
 bash
 curl -N http://127.0.0.1:8000/time-stream
 -N选项是为了防止curl缓冲输出,让它在接收到数据时立即显示。你会看到每秒输出一个data: ...\n\n格式的数据块。
代码解释:
- async def event_stream():定义了一个异步生成器函数。- async def表明它是一个协程,可以使用- await。- yield关键字使其成为一个生成器。
- while True:创建一个无限循环,持续生成数据。
- await asyncio.sleep(1):在生成数据后暂停协程1秒,而不是阻塞整个进程。这是异步编程的关键。
- sse_message = f"data: {data}\n\n":严格按照 SSE 格式构造消息。- data:是必须的前缀,- \n\n是每个事件块的终止符。
- yield sse_message: 将构造好的 SSE 格式字符串发送给- StreamingResponse。- StreamingResponse会将其写入 HTTP 响应体并发送给客户端。
- @app.get("/time-stream"): 定义一个 GET 请求的 endpoint。
- return StreamingResponse(event_stream(), media_type="text/event-stream"): 创建并返回一个- StreamingResponse对象。- 第一个参数是生成器(或异步生成器)函数。StreamingResponse会不断从这个生成器中获取数据块并发送。
- media_type="text/event-stream"是 SSE 必需的头部。
 
- 第一个参数是生成器(或异步生成器)函数。
这个例子展示了 FastAPi 实现 SSE 的核心:一个返回 StreamingResponse 的 endpoint,其内容是一个不断 yield 出符合 SSE 格式字符串的异步生成器。
5. 深入理解与进阶:构建更完善的 SSE 流
刚才的例子只使用了 data: 字段。更完善的 SSE 流可以使用 event:, id:, retry: 等字段,并且可以发送更复杂的数据结构,比如 JSON。
发送结构化数据 (JSON):
虽然 SSE 数据是文本格式,但通常我们会将 JSON 字符串放在 data: 后面,以便客户端轻松解析。
“`python
import asyncio
import json # 导入 json 库
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import datetime
app = FastAPI()
async def complex_event_stream():
“””
一个异步生成器,发送包含 JSON 数据、事件类型和 ID 的 SSE 事件。
“””
counter = 0
while True:
counter += 1
# 准备要发送的结构化数据 (Python 字典)
data_payload = {
“timestamp”: datetime.datetime.now().isoformat(),
“sequence”: counter,
“message”: f”这是一个序列 {counter} 的更新”,
“status”: “processing” if counter % 5 != 0 else “completed”
}
    # 将 Python 字典转换为 JSON 字符串
    json_data = json.dumps(data_payload)
    # 构造 SSE 数据块
    # 可以包含 id:, event:, data: 等行
    # 注意每行后面都要有 \n,整个数据块以 \n\n 结束
    sse_message = (
        f"id: {counter}\n" # 添加事件 ID
        f"event: update\n"  # 添加事件类型
        f"data: {json_data}\n\n" # 添加 JSON 数据
    )
    print(f"Sending event:\n{sse_message}") # 服务器端打印发送的事件
    yield sse_message
    await asyncio.sleep(2) # 每2秒发送一次
@app.get(“/complex-stream”)
async def complex_stream():
“””
SSE Endpoint,返回一个发送更复杂事件流。
“””
return StreamingResponse(complex_event_stream(), media_type=”text/event-stream”)
“`
客户端代码 (index.html 部分更新):
“`html
FastAPI SSE 复杂流示例
    
“`
代码解释:
- import json: 导入- json库用于将 Python 字典转换为 JSON 字符串。
- json_data = json.dumps(data_payload): 将 Python 字典序列化为 JSON 格式的字符串。
- sse_message = (f"id: {counter}\n" ... f"data: {json_data}\n\n"): 构建更复杂的 SSE 消息字符串。使用了多行字符串,方便阅读。注意每行末尾的- \n以及整个消息块末尾的- \n\n。
- event: update: 指定了事件类型为- update。客户端通过- eventSource.addEventListener('update', ...)来监听这种类型的事件。
- id: {counter}: 为每个事件设置一个唯一的 ID。浏览器会在重连时发送这个 ID。
- 客户端变化:
- 使用了 eventSource.addEventListener('update', ...)来专门处理服务器发送的event: update类型的消息。
- 在事件处理函数中,使用 JSON.parse(event.data)将接收到的 JSON 字符串解析为 JavaScript 对象,然后可以方便地访问其属性。
- event.lastEventId可以获取当前事件的 ID。
- event.type可以获取当前事件的类型(对应服务器发送的- event:值)。
 
- 使用了 
这个例子展示了如何发送带有类型和 ID 的 JSON 数据,使得 SSE 流更加灵活和有用。
6. SSE 应用场景示例
除了上面的时间流和复杂更新流,SSE 还广泛应用于其他场景。
6.1 实时时间流 (已在快速入门中实现)
这是最基本的例子,适用于任何需要定期推送少量数据的场景。
6.2 任务进度报告
当服务器启动一个耗时任务(如文件处理、数据分析、模型训练等),可以通过 SSE 向客户端实时报告进度。
“`python
import asyncio
import time
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
存储当前活动任务的进度信息(这里用字典模拟,实际可能需要更复杂的结构)
注意:如果应用需要多进程或分布式部署,这个状态不能直接存在内存中,需要外部存储(如 Redis)
task_progress = {} # task_id: current_step
async def progress_stream(task_id: str):
“””
为特定任务 ID 生成进度更新的 SSE 事件流。
“””
# 发送初始状态
yield f”data: {json.dumps({‘status’: ‘started’, ‘task_id’: task_id})}\n\n”
while True:
    if task_id not in task_progress:
        # 任务完成或不存在
        yield f"data: {json.dumps({'status': 'completed', 'task_id': task_id, 'message': 'Task finished or not found'})}\n\n"
        break # 任务完成,停止流
    current_step = task_progress[task_id]
    # 假设任务总共10步
    total_steps = 10
    progress_percentage = (current_step / total_steps) * 100
    data_payload = {
        "task_id": task_id,
        "status": "in_progress",
        "step": current_step,
        "total_steps": total_steps,
        "progress": round(progress_percentage, 2)
    }
    sse_message = f"event: progress_update\ndata: {json.dumps(data_payload)}\n\n"
    yield sse_message
    # 如果进度达到100%,也停止流
    if current_step >= total_steps:
         yield f"data: {json.dumps({'status': 'completed', 'task_id': task_id, 'message': 'Task finished successfully'})}\n\n"
         del task_progress[task_id] # 从字典中移除已完成任务
         break
    await asyncio.sleep(0.5) # 每0.5秒检查并发送一次更新
模拟一个后台任务
async def run_heavy_task(task_id: str):
“””
模拟一个耗时任务,更新 global 的 task_progress。
“””
print(f”Task {task_id} started”)
total_steps = 10
task_progress[task_id] = 0 # 初始化进度
for i in range(1, total_steps + 1):
    await asyncio.sleep(1) # 模拟每一步的耗时
    task_progress[task_id] = i # 更新进度
    print(f"Task {task_id} step {i}/{total_steps}")
# 最后一步完成后, progress_stream 会检测到并发送 completed 事件然后退出
# 或者你可以在这里发送一个最终的 completed 消息,然后等待 stream 自然退出
# 如果 stream 退出,客户端会自动重连,这通常不是我们想要的。
# 更好的方法是让 stream 检测到状态变化并退出。
# await asyncio.sleep(0.1) # 给 stream 发送最后一条消息的时间
# del task_progress[task_id] # 任务完成后清理状态,让 stream 退出
@app.post(“/start-task/{task_id}”)
async def start_task(task_id: str, background_tasks: BackgroundTasks):
“””
启动一个后台任务。
“””
if task_id in task_progress:
return {“message”: f”Task {task_id} is already running.”}
background_tasks.add_task(run_heavy_task, task_id)
return {"message": f"Task {task_id} started in background. Connect to /task-progress/{task_id} for updates."}
@app.get(“/task-progress/{task_id}”)
async def get_task_progress_stream(task_id: str):
“””
为特定任务 ID 提供 SSE 进度流。
“””
if task_id not in task_progress:
# 如果任务不存在或已完成,立即返回一个completed消息并结束流
async def immediate_completion_stream():
yield f”data: {json.dumps({‘status’: ‘completed’, ‘task_id’: task_id, ‘message’: ‘Task not running or already finished’})}\n\n”
# 注意:这里立即结束生成器
    return StreamingResponse(immediate_completion_stream(), media_type="text/event-stream")
# 如果任务正在运行,返回进度流
return StreamingResponse(progress_stream(task_id), media_type="text/event-stream")
“`
使用方式:
- 向 /start-task/{some_task_id}发送 POST 请求,启动任务。
- 客户端连接到 /task-progress/{some_task_id}endpoint,接收进度更新。
客户端处理: 客户端需要监听 progress_update 事件,并解析 JSON 数据来更新进度条或显示日志。
这个例子中,我们使用了 FastAPI 的 BackgroundTasks 来启动后台任务,并在 SSE 流生成器中通过检查一个共享状态(task_progress 字典)来报告进度。
注意: 直接在内存中维护 task_progress 字典只适用于单进程单线程的简单场景。在生产环境中,特别是需要处理多个并发任务或部署到多进程/多服务器环境时,必须使用外部存储(如 Redis、数据库)来共享任务状态和进度信息。
6.3 简单通知系统
可以构建一个简单的 Pub/Sub 系统,服务器在有新通知时发布,所有连接的客户端接收。这个场景与下面的基于队列的广播非常相似。
7. 更复杂的 SSE 应用:基于队列实现广播
一个常见的 SSE 用例是将特定事件广播给所有(或部分)连接的客户端。例如,一个聊天应用的新消息通知,或者一个管理后台的系统级事件通知。
FastAPI 结合 asyncio.Queue 是实现这一目标的一种优雅方式。我们可以为每个连接的客户端创建一个独立的队列。当有新消息需要广播时,生产者将消息放入所有活动客户端的队列中。每个 SSE 生成器(对应一个客户端连接)则从其专属队列中读取消息并发送。
“`python
import asyncio
import json
import uuid # 用于生成唯一的客户端 ID
from typing import Set, Dict
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import StreamingResponse
from fastapi.templating import Jinja2Templates # 如果需要提供 HTML 页面
from fastapi.staticfiles import StaticFiles
app = FastAPI()
存储所有活跃客户端的队列
Key: 客户端 ID (str)
Value: asyncio.Queue
connected_clients: Dict[str, asyncio.Queue] = {}
简单的广播函数
async def broadcast_message(message: str):
“””
将消息广播给所有连接的客户端。
“””
# 注意:在实际应用中,这里可能需要处理连接断开的情况,
# 比如将发送失败的客户端从 connected_clients 中移除。
# 但 asyncio.Queue put_nowait 通常不会失败,除非队列满(这里队列是无限大小)。
# 真正的断开处理会在 stream_for_client 中完成。
for client_id, queue in list(connected_clients.items()): # 遍历时复制列表,避免在迭代过程中修改
print(f”Broadcasting to client {client_id}”)
try:
await queue.put(message) # 异步放入队列
except Exception as e:
print(f”Error putting message to client {client_id} queue: {e}”)
# 这里的错误可能意味着队列已经被关闭,说明客户端已经断开或即将断开
# 清理工作将在 stream_for_client 中执行
async def stream_for_client(client_id: str, client_queue: asyncio.Queue):
“””
为特定客户端生成 SSE 事件流,从其队列中读取消息。
“””
print(f”Client {client_id} connected, starting stream…”)
try:
# 可选:发送一个连接成功的消息
yield f”data: {json.dumps({‘status’: ‘connected’, ‘client_id’: client_id})}\n\n”
    while True:
        # 从队列中获取消息,如果队列为空则等待
        message = await client_queue.get() 
        print(f"Client {client_id} received message from queue: {message}")
        # 构造 SSE 格式消息
        # 可以在这里根据 message 内容设置不同的 event: 或 id:
        sse_message = f"data: {message}\n\n"
        yield sse_message
        # 标记任务完成,释放队列中的项(如果不调用,且队列项数大于最大值,get_nowait 会抛异常)
        # 对于无限大小的队列,不是必须的,但养成习惯更好
        # client_queue.task_done() # 如果使用了 join(),需要调用 task_done()
except asyncio.CancelledError:
    # 当客户端断开时,协程会被取消
    print(f"Client {client_id} stream cancelled (disconnected).")
except Exception as e:
    print(f"Client {client_id} stream encountered error: {e}")
finally:
    # 客户端断开或流结束时执行清理
    print(f"Client {client_id} disconnected. Cleaning up.")
    if client_id in connected_clients:
         # 关闭队列,防止 broadcast_message 继续往里放
         client_queue.put_nowait(None) # 放入一个 None 作为结束标志,让 stream 退出 while True
         # 或者直接关闭队列 (可能导致还在等待的 get() 抛出 asyncio.CancelledError 或 RuntimeError)
         # client_queue.close() 
         del connected_clients[client_id] # 从活跃客户端列表中移除
    print(f"Current connected clients: {len(connected_clients)}")
@app.get(“/stream”)
async def sse_stream():
“””
SSE Endpoint,为每个连接的客户端创建一个 SSE 流。
“””
client_id = str(uuid.uuid4()) # 为新连接生成唯一 ID
client_queue = asyncio.Queue() # 为该客户端创建队列
connected_clients[client_id] = client_queue # 加入活跃客户端列表
print(f"New client connected: {client_id}. Total clients: {len(connected_clients)}")
# 返回 StreamingResponse,使用 stream_for_client 异步生成器
# 注意:stream_for_client 协程会在客户端连接期间一直运行
return StreamingResponse(stream_for_client(client_id, client_queue), media_type="text/event-stream")
示例:一个 endpoint 用于触发广播消息
@app.post(“/send-message”)
async def send_message(message: str):
“””
一个示例 endpoint,用于触发向所有客户端广播消息。
“””
print(f”Received message to broadcast: {message}”)
# 在后台协程中执行广播,避免阻塞当前请求响应
asyncio.create_task(broadcast_message(message))
return {“status”: “broadcasting”, “message”: message}
可以添加一个简单的 HTML 页面来测试广播
templates = Jinja2Templates(directory=”templates”)
app.mount(“/static”, StaticFiles(directory=”static”), name=”static”) # 如果需要提供静态文件 (如js, css)
@app.get(“/”)
async def read_root(request: Request):
return templates.TemplateResponse(“index.html”, {“request”: request})
templates/index.html (示例)
“””
FastAPI SSE 广播示例
    
    
“””
“`
代码解释:
- connected_clients: Dict[str, asyncio.Queue]: 一个字典,维护所有当前连接的客户端及其对应的- asyncio.Queue。字典的键是为每个客户端生成的唯一 ID。
- asyncio.Queue(): 为每个新连接的客户端创建一个无限大小的队列。
- connected_clients[client_id] = client_queue: 将新客户端及其队列添加到字典中。
- async def stream_for_client(...): 这是核心的 SSE 生成器。它接收客户端 ID 和其队列作为参数。- await client_queue.get(): 这是一个阻塞操作,它会暂停当前协程,直到队列中有新的项。一旦有数据,它就会获取该项。
- yield sse_message: 将从队列中获取并格式化好的消息发送给客户端。
- except asyncio.CancelledError:: 这是处理客户端断开连接的关键。当客户端关闭连接时,FastAPI/Starlette 会取消对应的协程,触发- asyncio.CancelledError异常。
- finally:块:无论协程是正常完成、取消还是发生其他异常,- finally块都会执行。这里用于从- connected_clients字典中移除已断开的客户端及其队列,进行清理。
 
- async def broadcast_message(message: str): 这个函数遍历- connected_clients字典,并将消息- put到每个客户端的队列中。
- asyncio.create_task(broadcast_message(message)): 在- /send-messageendpoint 中,我们使用- asyncio.create_task创建一个 新的 协程来执行广播任务。这样做是为了避免- send-messageendpoint 因为等待广播完成而长时间阻塞响应。广播是异步进行的。
- /send-messageendpoint 是一个普通的 FastAPI endpoint,用于接收来自任何地方(例如另一个服务或客户端的 POST 请求)的消息,然后触发广播。
- 客户端连接到 /streamendpoint,服务器会为它创建一个 SSE 流并将其添加到广播列表中。
- 通过 /send-message发送的消息会被推送到所有/stream连接的客户端。
这个例子展示了如何构建一个基于队列的广播系统,是实现实时通知、聊天室等功能的良好起点。在生产环境中,如果需要跨多个服务器实例进行广播,connected_clients 字典和 broadcast_message 函数就需要与外部消息代理(如 Redis Pub/Sub, Kafka, RabbitMQ 等)集成。
8. 错误处理与客户端断开
处理客户端断开连接是构建稳定 SSE 服务的重要一环。如前所述,FastAPI/Starlette 在客户端断开时会取消对应的协程,导致 asyncio.CancelledError。我们需要在生成器中使用 try...except asyncio.CancelledError...finally 结构来捕获取消事件,并在 finally 块中执行清理工作(例如从活跃客户端列表中移除)。
“`python
在 stream_for_client 函数中:
async def stream_for_client(client_id: str, client_queue: asyncio.Queue):
print(f”Client {client_id} connected, starting stream…”)
try:
# … 发送初始消息 …
while True:
message = await client_queue.get()
if message is None: # 约定 None 作为队列结束标志
print(f”Client {client_id} queue received None, closing stream.”)
break # 退出循环,进入 finally
        # ... 处理并格式化消息 ...
        yield sse_message
except asyncio.CancelledError:
    # 客户端断开时触发
    print(f"Client {client_id} stream cancelled (disconnected).")
except Exception as e:
    # 处理生成器内部的其他异常
    print(f"Client {client_id} stream encountered error: {e}")
    # 可以选择发送一个错误事件给客户端,或者直接退出流
    yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
finally:
    # 清理工作,无论如何都会执行
    print(f"Client {client_id} disconnected. Cleaning up.")
    if client_id in connected_clients:
         # 确保队列被标记为完成或关闭,避免泄露
         # 如果队列还在 connected_clients 里,但 stream 已经退出,
         # broadcast_message 可能会尝试往已关闭的队列里放东西
         # 更好的做法是在这里移除
         del connected_clients[client_id]
         # 关闭队列或放入 None,确保如果有其他地方还在等 get() 会退出
         try:
             client_queue.put_nowait(None) 
         except: # 如果队列已经关闭会抛错,忽略
             pass
    print(f"Current connected clients: {len(connected_clients)}")
在 broadcast_message 函数中,为了更健壮,可以在put失败时考虑移除客户端
async def broadcast_message(message: str):
disconnected_clients = []
for client_id, queue in connected_clients.items():
try:
# put_nowait 尝试非阻塞放入,如果队列已满则抛出 asyncio.QueueFull
# 但我们用的是无限大小队列,所以主要考虑 put 可能在队列关闭后失败
# await queue.put(message) # 异步放入
queue.put_nowait(message) # 非阻塞放入
except Exception:
# 如果放入失败,说明客户端可能已经断开或即将断开
disconnected_clients.append(client_id)
for client_id in disconnected_clients:
print(f”Removing disconnected client {client_id} during broadcast.”)
# 在 stream_for_client 的 finally 里也会移除,这里是为了尽快清理
# del connected_clients[client_id] # 小心并发修改字典问题,这里简单的列表移除再删除是安全的
# 更安全的做法是让 stream 自己清理,广播失败只是一个信号
“`
客户端自动重连:
如前所述,EventSource 默认会自动处理网络中断后的重连。如果服务器希望控制重连行为,可以发送 retry: [毫秒数]。如果服务器希望客户端 不 重连,可以:
- 返回 HTTP 204 No Content 状态码。
- 在响应头部中包含 Connection: close。
- 直接关闭连接而不发送 \n\n结束最后一个事件。
通常情况下,依赖 EventSource 的自动重连和服务器端的清理(当检测到连接取消时)就足够了。
9. 性能与可伸缩性考虑
SSE 相比轮询有更好的性能,因为它维护的是长连接。但长连接本身也有开销:
- 内存开销: 每个连接都需要服务器维护一些状态(如套接字、缓冲区、可能还有我们示例中的队列等)。连接数越多,内存占用越高。
- 服务器并发连接限制: 操作系统和服务器软件(如 uvicorn)对并发连接数有限制。
- 代理与负载均衡: HTTP/1.1 下,大多数代理和负载均衡器默认可能会对空闲的长连接设置超时。如果连接长时间没有数据发送,代理可能会断开它。HTTP/2 对多路复用有更好支持,可能缓解部分问题。
可伸缩性:
对于单服务器应用,基于内存队列的广播是可行的。但如果需要部署到多个服务器实例,内存中的 connected_clients 字典无法共享。这时,必须引入外部消息代理:
- Redis Pub/Sub: 一个常用的轻量级选择。所有 FastAPI 实例连接到同一个 Redis 服务器。客户端连接到 任意 一个 FastAPI 实例的 SSE endpoint。当需要广播时,任何一个 FastAPI 实例都可以向 Redis 特定频道发布消息。所有连接到 Redis 并订阅该频道的 FastAPI 实例都会收到消息,然后将消息推送到它们各自维护的连接客户端队列中。
- Kafka, RabbitMQ 等: 更重量级、功能更丰富的消息队列,适用于需要更高吞吐量、持久性、复杂路由或消费者组的场景。
Keep-Alive 机制:
为了防止代理/负载均衡器因为连接空闲而断开,服务器应该定期发送心跳包。SSE 规范中,以冒号 : 开头的行是注释行,浏览器会忽略,但能保持连接活跃。
“`python
在 stream_for_client 或任何 SSE 生成器中
async def event_stream_with_keepalive():
while True:
# 发送真实数据…
yield f”data: some data\n\n”
    # 定期发送心跳 (例如每 15 秒)
    await asyncio.sleep(10) # 假设数据发送间隔小于10秒
    yield ":keepalive\n\n" # 发送一个注释行作为心跳包
    await asyncio.sleep(5) # 等待剩余时间凑满15秒间隔(或者直接 await asyncio.sleep(15))
或者在 while True 循环内部,维护一个计数器或计时器,每隔 N 秒发送一次心跳
“`
在 stream_for_client 的 while True 循环中,除了获取并 yield 消息,还可以定期检查是否需要发送心跳。
“`python
async def stream_for_client_with_keepalive(client_id: str, client_queue: asyncio.Queue):
# … (之前的代码) …
last_data_time = asyncio.get_event_loop().time()
keepalive_interval = 15 # 秒
while True:
    try:
        # 尝试从队列中获取消息,设置一个较短的超时时间
        message = await asyncio.wait_for(client_queue.get(), timeout=keepalive_interval)
        if message is None:
             print(f"Client {client_id} queue received None, closing stream.")
             break
        # 处理并发送数据
        sse_message = f"data: {message}\n\n"
        yield sse_message
        last_data_time = asyncio.get_event_loop().time() # 更新发送数据的时间
    except asyncio.TimeoutError:
        # 超时了,队列里没有新消息,发送心跳
        print(f"Client {client_id} sending keepalive.")
        yield ": keepalive\n\n"
    except asyncio.CancelledError:
        print(f"Client {client_id} stream cancelled (disconnected).")
        break # 退出循环,进入 finally
    except Exception as e:
         print(f"Client {client_id} stream encountered error: {e}")
         yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
         # 可以选择 break 或继续,取决于错误类型
finally:
    # ... (清理代码) ...
“`
这个带超时的 await asyncio.wait_for(client_queue.get(), timeout=...) 模式可以在等待消息的同时实现心跳功能。如果超时,说明队列里没有消息,就发送心跳。
10. 安全性
像其他任何 Web endpoint 一样,SSE endpoint 也需要考虑安全性。
- 
认证与授权: 不是任何人都能连接到你的 SSE 流。使用 FastAPI 的依赖注入系统,可以在路由处理函数中添加认证和授权依赖,例如: 
 “`python
 from fastapi import Depends, HTTPException, status
 from your_auth_module import get_current_user # 假设你有一个认证函数@app.get(“/private-stream”, dependencies=[Depends(get_current_user)]) 
 async def private_sse_stream(current_user: Any = Depends(get_current_user)):
 # 只有认证用户才能连接到这个流
 # 可以根据 current_user 定制化流内容
 return StreamingResponse(user_specific_stream(current_user.id), media_type=”text/event-stream”)
 ``wss://
 这确保了只有通过验证的用户才能建立 SSE 连接。
 * **数据加密:** 始终使用 HTTPS (for WebSockets,https://for SSE) 来加密传输的数据,防止中间人攻击。在生产环境中,这一点至关重要。Uvicorn 可以配置使用 SSL 证书来启用 HTTPS。/task-progress/{task_id}`),需要像处理普通请求一样验证这些参数,防止注入或其他恶意输入。FastAPI 和 Pydantic 提供了强大的验证能力。
 * **输入验证:** 如果 SSE 流的行为取决于请求参数(如
11. SSE vs. WebSockets:何时选择哪个?
| 特性 | Server-Sent Events (SSE) | WebSockets | 
|---|---|---|
| 通信方向 | 单向 (服务器 -> 客户端) | 全双工 (双向) | 
| 协议 | 基于 HTTP/1.1 或 HTTP/2 ( text/event-stream) | 独立协议 ( ws://,wss://) 基于 TCP | 
| 实现复杂度 | 相对简单,基于 HTTP 和 EventSource API | 相对复杂,需要握手,通常需要客户端/服务器库 | 
| 浏览器支持 | 内置 EventSourceAPI,无需额外库 (IE 除外) | 内置 WebSocket API,无需额外库 | 
| 数据格式 | 文本 (特定格式, data:,event:, etc.) | 文本或二进制 | 
| 自动重连 | 内置,可配置 | 需要手动实现 | 
| 头部开销 | 相比 WebSockets,每次发送数据时头部开销稍大 | 握手后头部开销很小 | 
| 代理/负载均衡 | 需要注意长连接超时和 sticky sessions | 相对容易处理 (依赖具体实现和配置) | 
| 适用场景 | 实时通知、进度更新、订阅、仪表盘数据等 | 聊天、在线游戏、实时协作、远程控制等 | 
总结:
- 如果你只需要从服务器向客户端推送数据,并且不关心从客户端向服务器发送实时消息,SSE 通常是更简单、更轻量级的选择。
- 如果需要双向实时通信(客户端需要频繁向服务器发送实时消息),或者需要传输二进制数据,WebSockets 是更好的选择。
在 FastAPI 中实现两者都非常方便,你可以根据具体的应用需求来选择合适的技术。
12. 最佳实践与总结
构建稳定、高效的 FastAPI SSE 应用,请遵循以下最佳实践:
- 使用 StreamingResponse和异步生成器: 这是 FastAPI 实现 SSE 的核心机制,充分利用了异步能力。
- 严格遵守 SSE 数据格式: 每条数据行以 \n结尾,每个事件块以\n\n结尾。使用data:,event:,id:,retry:等字段来构建有意义的事件流。
- 处理客户端断开: 在生成器中使用 try...except asyncio.CancelledError...finally来捕获断开事件,并在finally块中进行必要的资源清理(如移除客户端队列)。
- 发送心跳包: 定期发送注释行 (: some comment\n\n) 来保持连接活跃,防止代理/负载均衡器超时断开连接。
- 考虑可伸缩性: 对于需要水平扩展的应用,不要将连接状态或广播逻辑存储在内存中,而应使用外部消息代理(如 Redis Pub/Sub)。
- 实施认证与授权: 使用 FastAPI 的依赖注入保护你的 SSE endpoint,确保只有授权用户才能连接。
- 使用 HTTPS: 加密数据传输,保护用户隐私和数据安全。
- 适当的错误处理: 在生成器中捕获可能发生的异常,并决定是发送错误事件给客户端,还是优雅地关闭连接。
总结:
FastAPI 提供了强大而灵活的方式来实现 Server-Sent Events。通过利用其异步能力、StreamingResponse 和 Python 的生成器/异步生成器,开发者可以轻松构建高性能的单向实时数据推送服务。从简单的定期更新到复杂的基于队列的广播系统,FastAPI 都能提供坚实的基础。理解 SSE 的工作原理、数据格式以及如何在 FastAPI 中有效利用异步特性,将帮助你构建出满足现代 Web 应用实时需求的优秀服务。希望这篇详细指南能帮助你快速入门并深入掌握 FastAPI SSE 的开发。