精通FastAPI SSE:高级技巧与窍门
Server-Sent Events (SSE) 是一种轻量级的、基于HTTP的单向通信协议,允许服务器向客户端推送实时更新。与WebSocket的双向通信不同,SSE专注于服务器到客户端的数据流。FastAPI凭借其出色的性能和易用性,为构建SSE端点提供了强大的支持。本文将深入探讨FastAPI中SSE的高级技巧和窍门,帮助您构建高效、可靠且可扩展的实时应用。
1. SSE基础回顾
在深入高级技巧之前,让我们快速回顾一下SSE的基础知识。
1.1 SSE工作原理
SSE建立在HTTP协议之上。客户端通过发送一个带有Accept: text/event-stream
头部的HTTP请求来发起连接。服务器接收到请求后,保持连接打开,并周期性地向客户端发送格式化的文本数据。
1.2 SSE数据格式
SSE消息由一个或多个字段组成,每个字段以换行符(\n
)分隔。常见的字段包括:
- data: 消息的主要内容。可以包含多行,每行以
data:
开头。 - event: 事件类型。客户端可以使用此字段来区分不同类型的事件。
- id: 事件的唯一标识符。客户端可以使用此字段来跟踪事件的顺序或检测丢失的事件。
- retry: 客户端在连接断开后重新连接的等待时间(毫秒)。
一个典型的SSE消息如下所示:
“`
event: message
id: 12345
data: This is the first line of data.
data: This is the second line.
“`
注意空行表示一个事件的结束。
1.3 FastAPI中的简单SSE示例
“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import time
app = FastAPI()
async def event_generator():
“””
一个简单的事件生成器,每秒发送一个带有当前时间的事件。
“””
while True:
yield {
“event”: “message”,
“id”: int(time.time()),
“data”: f”The time is: {time.strftime(‘%H:%M:%S’)}”
}
await asyncio.sleep(1)
@app.get(“/events”)
async def events(request: Request):
“””
SSE端点,使用StreamingResponse发送事件流。
“””
return StreamingResponse(event_generator(), media_type=”text/event-stream”)
``
event_generator
这个示例中,是一个异步生成器函数,它不断产生SSE消息。
StreamingResponse将生成器函数作为参数,并设置
media_type为
text/event-stream`,从而创建了一个SSE端点。
2. 高级技巧与窍门
2.1. 使用 EventSourceResponse
(推荐)
FastAPI 实际上提供了一个更专门的 EventSourceResponse
类来处理 SSE。虽然 StreamingResponse
可以工作,但 EventSourceResponse
提供了更好的类型提示和一些内置的便利功能。它还处理了设置正确的标头等细节。
“`python
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse # 需要安装 sse-starlette
import asyncio
import time
app = FastAPI()
async def event_generator():
i = 0
while True:
yield {
“event”: “message”,
“id”: i,
“data”: f”The time is: {time.strftime(‘%H:%M:%S’)}”
}
i += 1
await asyncio.sleep(1)
@app.get(“/events”)
async def events():
return EventSourceResponse(event_generator())
“`
优势:
- 更清晰的意图:
EventSourceResponse
明确表示此端点用于 SSE。 - 类型安全:更好的类型提示和静态分析。
- 自动头部:自动设置
Content-Type: text/event-stream
和其他相关头部。 - 内置功能:可以轻松设置
retry
等参数。
2.2. 客户端断开连接检测
在SSE中,服务器需要能够检测客户端何时断开连接,以便停止发送事件并释放资源。FastAPI提供了几种方法来实现这一点:
2.2.1. 使用request.is_disconnected
在生成器函数中,可以使用request.is_disconnected
属性来检查客户端是否已断开连接。
“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse # 或者 EventSourceResponse
import asyncio
import time
app = FastAPI()
async def event_generator(request: Request):
try:
while True:
if await request.is_disconnected(): # 重要:必须是 await
print(“Client disconnected!”)
break
yield {
"event": "message",
"id": int(time.time()),
"data": f"The time is: {time.strftime('%H:%M:%S')}"
}
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Generator cancelled (client disconnected).") # 通常在客户端断开时发生
@app.get(“/events”)
async def events(request: Request):
return StreamingResponse(event_generator(request), media_type=”text/event-stream”)
“`
2.2.2. 异常处理 (更推荐)
更健壮的方法是使用 try...except
块来捕获 asyncio.CancelledError
异常。当客户端断开连接时,FastAPI通常会取消与该请求关联的异步任务,从而触发 CancelledError
。 这是处理客户端断开连接的更可靠和推荐的方法。
“`python
from fastapi import FastAPI
from fastapi.responses import StreamingResponse # 或者 EventSourceResponse
import asyncio
import time
app = FastAPI()
async def event_generator():
try:
while True:
yield {
“event”: “message”,
“id”: int(time.time()),
“data”: f”The time is: {time.strftime(‘%H:%M:%S’)}”
}
await asyncio.sleep(1)
except asyncio.CancelledError:
print(“Client disconnected!”)
finally:
print(“Generator cleaning up…”) # 可选:执行清理操作
@app.get(“/events”)
async def events():
return StreamingResponse(event_generator(), media_type=”text/event-stream”)
“`
最佳实践: 优先使用 try...except asyncio.CancelledError
来处理客户端断开连接。它更可靠,并且允许你在 finally
块中执行任何必要的清理操作(例如,关闭数据库连接、释放资源等)。
2.3. 发送不同类型的事件
SSE允许您使用event
字段发送不同类型的事件。客户端可以根据事件类型来处理不同的消息。
“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import time
import random
app = FastAPI()
async def event_generator(request: Request):
try:
while True:
if await request.is_disconnected():
break
event_type = random.choice(["temperature", "humidity", "pressure"])
data = ""
if event_type == "temperature":
data = f"Temperature: {random.uniform(20, 30):.2f}°C"
elif event_type == "humidity":
data = f"Humidity: {random.uniform(40, 60):.2f}%"
else:
data = f"Pressure: {random.uniform(1000, 1020):.2f} hPa"
yield {
"event": event_type,
"id": int(time.time()),
"data": data
}
await asyncio.sleep(2)
except asyncio.CancelledError:
print("Client disconnected!")
@app.get(“/events”)
async def events(request: Request):
return StreamingResponse(event_generator(request), media_type=”text/event-stream”)
“`
在客户端,您可以使用JavaScript的EventSource
API来监听特定类型的事件:
“`javascript
const eventSource = new EventSource(‘/events’);
eventSource.addEventListener(‘temperature’, (event) => {
console.log(‘Temperature update:’, event.data);
});
eventSource.addEventListener(‘humidity’, (event) => {
console.log(‘Humidity update:’, event.data);
});
eventSource.addEventListener(‘pressure’, (event) => {
console.log(‘Pressure update:’, event.data);
});
eventSource.onmessage = (event) => {
// 捕获所有没有特定事件类型处理程序的消息
console.log(“Generic message:”, event.data)
};
eventSource.onerror = (error) => {
console.error(“EventSource failed:”, error);
eventSource.close(); // 最佳实践:在错误时关闭连接
}
“`
2.4. 设置重试时间
您可以使用retry
字段来告诉客户端在连接断开后多长时间尝试重新连接(以毫秒为单位)。
“`python
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
app = FastAPI()
async def event_generator():
try:
i = 0
while True:
yield {
“event”: “message”,
“id”: i,
“data”: f”Event {i}”,
“retry”: 5000 # 告诉客户端在5秒后重试
}
i += 1
await asyncio.sleep(2)
except asyncio.CancelledError:
print(“Client disconnected!”)
@app.get(“/events”)
async def events():
return EventSourceResponse(event_generator())
“`
2.5. 使用Last-Event-ID实现断点续传
客户端可以在请求头中包含Last-Event-ID
字段,以指示它接收到的最后一个事件的ID。服务器可以使用此信息来从客户端断开的地方继续发送事件。这对于处理网络中断或客户端重新加载非常有用。
“`python
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse # 或者 EventSourceResponse
import asyncio
import time
app = FastAPI()
假设我们有一个简单的事件存储(在实际应用中,可以使用数据库或消息队列)
events = []
async def event_generator(request: Request, last_event_id: int = None):
try:
current_id = 0
# 如果提供了Last-Event-ID,则从该ID之后的事件开始发送
if last_event_id is not None:
current_id = last_event_id + 1
# 过滤掉已经发送过的事件(在实际应用中,这应该在数据库查询中完成)
# events = events[current_id:]
while True:
if await request.is_disconnected():
break
# 生成新的事件
new_event = {
"event": "message",
"id": current_id,
"data": f"Event {current_id}"
}
events.append(new_event) # 将事件添加到存储中
yield new_event
current_id += 1
await asyncio.sleep(2)
except asyncio.CancelledError:
print("Client disconnected!")
@app.get(“/events”)
async def events(request: Request, last_event_id: str = None):
# 从请求头中获取Last-Event-ID
last_id = request.headers.get(“Last-Event-ID”)
start_id = None
if last_id:
try:
start_id = int(last_id)
except ValueError:
raise HTTPException(status_code=400, detail="Invalid Last-Event-ID")
return StreamingResponse(
event_generator(request, start_id),
media_type="text/event-stream"
)
“`
客户端代码(JavaScript):
“`javascript
let eventSource;
let lastEventId = null;
function connect() {
let url = ‘/events’;
if (lastEventId) {
url += ‘?last_event_id=’ + lastEventId; // 也可以通过header传递,更标准
const headers = {‘Last-Event-ID’: lastEventId};
eventSource = new EventSource(url, {headers: headers});
}
else{
eventSource = new EventSource(url);
}
eventSource.onmessage = (event) => {
console.log('Received:', event.data);
lastEventId = event.lastEventId; // 更新 lastEventId
};
eventSource.onerror = (error) => {
console.error('EventSource error:', error);
eventSource.close();
// 可以在这里实现自动重连逻辑
setTimeout(connect, 5000); // 5秒后重连
};
}
connect();
“`
2.6. 与消息队列集成 (RabbitMQ, Redis, Kafka)
对于更复杂的应用场景,您可能需要将SSE与消息队列集成,以实现更可靠的事件传递和水平扩展。
示例 (使用 Redis Pub/Sub):
首先,安装 aioredis
: pip install aioredis
“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import aioredis
import json
import os
from dotenv import load_dotenv
load_dotenv() # 加载 .env 文件
app = FastAPI()
Redis 连接配置
REDIS_HOST = os.getenv(“REDIS_HOST”, “localhost”)
REDIS_PORT = int(os.getenv(“REDIS_PORT”, 6379))
REDIS_PASSWORD = os.getenv(“REDIS_PASSWORD”)
async def get_redis_connection():
“””
获取 Redis 连接。
“””
return await aioredis.create_redis_pool(
f”redis://{REDIS_HOST}:{REDIS_PORT}”, password=REDIS_PASSWORD, encoding=”utf-8″
)
async def event_generator(request: Request, redis_channel: str):
“””
从 Redis Pub/Sub 频道接收消息并作为 SSE 事件发送。
“””
redis = await get_redis_connection()
try:
(channel,) = await redis.subscribe(redis_channel) # 订阅频道
while True:
if await request.is_disconnected():
break
try:
message = await channel.get_message(ignore_subscribe_messages=True, timeout=1.0) #使用timeout,避免阻塞
if message:
data = json.loads(message[“data”])
yield {
“event”: data.get(“event”, “message”), # 默认事件类型为 “message”
“id”: data.get(“id”),
“data”: data.get(“data”)
}
except asyncio.TimeoutError:
continue # 超时后继续循环
except Exception as e:
print(f”Error reading from Redis: {e}”)
break
except asyncio.CancelledError:
print("Client disconnected!")
finally:
await redis.unsubscribe(redis_channel)
redis.close()
await redis.wait_closed()
@app.get(“/events/{channel}”)
async def events(request: Request, channel: str):
“””
SSE 端点,订阅指定的 Redis 频道。
“””
return StreamingResponse(
event_generator(request, channel), media_type=”text/event-stream”
)
示例:发布消息到 Redis 频道(通常在另一个服务或进程中)
async def publish_message(channel: str, message: dict):
redis = await get_redis_connection()
try:
await redis.publish(channel, json.dumps(message))
finally:
redis.close()
await redis.wait_closed()
示例端点:发布消息
@app.post(“/publish/{channel}”)
async def publish(channel: str, message: dict):
await publish_message(channel, message)
return {“message”: “Message published”}
“`
解释:
get_redis_connection()
: 创建一个 Redis 连接池。event_generator()
:- 订阅指定的 Redis 频道。
- 循环读取频道中的消息。
- 重要的: 使用
channel.get_message(ignore_subscribe_messages=True, timeout=1.0)
,这样可以避免在没有消息时无限期阻塞,并允许检查客户端是否断开连接。
- 重要的: 使用
- 将消息解析为 JSON,并格式化为 SSE 事件。
- 使用
yield
发送 SSE 事件。 - 在
finally
块中取消订阅并关闭 Redis 连接。
events()
: SSE 端点,调用event_generator()
并传入频道名称。publish_message()
: 将消息发布到 Redis 频道的函数(通常在单独的进程或服务中)。/publish/{channel}
: 一个示例端点,允许您发布消息到指定的频道(用于测试)。
优点:
- 可靠性: 消息队列确保事件不会丢失,即使客户端暂时断开连接。
- 可扩展性: 您可以轻松地添加更多 SSE 服务器实例,它们都从同一个消息队列读取事件。
- 解耦: 事件生产者和消费者(SSE 服务器)是解耦的,可以独立扩展。
- 持久性: Redis 可以配置为持久化消息,确保即使服务器重启也不会丢失事件。
2.7. 速率限制
为了防止滥用和资源耗尽,您可能需要对SSE端点进行速率限制。可以使用fastapi-limiter
库来实现这一点。
首先,安装 fastapi-limiter
: pip install fastapi-limiter
“`python
from fastapi import FastAPI, Request, Depends
from fastapi.responses import StreamingResponse
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
import asyncio
import time
import redis.asyncio as redis
import os
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
Redis 连接配置
REDIS_HOST = os.getenv(“REDIS_HOST”, “localhost”)
REDIS_PORT = int(os.getenv(“REDIS_PORT”, 6379))
REDIS_PASSWORD = os.getenv(“REDIS_PASSWORD”)
初始化速率限制器 (连接到Redis)
@app.on_event(“startup”)
async def startup():
redis_instance = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD)
await FastAPILimiter.init(redis_instance)
async def event_generator(request: Request):
try:
while True:
if await request.is_disconnected():
break
yield {
"event": "message",
"id": int(time.time()),
"data": f"The time is: {time.strftime('%H:%M:%S')}"
}
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Client disconnected!")
@app.get(“/events”, dependencies=[Depends(RateLimiter(times=2, seconds=5))]) # 5秒内最多2次请求
async def events(request: Request):
return StreamingResponse(event_generator(request), media_type=”text/event-stream”)
“`
解释:
@app.on_event("startup")
: 在FastAPI启动时初始化FastAPILimiter
,并连接到Redis。dependencies=[Depends(RateLimiter(times=2, seconds=5))]
: 将速率限制器应用于/events
端点。RateLimiter(times=2, seconds=5)
: 限制为每5秒2次请求。
2.8. 使用HTTP/2
SSE本身就受益于HTTP/2的多路复用功能。在HTTP/1.1中,浏览器通常对每个域的并发连接数有限制(通常为6个)。这意味着如果您有多个SSE连接或其他HTTP请求,它们可能会相互阻塞。HTTP/2通过在单个连接上多路复用多个请求和响应来解决这个问题。
FastAPI本身并不直接处理HTTP协议版本。这取决于您使用的ASGI服务器(例如Uvicorn或Hypercorn)。
- Uvicorn: Uvicorn 自动支持 HTTP/2,前提是您使用
--http http1.1|http2
命令行参数启用了它(默认是http1.1),并且客户端也支持 HTTP/2。 您通常还需要配置SSL证书才能使用HTTP/2。 - Hypercorn: Hypercorn 也支持 HTTP/2,使用方式类似。
重要提示: 为了充分利用 HTTP/2,请确保:
- 您的 ASGI 服务器已配置为支持 HTTP/2。
- 您已配置了 SSL 证书(HTTP/2 通常需要 HTTPS)。
- 您的客户端(浏览器或应用程序)支持 HTTP/2。大多数现代浏览器都支持。
2.9 使用pydantic进行数据验证和序列化
“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import asyncio
import time
app = FastAPI()
class EventData(BaseModel):
timestamp: float
value: int
message: str
async def event_generator(request:Request):
try:
while True:
if await request.is_disconnected():
break
data = EventData(timestamp=time.time(), value=int(time.time()) % 100, message=”Periodic Update”)
yield {
“event”: “periodic_update”,
“id” : int(time.time()),
“data”: data.json() # 使用.json()方法
}
await asyncio.sleep(2)
except asyncio.CancelledError:
print(“Client Disconnected!”)
@app.get(“/events”)
async def events(request: Request):
return StreamingResponse(event_generator(request=request), media_type = “text/event-stream”)
``
BaseModel
使用 Pydantic 的,您可以轻松地验证传入的数据,并使用
.json()` 方法将其序列化为 JSON 字符串,以便通过 SSE 发送。
3. 总结
本文深入探讨了FastAPI中SSE的高级技巧与窍门,包括客户端断开连接检测、发送不同类型的事件、设置重试时间、使用Last-Event-ID实现断点续传、与消息队列集成、速率限制以及利用HTTP/2的优势。通过掌握这些技巧,您可以构建更强大、更可靠、更具扩展性的实时应用。记住,实践是最好的学习方式,请尝试将这些技巧应用到您的项目中,并不断探索FastAPI和SSE的更多可能性。