用 FastAPI 构建实时应用:WebSocket 指南
在现代 Web 应用中,实时性变得越来越重要。从在线聊天、多人协作文档到实时数据看板、游戏和物联网监控,用户期望能够即时接收到信息更新,而无需手动刷新页面。传统的 HTTP 请求-响应模型在这种场景下显得效率低下且难以实现。这时,WebSocket 协议应运而生,它提供了一种在客户端和服务器之间进行双向、持久通信的方式。
而 FastAPI,作为一个高性能的 Python Web 框架,凭借其异步能力和简洁的语法,成为了构建实时应用的绝佳选择。本文将深入探讨如何利用 FastAPI 和 WebSocket 协议来构建强大的实时应用程序。
1. 理解实时应用与 WebSocket
1.1 什么是实时应用?
实时应用是指能够以极低的延迟响应事件并向用户提供最新信息的应用程序。与传统的 Web 应用(用户发起请求,服务器响应该请求)不同,实时应用中,服务器可以在数据发生变化时主动将信息推送给客户端。
常见的实时应用场景包括:
- 在线聊天: 用户发送消息后,其他用户立即收到。
- 实时通知: 新邮件、社交动态等即时提醒。
- 协作工具: 多人同时编辑文档,看到彼此的修改。
- 数据看板: 股票价格、监控指标等实时更新。
- 在线游戏: 多人互动,同步游戏状态。
1.2 为什么传统的 HTTP 不适合实时应用?
HTTP 是一个无状态的请求-响应协议。客户端发送一个请求,服务器处理并返回一个响应,然后连接通常就会关闭。要模拟实时性,传统的方法有:
- 轮询 (Polling): 客户端每隔一定时间(例如几秒钟)向服务器发送请求,询问是否有新数据。这种方式简单,但效率低下,会产生大量冗余请求,尤其是在数据不经常更新时,并且延迟较高。
- 长轮询 (Long Polling): 客户端发送请求,服务器保持连接打开,直到有新数据可用或超时。有新数据后,服务器发送响应并关闭连接。客户端收到响应后立即发送新的请求。这比短轮询效率高一些,但仍然有连接建立/关闭的开销,且实现复杂。
- Server-Sent Events (SSE): 允许服务器单向地向客户端推送文本数据流。适用于服务器只需要向客户端发送更新的场景(如新闻推送),但客户端无法向服务器发送数据(除非通过单独的 HTTP 请求)。
这些方法都无法像 WebSocket 那样提供真正的双向、低延迟、持久连接。
1.3 WebSocket 协议简介
WebSocket 协议(RFC 6455)提供了一个在单个 TCP 连接上进行全双工(双向)通信的信道。与 HTTP 不同,WebSocket 连接一旦建立,就会保持打开状态,直到一方主动关闭或连接中断。
WebSocket 的优势在于:
- 双向通信: 客户端和服务器都可以随时向对方发送数据。
- 持久连接: 避免了 HTTP 频繁建立和关闭连接的开销。
- 较低开销: 一旦连接建立,数据传输的帧开销远小于 HTTP 请求头。
- 实时性: 数据可以即时从服务器推送到客户端。
WebSocket 连接的建立过程通常是从一个 HTTP 请求开始的,称为“握手”(Handshake)。客户端发送一个带有特定头部(如 Upgrade: websocket
, Connection: Upgrade
, Sec-WebSocket-Key
)的 HTTP GET 请求,服务器如果支持 WebSocket,则返回一个特定的 HTTP 响应(状态码 101 Switching Protocols),表示协议切换成功。握手完成后,底层的 TCP 连接就升级为了 WebSocket 连接,后续的数据传输都通过这个连接进行。
2. FastAPI 对 WebSocket 的支持
FastAPI 基于 Starlette 和 Pydantic,天生支持异步编程(async/await)。Starlette 提供了对 WebSocket 的一流支持,FastAPI 则在其基础上构建了更易用的接口。
在 FastAPI 中处理 WebSocket 连接,主要涉及以下几个方面:
- 声明 WebSocket 路由: 使用
@app.websocket()
或@router.websocket()
装饰器来定义一个 WebSocket 接口。 - 处理 WebSocket 连接: 在异步函数中接收一个
WebSocket
类型的参数,该对象代表了当前的连接。 - 接受连接: 在处理连接之前,必须调用
await websocket.accept()
来完成 WebSocket 握手。 - 发送和接收数据: 使用
await websocket.send_text()
,await websocket.send_json()
,await websocket.receive_text()
,await websocket.receive_json()
等方法在连接上发送和接收数据。 - 处理断开连接: 使用
try...except WebSocketDisconnect
块来捕获客户端断开连接的事件。 - 关闭连接: 使用
await websocket.close()
从服务器端关闭连接。
3. 构建一个基本的 WebSocket 服务器 (Echo 服务)
我们先从一个最简单的例子开始:一个 Echo 服务。客户端发送什么消息,服务器就原样返回。
“`python
main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
“””
处理 WebSocket 连接的端点。
当有新的 WebSocket 连接到 /ws 时,此函数会被调用。
“””
await websocket.accept() # 接受客户端的 WebSocket 连接请求
print(f”Client connected: {websocket.client}”)
try:
# 循环接收客户端发送的消息
while True:
data = await websocket.receive_text() # 接收文本消息
print(f"Received message: {data} from {websocket.client}")
await websocket.send_text(f"Message text was: {data}") # 将接收到的消息原样发送回客户端
except WebSocketDisconnect as e:
# 客户端断开连接时抛出 WebSocketDisconnect 异常
print(f"Client disconnected: {websocket.client} with code {e.code}, reason: {e.reason}")
except Exception as e:
# 处理其他可能的异常
print(f"An error occurred: {e}")
finally:
# 无论如何,当循环结束或发生异常时,确保连接被处理
# 注意:WebSocketDisconnect 已经被捕获,这里的 finally 更多是清理其他潜在状态
# 在 WebSocketDisconnect 发生后,连接实际上已经断开,不需要显式调用 close()
# 但如果是非 WebSocketDisconnect 的异常,或者需要在连接生命周期结束时执行清理,可以在这里做
print(f"Connection handler finished for {websocket.client}")
运行应用 (使用 uvicorn)
在终端中运行: uvicorn main:app –reload
“`
代码解释:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
: 导入必要的模块。WebSocket
类型用于类型提示,WebSocketDisconnect
是处理断开连接时需要捕获的异常。@app.websocket("/ws")
: 这个装饰器将websocket_endpoint
函数注册为处理/ws
路径上的 WebSocket 连接的端点。async def websocket_endpoint(websocket: WebSocket):
: 定义一个异步函数来处理连接。FastAPI 会将代表当前 WebSocket 连接的WebSocket
对象作为参数传递给这个函数。函数必须是异步的,因为 WebSocket 操作(如accept
,send
,receive
)都是异步的。await websocket.accept()
: 这是 WebSocket 连接建立的第二步(握手)。在处理任何收发消息之前,必须先调用此方法来接受连接。while True:
: 进入一个无限循环,持续监听客户端发送的消息。data = await websocket.receive_text()
: 异步地接收客户端发送的下一条文本消息。如果客户端发送的是其他类型(如二进制),或者没有消息,这个方法会等待。如果连接断开,它会抛出WebSocketDisconnect
异常。await websocket.send_text(...)
: 异步地向客户端发送文本消息。try...except WebSocketDisconnect as e:
: 这个块用来优雅地处理客户端断开连接的情况。当客户端关闭连接时(例如,关闭浏览器标签页),receive_text()
或send_*()
方法会抛出WebSocketDisconnect
异常。捕获这个异常可以让我们执行一些清理工作,而不会导致服务器崩溃。finally:
: 无论是否发生异常,finally 块中的代码都会执行。在这里可以做一些最终的清理,比如从活动连接列表中移除连接(我们稍后会讨论)。
运行和测试:
- 保存上面的代码为
main.py
。 - 安装 FastAPI 和 Uvicorn:
pip install fastapi uvicorn
- 在终端中运行服务器:
uvicorn main:app --reload
- 你需要一个 WebSocket 客户端来测试。可以在浏览器中使用 JavaScript 或使用像
websocat
(命令行工具) 这样的工具。-
JavaScript (在浏览器控制台或一个简单的 HTML 文件中):
“`javascript
let ws = new WebSocket(“ws://localhost:8000/ws”);ws.onopen = function(event) {
console.log(“WebSocket connection opened:”, event);
ws.send(“Hello Server!”);
};ws.onmessage = function(event) {
console.log(“Message from server:”, event.data);
};ws.onerror = function(event) {
console.error(“WebSocket error observed:”, event);
};ws.onclose = function(event) {
if (event.wasClean) {
console.log(WebSocket connection closed cleanly, code=${event.code} reason=${event.reason}
);
} else {
console.error(‘WebSocket connection died’);
}
};// 发送更多消息
// ws.send(“Another message”);// 关闭连接
// ws.close();
``
cargo install websocat
* **websocat (命令行):**
安装:(需要 Rust 环境) 或 从 Releases 下载二进制文件。
websocat ws://localhost:8000/ws`
连接并发送消息:
然后你可以在终端中输入消息,按回车发送,服务器的回复会显示出来。按 Ctrl+C 断开连接。
-
通过这个例子,我们看到了如何在 FastAPI 中定义一个基本的 WebSocket 端点并进行简单的消息收发。
4. 处理多个客户端连接:连接管理器
真正的实时应用通常需要同时处理多个客户端。例如,在一个聊天室中,一个用户发送的消息需要被广播给所有其他在线用户。这意味着服务器需要维护一个当前所有活动 WebSocket 连接的列表或集合。
我们可以创建一个简单的连接管理器类来封装这些逻辑:存储连接、添加连接、移除连接、广播消息。
“`python
managers.py (创建一个新文件)
from typing import List
from fastapi import WebSocket
class ConnectionManager:
“””
管理 WebSocket 连接的类。
保存所有活动连接,并提供添加、移除和广播功能。
“””
def init(self):
# 使用一个列表来存储所有活动的 WebSocket 连接对象
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
"""
处理新连接。接受连接并将其添加到活动连接列表中。
"""
await websocket.accept() # 接受连接
self.active_connections.append(websocket) # 添加到列表
print(f"Client connected: {websocket.client}. Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
"""
处理连接断开。从活动连接列表中移除连接。
"""
try:
self.active_connections.remove(websocket)
print(f"Client disconnected: {websocket.client}. Total connections: {len(self.active_connections)}")
except ValueError:
# 如果连接不在列表中,说明可能已经被移除了 (比如在广播时发现断开)
print(f"Attempted to remove already removed connection: {websocket.client}")
async def send_personal_message(self, message: str, websocket: WebSocket):
"""
向特定连接发送消息。
"""
try:
await websocket.send_text(message)
except Exception as e:
# 发送失败可能意味着连接已断开,虽然 WebSocketDisconnect 更常见,但也处理其他异常
print(f"Failed to send message to {websocket.client}: {e}")
# 可选:如果发送失败,考虑移除此连接
# self.disconnect(websocket)
async def broadcast(self, message: str):
"""
向所有活动连接广播消息。
"""
# 遍历活动连接列表
# 注意:在遍历列表时修改列表是不安全的,但这里我们在发现断开连接时进行移除,
# 这是一个常见的模式,需要注意潜在的并发问题 (例如,如果多个协程同时广播)。
# 更健壮的方式是在一个单独的任务中处理移除。
disconnected_clients = []
for connection in self.active_connections:
try:
await connection.send_text(message)
except WebSocketDisconnect:
# 在尝试发送时发现连接已断开
print(f"Client disconnected during broadcast: {connection.client}")
disconnected_clients.append(connection)
except Exception as e:
# 处理其他可能的发送错误
print(f"Error broadcasting to {connection.client}: {e}")
disconnected_clients.append(connection)
# 在遍历结束后,移除所有已断开的连接
for client in disconnected_clients:
self.disconnect(client)
“`
现在,我们可以在主应用中使用这个连接管理器。
“`python
main.py (修改)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from managers import ConnectionManager # 导入连接管理器
app = FastAPI()
创建一个连接管理器实例
注意:在实际应用中,如果使用多进程或多服务器部署,需要更高级的连接管理方式
(如使用 Redis Pub/Sub),但对于单进程应用,这个管理器足够了。
manager = ConnectionManager()
@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
“””
处理 WebSocket 连接的端点,包含客户端 ID。
“””
# 连接成功时,使用连接管理器进行处理
await manager.connect(websocket)
await manager.broadcast(f”Client #{client_id} joined the chat”)
try:
# 循环接收客户端发送的消息
while True:
data = await websocket.receive_text() # 接收文本消息
# 收到消息后,向所有其他连接广播
await manager.send_personal_message(f"You wrote: {data}", websocket) # 发送给发送者确认
await manager.broadcast(f"Client #{client_id} says: {data}") # 广播给所有人
except WebSocketDisconnect:
# 客户端断开连接时,使用连接管理器进行处理
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
except Exception as e:
print(f"An error occurred with client #{client_id}: {e}")
manager.disconnect(websocket) # 发生其他异常也尝试断开连接
“`
代码解释:
from managers import ConnectionManager
: 导入我们创建的连接管理器。manager = ConnectionManager()
: 创建一个全局的连接管理器实例。注意,这是一个单例,在单进程 FastAPI 应用中有效。@app.websocket("/ws/{client_id}")
: 我们修改了路径,增加了路径参数{client_id}
。这样可以在连接处理函数中获取到客户端的标识。async def websocket_endpoint(websocket: WebSocket, client_id: int):
: 函数签名中增加了client_id: int
参数,FastAPI 会自动从路径中提取并转换类型。await manager.connect(websocket)
: 在接受连接后,立即调用管理器的方法来处理新连接。await manager.broadcast(...)
: 在连接成功后,向所有客户端广播一条消息通知新成员加入。while True:
循环中:data = await websocket.receive_text()
: 接收客户端消息。await manager.send_personal_message(...)
: 给发送消息的客户端一个确认。await manager.broadcast(...)
: 将客户端发送的消息广播给所有(包括发送者自己,如果需要的话,管理器可以改进排除发送者)其他连接。
except WebSocketDisconnect:
: 捕获断开连接异常后,调用管理器的方法来移除该连接,并广播一条消息通知该用户离开。
现在,你可以运行 uvicorn main:app --reload
并用多个客户端连接到 ws://localhost:8000/ws/1
, ws://localhost:8000/ws/2
等不同的路径(client_id
不同),然后在一个客户端发送消息,观察其他客户端是否收到了广播的消息。
5. 发送和接收 JSON 数据
在实际应用中,客户端和服务器之间通常会交换结构化的数据,而不是简单的文本。WebSocket 协议支持发送文本和二进制数据。发送 JSON 数据时,通常将 Python 字典或列表序列化为 JSON 字符串,然后作为文本数据发送。接收时则解析 JSON 字符串。
FastAPI 的 WebSocket
对象提供了方便的 send_json()
和 receive_json()
方法。
“`python
main.py (在上面的基础上修改 receive/send 逻辑)
… (imports and ConnectionManager class remain the same)
@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
# 广播加入消息,使用 JSON 格式
await manager.broadcast_json({“type”: “status”, “message”: f”Client #{client_id} joined the chat”}) # 需要在 manager 里实现 broadcast_json
try:
while True:
# 接收 JSON 格式的消息
data = await websocket.receive_json()
print(f"Received JSON: {data} from {websocket.client}")
# 假设客户端发送的消息是 {'type': 'message', 'text': '...' } 格式
if data.get("type") == "message" and "text" in data:
message_text = data["text"]
# 发送确认消息回发送者,使用 JSON 格式
await manager.send_personal_json({"type": "status", "message": f"You sent: {message_text}"}, websocket) # 需要在 manager 里实现 send_personal_json
# 广播消息给所有人,使用 JSON 格式
await manager.broadcast_json({"type": "message", "sender": client_id, "text": message_text})
else:
# 处理未知格式的消息
await manager.send_personal_json({"type": "error", "message": "Unknown message format"}, websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast_json({"type": "status", "message": f"Client #{client_id} left the chat"})
except Exception as e:
print(f"An error occurred with client #{client_id}: {e}")
# 发生其他异常也尝试发送错误信息 (如果连接还活着) 并断开
try:
await websocket.send_json({"type": "error", "message": f"Server error: {e}"})
except Exception:
pass # 如果连发送错误信息都失败了,忽略
finally:
manager.disconnect(websocket)
managers.py (添加 JSON 相关方法)
from typing import List, Dict, Any
from fastapi import WebSocket
class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
print(f"Client connected: {websocket.client}. Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
try:
self.active_connections.remove(websocket)
print(f"Client disconnected: {websocket.client}. Total connections: {len(self.active_connections)}")
except ValueError:
print(f"Attempted to remove already removed connection: {websocket.client}")
async def send_personal_text(self, message: str, websocket: WebSocket):
# 原来的 send_personal_message 改名以区分
try:
await websocket.send_text(message)
except Exception as e:
print(f"Failed to send text message to {websocket.client}: {e}")
async def send_personal_json(self, message: Dict[str, Any], websocket: WebSocket):
"""
向特定连接发送 JSON 消息。
"""
try:
await websocket.send_json(message)
except Exception as e:
print(f"Failed to send JSON message to {websocket.client}: {e}")
# 可选:如果发送失败,考虑移除此连接
# self.disconnect(websocket)
async def broadcast_text(self, message: str):
# 原来的 broadcast 改名以区分
disconnected_clients = []
for connection in self.active_connections:
try:
await connection.send_text(message)
except WebSocketDisconnect:
print(f"Client disconnected during text broadcast: {connection.client}")
disconnected_clients.append(connection)
except Exception as e:
print(f"Error broadcasting text to {connection.client}: {e}")
disconnected_clients.append(connection)
for client in disconnected_clients:
self.disconnect(client)
async def broadcast_json(self, message: Dict[str, Any]):
"""
向所有活动连接广播 JSON 消息。
"""
disconnected_clients = []
for connection in self.active_connections:
try:
await connection.send_json(message)
except WebSocketDisconnect:
print(f"Client disconnected during JSON broadcast: {connection.client}")
disconnected_clients.append(connection)
except Exception as e:
print(f"Error broadcasting JSON to {connection.client}: {e}")
disconnected_clients.append(connection)
for client in disconnected_clients:
self.disconnect(client)
“`
现在,客户端需要发送 JSON 格式的消息,例如:
“`javascript
// JavaScript client example sending JSON
let ws = new WebSocket(“ws://localhost:8000/ws/123”); // Use a client_id
ws.onopen = function(event) {
console.log(“WebSocket connection opened:”, event);
// 发送一个类型为 message 的 JSON 对象
ws.send(JSON.stringify({ type: “message”, text: “Hello from client 123!” }));
};
ws.onmessage = function(event) {
// 接收到的可能是文本(旧的 Echo)或 JSON 字符串
try {
const data = JSON.parse(event.data);
console.log(“Message from server (JSON):”, data);
// 根据 data.type 进行不同的处理
if (data.type === “message”) {
console.log(New message from ${data.sender}: ${data.text}
);
} else if (data.type === “status”) {
console.log(“Status update:”, data.message);
} else if (data.type === “error”) {
console.error(“Server error:”, data.message);
}
} catch (e) {
console.log(“Message from server (non-JSON):”, event.data);
}
};
// … onerror and onclose as before
// 发送另一个 JSON 消息
// ws.send(JSON.stringify({ type: “message”, text: “How are you?” }));
// 发送一个不同类型的 JSON
// ws.send(JSON.stringify({ type: “command”, action: “ping” }));
“`
通过使用 send_json()
和 receive_json()
,我们可以更方便地处理结构化数据,这在构建复杂的实时应用时非常有用。
6. 改进结构:使用 APIRouter
随着应用的增长,所有 WebSocket 逻辑都放在一个 main.py
文件中会变得难以管理。FastAPI 提供了 APIRouter
来组织路由,这同样适用于 WebSocket 路由。
“`python
websocket_router.py (创建一个新文件)
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
from managers import ConnectionManager
创建连接管理器实例
注意:这里的 manager 实例需要与 main.py 中的是同一个,或者以某种方式传递/共享
最简单的方式是在 main.py 中创建并在 include_router 时传递,或者使用依赖注入
这里为了简化,我们先假设可以通过某种方式访问到全局唯一的 manager 实例
在大型应用中,建议使用依赖注入
manager = ConnectionManager() # 警告:这是一个简单的示例,实际应用请确保 manager 是单例
router = APIRouter(
prefix=”/ws”, # 所有路由都带上 /ws 前缀
tags=[“websockets”]
)
@router.websocket(“/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
await manager.broadcast_json({“type”: “status”, “message”: f”Client #{client_id} joined the chat”})
try:
while True:
data = await websocket.receive_json()
print(f"Received JSON: {data} from {websocket.client}")
if data.get("type") == "message" and "text" in data:
message_text = data["text"]
await manager.send_personal_json({"type": "status", "message": f"You sent: {message_text}"}, websocket)
await manager.broadcast_json({"type": "message", "sender": client_id, "text": message_text})
else:
await manager.send_personal_json({"type": "error", "message": "Unknown message format"}, websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast_json({"type": "status", "message": f"Client #{client_id} left the chat"})
except Exception as e:
print(f"An error occurred with client #{client_id}: {e}")
try:
await websocket.send_json({"type": "error", "message": f"Server error: {e}"})
except Exception:
pass
finally:
manager.disconnect(websocket)
main.py (修改)
from fastapi import FastAPI
from managers import ConnectionManager
from websocket_router import router as websocket_router # 导入路由器
app = FastAPI()
如果在 websocket_router.py 中实例化了 manager,这里就不用再实例化了,
但要确保它是同一个实例。更好的方式是用依赖注入。
managers.py 中的 manager 实例需要被主应用共享。
一种简单的共享方式 (非 DI):
manager = ConnectionManager() # 在 main.py 中创建
假设 ConnectionManager 是一个模块级别的单例或者在某个地方统一创建
并在需要的地方导入使用
app.include_router(websocket_router) # 包含 WebSocket 路由器
可以添加其他的 HTTP 路由
@app.get(“/”)
async def read_root():
return {“Hello”: “World”}
运行: uvicorn main:app –reload
“`
将 WebSocket 路由放在单独的模块和 APIRouter
中,使主应用代码更清晰,更易于管理大型项目。注意,ConnectionManager 的实例需要在整个应用中是唯一的,以便所有 WebSocket 连接都能被同一个管理器跟踪。依赖注入是实现这一目标的最佳实践。
7. 高级话题与考量
构建健壮、可扩展的实时应用还需要考虑许多高级话题。
7.1 身份验证和授权
WebSocket 连接通常需要知道是哪个用户在连接。握手阶段是一个自然的进行身份验证的时机。
- 查询参数 (Query Parameters): 客户端可以在 WebSocket URL 中包含 token 或 session ID,例如
ws://localhost:8000/ws/123?token=abc
。服务器在websocket_endpoint
函数中可以通过websocket.query_params.get("token")
获取。安全性提示: 避免在 URL 中传递敏感信息,因为它们可能会被记录在日志中。 - HTTP Headers: WebSocket 握手是基于 HTTP 的,客户端可以在握手请求中发送自定义 HTTP 头(如
Authorization
)。服务器可以通过websocket.headers.get("Authorization")
获取。这是比查询参数更安全的传递 token 的方式。 - 基于消息的认证: 连接建立后,客户端发送第一条消息包含认证凭据。服务器验证后,将连接标记为已认证,并可能关联用户 ID。这种方式的缺点是在认证完成前,连接是未认证状态,需要服务器小心处理。
一旦连接认证成功,你需要将用户信息(如用户 ID)与该 WebSocket
连接关联起来。可以在 ConnectionManager
中维护一个映射,例如 Dict[WebSocket, User]
或 Dict[str, WebSocket]
(用户 ID 到连接的映射)。
授权(判断用户是否有权限执行某个操作,例如加入某个聊天室)可以在收到客户端消息时进行检查。
7.2 状态管理
实时应用通常需要维护一些状态,例如:
- 在线用户列表: 哪些用户当前在线。
- 聊天室成员: 每个聊天室有哪些用户。
- 用户相关数据: 用户的配置、当前所在的游戏房间等。
ConnectionManager
就是一种基本的连接状态管理。对于更复杂的应用,你可能需要将状态存储在数据库、缓存(如 Redis)或内存数据结构中。确保在连接建立、消息收发和断开连接时及时更新这些状态。
7.3 扩展性和水平伸缩
前面实现的 ConnectionManager
只适用于单进程、单服务器的应用。如果需要部署多个 FastAPI 实例来处理高并发,会遇到问题:
- 连接分散: 不同的客户端连接可能分布在不同的服务器实例上。
- 广播失效: 一个实例上的
manager.broadcast()
只能发送给连接到 该实例 的客户端,无法发送给连接到 其他实例 的客户端。
解决这个问题的常见方案是使用一个外部的消息队列或发布/订阅系统,例如 Redis Pub/Sub、Kafka 或 RabbitMQ。
Redis Pub/Sub 方案:
- 每个 FastAPI 实例都连接到同一个 Redis 服务器。
- 每个实例都订阅一个或多个 Redis 频道(例如,一个广播频道、一个特定聊天室频道)。
- 当一个实例收到客户端消息需要广播时,它不是直接遍历本地连接,而是将消息发布到相应的 Redis 频道。
- 所有订阅了该频道的 FastAPI 实例都会收到这条消息。
- 收到 Redis 消息的实例,然后遍历其本地连接,将消息发送给相关的客户端。
这种方式解耦了消息的发送和接收,使得多个实例可以协同工作。
粘性会话 (Sticky Sessions): 如果你的负载均衡器支持,可以将同一个客户端的后续连接(包括 WebSocket 握手和随后的数据帧)都路由到同一个后端实例。这在某些情况下可以简化状态管理,但并不能解决跨实例广播的问题。通常结合消息队列使用。
7.4 错误处理和断开连接的健壮性
- 平滑关闭: 当服务器需要关闭时,应该尝试优雅地关闭所有 WebSocket 连接,而不是直接中断。可以在应用接收到停止信号时,遍历活动连接并调用
await connection.close()
。 - 心跳机制 (Ping/Pong): WebSocket 协议内置了心跳机制。客户端和服务器可以定期发送 Ping 帧,接收方必须回应 Pong 帧。这有助于检测死连接(例如,客户端网络断开但没有发送关闭帧),避免服务器持有大量僵尸连接。FastAPI/Starlette 在底层处理了 Ping/Pong,但应用层有时也需要实现自己的心跳来确保应用层协议的活跃性。
- 客户端重连: 客户端应该实现自动重连逻辑,在连接断开后尝试重新建立连接。
7.5 性能优化
- 异步操作: 确保在 WebSocket 处理函数中使用
await
进行所有 I/O 操作(send
,receive
, 数据库查询等),避免阻塞事件循环。FastAPI/Starlette 强制要求 WebSocket 端点是异步的,这有助于避免常见的性能陷阱。 - 数据序列化/反序列化: JSON 的解析和生成是有开销的。对于高性能要求的场景,可以考虑使用更快的 JSON 库(如
orjson
,ujson
)或甚至使用 Protocol Buffers、MessagePack 等更高效的序列化格式(但这需要在客户端和服务器端都支持)。 - 消息压缩: WebSocket 协议支持 Permessage-Deflate 扩展进行消息压缩,可以减少传输的数据量,尤其适用于发送大量重复数据。可以在服务器端启用此扩展。
7.6 测试
测试 WebSocket 端点比测试标准的 HTTP 端点更复杂。FastAPI 的 TestClient
支持 WebSocket 连接:
“`python
from fastapi.testclient import TestClient
from main import app # 假设你的 FastAPI 应用在 main.py 中
client = TestClient(app)
def test_websocket_echo():
# 使用 client.websocket_connect() 建立连接
with client.websocket_connect(“/ws/test_user”) as websocket:
# 发送消息
websocket.send_json({“type”: “message”, “text”: “Hello via WebSocket”})
# 接收消息
data = websocket.receive_json()
# 进行断言
assert data == {“type”: “status”, “message”: “You sent: Hello via WebSocket”} # 假设回显是这个格式
data = websocket.receive_json() # 接收广播的消息
assert data == {“type”: “message”, “sender”: “test_user”, “text”: “Hello via WebSocket”}
“`
需要注意,TestClient
的 WebSocket 支持是在同一个进程中模拟连接,可能无法完全复制真实网络环境中的所有情况(如并发问题、网络延迟等),但对于大部分业务逻辑测试是足够的。
8. 总结与展望
使用 FastAPI 和 WebSocket 构建实时应用是一个强大且高效的组合。FastAPI 提供的异步能力和简洁的 WebSocket API 使得处理实时通信变得相对容易。
我们从基本的 Echo 服务开始,逐步引入了连接管理器来处理多个客户端,学会了发送和接收 JSON 数据,并讨论了如何使用 APIRouter
来组织代码。最后,我们深入探讨了身份验证、状态管理、水平扩展、错误处理、性能优化和测试等关键的高级话题。
通过掌握本文介绍的概念和技术,你已经具备了使用 FastAPI 构建各种实时应用的基础。下一步,可以根据你的具体需求,深入研究 Redis Pub/Sub 集成、更精细的状态管理方案、客户端的 WebSocket 实现(例如,使用 JavaScript 的 WebSocket API、Socket.IO 客户端或特定框架的实时库),以及更复杂的实时协议设计。
实时应用的世界充满挑战,但也充满机遇。FastAPI 提供了一个坚实的基础,帮助你在这个领域大展拳脚。祝你在实时应用的开发旅程中一切顺利!