FastAPI Server-Sent Events (SSE) 教程 – wiki基地


FastAPI Server-Sent Events (SSE) 详细教程

在构建现代 Web 应用时,实时数据传输是常见的需求。无论是聊天应用、股票行情显示、实时通知,还是监控仪表盘,都需要服务器能够主动向客户端推送数据。传统的 HTTP 请求-响应模式是客户端发起请求,服务器返回响应,这种模式不适合需要服务器持续发送更新的场景。为了解决这个问题,出现了多种技术,其中 Server-Sent Events (SSE) 是一种简单而有效的单向数据推送技术。

FastAPI 作为一个高性能、易于使用的 Python Web 框架,基于 ASGI(Asynchronous Server Gateway Interface),天然支持异步操作和流式响应,这使得它成为实现 SSE 的绝佳选择。

本文将详细介绍 Server-Sent Events 的概念、优势,以及如何在 FastAPI 中从零开始实现一个 SSE 服务,包括客户端的接收逻辑,并探讨一些高级话题和最佳实践。

1. 什么是 Server-Sent Events (SSE)?

Server-Sent Events 是一种允许服务器通过 HTTP 连接将数据“推”送到客户端的技术。它与传统的 HTTP 请求-响应模式不同,SSE 建立的是一个持久的 HTTP 连接,服务器可以利用这个连接向客户端发送多条消息,而不需要客户端反复发起请求(轮询)。

SSE 的核心特点:

  1. 单向通信: 数据流向是固定的,从服务器到客户端。这是 SSE 与 WebSockets 的主要区别之一(WebSocket 支持双向通信)。
  2. 基于 HTTP: SSE 运行在标准的 HTTP 或 HTTPS 协议之上,使用普通的 HTTP 请求和响应。
  3. 文本协议: 发送的数据是文本格式,遵循特定的事件流格式。
  4. 自动重连: 客户端浏览器内置了对 SSE 的支持,并提供了自动重连机制,当连接断开时会自动尝试重新连接服务器。
  5. 高效: 相对于短轮询或长轮询,SSE 减少了大量的 HTTP 请求和响应头开销,提高了效率。

SSE 的 MIME 类型:

SSE 使用一个特殊的 MIME 类型 text/event-stream。当服务器返回这种 MIME 类型时,浏览器会将其视为一个事件流,并持续监听该连接以接收新的事件。

SSE 的消息格式:

每个发送到客户端的事件都是一个或多个以换行符结束的字段组成的文本块。块之间用一个空行分隔。常用的字段包括:

  • data:: 消息的数据内容。可以有多行 data: 字段,它们的内容会被连接起来,以换行符分隔,最终作为客户端接收到的单个消息的 data 属性。
  • event:: 可选的事件类型名称。客户端可以根据这个名称来区分不同类型的事件。
  • id:: 可选的事件 ID。客户端会记录最后接收到的事件 ID,并在断开重连时将其发送给服务器(通过 Last-Event-ID 请求头),方便服务器从上次中断的地方继续发送数据。
  • retry:: 可选的重连间隔时间(毫秒)。客户端在连接断开后会等待这个时间间隔后尝试重连。

示例 SSE 消息:

“`
data: 这是第一条消息
id: 1

data: 这是带有自定义事件类型的消息
event: greeting
id: 2

data: 这是分多行的数据
data: 第二行数据

data: 第三条消息
id: 3

“`

2. 为什么选择 SSE?

  • 简单: 相对于 WebSockets,SSE 的协议和实现都更为简单。服务器只需生成特定格式的文本流,客户端使用内置的 EventSource API 即可。
  • 效率: 避免了重复的 HTTP 请求开销,尤其适合数据更新频率不高的场景。
  • 内置特性: 浏览器原生支持 EventSource API,包括自动重连。
  • 基于 HTTP: 可以利用现有的 HTTP infrastructure(如代理服务器、负载均衡)而不需要特殊配置(相较于 WebSockets 可能需要升级连接)。

什么时候使用 SSE?

当你的应用场景主要是服务器向客户端单向推送数据,且不需要客户端频繁向服务器发送数据时,SSE 是一个非常合适的选择。典型的应用包括:

  • 实时通知和提醒
  • 新闻或博客更新推送
  • 股票或加密货币价格实时显示
  • 体育赛事比分更新
  • 任务进度或日志输出实时显示
  • 简单的实时数据仪表盘

什么时候不适合使用 SSE?

如果你的应用需要频繁的双向通信(客户端和服务器都需要主动发送数据),例如在线聊天室、多人游戏、需要客户端实时发送指令的控制系统,那么 WebSockets 是更合适的选择。

3. FastAPI 实现 SSE 的基础

FastAPI 基于 Starlette,而 Starlette 提供了 StreamingResponse 类,非常适合用来实现 SSE。StreamingResponse 可以接受一个异步生成器作为内容,并将生成器产生的数据流式地返回给客户端。

先决条件:

  • Python 3.7+
  • 安装 FastAPI 和 Uvicorn (或 Hypercorn 等 ASGI 服务器):
    bash
    pip install fastapi uvicorn

基本实现步骤:

  1. 创建一个 FastAPI 应用。
  2. 定义一个异步端点(通常是 GET 请求)。
  3. 在这个端点内部,创建一个异步生成器函数,该函数会周期性地生成符合 SSE 格式的字符串。
  4. 使用 StreamingResponse 返回这个生成器,并设置 media_typetext/event-stream

4. 在 FastAPI 中实现一个简单的 SSE 示例 (计数器)

让我们从一个最简单的例子开始:一个服务器端每秒向客户端推送一个递增的计数器值。

服务器端代码 (main.py):

“`python
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()

异步生成器函数,用于生成 SSE 事件流

async def event_generator():
“””
一个简单的异步生成器,每秒生成一个带有当前时间的 SSE 消息。
“””
count = 0
try:
while True:
# 构建 SSE 消息格式
# data: <消息内容>\n\n
# 如果需要event和id,格式如下:
# event: <事件类型>\n
# id: <事件ID>\n
# data: <消息内容>\n\n
current_time = asyncio.datetime.now().strftime(“%Y-%m-%d %H:%M:%S”)
message = f”data: Count {count} at {current_time}\n\n”

        yield message # 将消息推送给客户端

        count += 1
        await asyncio.sleep(1) # 等待1秒

except asyncio.CancelledError:
    # 当客户端断开连接时,生成器会被取消
    print("Client disconnected")
except Exception as e:
    print(f"Error in event stream: {e}")

@app.get(“/stream”)
async def stream_events(request: Request):
“””
SSE 端点,返回一个 StreamingResponse。
“””
print(“Client connected”)
return StreamingResponse(event_generator(), media_type=”text/event-stream”)

@app.get(“/”)
async def read_root():
“””
根路径,可以返回一个简单的 HTML 页面来测试 SSE。
“””
html_content = “””
<!DOCTYPE html>


FastAPI SSE Counter

SSE Counter

滚动至顶部