精通FastAPI SSE:高级技巧与窍门 – wiki基地

精通FastAPI SSE:高级技巧与窍门

Server-Sent Events (SSE) 是一种轻量级的、基于HTTP的单向通信协议,允许服务器向客户端推送实时更新。与WebSocket的双向通信不同,SSE专注于服务器到客户端的数据流。FastAPI凭借其出色的性能和易用性,为构建SSE端点提供了强大的支持。本文将深入探讨FastAPI中SSE的高级技巧和窍门,帮助您构建高效、可靠且可扩展的实时应用。

1. SSE基础回顾

在深入高级技巧之前,让我们快速回顾一下SSE的基础知识。

1.1 SSE工作原理

SSE建立在HTTP协议之上。客户端通过发送一个带有Accept: text/event-stream头部的HTTP请求来发起连接。服务器接收到请求后,保持连接打开,并周期性地向客户端发送格式化的文本数据。

1.2 SSE数据格式

SSE消息由一个或多个字段组成,每个字段以换行符(\n)分隔。常见的字段包括:

  • data: 消息的主要内容。可以包含多行,每行以data:开头。
  • event: 事件类型。客户端可以使用此字段来区分不同类型的事件。
  • id: 事件的唯一标识符。客户端可以使用此字段来跟踪事件的顺序或检测丢失的事件。
  • retry: 客户端在连接断开后重新连接的等待时间(毫秒)。

一个典型的SSE消息如下所示:

“`
event: message
id: 12345
data: This is the first line of data.
data: This is the second line.

“`
注意空行表示一个事件的结束。

1.3 FastAPI中的简单SSE示例

“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import time

app = FastAPI()

async def event_generator():
“””
一个简单的事件生成器,每秒发送一个带有当前时间的事件。
“””
while True:
yield {
“event”: “message”,
“id”: int(time.time()),
“data”: f”The time is: {time.strftime(‘%H:%M:%S’)}”
}
await asyncio.sleep(1)

@app.get(“/events”)
async def events(request: Request):
“””
SSE端点,使用StreamingResponse发送事件流。
“””
return StreamingResponse(event_generator(), media_type=”text/event-stream”)

``
这个示例中,
event_generator是一个异步生成器函数,它不断产生SSE消息。StreamingResponse将生成器函数作为参数,并设置media_typetext/event-stream`,从而创建了一个SSE端点。

2. 高级技巧与窍门

2.1. 使用 EventSourceResponse(推荐)

FastAPI 实际上提供了一个更专门的 EventSourceResponse 类来处理 SSE。虽然 StreamingResponse 可以工作,但 EventSourceResponse 提供了更好的类型提示和一些内置的便利功能。它还处理了设置正确的标头等细节。

“`python
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse # 需要安装 sse-starlette
import asyncio
import time

app = FastAPI()

async def event_generator():
i = 0
while True:
yield {
“event”: “message”,
“id”: i,
“data”: f”The time is: {time.strftime(‘%H:%M:%S’)}”
}
i += 1
await asyncio.sleep(1)

@app.get(“/events”)
async def events():
return EventSourceResponse(event_generator())

“`

优势:

  • 更清晰的意图:EventSourceResponse 明确表示此端点用于 SSE。
  • 类型安全:更好的类型提示和静态分析。
  • 自动头部:自动设置 Content-Type: text/event-stream 和其他相关头部。
  • 内置功能:可以轻松设置 retry 等参数。

2.2. 客户端断开连接检测

在SSE中,服务器需要能够检测客户端何时断开连接,以便停止发送事件并释放资源。FastAPI提供了几种方法来实现这一点:

2.2.1. 使用request.is_disconnected

在生成器函数中,可以使用request.is_disconnected属性来检查客户端是否已断开连接。

“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse # 或者 EventSourceResponse
import asyncio
import time

app = FastAPI()

async def event_generator(request: Request):
try:
while True:
if await request.is_disconnected(): # 重要:必须是 await
print(“Client disconnected!”)
break

        yield {
            "event": "message",
            "id": int(time.time()),
            "data": f"The time is: {time.strftime('%H:%M:%S')}"
        }
        await asyncio.sleep(1)
except asyncio.CancelledError:
    print("Generator cancelled (client disconnected).") # 通常在客户端断开时发生

@app.get(“/events”)
async def events(request: Request):
return StreamingResponse(event_generator(request), media_type=”text/event-stream”)

“`

2.2.2. 异常处理 (更推荐)

更健壮的方法是使用 try...except 块来捕获 asyncio.CancelledError 异常。当客户端断开连接时,FastAPI通常会取消与该请求关联的异步任务,从而触发 CancelledError。 这是处理客户端断开连接的更可靠和推荐的方法。

“`python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse # 或者 EventSourceResponse
import asyncio
import time

app = FastAPI()

async def event_generator():
try:
while True:
yield {
“event”: “message”,
“id”: int(time.time()),
“data”: f”The time is: {time.strftime(‘%H:%M:%S’)}”
}
await asyncio.sleep(1)
except asyncio.CancelledError:
print(“Client disconnected!”)
finally:
print(“Generator cleaning up…”) # 可选:执行清理操作

@app.get(“/events”)
async def events():
return StreamingResponse(event_generator(), media_type=”text/event-stream”)
“`

最佳实践: 优先使用 try...except asyncio.CancelledError 来处理客户端断开连接。它更可靠,并且允许你在 finally 块中执行任何必要的清理操作(例如,关闭数据库连接、释放资源等)。

2.3. 发送不同类型的事件

SSE允许您使用event字段发送不同类型的事件。客户端可以根据事件类型来处理不同的消息。

“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import time
import random

app = FastAPI()

async def event_generator(request: Request):
try:
while True:
if await request.is_disconnected():
break

        event_type = random.choice(["temperature", "humidity", "pressure"])
        data = ""

        if event_type == "temperature":
            data = f"Temperature: {random.uniform(20, 30):.2f}°C"
        elif event_type == "humidity":
            data = f"Humidity: {random.uniform(40, 60):.2f}%"
        else:
            data = f"Pressure: {random.uniform(1000, 1020):.2f} hPa"

        yield {
            "event": event_type,
            "id": int(time.time()),
            "data": data
        }
        await asyncio.sleep(2)
except asyncio.CancelledError:
     print("Client disconnected!")

@app.get(“/events”)
async def events(request: Request):
return StreamingResponse(event_generator(request), media_type=”text/event-stream”)

“`

在客户端,您可以使用JavaScript的EventSource API来监听特定类型的事件:

“`javascript
const eventSource = new EventSource(‘/events’);

eventSource.addEventListener(‘temperature’, (event) => {
console.log(‘Temperature update:’, event.data);
});

eventSource.addEventListener(‘humidity’, (event) => {
console.log(‘Humidity update:’, event.data);
});

eventSource.addEventListener(‘pressure’, (event) => {
console.log(‘Pressure update:’, event.data);
});

eventSource.onmessage = (event) => {
// 捕获所有没有特定事件类型处理程序的消息
console.log(“Generic message:”, event.data)
};

eventSource.onerror = (error) => {
console.error(“EventSource failed:”, error);
eventSource.close(); // 最佳实践:在错误时关闭连接
}

“`

2.4. 设置重试时间

您可以使用retry字段来告诉客户端在连接断开后多长时间尝试重新连接(以毫秒为单位)。

“`python
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio

app = FastAPI()

async def event_generator():
try:
i = 0
while True:
yield {
“event”: “message”,
“id”: i,
“data”: f”Event {i}”,
“retry”: 5000 # 告诉客户端在5秒后重试
}
i += 1
await asyncio.sleep(2)
except asyncio.CancelledError:
print(“Client disconnected!”)

@app.get(“/events”)
async def events():
return EventSourceResponse(event_generator())
“`

2.5. 使用Last-Event-ID实现断点续传

客户端可以在请求头中包含Last-Event-ID字段,以指示它接收到的最后一个事件的ID。服务器可以使用此信息来从客户端断开的地方继续发送事件。这对于处理网络中断或客户端重新加载非常有用。

“`python
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse # 或者 EventSourceResponse
import asyncio
import time

app = FastAPI()

假设我们有一个简单的事件存储(在实际应用中,可以使用数据库或消息队列)

events = []

async def event_generator(request: Request, last_event_id: int = None):
try:
current_id = 0

    # 如果提供了Last-Event-ID,则从该ID之后的事件开始发送
    if last_event_id is not None:
        current_id = last_event_id + 1
        # 过滤掉已经发送过的事件(在实际应用中,这应该在数据库查询中完成)
        # events = events[current_id:]

    while True:
        if await request.is_disconnected():
            break

        # 生成新的事件
        new_event = {
            "event": "message",
            "id": current_id,
            "data": f"Event {current_id}"
        }
        events.append(new_event)  # 将事件添加到存储中
        yield new_event

        current_id += 1
        await asyncio.sleep(2)

except asyncio.CancelledError:
    print("Client disconnected!")

@app.get(“/events”)
async def events(request: Request, last_event_id: str = None):
# 从请求头中获取Last-Event-ID
last_id = request.headers.get(“Last-Event-ID”)
start_id = None

if last_id:
    try:
        start_id = int(last_id)
    except ValueError:
        raise HTTPException(status_code=400, detail="Invalid Last-Event-ID")


return StreamingResponse(
    event_generator(request, start_id),
    media_type="text/event-stream"
)

“`

客户端代码(JavaScript):

“`javascript
let eventSource;
let lastEventId = null;

function connect() {
let url = ‘/events’;
if (lastEventId) {
url += ‘?last_event_id=’ + lastEventId; // 也可以通过header传递,更标准
const headers = {‘Last-Event-ID’: lastEventId};
eventSource = new EventSource(url, {headers: headers});
}
else{
eventSource = new EventSource(url);
}

eventSource.onmessage = (event) => {
    console.log('Received:', event.data);
    lastEventId = event.lastEventId;  // 更新 lastEventId
};

eventSource.onerror = (error) => {
    console.error('EventSource error:', error);
    eventSource.close();
    // 可以在这里实现自动重连逻辑
    setTimeout(connect, 5000); // 5秒后重连
};

}

connect();

“`

2.6. 与消息队列集成 (RabbitMQ, Redis, Kafka)

对于更复杂的应用场景,您可能需要将SSE与消息队列集成,以实现更可靠的事件传递和水平扩展。

示例 (使用 Redis Pub/Sub):

首先,安装 aioredis: pip install aioredis

“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import aioredis
import json
import os
from dotenv import load_dotenv

load_dotenv() # 加载 .env 文件

app = FastAPI()

Redis 连接配置

REDIS_HOST = os.getenv(“REDIS_HOST”, “localhost”)
REDIS_PORT = int(os.getenv(“REDIS_PORT”, 6379))
REDIS_PASSWORD = os.getenv(“REDIS_PASSWORD”)

async def get_redis_connection():
“””
获取 Redis 连接。
“””
return await aioredis.create_redis_pool(
f”redis://{REDIS_HOST}:{REDIS_PORT}”, password=REDIS_PASSWORD, encoding=”utf-8″
)

async def event_generator(request: Request, redis_channel: str):
“””
从 Redis Pub/Sub 频道接收消息并作为 SSE 事件发送。
“””
redis = await get_redis_connection()
try:
(channel,) = await redis.subscribe(redis_channel) # 订阅频道
while True:
if await request.is_disconnected():
break
try:
message = await channel.get_message(ignore_subscribe_messages=True, timeout=1.0) #使用timeout,避免阻塞
if message:
data = json.loads(message[“data”])
yield {
“event”: data.get(“event”, “message”), # 默认事件类型为 “message”
“id”: data.get(“id”),
“data”: data.get(“data”)
}
except asyncio.TimeoutError:
continue # 超时后继续循环
except Exception as e:
print(f”Error reading from Redis: {e}”)
break

except asyncio.CancelledError:
    print("Client disconnected!")
finally:
    await redis.unsubscribe(redis_channel)
    redis.close()
    await redis.wait_closed()

@app.get(“/events/{channel}”)
async def events(request: Request, channel: str):
“””
SSE 端点,订阅指定的 Redis 频道。
“””
return StreamingResponse(
event_generator(request, channel), media_type=”text/event-stream”
)

示例:发布消息到 Redis 频道(通常在另一个服务或进程中)

async def publish_message(channel: str, message: dict):
redis = await get_redis_connection()
try:
await redis.publish(channel, json.dumps(message))
finally:
redis.close()
await redis.wait_closed()

示例端点:发布消息

@app.post(“/publish/{channel}”)
async def publish(channel: str, message: dict):
await publish_message(channel, message)
return {“message”: “Message published”}

“`
解释:

  • get_redis_connection(): 创建一个 Redis 连接池。
  • event_generator():
    • 订阅指定的 Redis 频道。
    • 循环读取频道中的消息。
      • 重要的: 使用 channel.get_message(ignore_subscribe_messages=True, timeout=1.0),这样可以避免在没有消息时无限期阻塞,并允许检查客户端是否断开连接。
    • 将消息解析为 JSON,并格式化为 SSE 事件。
    • 使用 yield 发送 SSE 事件。
    • finally 块中取消订阅并关闭 Redis 连接。
  • events(): SSE 端点,调用 event_generator() 并传入频道名称。
  • publish_message(): 将消息发布到 Redis 频道的函数(通常在单独的进程或服务中)。
  • /publish/{channel}: 一个示例端点,允许您发布消息到指定的频道(用于测试)。

优点:

  • 可靠性: 消息队列确保事件不会丢失,即使客户端暂时断开连接。
  • 可扩展性: 您可以轻松地添加更多 SSE 服务器实例,它们都从同一个消息队列读取事件。
  • 解耦: 事件生产者和消费者(SSE 服务器)是解耦的,可以独立扩展。
  • 持久性: Redis 可以配置为持久化消息,确保即使服务器重启也不会丢失事件。

2.7. 速率限制

为了防止滥用和资源耗尽,您可能需要对SSE端点进行速率限制。可以使用fastapi-limiter库来实现这一点。
首先,安装 fastapi-limiter: pip install fastapi-limiter

“`python
from fastapi import FastAPI, Request, Depends
from fastapi.responses import StreamingResponse
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
import asyncio
import time
import redis.asyncio as redis
import os
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()

Redis 连接配置

REDIS_HOST = os.getenv(“REDIS_HOST”, “localhost”)
REDIS_PORT = int(os.getenv(“REDIS_PORT”, 6379))
REDIS_PASSWORD = os.getenv(“REDIS_PASSWORD”)

初始化速率限制器 (连接到Redis)

@app.on_event(“startup”)
async def startup():
redis_instance = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD)
await FastAPILimiter.init(redis_instance)

async def event_generator(request: Request):
try:
while True:
if await request.is_disconnected():
break

        yield {
            "event": "message",
            "id": int(time.time()),
            "data": f"The time is: {time.strftime('%H:%M:%S')}"
        }
        await asyncio.sleep(1)
except asyncio.CancelledError:
    print("Client disconnected!")

@app.get(“/events”, dependencies=[Depends(RateLimiter(times=2, seconds=5))]) # 5秒内最多2次请求
async def events(request: Request):
return StreamingResponse(event_generator(request), media_type=”text/event-stream”)
“`

解释:

  • @app.on_event("startup"): 在FastAPI启动时初始化FastAPILimiter,并连接到Redis。
  • dependencies=[Depends(RateLimiter(times=2, seconds=5))]: 将速率限制器应用于/events端点。
  • RateLimiter(times=2, seconds=5): 限制为每5秒2次请求。

2.8. 使用HTTP/2

SSE本身就受益于HTTP/2的多路复用功能。在HTTP/1.1中,浏览器通常对每个域的并发连接数有限制(通常为6个)。这意味着如果您有多个SSE连接或其他HTTP请求,它们可能会相互阻塞。HTTP/2通过在单个连接上多路复用多个请求和响应来解决这个问题。

FastAPI本身并不直接处理HTTP协议版本。这取决于您使用的ASGI服务器(例如Uvicorn或Hypercorn)。

  • Uvicorn: Uvicorn 自动支持 HTTP/2,前提是您使用 --http http1.1|http2 命令行参数启用了它(默认是http1.1),并且客户端也支持 HTTP/2。 您通常还需要配置SSL证书才能使用HTTP/2。
  • Hypercorn: Hypercorn 也支持 HTTP/2,使用方式类似。

重要提示: 为了充分利用 HTTP/2,请确保:

  • 您的 ASGI 服务器已配置为支持 HTTP/2。
  • 您已配置了 SSL 证书(HTTP/2 通常需要 HTTPS)。
  • 您的客户端(浏览器或应用程序)支持 HTTP/2。大多数现代浏览器都支持。

2.9 使用pydantic进行数据验证和序列化

“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
import time

app = FastAPI()

class EventData(BaseModel):
timestamp: float
value: int
message: str

async def event_generator(request:Request):
try:
while True:
if await request.is_disconnected():
break
data = EventData(timestamp=time.time(), value=int(time.time()) % 100, message=”Periodic Update”)
yield {
“event”: “periodic_update”,
“id” : int(time.time()),
“data”: data.json() # 使用.json()方法
}
await asyncio.sleep(2)
except asyncio.CancelledError:
print(“Client Disconnected!”)

@app.get(“/events”)
async def events(request: Request):
return StreamingResponse(event_generator(request=request), media_type = “text/event-stream”)

``
使用 Pydantic 的
BaseModel,您可以轻松地验证传入的数据,并使用.json()` 方法将其序列化为 JSON 字符串,以便通过 SSE 发送。

3. 总结

本文深入探讨了FastAPI中SSE的高级技巧与窍门,包括客户端断开连接检测、发送不同类型的事件、设置重试时间、使用Last-Event-ID实现断点续传、与消息队列集成、速率限制以及利用HTTP/2的优势。通过掌握这些技巧,您可以构建更强大、更可靠、更具扩展性的实时应用。记住,实践是最好的学习方式,请尝试将这些技巧应用到您的项目中,并不断探索FastAPI和SSE的更多可能性。

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部