FastAPI SSE 开发指南 – wiki基地


FastAPI SSE 开发指南:构建实时服务器推送应用

引言

在现代 Web 应用中,实时性变得越来越重要。用户期望能够即时获取最新的数据、通知或状态更新,而无需手动刷新页面。传统的客户端轮询(Polling)机制虽然简单,但效率低下,会给服务器和客户端带来不必要的负担,尤其是在更新不频繁的情况下。

为了解决这一问题,双向通信技术如 WebSockets 应运而生。然而,在许多场景下,我们只需要实现服务器向客户端的单向数据推送,而客户端并不需要频繁地向服务器发送数据。在这种情况下,使用功能更强大、更复杂的 WebSockets 可能显得“杀鸡用牛刀”了。

这时,服务器发送事件(Server-Sent Events,简称 SSE)提供了一种更轻量、更简单的解决方案。SSE 允许服务器通过一个持久的 HTTP 连接向客户端推送数据。它基于标准的 HTTP 协议,并且浏览器原生支持,使用起来比 WebSockets 更简单。

FastAPI 是一个现代、快速(高性能)的 Web 框架,基于标准的 Python 类型提示构建,非常适合构建 API,包括需要实时能力的 API。FastAPI 对异步编程的良好支持,使其与 SSE 的异步推送特性天然契合。

本文将详细介绍如何在 FastAPI 中实现 Server-Sent Events,包括 SSE 的基本原理、在 FastAPI 中的具体实现方法、如何生成事件、管理客户端以及一些高级话题和最佳实践。

什么是 Server-Sent Events (SSE)?

SSE 是一种基于 HTTP 的服务器向客户端推送数据的技术标准。它的核心思想是:

  1. 客户端发起一个标准的 HTTP 请求(通常是 GET 请求)。
  2. 服务器保持连接开放,不立即关闭。
  3. 服务器可以随时通过这个开放的连接向客户端发送格式化的数据流。
  4. 客户端接收到数据后,可以解析并处理。

与 WebSockets 的双向全双工通信不同,SSE 是单向的,数据流只能从服务器流向客户端。这使得它特别适合那些只需要接收服务器更新的场景,例如:

  • 实时股票报价
  • 新闻/社交媒体动态更新
  • 后台任务进度通知
  • 日志流输出

SSE 数据格式

SSE 使用 text/event-stream 作为 MIME 类型。服务器发送的每个事件都由一系列以换行符分隔的字段组成,每个字段以字段名开头,后跟一个冒号 :,然后是字段值。一个事件的结束由一个空行(即连续两个换行符 \n\n)标记。

常见的 SSE 字段包括:

  • data: 事件的数据。如果数据很长,可以分多行发送,每行都以 data: 开头。
  • event: 事件类型。客户端可以通过指定 event 类型来监听不同类型的事件。
  • id: 事件 ID。客户端可以使用这个 ID 来跟踪最后接收到的事件,如果连接中断,可以在重新连接时告知服务器从哪个 ID 开始发送。
  • retry: 客户端在连接中断后应该等待多少毫秒再尝试重新连接。

一个简单的 SSE 数据流示例:

“`
data: This is the first message.

data: This is the second message.
data: It spans multiple lines.

event: user_joined
data: {“username”: “Alice”}
id: 123

event: message
data: {“sender”: “Bob”, “text”: “Hello!”}
id: 124

“`

SSE 的优势与劣势

优势:

  • 简单: 基于 HTTP,使用标准的 EventSource 客户端 API,比 WebSockets 更容易实现。
  • 原生支持: 浏览器内置 EventSource API,无需额外的库(除了旧版本 IE)。
  • 自动重连: EventSource 客户端 API 内置了自动重连机制,并且可以使用 retry 字段控制重连间隔。
  • 基于 HTTP: 可以利用现有的 HTTP 基础设施(如代理、负载均衡),但需要注意配置,避免它们干扰长连接。

劣势:

  • 单向: 客户端不能方便地向服务器发送数据(需要额外的 HTTP 请求)。
  • 限制: 某些旧的浏览器或代理可能存在连接数限制(通常是每个源 6 个连接)。
  • 二进制数据: 主要设计用于传输文本数据,传输二进制数据不如 WebSockets 方便。

在 FastAPI 中实现 SSE

FastAPI 构建在 Starlette 之上,Starlette 对 ASGI 协议提供了很好的支持,包括流式响应。实现 SSE 在 FastAPI 中非常直观,核心就是使用 StreamingResponse 返回一个生成器或异步生成器,该生成器以正确的 SSE 格式 yield 数据块,并将 media_type 设置为 text/event-stream

基本实现步骤

  1. 导入必要的模块:FastAPI, StreamingResponse, asyncio 等。
  2. 创建一个 FastAPI 应用实例。
  3. 定义一个异步路由处理函数。
  4. 在处理函数中定义一个异步生成器函数。
  5. 生成器函数负责按一定频率或在特定事件发生时生成 SSE 格式的字符串(如 data: message\n\n)。
  6. 使用 StreamingResponse 包装这个生成器,并设置 media_type="text/event-stream"

示例:一个简单的计时器

这个示例将创建一个 SSE 端点,每秒向客户端发送当前时间。

“`python
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import datetime

app = FastAPI()

异步生成器函数,用于生成SSE事件

async def event_generator(request: Request):
client_disconnected = False
try:
while True:
# 检查客户端是否已断开连接
# Starlette/FastAPI 内部会通过监听连接状态来处理断开
# 虽然 request.is_disconnected() 可以用,但对于StreamingResponse
# 更可靠的方式是在yield或await操作后,如果连接断开会抛出异常
# yield 语句在连接断开时会触发 ClientDisconnect 异常

        now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        # SSE 数据格式:data: <数据>\n\n
        # 确保以双换行符 \n\n 结束每个事件
        sse_message = f"data: Current time is {now}\n\n"

        yield sse_message

        # 等待一秒,模拟数据更新间隔
        await asyncio.sleep(1)

except asyncio.CancelledError:
    # 当客户端断开连接时,uvicorn/Starlette 会取消这个任务
    print("Client disconnected via CancelledError")
    client_disconnected = True
except Exception as e:
    # 捕获其他可能的异常
    print(f"An error occurred: {e}")
    client_disconnected = True
finally:
    # 可以在这里进行一些清理工作,如果需要的话
    if client_disconnected:
        print("Generator cleanup completed.")

定义SSE端点

@app.get(“/stream”)
async def sse_endpoint(request: Request):
# 返回StreamingResponse,使用异步生成器和正确的媒体类型
return StreamingResponse(event_generator(request), media_type=”text/event-stream”)

运行应用 (使用 uvicorn)

uvicorn main:app –reload

“`

代码解释:

  1. import asyncio: 导入 asyncio 用于异步操作,特别是 asyncio.sleep
  2. import datetime: 用于生成实时时间戳。
  3. from fastapi import FastAPI, Request: 导入 FastAPI 和 Request 对象(Request 用于获取请求信息,虽然在这个简单示例中主要用于演示如何获取并在生成器中使用)。
  4. from fastapi.responses import StreamingResponse: 导入 StreamingResponse,这是 FastAPI/Starlette 用于返回流式响应的类。
  5. async def event_generator(request: Request): 定义一个异步生成器函数。异步生成器是使用 async def 定义并包含 yield 关键字的函数。它能够与 await 一起使用。
  6. while True:: 创建一个无限循环,使生成器持续产生事件。
  7. now = ...: 获取当前时间。
  8. sse_message = f"data: Current time is {now}\n\n": 构建 SSE 格式的字符串。data: 是必需的字段前缀,\n\n 标志着一个事件的结束。
  9. yield sse_message: 将构建好的 SSE 字符串发送给客户端。yield 关键字使得函数成为一个生成器。
  10. await asyncio.sleep(1): 暂停一秒。await 允许在等待时释放事件循环,处理其他请求。
  11. try...except asyncio.CancelledError...finally:: 这是处理客户端断开连接的关键部分。当客户端关闭连接时, ASGI 服务器(如 Uvicorn)会取消正在执行的与该连接相关的协程。捕获 asyncio.CancelledError 是检测这一情况的标准方法。finally 块用于执行清理任务。
  12. @app.get("/stream"): 定义一个 GET 请求的路由 /stream
  13. async def sse_endpoint(request: Request):: 路由处理函数,标记为 async
  14. return StreamingResponse(...): 创建并返回 StreamingResponse 实例,将 event_generator 作为内容,并将 media_type 设置为 text/event-stream

如何测试:

  1. 保存代码为 main.py
  2. 安装 uvicorn:pip install uvicorn fastapi
  3. 运行应用:uvicorn main:app --reload
  4. 在浏览器中访问 http://127.0.0.1:8000/stream。大多数现代浏览器(Chrome, Firefox, Edge, Safari)都原生支持 SSE,它们会尝试连接并显示接收到的数据流。或者使用 curl 命令:curl -N http://127.0.0.1:8000/stream (-N 选项禁用缓冲)。

你会在浏览器页面(或 curl 输出)中看到每秒更新的时间。关闭浏览器标签页或中断 curl 进程,会在服务器控制台看到 “Client disconnected…” 的输出。

发送不同类型的事件

你可以通过在 data: 字段前添加 event: 字段来发送不同类型的事件:

“`python

在 event_generator 中修改 yield 部分

async def event_generator(request: Request):
count = 0
try:
while True:
count += 1
if count % 5 == 0:
# 每5秒发送一个特殊事件
special_message = f”event: five_second_marker\ndata: It’s been {count} seconds!\n\n”
yield special_message
else:
# 发送常规数据事件
now = datetime.datetime.now().strftime(“%H:%M:%S”)
regular_message = f”data: Ping at {now}\n\n”
yield regular_message

        await asyncio.sleep(1)

except asyncio.CancelledError:
    print("Client disconnected.")
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    print("Generator cleanup completed.")

“`

客户端 JavaScript EventSource API 可以通过 addEventListener() 监听特定的事件类型。

添加事件 ID 和重连间隔

可以在 SSE 消息中加入 id:retry: 字段:

“`python

在 event_generator 中修改 yield 部分

async def event_generator(request: Request):
message_id = 0
try:
# 可以设置一个初始的重连间隔,或者在消息中发送
# initial_retry = 2000 # 2秒
# yield f”retry: {initial_retry}\n\n” # 如果需要在开始时指定

    while True:
        message_id += 1
        now = datetime.datetime.now().strftime("%H:%M:%S")

        # 包含 id 字段
        sse_message = f"id: {message_id}\ndata: Message {message_id} at {now}\n\n"

        # 可以根据需要动态调整 retry 间隔,例如在检测到问题时增加
        # if some_condition:
        #     sse_message = f"{sse_message}retry: 5000\n\n" # 尝试5秒后重连

        yield sse_message

        await asyncio.sleep(1)

except asyncio.CancelledError:
    print(f"Client disconnected. Last ID sent: {message_id}")
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    print("Generator cleanup completed.")

“`

客户端 EventSource 会自动处理 idretry 字段。当连接中断并自动重连时,浏览器会在新的连接请求头中包含一个 Last-Event-ID 字段,其值是客户端接收到的最后一个事件的 ID。服务器端可以检查这个头,从而从上次中断的地方继续发送事件。在 FastAPI 中,你可以通过 request.headers.get('Last-Event-ID') 获取这个值。

生成事件的策略

上面的示例中,事件生成器本身就是事件的来源(它自己每秒生成一个时间戳)。在更实际的应用中,事件通常来源于外部:

  1. 后台任务: 一个独立的后台协程或线程正在执行某个任务,并在任务有更新时通知 SSE 生成器。
  2. 数据库变化: 监听数据库的变化(例如使用触发器、CDC 等),并在数据更新时推送。
  3. 消息队列: 从消息队列(如 Redis Pub/Sub, Kafka, RabbitMQ)订阅消息,并在接收到消息时推送给客户端。

为了将外部事件传递给 SSE 生成器,我们需要一个中间机制。

使用 asyncio.Queue (适用于单进程/单机)

对于简单的应用或单进程部署,可以使用 asyncio.Queue 在不同的协程之间传递事件。

“`python
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import datetime
import uvicorn

app = FastAPI()

创建一个异步队列来存放待发送的事件

注意:在多进程/多副本部署时,这需要被一个共享的消息队列替换

event_queue = asyncio.Queue()

后台任务:模拟一个每秒产生数据的源

async def generate_data_source():
i = 0
while True:
i += 1
message = f”Background data {i} at {datetime.datetime.now().strftime(‘%H:%M:%S’)}”
# 将数据放入队列
await event_queue.put(message)
print(f”Put data into queue: {message}”)
await asyncio.sleep(1)

异步生成器函数:从队列中获取数据并格式化为 SSE

async def sse_event_publisher(request: Request):
print(“Client connected”)
client_disconnected = False
try:
while True:
# 从队列中获取事件,如果队列为空则等待
# await event_queue.get() 是一个阻塞操作,但不会阻塞整个事件循环
data = await event_queue.get()

        # 格式化为 SSE 消息
        sse_message = f"data: {data}\n\n"

        # 将消息发送给客户端
        yield sse_message

        # 通知队列该项已处理完毕 (如果使用 Queue.task_done)
        # event_queue.task_done()

except asyncio.CancelledError:
    print("Client disconnected.")
    client_disconnected = True
except Exception as e:
    print(f"An error occurred: {e}")
    client_disconnected = True
finally:
    print("Generator cleanup completed.")
    # 清理工作 (例如,如果需要从某个订阅列表中移除客户端)

定义SSE端点

@app.get(“/stream”)
async def sse_endpoint(request: Request):
# 返回StreamingResponse,使用异步生成器和正确的媒体类型
# 注意:每个连接都会启动一个新的 sse_event_publisher 生成器
# 这个简单的 Queue 示例无法将同一个事件广播给所有客户端
# 下面会讨论广播问题
return StreamingResponse(sse_event_publisher(request), media_type=”text/event-stream”)

启动后台任务

@app.on_event(“startup”)
async def startup_event():
print(“Starting background data generator…”)
asyncio.create_task(generate_data_source())

运行应用 (使用 uvicorn)

uvicorn main:app –reload

“`

注意: 上面的 asyncio.Queue 示例只是演示了如何将后台任务产生的数据传递给 SSE 生成器。然而,它存在一个严重的问题:event_queue 是共享的,但 sse_event_publisher 是每个客户端连接创建一个新的。当 generate_data_source 将一个消息放入队列时,只有一个正在等待的 sse_event_publisher 会收到它。这个模式无法实现将同一个事件广播给所有连接的客户端

广播事件给多个客户端

要将事件广播给所有连接的客户端,你需要一个机制来管理所有活动的 SSE 连接,并在事件发生时遍历这些连接,将事件发送给每一个客户端。

一种常见模式是:

  1. 维护一个活动客户端连接(或更准确地说,它们的事件队列或写入通道)的列表或集合。
  2. 当新的 SSE 连接建立时,将其添加到列表中。
  3. 当连接断开时,将其从列表中移除。
  4. 当事件发生时,遍历列表,将事件发送给每个客户端。

这需要更复杂的事件管理类。为了实现高效广播和处理客户端断开,通常会使用每个客户端独立的 asyncio.Queue 或类似的结构,以及一个中心化的广播逻辑。

高级示例:一个简单的广播器 (单机)

“`python
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import datetime
import uuid # 用于给客户端分配唯一ID

app = FastAPI()

存储所有活动客户端的字典 {client_id: asyncio.Queue}

队列用于存储待发送给该客户端的事件

active_clients: dict[str, asyncio.Queue] = {}

后台任务:模拟数据源并将数据广播给所有客户端

async def broadcast_events():
i = 0
while True:
i += 1
message_data = f”Broadcast message {i} at {datetime.datetime.now().strftime(‘%H:%M:%S’)}”
print(f”Broadcasting: {message_data}”)

    # 构建 SSE 消息
    sse_message = f"data: {message_data}\n\n"

    # 遍历所有活动客户端,将消息放入它们的队列
    disconnected_clients = []
    for client_id, queue in active_clients.items():
        try:
            # 使用 put_nowait 以避免广播任务阻塞
            # 如果队列满了,表示客户端处理慢,可以忽略或使用 put 等待
            # 这里简单处理:如果队列满则跳过该客户端
            queue.put_nowait(sse_message)
        except asyncio.QueueFull:
            print(f"Client {client_id} queue is full. Skipping.")
            # 可选:认为客户端不活跃,标记为断开
            # disconnected_clients.append(client_id)
        except Exception as e:
            print(f"Error broadcasting to client {client_id}: {e}")
            disconnected_clients.append(client_id)

    # 移除已断开或处理异常的客户端
    for client_id in disconnected_clients:
        print(f"Removing disconnected client: {client_id}")
        del active_clients[client_id]

    await asyncio.sleep(1) # 模拟广播间隔

SSE 生成器:管理单个客户端连接,从其专属队列中读取事件

async def client_event_generator(client_id: str, queue: asyncio.Queue, request: Request):
print(f”Client {client_id} connected.”)
try:
while True:
# 从客户端专属队列中获取事件,如果队列为空则等待
# get() 会等待直到队列中有项目
sse_message = await queue.get()

        # 检查连接是否仍然活跃 (yield 会处理断开异常)
        yield sse_message

        # 通知队列该项已处理完毕
        queue.task_done()

except asyncio.CancelledError:
    print(f"Client {client_id} disconnected.")
except Exception as e:
    print(f"An error occurred for client {client_id}: {e}")
finally:
    # 客户端断开时,将其从活动客户端列表中移除
    if client_id in active_clients:
        print(f"Removing client {client_id} from active list.")
        del active_clients[client_id]

SSE 端点:创建客户端队列并启动生成器

@app.get(“/stream”)
async def sse_endpoint(request: Request):
# 为新连接生成唯一ID和队列
client_id = str(uuid.uuid4())
client_queue = asyncio.Queue() # 每个客户端一个队列
active_clients[client_id] = client_queue
print(f”New client connected: {client_id}. Total clients: {len(active_clients)}”)

# 返回 StreamingResponse,使用为该客户端创建的生成器
return StreamingResponse(client_event_generator(client_id, client_queue, request), media_type="text/event-stream")

启动后台广播任务

@app.on_event(“startup”)
async def startup_event():
print(“Starting background broadcast task…”)
asyncio.create_task(broadcast_events())

运行应用 (使用 uvicorn)

uvicorn main:app –reload

“`

解释:

  1. active_clients: 一个字典,键是客户端的唯一 ID,值是专门用于该客户端的 asyncio.Queue
  2. broadcast_events(): 这是一个独立的异步任务,负责生成(或从外部获取)事件。它遍历 active_clients 字典,并将事件放入每个客户端的队列中。
  3. client_event_generator(): 这是每个客户端连接的 SSE 生成器。它不再自己生成数据,而是从传入的 queue 中读取数据。await queue.get() 会在该客户端队列为空时等待。
  4. /stream 端点:当新的客户端连接时,为其创建一个唯一的 ID 和一个专用的队列,将其添加到 active_clients 字典,然后启动 client_event_generator 并将该队列传递进去。
  5. 客户端断开时:client_event_generator 中的 try...finally 块会捕获 CancelledError 并将该客户端从 active_clients 中移除。broadcast_events 也会检查队列是否可写,并在异常时移除客户端。

这个广播模式解决了将同一事件发送给所有客户端的问题,并且通过每个客户端自己的队列提供了一定的背压(backpressure)控制:如果一个客户端处理消息很慢导致其队列满了,它不会影响其他客户端。

限制: 这个基于 asyncio.Queue 和全局字典的广播器只适用于单进程的 FastAPI 应用。如果使用 Gunicorn 启动多个 worker 进程,或者部署到多个服务器实例,每个进程都会有自己的 active_clients 字典,它们之间无法直接通信,广播将失效。

扩展到多进程/分布式环境

要在多进程或分布式环境中实现 SSE 广播,必须使用一个外部的、进程间/跨服务器通信的消息队列。流行的选择包括:

  • Redis Pub/Sub: 轻量、高性能的发布/订阅系统,非常适合简单的广播场景。
  • Kafka: 分布式流处理平台,功能强大,适合处理大量数据和更复杂的场景。
  • RabbitMQ: 成熟的消息代理,支持多种消息模式。

基本模式是:

  1. 事件源: 将事件发布到消息队列的一个特定频道或主题。
  2. FastAPI 应用: 每个运行的 FastAPI 进程(或 worker)都订阅这个频道或主题。
  3. 广播器: 每个 FastAPI 进程中有一个协程或任务,负责从消息队列中接收事件。
  4. 客户端管理: 每个 FastAPI 进程维护连接到该进程的活动客户端列表(如上面的 active_clients 字典)。
  5. 推送: 当消息队列订阅者接收到事件时,它遍历该进程内的活动客户端列表,并将事件推送到每个客户端的队列或直接通过 SSE 连接发送。

实现这一模式需要集成相应的消息队列客户端库(如 aioredis 用于 Redis,aiokafka 用于 Kafka 等),并在 FastAPI 应用启动时建立订阅连接。

客户端实现 (JavaScript EventSource)

浏览器端接收 SSE 非常简单,使用原生的 EventSource API 即可:

“`javascript
// 连接到你的 SSE 端点
const eventSource = new EventSource(‘http://127.0.0.1:8000/stream’);

// 监听 “message” 事件 (默认事件类型)
eventSource.onmessage = function(event) {
console.log(‘Received message:’, event.data);
// event.data 包含服务器发送的数据 (去掉 data: 前缀和\n\n)
};

// 监听特定事件类型 (如果服务器发送了 event: )
// eventSource.addEventListener(‘five_second_marker’, function(event) {
// console.log(‘Received five_second_marker:’, event.data);
// });

// 监听连接打开事件
eventSource.onopen = function(event) {
console.log(‘SSE connection opened.’);
};

// 监听错误事件 (连接中断,重连失败等)
eventSource.onerror = function(event) {
console.error(‘SSE error:’, event);
// EventSource 会自动尝试重连,除非遇到永久性错误或服务器未指定 retry 间隔
// 可以根据需要在这里手动关闭并重新创建 EventSource
// eventSource.close();
// setTimeout(() => {
// const newEventSource = new EventSource(‘http://127.0.0.1:8000/stream’);
// // 重新设置事件监听器…
// }, eventSource.retry || 3000); // 使用服务器指定的retry间隔或默认3秒
};

// 可以随时关闭连接
// eventSource.close();
“`

EventSource 对象的 readyState 属性表示连接状态:0 (CONNECTING), 1 (OPEN), 2 (CLOSED)。

实际考虑与最佳实践

  1. 认证与授权: SSE 端点通常需要保护。可以在建立 SSE 连接前进行标准的认证(如基于 Token 的认证),并在路由处理函数中检查认证信息。如果使用 Token,可以考虑将其作为查询参数或自定义请求头发送(注意:EventSource 不支持发送自定义头,查询参数是更常见的做法)。
  2. 代理与防火墙: 某些代理服务器可能会缓冲 HTTP 响应或关闭长时间不活动的连接。确保你的代理(如 Nginx)配置正确,以支持 SSE 长连接(关闭缓冲)。
  3. 心跳机制: 虽然 SSE 标准本身没有内置心跳,但长时间连接可能会被中间网络设备关闭。可以考虑定期发送一个空的 data: \n\nevent: heartbeat\ndata: \n\n 消息作为心跳,保持连接活跃。
  4. 错误处理与日志: 在生成器中捕获异常(特别是 CancelledErrorConnectionClosedError,取决于 ASGI 服务器),并记录有用的信息,如哪个客户端断开、断开原因等。
  5. 资源管理: 当有大量客户端连接时,每个连接都会消耗服务器资源(内存、文件描述符)。设计时要考虑如何高效管理这些资源,并在客户端断开时正确清理。使用消息队列是扩展性的关键。
  6. 客户端重连: 浏览器 EventSource 默认会自动重连,并遵循 retry 字段。确保你的服务器在客户端断开后能正确处理 Last-Event-ID 头,从中断处继续发送(如果你的应用需要此功能)。
  7. 性能: SSE 的开销相对较小,但在连接数非常多时(几万、几十万),总的连接管理开销可能会变大。对于极高并发、低延迟且需要双向通信的场景,WebSockets 可能更合适。
  8. 使用 Uvicorn/Gunicorn: 使用支持 ASGI 的服务器(如 Uvicorn 或 Gunicorn 配合 Uvicorn worker)来运行 FastAPI 应用,它们能够正确处理异步生成器和长连接。

SSE vs WebSockets

特性 SSE (Server-Sent Events) WebSockets
通信方向 单向(服务器 -> 客户端) 双向
协议 基于 HTTP/1.1 长连接 独立的 WS 协议 (ws:// 或 wss://)
复杂性 简单,基于 HTTP,浏览器原生 EventSource 相对复杂,需要握手,通常需要库,更底层的 API
数据格式 文本 (text/event-stream) 文本或二进制
自动重连 内置于 EventSource 客户端 API 需要手动实现
Use Case 服务器推送更新,通知,数据流(只读) 需要频繁双向通信,聊天,游戏,协作工具
HTTP 兼容性 更好,利用现有 HTTP 基础设施(但需注意代理) 需要升级握手,对代理/防火墙要求更高
浏览器支持 广泛支持(除旧版 IE) 现代浏览器普遍支持
性能 每连接开销相对 WebSocket 稍高一点(HTTP 头) 每连接开销通常更低

选择 SSE 还是 WebSockets 取决于你的具体需求。如果你只需要服务器向客户端推送数据,并且对延迟要求不是极致低,SSE 通常是更简单、更合适的选择。如果需要双向通信、传输二进制数据或极低延迟,WebSockets 是更好的选择。

结论

Server-Sent Events 为 Web 应用提供了高效、简洁的服务器推送能力。结合 FastAPI 强大的异步特性和简洁的 API 设计,实现 SSE 端点变得非常容易。

从基础的 StreamingResponse 和异步生成器,到使用 asyncio.Queue 实现单机广播,再到理解如何通过消息队列扩展到分布式环境,FastAPI 提供了构建各种规模实时推送应用的良好基础。

通过本文的指南,你应该能够理解 SSE 的原理,掌握在 FastAPI 中实现 SSE 的核心技术,并能根据需求选择合适的事件生成和广播策略,从而构建出响应迅速、用户体验更好的实时应用。记住,对于生产环境,尤其是需要处理大量并发连接或分布式部署时,集成一个外部的消息队列是实现可靠和可伸缩 SSE 推送的关键。


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部