FastAPI WebSocket 开发指南:构建实时应用的基石
在现代 Web 应用中,实时交互变得越来越重要。无论是聊天应用、股票行情、在线游戏还是协作工具,都需要服务器能够主动向客户端推送数据,而无需客户端频繁地发送请求。传统的 HTTP 协议是无状态的、基于请求-响应模式的,无法高效地满足这种需求。这时,WebSocket 就应运而生了。
WebSocket 提供了一种在单个 TCP 连接上进行全双工通信的通道,允许服务器和客户端之间的数据交换变得更加高效和实时。FastAPI,作为一款高性能、易于学习、快速开发 API 的 Python Web 框架,凭借其优秀的异步支持,成为了构建 WebSocket 应用的绝佳选择。
本文将带你深入了解如何在 FastAPI 中开发 WebSocket 应用,从基础概念到高级用法,一步步构建你的实时交互功能。
1. 什么是 WebSocket?为什么在 FastAPI 中使用它?
在深入开发之前,我们先快速回顾一下 WebSocket 的核心概念:
- 全双工通信 (Full-Duplex Communication): 与 HTTP 的请求-响应模式不同,WebSocket 建立连接后,数据可以在客户端和服务器之间随时双向流动,无需等待对方的请求或响应。
- 持久连接 (Persistent Connection): WebSocket 连接一旦建立,就会保持开放状态,直到一方主动关闭或发生错误。这减少了 HTTP 连接频繁建立和断开的开销。
- 低延迟 (Low Latency): 由于连接是持久的且全双工,数据可以在第一时间被推送或接收,大大降低了通信延迟。
- 基于 TCP: WebSocket 协议本身是建立在 TCP 协议之上的,利用了 TCP 的可靠传输特性。
- 通过 HTTP 握手启动: WebSocket 连接的建立始于一个特殊的 HTTP 请求(带有
Upgrade
和Connection: Upgrade
头),服务器同意升级协议后,连接便切换到 WebSocket 协议。
为什么在 FastAPI 中使用 WebSocket?
FastAPI 构建在 Starlette 之上,而 Starlette 是一个轻量级的 ASGI (Asynchronous Server Gateway Interface) 框架。ASGI 是 Python 异步 Web 应用的标准接口,它原生支持异步编程和 WebSockets。
FastAPI 继承了 Starlette 的所有 ASGI 特性,包括对 WebSockets 的一流支持。结合 FastAPI 自身的优势:
- 异步支持 (async/await): WebSockets 是典型的 I/O 密集型任务(等待数据发送/接收)。FastAPI 的异步能力使其能够高效地处理大量并发的 WebSocket 连接,而不会阻塞主事件循环。
- 简洁的路由定义: 使用
@app.websocket("/ws")
装饰器,可以像定义 HTTP 路径操作一样简单地定义 WebSocket 端点。 - 依赖注入 (Dependency Injection): 可以方便地将 WebSocket 连接作为依赖注入到函数中,并利用依赖注入来处理连接管理、认证等逻辑。
- Pydantic 支持 (用于 JSON 消息): Embora WebSockets原生处理文本和二进制数据,当使用 JSON 格式交换数据时,FastAPI 结合 Starlette 的
receive_json
和send_json
方法,可以方便地与 Pydantic 模型结合使用(尽管不像 HTTP 请求体那样自动验证,但概念相似)。 - 自动文档 (非直接): 虽然 Swagger/OpenAPI 标准目前不直接包含 WebSocket 定义,但 FastAPI 的结构清晰,便于手动或借助第三方工具记录 WebSocket 端点。
综上所述,FastAPI 提供了构建高性能、可扩展、易于维护的 WebSocket 应用所需的一切工具和特性。
2. 准备工作
在开始编写代码之前,确保你已经安装了必要的库:
- Python 3.7+
- FastAPI
- 一个 ASGI 服务器,如 Uvicorn 或 Hypercorn
你可以使用 pip 安装它们:
bash
pip install fastapi uvicorn python-multipart # python-multipart 对于某些依赖可能需要
3. 构建最基本的 WebSocket 端点
让我们从一个最简单的 WebSocket 端点开始。它将接收客户端发送的文本消息,然后将消息原文加上一些前缀发回给客户端(一个简单的 Echo 服务)。
创建一个 Python 文件,例如 main.py
:
“`python
main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
定义一个 WebSocket 端点,路径为 “/ws”
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 1. 接受 WebSocket 连接
await websocket.accept()
print(“Client connected”)
try:
# 2. 进入一个循环,持续接收客户端发送的消息
while True:
# 接收文本消息
data = await websocket.receive_text()
print(f"Received message: {data}")
# 3. 将处理后的消息发回给客户端
response_message = f"Message text was: {data}"
await websocket.send_text(response_message)
print(f"Sent message: {response_message}")
except WebSocketDisconnect:
# 4. 处理客户端断开连接的情况
print("Client disconnected")
except Exception as e:
# 处理其他可能的异常
print(f"An error occurred: {e}")
你可以在这里定义其他的 HTTP 路径操作
@app.get(“/”)
async def read_root():
return {“Hello”: “World”}
“`
代码解释:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
: 导入必要的类。WebSocket
类用于表示一个活动的 WebSocket 连接实例,WebSocketDisconnect
是在连接断开时可能抛出的异常。@app.websocket("/ws")
: 这个装饰器告诉 FastAPI 在/ws
路径上监听 WebSocket 连接请求。async def websocket_endpoint(websocket: WebSocket):
: 定义一个异步函数来处理 WebSocket 连接。FastAPI 会将建立的 WebSocket 连接实例作为websocket
参数传递给这个函数。使用async
是必须的,因为 WebSocket 操作(接受连接、发送、接收)都是异步的 I/O 操作。await websocket.accept()
: 这是处理 WebSocket 连接的第一步。你必须调用这个方法来接受传入的连接。如果连接未被接受,FastAPI 会在函数返回时自动关闭连接。while True:
: 进入一个无限循环,以便持续监听该客户端发送的消息。一个 WebSocket 端点函数通常会在一个持久的循环中运行,直到连接断开。data = await websocket.receive_text()
: 这是一个异步方法,用于接收客户端发送的文本消息。它会暂停函数的执行,直到接收到一条消息。await websocket.send_text(response_message)
: 这是一个异步方法,用于向该客户端发送文本消息。try...except WebSocketDisconnect:
: 这是处理 WebSocket 连接断开的标准方式。当客户端关闭连接时,receive_text()
(或其他receive_*
方法)会抛出WebSocketDisconnect
异常。在except
块中,你可以执行清理工作,例如从活动连接列表中移除该客户端。except Exception as e:
: 捕获其他潜在的异常,例如网络错误等。- 在循环外部(或
except
块中)的任何清理代码会在连接断开或发生异常时执行。
运行服务器:
打开终端,导航到你的文件目录,然后运行 Uvicorn:
bash
uvicorn main:app --reload
Uvicorn 会启动一个本地服务器,默认监听 http://127.0.0.1:8000
。
测试 WebSocket 端点:
你不能直接在浏览器中访问 http://127.0.0.1:8000/ws
来测试 WebSocket。你需要一个 WebSocket 客户端。你可以使用在线 WebSocket 测试工具,或者编写一个简单的 HTML/JavaScript 页面。
以下是一个简单的 HTML/JavaScript 客户端示例:
“`html
FastAPI WebSocket Test
“`
将这段代码保存为 index.html
,用浏览器打开它。输入消息,点击发送,你应该能在页面上看到你发送的消息和服务器返回的消息。
4. 处理不同类型的数据:文本、二进制和 JSON
上面的例子只处理了文本消息。FastAPI 的 WebSocket
对象提供了更多方法来发送和接收不同类型的数据:
await websocket.send_text(data: str)
: 发送文本数据。await websocket.receive_text() -> str
: 接收文本数据。await websocket.send_bytes(data: bytes)
: 发送二进制数据。await websocket.receive_bytes() -> bytes
: 接收二进制数据。await websocket.send_json(data: Any, mode: str = 'text')
: 发送 JSON 数据。data
可以是任何可 JSON 序列化的 Python 对象。mode
可以是'text'
或'binary'
,决定底层 WebSocket 帧的类型(通常使用'text'
)。await websocket.receive_json(mode: str = 'text') -> Any
: 接收 JSON 数据,并自动反序列化为 Python 对象。mode
可以是'text'
或'binary'
。
示例:收发 JSON 数据
假设客户端和服务器之间约定使用 JSON 格式交换结构化消息。客户端发送 {"message": "hello", "sender": "user1"}
,服务器返回 {"status": "received", "echo": "hello from user1"}
。
“`python
main.py (修改 websocket_endpoint)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket(“/ws_json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
print(“Client connected (JSON endpoint)”)
try:
while True:
# 接收 JSON 数据,自动反序列化为 Python 字典/列表等
data = await websocket.receive_json()
print(f"Received JSON: {data}")
# 假设期望接收 {"message": "...", "sender": "..."}
message = data.get("message", "N/A")
sender = data.get("sender", "Anonymous")
# 构建 JSON 响应
response = {
"status": "received",
"echo": f"{message} from {sender}"
}
# 发送 JSON 数据,自动序列化
await websocket.send_json(response)
print(f"Sent JSON: {response}")
except WebSocketDisconnect:
print("Client disconnected (JSON endpoint)")
except Exception as e:
print(f"An error occurred in JSON endpoint: {e}")
… (其他 HTTP 路径操作)
“`
对应的 JavaScript 客户端需要使用 JSON.stringify()
发送数据,并解析接收到的数据:
“`javascript
// … (index.html script)
var websocket_json = new WebSocket(“ws://127.0.0.1:8000/ws_json”);
websocket_json.onopen = function(event) {
console.log(“WebSocket JSON connection opened.”);
};
websocket_json.onmessage = function(event) {
console.log(“JSON message from server:”, event.data);
try {
const data = JSON.parse(event.data);
document.getElementById(“messages”).innerHTML += “
Received JSON: ” + JSON.stringify(data) + “
“;
} catch (e) {
console.error(“Failed to parse JSON:”, event.data);
document.getElementById(“messages”).innerHTML += “
Received non-JSON: ” + event.data + “
“;
}
};
// … (error and close handlers similar to the first example)
function sendJsonMessage() {
var input = document.getElementById(“messageInput”); // Assuming reuse the same input for simplicity
var message = input.value;
if (message) {
const jsonData = {
“message”: message,
“sender”: “web_client” // Example sender
};
websocket_json.send(JSON.stringify(jsonData));
console.log(“Sent JSON:”, jsonData);
document.getElementById(“messages”).innerHTML += “
Sent JSON: ” + JSON.stringify(jsonData) + “
“;
input.value = “”;
}
}
// Add a button and input for the JSON example in your HTML
//
“`
使用 send_json
和 receive_json
是处理结构化数据时非常方便的方式,推荐使用。
5. 管理多个客户端连接 (广播)
WebSocket 应用的常见场景是服务器需要向多个甚至所有连接的客户端发送消息,这称为广播 (Broadcasting)。由于每个 WebSocket 连接在服务器端都是一个独立的 WebSocket
对象实例,我们需要一种方式来跟踪这些活动连接,并在需要时遍历它们发送消息。
一个简单的方法是维护一个全局的连接列表。然而,更推荐的做法是创建一个连接管理器类来封装连接的添加、移除和广播逻辑。
连接管理器类示例:
“`python
main.py (添加 ConnectionManager 类)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List # 导入 List
class ConnectionManager:
def init(self):
# 使用列表来存储活跃的 WebSocket 连接
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
# 接受连接
await websocket.accept()
# 将新的连接添加到列表中
self.active_connections.append(websocket)
print(f"Client connected. Total active connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
# 从列表中移除断开的连接
try:
self.active_connections.remove(websocket)
print(f"Client disconnected. Total active connections: {len(self.active_connections)}")
except ValueError:
# 可能连接已经在广播时被移除
pass
async def send_personal_message(self, message: str, websocket: WebSocket):
# 向特定的客户端发送消息
try:
await websocket.send_text(message)
except WebSocketDisconnect:
# 如果发送时发现断开,则清理
self.disconnect(websocket)
except Exception as e:
print(f"Error sending personal message: {e}")
self.disconnect(websocket)
async def broadcast(self, message: str):
# 向所有活跃的客户端广播消息
# 注意:在遍历时修改列表是不安全的,所以我们创建一个副本
# 或者更安全地,在发送失败时移除
disconnected_clients = []
for connection in self.active_connections:
try:
await connection.send_text(message)
except WebSocketDisconnect:
# 如果发送时发现客户端已断开,标记移除
disconnected_clients.append(connection)
except Exception as e:
# 处理其他发送错误
print(f"Error broadcasting to a client: {e}")
disconnected_clients.append(connection)
# 移除所有已标记断开的客户端
for client in disconnected_clients:
self.disconnect(client)
在应用启动时创建连接管理器实例
manager = ConnectionManager()
修改 websocket_endpoint 使用连接管理器
@app.websocket(“/ws_manager”)
async def websocket_endpoint_with_manager(websocket: WebSocket):
# 使用管理器处理连接
await manager.connect(websocket)
try:
while True:
# 接收消息
data = await websocket.receive_text()
# 向发送者回复私信
await manager.send_personal_message(f"You wrote: {data}", websocket)
# 向所有客户端广播消息(除了发送者自己,如果需要可以加判断)
await manager.broadcast(f"Client says: {data}")
except WebSocketDisconnect:
# 在断开连接时从管理器中移除
manager.disconnect(websocket)
except Exception as e:
print(f"An error occurred in /ws_manager: {e}")
manager.disconnect(websocket) # 确保发生其他错误时也移除连接
… (其他路径操作)
“`
代码解释:
ConnectionManager
类:active_connections: List[WebSocket]
: 一个列表,用于存储当前所有连接着的WebSocket
对象。connect(self, websocket: WebSocket)
: 接受传入的websocket
连接,然后将其添加到active_connections
列表中。disconnect(self, websocket: WebSocket)
: 从active_connections
列表中移除断开的websocket
。send_personal_message(self, message: str, websocket: WebSocket)
: 向特定的一个客户端发送消息。增加了try...except
块来捕获发送时的断开错误。broadcast(self, message: str)
: 遍历active_connections
列表,向每个连接发送消息。重要: 在循环中处理WebSocketDisconnect
异常,并将断开的连接收集起来,在循环结束后再统一移除。直接在循环中修改列表(如self.active_connections.remove(connection)
)可能会导致问题。
manager = ConnectionManager()
: 创建一个ConnectionManager
实例。在实际应用中,这个实例应该是单例的,以便所有 WebSocket 连接共享同一个管理器。@app.websocket("/ws_manager")
: 新的 WebSocket 端点。await manager.connect(websocket)
: 在端点函数的开始,调用管理器的connect
方法来处理连接接受和列表添加。- 在
try
块中,接收消息后,我们分别调用manager.send_personal_message
和manager.broadcast
来发送消息。 - 在
except WebSocketDisconnect:
块中,调用manager.disconnect(websocket)
来从列表中移除断开的连接。这确保了管理器只保留活跃的连接。
使用这个管理器,你可以更方便地构建聊天室、实时通知等需要向多个客户端推送消息的应用。
思考题: 这种基于列表的连接管理器有什么局限性?
- 单进程限制: 如果你的应用使用 Uvicorn 或 Gunicorn 启动了多个 worker 进程,每个进程都会有自己的
ConnectionManager
实例和连接列表。一个进程接收到的消息无法直接广播给另一个进程的客户端。 - 内存占用: 大量连接会占用较多内存。
- 可靠性: 如果服务器崩溃,所有连接信息会丢失。
对于需要更高可扩展性和可靠性的生产环境应用,你需要使用外部的 Pub/Sub 系统(如 Redis Pub/Sub, RabbitMQ, Kafka 等)来管理跨进程/跨服务器的消息广播。FastAPI 后端只负责处理单个连接的接收和发送,消息的路由和分发由外部系统负责。但这超出了本文的基础范围。
6. 处理断开连接和错误
前面我们已经使用了 try...except WebSocketDisconnect:
来处理客户端断开连接。这是非常重要的,因为它允许你在客户端离开时执行清理工作(例如从连接管理器中移除)。
WebSocketDisconnect
异常对象包含连接断开的原因代码 (code
) 和可选的理由 (reason
)。你可以在异常处理块中访问这些信息:
python
except WebSocketDisconnect as e:
print(f"Client disconnected with code: {e.code}, reason: {e.reason}")
manager.disconnect(websocket)
常见的 WebSocket 关闭代码包括:
1000
: Normal Closure (正常关闭)1001
: Going Away (端点正离开,例如页面导航)1006
: Abormal Closure (异常关闭,无明确代码,如网络中断)- 其他代码用于特定的协议错误或应用错误。
除了 WebSocketDisconnect
,还应该考虑其他可能发生的网络或应用错误,并适当地捕获和处理它们,例如在 send_*
或 receive_*
调用时捕获 RuntimeError
或其他 I/O 相关的异常。在发生任何错误时,通常都应该尝试从连接管理器中移除对应的 websocket
,以避免向已失效的连接发送数据导致更多错误。
7. 在 WebSocket 端点中使用依赖注入
FastAPI 强大的依赖注入系统同样适用于 WebSocket 端点。你可以像 HTTP 路径操作一样,在 WebSocket 函数的参数中声明依赖。
例如,如果你需要从查询参数中获取一个用户 ID,或者注入一个配置对象,可以这样做:
“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from typing import Optional # 导入 Optional
… (ConnectionManager 类)
@app.websocket(“/ws_auth/{client_id}”)
async def websocket_auth_endpoint(
websocket: WebSocket,
client_id: int, # 从路径参数获取 client_id
token: str = Query(…, description=”Authentication token”) # 从查询参数获取 token
):
# 在 accept() 之前进行认证和授权检查
if token != “secrettoken123″: # 简单的硬编码检查示例
print(f”Authentication failed for client_id {client_id}”)
# 使用特定的错误代码关闭连接,例如 4003 表示 “Bad Data” 或自定义代码
# WebSocket spec recommends codes 4000-4999 for application errors
await websocket.close(code=4001) # Use a specific code for auth failure
return # 停止处理该连接
# 如果认证通过,接受连接
await manager.connect(websocket)
print(f"Client {client_id} connected with token {token}")
# 你可以将 client_id 或用户对象与 websocket 关联起来存储在 manager 中
# 例如,manager 可以维护一个 websocket 到 client_id 的映射
try:
await manager.broadcast(f"Client {client_id} joined the chat.")
while True:
data = await websocket.receive_text()
print(f"Client {client_id} sent: {data}")
# 广播消息,包含发送者 ID
await manager.broadcast(f"Client {client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client {client_id} left the chat.")
print(f"Client {client_id} disconnected.")
except Exception as e:
print(f"An error occurred for client {client_id}: {e}")
manager.disconnect(websocket)
await manager.broadcast(f"Client {client_id} experienced an error and left.")
“`
代码解释:
client_id: int
: 从路径参数{client_id}
中提取整数类型的client_id
。token: str = Query(..., description="Authentication token")
: 使用Query
依赖从查询参数中提取名为token
的字符串。...
表示它是必需的。- 认证/授权时机: 在调用
await websocket.accept()
之前进行认证和授权检查非常重要。如果检查失败,不要调用accept()
,而是直接调用await websocket.close(...)
并return
。 await websocket.close(code=4001)
: 使用close()
方法以指定的 WebSocket 状态码关闭连接。- 关联用户/客户端信息: 在实际应用中,你可能需要将认证通过的用户信息(如用户 ID、用户名)与
websocket
实例关联起来存储在连接管理器中,这样在接收到消息或广播时,就知道消息来自哪个用户,或者可以向特定用户发送私信。这需要在ConnectionManager
类中进行相应的修改(例如使用字典{websocket: user_id}
)。
8. 构建更复杂的应用:一个简单的聊天室
让我们综合以上知识,构建一个稍微复杂一点的简单内存聊天室。
“`python
main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict
class ConnectionManager:
def init(self):
# 存储活跃连接,key 是 WebSocket 对象,value 可以是用户ID或用户名
self.active_connections: Dict[WebSocket, str] = {}
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.active_connections[websocket] = client_id
print(f"Client {client_id} connected. Total: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
client_id = self.active_connections[websocket]
del self.active_connections[websocket]
print(f"Client {client_id} disconnected. Total: {len(self.active_connections)}")
return client_id
return None
async def send_personal_message(self, message: str, websocket: WebSocket):
try:
await websocket.send_text(message)
except WebSocketDisconnect:
self.disconnect(websocket)
except Exception as e:
print(f"Error sending personal message: {e}")
self.disconnect(websocket)
async def broadcast(self, message: str):
disconnected_clients = []
for connection in list(self.active_connections): # Iterate over a copy
try:
await connection.send_text(message)
except WebSocketDisconnect:
disconnected_clients.append(connection)
except Exception as e:
print(f"Error broadcasting: {e}")
disconnected_clients.append(connection)
for client in disconnected_clients:
self.disconnect(client)
manager = ConnectionManager()
app = FastAPI()
@app.websocket(“/ws/chat/{client_id}”)
async def websocket_chat_endpoint(
websocket: WebSocket,
client_id: str # 假设 client_id 从路径参数获取,实际应用可能通过认证获取
):
# 简单的客户端ID重复检查(此处仅为示例,生产环境需更严谨)
# if client_id in manager.active_connections.values():
# await websocket.close(code=4000, reason=”Client ID already in use”)
# return
await manager.connect(websocket, client_id)
try:
# 广播新用户加入消息
await manager.broadcast(f"Chat: Client {client_id} joined.")
while True:
data = await websocket.receive_text() # 接收文本消息作为聊天内容
print(f"Received from {client_id}: {data}")
# 广播聊天消息
await manager.broadcast(f"Chat from {client_id}: {data}")
except WebSocketDisconnect:
disconnected_client_id = manager.disconnect(websocket)
if disconnected_client_id:
await manager.broadcast(f"Chat: Client {disconnected_client_id} left.")
except Exception as e:
print(f"An error occurred for {client_id}: {e}")
disconnected_client_id = manager.disconnect(websocket)
if disconnected_client_id:
await manager.broadcast(f"Chat: Client {disconnected_client_id} left due to error.")
Optional: Serve a simple HTML client for the chat
from fastapi.responses import HTMLResponse
html = “””
FastAPI Chat
“””
@app.get(“/chat_client”, response_class=HTMLResponse)
async def get_chat_client():
return html
“`
运行 Uvicorn uvicorn main:app --reload
。访问 http://127.0.0.1:8000/chat_client
。打开多个浏览器窗口或标签页,输入不同的客户端 ID 并连接,然后就可以互相发送消息了。
注意: 这个聊天室是基于内存的,服务器重启会丢失所有连接信息。且如前所述,不适用于多进程部署。
9. 路由和结构化应用
随着 WebSocket 端点数量的增加,你可以像组织 HTTP 路径操作一样,使用 APIRouter
来组织 WebSocket 端点。
“`python
routers/websocket_router.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
Import your ConnectionManager or other necessary components
from ..main import manager # Example if manager is in main.py
router = APIRouter()
Assuming manager is accessible, maybe passed during router creation or is a global/dependency
For this example, let’s assume manager is imported or somehow available
In a real app, you’d inject dependencies or manage state properly.
Let’s redefine a minimal manager for demonstration within the router file
class RouterConnectionManager:
def init(self):
self.active_connections = []
async def connect(self, ws): await ws.accept(); self.active_connections.append(ws)
def disconnect(self, ws): self.active_connections.remove(ws)
async def broadcast(self, msg):
disconnected = []
for conn in list(self.active_connections):
try: await conn.send_text(msg)
except WebSocketDisconnect: disconnected.append(conn)
except Exception as e: print(f”Router broadcast error: {e}”); disconnected.append(conn)
for conn in disconnected: self.disconnect(conn)
router_manager = RouterConnectionManager()
@router.websocket(“/ws/items”)
async def websocket_item_endpoint(websocket: WebSocket):
await router_manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(f”Item endpoint received: {data}”)
await router_manager.broadcast(f”Item update: {data}”)
except WebSocketDisconnect:
router_manager.disconnect(websocket)
print(“Item endpoint client disconnected”)
main.py (updated)
from fastapi import FastAPI
from .routers import websocket_router # Assuming router is in routers/websocket_router.py
app = FastAPI()
Mount the router
app.include_router(websocket_router.router) # Uncomment this line
… (Other HTTP paths or WebSocket endpoints if needed)
“`
在 main.py
中,通过 app.include_router(websocket_router.router)
即可将定义在其他文件中的 WebSocket 路由包含进来。这有助于保持代码的模块化和可维护性。
10. 测试 WebSocket 端点
FastAPI 的 TestClient
主要用于测试 HTTP 端点。要测试 WebSocket 端点,你需要使用支持 WebSocket 协议的测试库,例如 websockets
库本身,通常结合 pytest
和 pytest-asyncio
。
以下是一个使用 websockets
进行简单测试的示例:
“`python
test_main.py
import pytest
import uvicorn
import asyncio
import threading
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import websockets # Make sure websockets is installed (pip install websockets pytest-asyncio)
Assuming your FastAPI app is defined in main.py as app
from main import app, manager # Import your app and manager
For standalone test, redefine a minimal app and manager
app = FastAPI()
class TestConnectionManager:
def init(self): self.active_connections = []
async def connect(self, ws): await ws.accept(); self.active_connections.append(ws)
def disconnect(self, ws): self.active_connections.remove(ws)
async def broadcast(self, msg):
disconnected = []
for conn in list(self.active_connections):
try: await conn.send_text(msg)
except WebSocketDisconnect: disconnected.append(conn)
except Exception as e: print(f”Test broadcast error: {e}”); disconnected.append(conn)
for conn in disconnected: self.disconnect(conn)
test_manager = TestConnectionManager()
@app.websocket(“/ws/test”)
async def websocket_test_endpoint(websocket: WebSocket):
await test_manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await test_manager.send_personal_message(f”Echo: {data}”, websocket)
except WebSocketDisconnect:
test_manager.disconnect(websocket)
Helper to run the FastAPI app in a separate thread for testing
def run_server():
config = uvicorn.Config(app, host=”127.0.0.1″, port=8001, log_level=”info”)
server = uvicorn.Server(config)
server.run()
server_thread = threading.Thread(target=run_server, daemon=True)
@pytest.fixture(scope=”module”, autouse=True)
def start_server():
# Start the server in a separate thread
server_thread.start()
# Wait a bit for the server to start
import time
time.sleep(1)
yield
# Server stops when the thread is daemon and test finishes
@pytest.mark.asyncio # Use pytest-asyncio marker
async def test_websocket_echo():
uri = “ws://127.0.0.1:8001/ws/test”
try:
async with websockets.connect(uri) as websocket:
message = “Hello WebSocket!”
await websocket.send(message)
print(f”> {message}”)
response = await websocket.recv()
print(f"< {response}")
assert response == f"Echo: {message}"
# Test sending another message
message2 = "Another test"
await websocket.send(message2)
print(f"> {message2}")
response2 = await websocket.recv()
print(f"< {response2}")
assert response2 == f"Echo: {message2}"
except Exception as e:
pytest.fail(f"WebSocket test failed: {e}")
@pytest.mark.asyncio
async def test_multiple_connections():
uri = “ws://127.0.0.1:8001/ws/test”
clients = []
try:
# Connect multiple clients
client1 = await websockets.connect(uri)
clients.append(client1)
client2 = await websockets.connect(uri)
clients.append(client2)
assert len(test_manager.active_connections) == 2 # Assuming test_manager is accessible
# Client 1 sends message
await client1.send("Msg from 1")
response1 = await client1.recv()
assert response1 == "Echo: Msg from 1"
# Client 2 sends message
await client2.send("Msg from 2")
response2 = await client2.recv()
assert response2 == "Echo: Msg from 2"
# Test disconnect
await client1.close()
# Give server a moment to process disconnect
await asyncio.sleep(0.1)
assert len(test_manager.active_connections) == 1
except Exception as e:
pytest.fail(f"Multiple connections test failed: {e}")
finally:
# Ensure connections are closed
for client in clients:
if not client.closed:
await client.close()
“`
运行 pytest -s
即可执行测试。这个示例展示了如何启动一个独立的 Uvicorn 服务器实例,然后使用 websockets
客户端连接并发送/接收消息。
11. 部署注意事项
部署 FastAPI WebSocket 应用时,需要注意以下几点:
- ASGI Server: 确保你使用的 ASGI 服务器(如 Uvicorn、Hypercorn)支持 WebSockets。Uvicorn 默认支持。
- Reverse Proxy: 如果你在 FastAPI 应用前面使用了反向代理(如 Nginx, Traefik, Caddy),需要配置代理来正确处理 WebSocket 握手。这通常涉及将
Upgrade
和Connection
头从客户端传递给后端服务。- Nginx 示例配置片段:
nginx
location /ws {
proxy_pass http://your_fastapi_upstream;
proxy_http_version 1.1;
proxy_set_header Upgrade $websocket_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 86400s; # Keep the connection open
}
- Nginx 示例配置片段:
- Scaling: 如前所述,基于内存的连接管理器不适合多进程或多服务器部署。对于需要水平扩展的应用,必须引入外部的 Pub/Sub 系统(如 Redis)来协调跨进程/跨服务器的消息广播。每个 FastAPI 实例只维护与其直接连接的客户端列表,并通过 Pub/Sub 系统发布和接收需要广播的消息。
- Load Balancing: 负载均衡器也需要支持 WebSocket 协议,通常使用 TCP 级别的负载均衡(如 ELB 在 TCP 模式下)或者支持 WebSocket 的 HTTP 负载均衡器(如 Nginx, HAProxy)。确保粘性会话 (sticky sessions) 不是必须的,因为 Pub/Sub 模式下任何客户端都可以通过任何后端实例收到消息。
- Connection Limits: 注意操作系统和服务器对最大文件描述符数量的限制,这会影响你能同时处理的连接数量。
12. 总结
FastAPI 凭借其异步能力和简洁的 API,为构建高性能的 WebSocket 应用提供了坚实的基础。通过本文的学习,你应该已经掌握了:
- 如何定义基本的 WebSocket 端点。
- 如何接受和关闭 WebSocket 连接。
- 如何发送和接收文本、二进制和 JSON 数据。
- 如何使用连接管理器来管理多个客户端连接并实现广播。
- 如何处理客户端断开连接和潜在错误。
- 如何在 WebSocket 端点中使用 FastAPI 的依赖注入。
- 构建一个简单的 WebSocket 聊天室的思路。
- 组织 WebSocket 路由和进行基本测试的方法。
- 部署 WebSocket 应用时需要考虑的关键点。
WebSocket 是构建实时 Web 应用的强大工具,而 FastAPI 使其开发变得更加愉快和高效。在此基础上,你可以进一步探索更高级的话题,如使用外部消息队列进行扩展、实现更复杂的实时协议、将 WebSocket 与现有认证系统集成等。
祝你在使用 FastAPI 构建实时应用的旅程中一切顺利!