FastAPI SSE 快速入门与指南 – wiki基地


FastAPI SSE (Server-Sent Events) 快速入门与深度指南

在现代 Web 应用中,实时数据更新已成为常态。无论是股票报价、聊天消息、通知推送,还是长时间运行任务的进度报告,客户端都需要能够接收服务器主动发送的数据,而无需频繁地向服务器轮询。实现这一目标的几种常见技术包括 WebSockets 和 Server-Sent Events (SSE)。

本文将专注于 Server-Sent Events (SSE) 技术,并详细探讨如何在高性能的 Python Web 框架 FastAPI 中高效地实现和使用 SSE。我们将从基本概念入手,逐步深入到更复杂的应用场景和最佳实践。

文章大纲:

  1. 引言:实时通信的需求与 SSE 的地位

    • 为什么需要实时通信?
    • 常见的实时通信技术概览 (Polling, Long Polling, WebSockets, SSE)
    • SSE 的优势与适用场景
  2. SSE 工作原理详解

    • HTTP 连接的复用
    • text/event-stream MIME 类型
    • SSE 数据格式 (data, event, id, retry)
    • 浏览器 EventSource API
  3. 为什么选择 FastAPI 实现 SSE?

    • FastAPI 的异步能力 (asyncio)
    • StreamingResponse 的妙用
    • Python 生成器与异步生成器
    • FastAPI 的其他优势 (依赖注入、数据验证等)
  4. FastAPI SSE 快速入门:构建第一个 SSE Endpoint

    • 安装必要的库
    • 创建 FastAPI 应用
    • 定义一个简单的 SSE 响应
    • 客户端如何连接和接收数据 (JavaScript EventSource)
    • 运行与测试
  5. 深入理解与进阶:构建更完善的 SSE 流

    • 发送结构化数据 (JSON)
    • 使用不同的事件类型 (event:)
    • 为事件添加唯一标识符 (id:)
    • 控制客户端重连间隔 (retry:)
    • 将 SSE 逻辑封装到生成器函数中
  6. SSE 应用场景示例:

    • 实时时间流: 持续发送当前时间
    • 任务进度报告: 服务器执行耗时任务时,向客户端报告进度
    • 简单通知系统: 服务器触发事件时向客户端发送通知
  7. 更复杂的 SSE 应用:基于队列实现广播

    • 使用 asyncio.Queue 管理消息
    • 构建一个简单的广播机制 (多个客户端接收同一消息)
    • 处理客户端连接与断开
    • 生产者-消费者模型在 SSE 中的应用
  8. 错误处理与客户端断开

    • 服务器端如何检测客户端断开
    • 处理 ClientDisconnect 异常
    • 客户端自动重连与服务器端的配合
  9. 性能与可伸缩性考虑

    • 保持连接的开销
    • 长连接与代理/负载均衡
    • 如何实现大规模广播 (外部消息队列如 Redis Pub/Sub)
    • Keep-Alive 机制
  10. 安全性

    • SSE Endpoint 的认证与授权
    • 数据加密
  11. SSE vs. WebSockets:何时选择哪个?

    • 单向 vs. 双向通信
    • 协议复杂度与浏览器支持
    • 头部开销
  12. 最佳实践与总结

    • 使用异步生成器
    • 清晰地格式化 SSE 数据
    • 妥善处理断开连接
    • 考虑 keep-alive
    • 规划可伸缩性

1. 引言:实时通信的需求与 SSE 的地位

在 Web 1.0 和早期的 Web 2.0 时代,客户端获取服务器数据的基本方式是请求-响应模式。浏览器发送 HTTP 请求,服务器返回数据。要获取最新数据,客户端需要不断地发起新的请求,也就是轮询 (Polling)。这种方式简单直接,但效率低下,尤其是在数据更新不频繁或者需要非常低延迟的场景下,会产生大量不必要的 HTTP 请求和服务器负载。

为了解决轮询的效率问题,出现了一些改进技术:

  • 长轮询 (Long Polling): 客户端发起请求后,如果服务器没有新数据,则 удерживает(hold)住连接,直到有新数据或者超时才响应。客户端收到响应后立即发起新的请求。这减少了空闲请求的数量,但仍然是请求-响应模式,且实现相对复杂。
  • WebSockets: 提供一个全双工(双向)通信通道,允许服务器和客户端随时互相发送数据。基于 TCP,有自己的协议 (ws://wss://)。功能强大,适用于需要频繁双向通信的场景(如在线游戏、实时协作应用)。但协议相对复杂,实现和维护成本较高,且对于只需要服务器向客户端推送数据的场景,可能显得“过重”。
  • Server-Sent Events (SSE): 允许服务器通过一条持久的 HTTP 连接向客户端推送数据。它基于 HTTP/1.1 或 HTTP/2 协议,使用 text/event-stream MIME 类型。它是一个单向通信技术,只允许服务器向客户端发送数据。

SSE 的优势与适用场景:

  • 简单性: 基于标准的 HTTP 协议,易于理解和实现。客户端使用内置的 EventSource JavaScript API,无需引入第三方库(对于现代浏览器)。
  • 高效: 利用单个持久连接,避免了轮询的头部开销和连接建立/关闭开销。
  • 内置重连机制: 浏览器 EventSource 自动处理连接断开后的重连,开发者无需手动编写复杂的重连逻辑。
  • 适用场景: 非常适合那些只需要服务器向客户端单向推送数据的场景,例如:
    • 实时报价(股票、加密货币)
    • 新闻/社交媒体动态推送
    • 通知系统
    • 长时间运行任务的进度条或日志输出
    • 仪表盘实时数据更新

FastAPI 凭借其优秀的异步支持和简洁的 API 设计,非常适合实现 SSE endpoint。

2. SSE 工作原理详解

SSE 的核心在于服务器使用一个特殊的 HTTP 响应。

  1. HTTP 连接: 客户端通过标准的 HTTP/GET 请求连接到服务器。
  2. 响应头部: 服务器返回一个响应,其中包含 Content-Type: text/event-stream 头部。这个头部告诉浏览器,服务器将通过这个连接持续发送一系列事件流,而不是一个普通的静态文件或 API 响应。
  3. 持久连接: 服务器不会立即关闭连接,而是保持连接开放。
  4. 数据格式: 服务器通过这个开放的连接发送特定格式的文本数据块。每个数据块代表一个“事件”。事件数据块之间用两个换行符 (\n\n) 分隔。

SSE 数据格式:

每个事件数据块由一系列以换行符 (\n) 结尾的行组成。常见的行类型有:

  • data: [数据]:包含要发送的数据。如果数据跨越多行,可以在每行前都加上 data:
  • event: [事件名称]:指定事件的类型。客户端可以使用 EventSource.addEventListener() 监听特定类型的事件。如果省略,则使用默认的 message 事件。
  • id: [唯一标识符]:为事件设置一个唯一 ID。浏览器会在重连时将最后一个接收到的 ID 发送给服务器(通过 Last-Event-ID 头部),服务器可以据此决定从哪里恢复数据流。
  • retry: [毫秒数]:指定客户端在连接断开后,等待多少毫秒再尝试重连。

示例事件格式:

“`
data: 这是第一行数据
data: 这是第二行数据
event: 通知
id: 12345
retry: 5000

data: 这是另一个事件的数据
event: 更新

data: 这是一个简单的消息
“`

注意:每个事件块都以 \n\n 结束。空行表示事件块的结束。以冒号 : 开头的行会被视为注释,会被浏览器忽略,常用于发送心跳包防止连接超时。

浏览器 EventSource API:

现代浏览器内置了 EventSource 对象,用于接收 SSE 流。

“`javascript
const eventSource = new EventSource(‘/my-sse-endpoint’);

// 监听默认的 ‘message’ 事件
eventSource.onmessage = function(event) {
console.log(‘Received message:’, event.data);
};

// 监听特定类型的事件 (例如上面例子中的 ‘通知’)
eventSource.addEventListener(‘通知’, function(event) {
console.log(‘Received notification:’, event.data, ‘ID:’, event.lastEventId);
});

// 监听错误
eventSource.onerror = function(err) {
console.error(‘EventSource failed:’, err);
// 浏览器会自动尝试重连,除非服务器返回特定的 HTTP 状态码或头部
};

// 监听连接打开
eventSource.onopen = function(event) {
console.log(‘SSE connection opened’);
};
“`

EventSource 会自动处理连接断开后的重连,并且在重连请求中包含 Last-Event-ID 头部,发送上一个成功接收的事件 ID(如果事件带有 ID 的话)。这是 SSE 一个非常方便的特性。

3. 为什么选择 FastAPI 实现 SSE?

FastAPI 是一个用于构建 API 的现代、快速 (高性能) 的 Web 框架,基于标准 Python 类型提示。它与 Starlette 和 Pydantic 紧密集成。以下是 FastAPI 特别适合实现 SSE 的原因:

  • 基于 ASGI: FastAPI 构建在 ASGI (Asynchronous Server Gateway Interface) 之上,原生支持异步操作。SSE 是一个典型的异步场景,需要在不阻塞主事件循环的情况下保持连接并发送数据。FastAPI 的 async defawait 语法使得编写异步代码非常自然。
  • StreamingResponse Starlette(FastAPI 的底层)提供了一个 StreamingResponse 类,专门用于流式传输响应。它接受一个生成器或异步生成器作为内容,可以非常方便地实现 SSE 这种需要连续发送数据的响应类型。
  • Python 生成器/异步生成器: Python 的生成器 (使用 yield 关键字) 和异步生成器 (使用 async defyield) 是实现 SSE 流的理想工具。它们允许你在需要发送数据时“暂停”函数执行,并在需要时继续,这完美契合了流式发送数据的模式。FastAPI 的 StreamingResponse 可以直接消费这些生成器。
  • 简洁的代码: FastAPI 的装饰器 @app.get(), @app.post() 等使得定义 API endpoint 非常直观。结合类型提示和 Pydantic,即使是异步 streaming endpoint 的定义也十分清晰。
  • 依赖注入系统: 如果你的 SSE endpoint 需要访问数据库连接、当前认证用户或其他资源,FastAPI 的依赖注入系统可以像处理普通请求一样方便地处理它们。

总而言之,FastAPI 的异步设计和对流式响应的良好支持,使得实现高性能的 SSE 服务变得简单高效。

4. FastAPI SSE 快速入门:构建第一个 SSE Endpoint

让我们从一个最简单的例子开始:一个不断向客户端发送当前时间的 SSE endpoint。

前提条件:

  • Python 3.7+
  • 安装 FastAPI 和 Uvicorn(ASGI 服务器):
    bash
    pip install fastapi uvicorn[standard]

代码实现:

创建一个 main.py 文件:

“`python

main.py

import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import datetime

app = FastAPI()

这是一个异步生成器函数,将用于生成 SSE 事件流

async def event_stream():
“””
一个异步生成器,每秒产出一个包含当前时间的 SSE 事件。
“””
while True:
# 获取当前时间
now = datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S”)
data = f”当前时间: {now}”

    # 构造 SSE 数据格式: data: [数据内容]\n\n
    # 每个事件必须以 data: 开头,并以双换行符 \n\n 结束
    sse_message = f"data: {data}\n\n"

    # 通过 yield 发送数据块
    yield sse_message

    # 等待一秒
    await asyncio.sleep(1)

@app.get(“/time-stream”)
async def time_stream():
“””
SSE Endpoint,返回一个持续发送当前时间的事件流。
“””
# 使用 StreamingResponse,指定 content 为异步生成器,media_type 为 text/event-stream
return StreamingResponse(event_stream(), media_type=”text/event-stream”)

— 客户端文件 (可选,用于测试) —

你可以将下面的 HTML/JavaScript 代码保存为 index.html 在浏览器中打开测试

注意:直接打开本地文件可能存在跨域问题,最好通过一个简单的静态文件服务器提供

例如: python -m http.server 8000

或者将 index.html 放在 FastAPI 项目的 static 目录下并通过 FastAPI 提供静态文件服务

from fastapi.staticfiles import StaticFiles

app.mount(“/static”, StaticFiles(directory=”static”), name=”static”)

然后将 index.html 放在 static 目录

“`

客户端代码 (index.html):

“`html




FastAPI SSE 时间流

FastAPI SSE 时间流示例

等待接收服务器时间…



“`

运行服务器:

在终端中,导航到 main.py 文件所在的目录,运行 Uvicorn:

bash
uvicorn main:app --reload

服务器将在 http://127.0.0.1:8000 启动。

测试:

  1. 使用浏览器测试:index.html 文件放在一个简单的 HTTP 服务器下(例如使用 python -m http.serverindex.html 所在的目录启动一个服务器,然后通过 http://localhost:8000/index.html 访问),或者通过 FastAPI 自身提供静态文件服务。打开 index.html,你应该能看到页面上的时间每秒更新一次。
  2. 使用 curl 测试: SSE 是基于 HTTP 的,也可以使用 curl 来查看原始流数据:
    bash
    curl -N http://127.0.0.1:8000/time-stream

    -N 选项是为了防止 curl 缓冲输出,让它在接收到数据时立即显示。你会看到每秒输出一个 data: ...\n\n 格式的数据块。

代码解释:

  • async def event_stream(): 定义了一个异步生成器函数。async def 表明它是一个协程,可以使用 awaityield 关键字使其成为一个生成器。
  • while True: 创建一个无限循环,持续生成数据。
  • await asyncio.sleep(1):在生成数据后暂停协程1秒,而不是阻塞整个进程。这是异步编程的关键。
  • sse_message = f"data: {data}\n\n":严格按照 SSE 格式构造消息。data: 是必须的前缀,\n\n 是每个事件块的终止符。
  • yield sse_message: 将构造好的 SSE 格式字符串发送给 StreamingResponseStreamingResponse 会将其写入 HTTP 响应体并发送给客户端。
  • @app.get("/time-stream"): 定义一个 GET 请求的 endpoint。
  • return StreamingResponse(event_stream(), media_type="text/event-stream"): 创建并返回一个 StreamingResponse 对象。
    • 第一个参数是生成器(或异步生成器)函数。StreamingResponse 会不断从这个生成器中获取数据块并发送。
    • media_type="text/event-stream" 是 SSE 必需的头部。

这个例子展示了 FastAPi 实现 SSE 的核心:一个返回 StreamingResponse 的 endpoint,其内容是一个不断 yield 出符合 SSE 格式字符串的异步生成器。

5. 深入理解与进阶:构建更完善的 SSE 流

刚才的例子只使用了 data: 字段。更完善的 SSE 流可以使用 event:, id:, retry: 等字段,并且可以发送更复杂的数据结构,比如 JSON。

发送结构化数据 (JSON):

虽然 SSE 数据是文本格式,但通常我们会将 JSON 字符串放在 data: 后面,以便客户端轻松解析。

“`python
import asyncio
import json # 导入 json 库
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import datetime

app = FastAPI()

async def complex_event_stream():
“””
一个异步生成器,发送包含 JSON 数据、事件类型和 ID 的 SSE 事件。
“””
counter = 0
while True:
counter += 1
# 准备要发送的结构化数据 (Python 字典)
data_payload = {
“timestamp”: datetime.datetime.now().isoformat(),
“sequence”: counter,
“message”: f”这是一个序列 {counter} 的更新”,
“status”: “processing” if counter % 5 != 0 else “completed”
}

    # 将 Python 字典转换为 JSON 字符串
    json_data = json.dumps(data_payload)

    # 构造 SSE 数据块
    # 可以包含 id:, event:, data: 等行
    # 注意每行后面都要有 \n,整个数据块以 \n\n 结束
    sse_message = (
        f"id: {counter}\n" # 添加事件 ID
        f"event: update\n"  # 添加事件类型
        f"data: {json_data}\n\n" # 添加 JSON 数据
    )

    print(f"Sending event:\n{sse_message}") # 服务器端打印发送的事件

    yield sse_message

    await asyncio.sleep(2) # 每2秒发送一次

@app.get(“/complex-stream”)
async def complex_stream():
“””
SSE Endpoint,返回一个发送更复杂事件流。
“””
return StreamingResponse(complex_event_stream(), media_type=”text/event-stream”)

“`

客户端代码 (index.html 部分更新):

“`html




FastAPI SSE 复杂流

FastAPI SSE 复杂流示例

等待接收服务器消息…



“`

代码解释:

  • import json: 导入 json 库用于将 Python 字典转换为 JSON 字符串。
  • json_data = json.dumps(data_payload): 将 Python 字典序列化为 JSON 格式的字符串。
  • sse_message = (f"id: {counter}\n" ... f"data: {json_data}\n\n"): 构建更复杂的 SSE 消息字符串。使用了多行字符串,方便阅读。注意每行末尾的 \n 以及整个消息块末尾的 \n\n
  • event: update: 指定了事件类型为 update。客户端通过 eventSource.addEventListener('update', ...) 来监听这种类型的事件。
  • id: {counter}: 为每个事件设置一个唯一的 ID。浏览器会在重连时发送这个 ID。
  • 客户端变化:
    • 使用了 eventSource.addEventListener('update', ...) 来专门处理服务器发送的 event: update 类型的消息。
    • 在事件处理函数中,使用 JSON.parse(event.data) 将接收到的 JSON 字符串解析为 JavaScript 对象,然后可以方便地访问其属性。
    • event.lastEventId 可以获取当前事件的 ID。
    • event.type 可以获取当前事件的类型(对应服务器发送的 event: 值)。

这个例子展示了如何发送带有类型和 ID 的 JSON 数据,使得 SSE 流更加灵活和有用。

6. SSE 应用场景示例

除了上面的时间流和复杂更新流,SSE 还广泛应用于其他场景。

6.1 实时时间流 (已在快速入门中实现)

这是最基本的例子,适用于任何需要定期推送少量数据的场景。

6.2 任务进度报告

当服务器启动一个耗时任务(如文件处理、数据分析、模型训练等),可以通过 SSE 向客户端实时报告进度。

“`python
import asyncio
import time
from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

存储当前活动任务的进度信息(这里用字典模拟,实际可能需要更复杂的结构)

注意:如果应用需要多进程或分布式部署,这个状态不能直接存在内存中,需要外部存储(如 Redis)

task_progress = {} # task_id: current_step

async def progress_stream(task_id: str):
“””
为特定任务 ID 生成进度更新的 SSE 事件流。
“””
# 发送初始状态
yield f”data: {json.dumps({‘status’: ‘started’, ‘task_id’: task_id})}\n\n”

while True:
    if task_id not in task_progress:
        # 任务完成或不存在
        yield f"data: {json.dumps({'status': 'completed', 'task_id': task_id, 'message': 'Task finished or not found'})}\n\n"
        break # 任务完成,停止流

    current_step = task_progress[task_id]
    # 假设任务总共10步
    total_steps = 10
    progress_percentage = (current_step / total_steps) * 100

    data_payload = {
        "task_id": task_id,
        "status": "in_progress",
        "step": current_step,
        "total_steps": total_steps,
        "progress": round(progress_percentage, 2)
    }

    sse_message = f"event: progress_update\ndata: {json.dumps(data_payload)}\n\n"

    yield sse_message

    # 如果进度达到100%,也停止流
    if current_step >= total_steps:
         yield f"data: {json.dumps({'status': 'completed', 'task_id': task_id, 'message': 'Task finished successfully'})}\n\n"
         del task_progress[task_id] # 从字典中移除已完成任务
         break

    await asyncio.sleep(0.5) # 每0.5秒检查并发送一次更新

模拟一个后台任务

async def run_heavy_task(task_id: str):
“””
模拟一个耗时任务,更新 global 的 task_progress。
“””
print(f”Task {task_id} started”)
total_steps = 10
task_progress[task_id] = 0 # 初始化进度

for i in range(1, total_steps + 1):
    await asyncio.sleep(1) # 模拟每一步的耗时
    task_progress[task_id] = i # 更新进度
    print(f"Task {task_id} step {i}/{total_steps}")

# 最后一步完成后, progress_stream 会检测到并发送 completed 事件然后退出
# 或者你可以在这里发送一个最终的 completed 消息,然后等待 stream 自然退出
# 如果 stream 退出,客户端会自动重连,这通常不是我们想要的。
# 更好的方法是让 stream 检测到状态变化并退出。
# await asyncio.sleep(0.1) # 给 stream 发送最后一条消息的时间
# del task_progress[task_id] # 任务完成后清理状态,让 stream 退出

@app.post(“/start-task/{task_id}”)
async def start_task(task_id: str, background_tasks: BackgroundTasks):
“””
启动一个后台任务。
“””
if task_id in task_progress:
return {“message”: f”Task {task_id} is already running.”}

background_tasks.add_task(run_heavy_task, task_id)
return {"message": f"Task {task_id} started in background. Connect to /task-progress/{task_id} for updates."}

@app.get(“/task-progress/{task_id}”)
async def get_task_progress_stream(task_id: str):
“””
为特定任务 ID 提供 SSE 进度流。
“””
if task_id not in task_progress:
# 如果任务不存在或已完成,立即返回一个completed消息并结束流
async def immediate_completion_stream():
yield f”data: {json.dumps({‘status’: ‘completed’, ‘task_id’: task_id, ‘message’: ‘Task not running or already finished’})}\n\n”
# 注意:这里立即结束生成器

    return StreamingResponse(immediate_completion_stream(), media_type="text/event-stream")

# 如果任务正在运行,返回进度流
return StreamingResponse(progress_stream(task_id), media_type="text/event-stream")

“`

使用方式:

  1. /start-task/{some_task_id} 发送 POST 请求,启动任务。
  2. 客户端连接到 /task-progress/{some_task_id} endpoint,接收进度更新。

客户端处理: 客户端需要监听 progress_update 事件,并解析 JSON 数据来更新进度条或显示日志。

这个例子中,我们使用了 FastAPI 的 BackgroundTasks 来启动后台任务,并在 SSE 流生成器中通过检查一个共享状态(task_progress 字典)来报告进度。

注意: 直接在内存中维护 task_progress 字典只适用于单进程单线程的简单场景。在生产环境中,特别是需要处理多个并发任务或部署到多进程/多服务器环境时,必须使用外部存储(如 Redis、数据库)来共享任务状态和进度信息。

6.3 简单通知系统

可以构建一个简单的 Pub/Sub 系统,服务器在有新通知时发布,所有连接的客户端接收。这个场景与下面的基于队列的广播非常相似。

7. 更复杂的 SSE 应用:基于队列实现广播

一个常见的 SSE 用例是将特定事件广播给所有(或部分)连接的客户端。例如,一个聊天应用的新消息通知,或者一个管理后台的系统级事件通知。

FastAPI 结合 asyncio.Queue 是实现这一目标的一种优雅方式。我们可以为每个连接的客户端创建一个独立的队列。当有新消息需要广播时,生产者将消息放入所有活动客户端的队列中。每个 SSE 生成器(对应一个客户端连接)则从其专属队列中读取消息并发送。

“`python
import asyncio
import json
import uuid # 用于生成唯一的客户端 ID
from typing import Set, Dict
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import StreamingResponse
from fastapi.templating import Jinja2Templates # 如果需要提供 HTML 页面
from fastapi.staticfiles import StaticFiles

app = FastAPI()

存储所有活跃客户端的队列

Key: 客户端 ID (str)

Value: asyncio.Queue

connected_clients: Dict[str, asyncio.Queue] = {}

简单的广播函数

async def broadcast_message(message: str):
“””
将消息广播给所有连接的客户端。
“””
# 注意:在实际应用中,这里可能需要处理连接断开的情况,
# 比如将发送失败的客户端从 connected_clients 中移除。
# 但 asyncio.Queue put_nowait 通常不会失败,除非队列满(这里队列是无限大小)。
# 真正的断开处理会在 stream_for_client 中完成。
for client_id, queue in list(connected_clients.items()): # 遍历时复制列表,避免在迭代过程中修改
print(f”Broadcasting to client {client_id}”)
try:
await queue.put(message) # 异步放入队列
except Exception as e:
print(f”Error putting message to client {client_id} queue: {e}”)
# 这里的错误可能意味着队列已经被关闭,说明客户端已经断开或即将断开
# 清理工作将在 stream_for_client 中执行

async def stream_for_client(client_id: str, client_queue: asyncio.Queue):
“””
为特定客户端生成 SSE 事件流,从其队列中读取消息。
“””
print(f”Client {client_id} connected, starting stream…”)
try:
# 可选:发送一个连接成功的消息
yield f”data: {json.dumps({‘status’: ‘connected’, ‘client_id’: client_id})}\n\n”

    while True:
        # 从队列中获取消息,如果队列为空则等待
        message = await client_queue.get() 
        print(f"Client {client_id} received message from queue: {message}")

        # 构造 SSE 格式消息
        # 可以在这里根据 message 内容设置不同的 event: 或 id:
        sse_message = f"data: {message}\n\n"

        yield sse_message

        # 标记任务完成,释放队列中的项(如果不调用,且队列项数大于最大值,get_nowait 会抛异常)
        # 对于无限大小的队列,不是必须的,但养成习惯更好
        # client_queue.task_done() # 如果使用了 join(),需要调用 task_done()

except asyncio.CancelledError:
    # 当客户端断开时,协程会被取消
    print(f"Client {client_id} stream cancelled (disconnected).")
except Exception as e:
    print(f"Client {client_id} stream encountered error: {e}")
finally:
    # 客户端断开或流结束时执行清理
    print(f"Client {client_id} disconnected. Cleaning up.")
    if client_id in connected_clients:
         # 关闭队列,防止 broadcast_message 继续往里放
         client_queue.put_nowait(None) # 放入一个 None 作为结束标志,让 stream 退出 while True
         # 或者直接关闭队列 (可能导致还在等待的 get() 抛出 asyncio.CancelledError 或 RuntimeError)
         # client_queue.close() 
         del connected_clients[client_id] # 从活跃客户端列表中移除
    print(f"Current connected clients: {len(connected_clients)}")

@app.get(“/stream”)
async def sse_stream():
“””
SSE Endpoint,为每个连接的客户端创建一个 SSE 流。
“””
client_id = str(uuid.uuid4()) # 为新连接生成唯一 ID
client_queue = asyncio.Queue() # 为该客户端创建队列

connected_clients[client_id] = client_queue # 加入活跃客户端列表
print(f"New client connected: {client_id}. Total clients: {len(connected_clients)}")

# 返回 StreamingResponse,使用 stream_for_client 异步生成器
# 注意:stream_for_client 协程会在客户端连接期间一直运行
return StreamingResponse(stream_for_client(client_id, client_queue), media_type="text/event-stream")

示例:一个 endpoint 用于触发广播消息

@app.post(“/send-message”)
async def send_message(message: str):
“””
一个示例 endpoint,用于触发向所有客户端广播消息。
“””
print(f”Received message to broadcast: {message}”)
# 在后台协程中执行广播,避免阻塞当前请求响应
asyncio.create_task(broadcast_message(message))
return {“status”: “broadcasting”, “message”: message}

可以添加一个简单的 HTML 页面来测试广播

templates = Jinja2Templates(directory=”templates”)
app.mount(“/static”, StaticFiles(directory=”static”), name=”static”) # 如果需要提供静态文件 (如js, css)

@app.get(“/”)
async def read_root(request: Request):
return templates.TemplateResponse(“index.html”, {“request”: request})

templates/index.html (示例)

“””




FastAPI SSE 广播示例

FastAPI SSE 广播示例

等待接收消息…




“””

“`

代码解释:

  • connected_clients: Dict[str, asyncio.Queue]: 一个字典,维护所有当前连接的客户端及其对应的 asyncio.Queue。字典的键是为每个客户端生成的唯一 ID。
  • asyncio.Queue(): 为每个新连接的客户端创建一个无限大小的队列。
  • connected_clients[client_id] = client_queue: 将新客户端及其队列添加到字典中。
  • async def stream_for_client(...): 这是核心的 SSE 生成器。它接收客户端 ID 和其队列作为参数。
    • await client_queue.get(): 这是一个阻塞操作,它会暂停当前协程,直到队列中有新的项。一旦有数据,它就会获取该项。
    • yield sse_message: 将从队列中获取并格式化好的消息发送给客户端。
    • except asyncio.CancelledError:: 这是处理客户端断开连接的关键。当客户端关闭连接时,FastAPI/Starlette 会取消对应的协程,触发 asyncio.CancelledError 异常。
    • finally: 块:无论协程是正常完成、取消还是发生其他异常,finally 块都会执行。这里用于从 connected_clients 字典中移除已断开的客户端及其队列,进行清理。
  • async def broadcast_message(message: str): 这个函数遍历 connected_clients 字典,并将消息 put 到每个客户端的队列中。
  • asyncio.create_task(broadcast_message(message)): 在 /send-message endpoint 中,我们使用 asyncio.create_task 创建一个 新的 协程来执行广播任务。这样做是为了避免 send-message endpoint 因为等待广播完成而长时间阻塞响应。广播是异步进行的。
  • /send-message endpoint 是一个普通的 FastAPI endpoint,用于接收来自任何地方(例如另一个服务或客户端的 POST 请求)的消息,然后触发广播。
  • 客户端连接到 /stream endpoint,服务器会为它创建一个 SSE 流并将其添加到广播列表中。
  • 通过 /send-message 发送的消息会被推送到所有 /stream 连接的客户端。

这个例子展示了如何构建一个基于队列的广播系统,是实现实时通知、聊天室等功能的良好起点。在生产环境中,如果需要跨多个服务器实例进行广播,connected_clients 字典和 broadcast_message 函数就需要与外部消息代理(如 Redis Pub/Sub, Kafka, RabbitMQ 等)集成。

8. 错误处理与客户端断开

处理客户端断开连接是构建稳定 SSE 服务的重要一环。如前所述,FastAPI/Starlette 在客户端断开时会取消对应的协程,导致 asyncio.CancelledError。我们需要在生成器中使用 try...except asyncio.CancelledError...finally 结构来捕获取消事件,并在 finally 块中执行清理工作(例如从活跃客户端列表中移除)。

“`python

在 stream_for_client 函数中:

async def stream_for_client(client_id: str, client_queue: asyncio.Queue):
print(f”Client {client_id} connected, starting stream…”)
try:
# … 发送初始消息 …
while True:
message = await client_queue.get()
if message is None: # 约定 None 作为队列结束标志
print(f”Client {client_id} queue received None, closing stream.”)
break # 退出循环,进入 finally

        # ... 处理并格式化消息 ...
        yield sse_message

except asyncio.CancelledError:
    # 客户端断开时触发
    print(f"Client {client_id} stream cancelled (disconnected).")
except Exception as e:
    # 处理生成器内部的其他异常
    print(f"Client {client_id} stream encountered error: {e}")
    # 可以选择发送一个错误事件给客户端,或者直接退出流
    yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
finally:
    # 清理工作,无论如何都会执行
    print(f"Client {client_id} disconnected. Cleaning up.")
    if client_id in connected_clients:
         # 确保队列被标记为完成或关闭,避免泄露
         # 如果队列还在 connected_clients 里,但 stream 已经退出,
         # broadcast_message 可能会尝试往已关闭的队列里放东西
         # 更好的做法是在这里移除
         del connected_clients[client_id]
         # 关闭队列或放入 None,确保如果有其他地方还在等 get() 会退出
         try:
             client_queue.put_nowait(None) 
         except: # 如果队列已经关闭会抛错,忽略
             pass
    print(f"Current connected clients: {len(connected_clients)}")

在 broadcast_message 函数中,为了更健壮,可以在put失败时考虑移除客户端

async def broadcast_message(message: str):

disconnected_clients = []

for client_id, queue in connected_clients.items():

try:

# put_nowait 尝试非阻塞放入,如果队列已满则抛出 asyncio.QueueFull

# 但我们用的是无限大小队列,所以主要考虑 put 可能在队列关闭后失败

# await queue.put(message) # 异步放入

queue.put_nowait(message) # 非阻塞放入

except Exception:

# 如果放入失败,说明客户端可能已经断开或即将断开

disconnected_clients.append(client_id)

for client_id in disconnected_clients:

print(f”Removing disconnected client {client_id} during broadcast.”)

# 在 stream_for_client 的 finally 里也会移除,这里是为了尽快清理

# del connected_clients[client_id] # 小心并发修改字典问题,这里简单的列表移除再删除是安全的

# 更安全的做法是让 stream 自己清理,广播失败只是一个信号

“`

客户端自动重连:

如前所述,EventSource 默认会自动处理网络中断后的重连。如果服务器希望控制重连行为,可以发送 retry: [毫秒数]。如果服务器希望客户端 重连,可以:

  • 返回 HTTP 204 No Content 状态码。
  • 在响应头部中包含 Connection: close
  • 直接关闭连接而不发送 \n\n 结束最后一个事件。

通常情况下,依赖 EventSource 的自动重连和服务器端的清理(当检测到连接取消时)就足够了。

9. 性能与可伸缩性考虑

SSE 相比轮询有更好的性能,因为它维护的是长连接。但长连接本身也有开销:

  • 内存开销: 每个连接都需要服务器维护一些状态(如套接字、缓冲区、可能还有我们示例中的队列等)。连接数越多,内存占用越高。
  • 服务器并发连接限制: 操作系统和服务器软件(如 uvicorn)对并发连接数有限制。
  • 代理与负载均衡: HTTP/1.1 下,大多数代理和负载均衡器默认可能会对空闲的长连接设置超时。如果连接长时间没有数据发送,代理可能会断开它。HTTP/2 对多路复用有更好支持,可能缓解部分问题。

可伸缩性:

对于单服务器应用,基于内存队列的广播是可行的。但如果需要部署到多个服务器实例,内存中的 connected_clients 字典无法共享。这时,必须引入外部消息代理:

  • Redis Pub/Sub: 一个常用的轻量级选择。所有 FastAPI 实例连接到同一个 Redis 服务器。客户端连接到 任意 一个 FastAPI 实例的 SSE endpoint。当需要广播时,任何一个 FastAPI 实例都可以向 Redis 特定频道发布消息。所有连接到 Redis 并订阅该频道的 FastAPI 实例都会收到消息,然后将消息推送到它们各自维护的连接客户端队列中。
  • Kafka, RabbitMQ 等: 更重量级、功能更丰富的消息队列,适用于需要更高吞吐量、持久性、复杂路由或消费者组的场景。

Keep-Alive 机制:

为了防止代理/负载均衡器因为连接空闲而断开,服务器应该定期发送心跳包。SSE 规范中,以冒号 : 开头的行是注释行,浏览器会忽略,但能保持连接活跃。

“`python

在 stream_for_client 或任何 SSE 生成器中

async def event_stream_with_keepalive():
while True:
# 发送真实数据…
yield f”data: some data\n\n”

    # 定期发送心跳 (例如每 15 秒)
    await asyncio.sleep(10) # 假设数据发送间隔小于10秒
    yield ":keepalive\n\n" # 发送一个注释行作为心跳包
    await asyncio.sleep(5) # 等待剩余时间凑满15秒间隔(或者直接 await asyncio.sleep(15))

或者在 while True 循环内部,维护一个计数器或计时器,每隔 N 秒发送一次心跳

“`

stream_for_clientwhile True 循环中,除了获取并 yield 消息,还可以定期检查是否需要发送心跳。

“`python
async def stream_for_client_with_keepalive(client_id: str, client_queue: asyncio.Queue):
# … (之前的代码) …
last_data_time = asyncio.get_event_loop().time()
keepalive_interval = 15 # 秒

while True:
    try:
        # 尝试从队列中获取消息,设置一个较短的超时时间
        message = await asyncio.wait_for(client_queue.get(), timeout=keepalive_interval)
        if message is None:
             print(f"Client {client_id} queue received None, closing stream.")
             break

        # 处理并发送数据
        sse_message = f"data: {message}\n\n"
        yield sse_message
        last_data_time = asyncio.get_event_loop().time() # 更新发送数据的时间

    except asyncio.TimeoutError:
        # 超时了,队列里没有新消息,发送心跳
        print(f"Client {client_id} sending keepalive.")
        yield ": keepalive\n\n"
    except asyncio.CancelledError:
        print(f"Client {client_id} stream cancelled (disconnected).")
        break # 退出循环,进入 finally
    except Exception as e:
         print(f"Client {client_id} stream encountered error: {e}")
         yield f"event: error\ndata: {json.dumps({'error': str(e)})}\n\n"
         # 可以选择 break 或继续,取决于错误类型

finally:
    # ... (清理代码) ...

“`

这个带超时的 await asyncio.wait_for(client_queue.get(), timeout=...) 模式可以在等待消息的同时实现心跳功能。如果超时,说明队列里没有消息,就发送心跳。

10. 安全性

像其他任何 Web endpoint 一样,SSE endpoint 也需要考虑安全性。

  • 认证与授权: 不是任何人都能连接到你的 SSE 流。使用 FastAPI 的依赖注入系统,可以在路由处理函数中添加认证和授权依赖,例如:
    “`python
    from fastapi import Depends, HTTPException, status
    from your_auth_module import get_current_user # 假设你有一个认证函数

    @app.get(“/private-stream”, dependencies=[Depends(get_current_user)])
    async def private_sse_stream(current_user: Any = Depends(get_current_user)):
    # 只有认证用户才能连接到这个流
    # 可以根据 current_user 定制化流内容
    return StreamingResponse(user_specific_stream(current_user.id), media_type=”text/event-stream”)
    ``
    这确保了只有通过验证的用户才能建立 SSE 连接。
    * **数据加密:** 始终使用 HTTPS (
    wss://for WebSockets,https://for SSE) 来加密传输的数据,防止中间人攻击。在生产环境中,这一点至关重要。Uvicorn 可以配置使用 SSL 证书来启用 HTTPS。
    * **输入验证:** 如果 SSE 流的行为取决于请求参数(如
    /task-progress/{task_id}`),需要像处理普通请求一样验证这些参数,防止注入或其他恶意输入。FastAPI 和 Pydantic 提供了强大的验证能力。

11. SSE vs. WebSockets:何时选择哪个?

特性 Server-Sent Events (SSE) WebSockets
通信方向 单向 (服务器 -> 客户端) 全双工 (双向)
协议 基于 HTTP/1.1 或 HTTP/2 (text/event-stream) 独立协议 (ws://, wss://) 基于 TCP
实现复杂度 相对简单,基于 HTTP 和 EventSource API 相对复杂,需要握手,通常需要客户端/服务器库
浏览器支持 内置 EventSource API,无需额外库 (IE 除外) 内置 WebSocket API,无需额外库
数据格式 文本 (特定格式,data:, event:, etc.) 文本或二进制
自动重连 内置,可配置 需要手动实现
头部开销 相比 WebSockets,每次发送数据时头部开销稍大 握手后头部开销很小
代理/负载均衡 需要注意长连接超时和 sticky sessions 相对容易处理 (依赖具体实现和配置)
适用场景 实时通知、进度更新、订阅、仪表盘数据等 聊天、在线游戏、实时协作、远程控制等

总结:

  • 如果你只需要从服务器向客户端推送数据,并且不关心从客户端向服务器发送实时消息,SSE 通常是更简单、更轻量级的选择
  • 如果需要双向实时通信(客户端需要频繁向服务器发送实时消息),或者需要传输二进制数据,WebSockets 是更好的选择

在 FastAPI 中实现两者都非常方便,你可以根据具体的应用需求来选择合适的技术。

12. 最佳实践与总结

构建稳定、高效的 FastAPI SSE 应用,请遵循以下最佳实践:

  • 使用 StreamingResponse 和异步生成器: 这是 FastAPI 实现 SSE 的核心机制,充分利用了异步能力。
  • 严格遵守 SSE 数据格式: 每条数据行以 \n 结尾,每个事件块以 \n\n 结尾。使用 data:, event:, id:, retry: 等字段来构建有意义的事件流。
  • 处理客户端断开: 在生成器中使用 try...except asyncio.CancelledError...finally 来捕获断开事件,并在 finally 块中进行必要的资源清理(如移除客户端队列)。
  • 发送心跳包: 定期发送注释行 (: some comment\n\n) 来保持连接活跃,防止代理/负载均衡器超时断开连接。
  • 考虑可伸缩性: 对于需要水平扩展的应用,不要将连接状态或广播逻辑存储在内存中,而应使用外部消息代理(如 Redis Pub/Sub)。
  • 实施认证与授权: 使用 FastAPI 的依赖注入保护你的 SSE endpoint,确保只有授权用户才能连接。
  • 使用 HTTPS: 加密数据传输,保护用户隐私和数据安全。
  • 适当的错误处理: 在生成器中捕获可能发生的异常,并决定是发送错误事件给客户端,还是优雅地关闭连接。

总结:

FastAPI 提供了强大而灵活的方式来实现 Server-Sent Events。通过利用其异步能力、StreamingResponse 和 Python 的生成器/异步生成器,开发者可以轻松构建高性能的单向实时数据推送服务。从简单的定期更新到复杂的基于队列的广播系统,FastAPI 都能提供坚实的基础。理解 SSE 的工作原理、数据格式以及如何在 FastAPI 中有效利用异步特性,将帮助你构建出满足现代 Web 应用实时需求的优秀服务。希望这篇详细指南能帮助你快速入门并深入掌握 FastAPI SSE 的开发。


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部