使用 FastAPI 搭建高性能 WebSocket 服务:从入门到进阶
引言:拥抱实时通信,为何选择 FastAPI 构建 WebSocket 服务?
在现代 Web 应用中,实时性变得越来越重要。传统的 HTTP 请求-响应模式是无状态的,客户端需要不断地发送请求(如轮询或长轮询)来获取服务器的最新状态,这不仅效率低下,而且给服务器带来了不必要的压力。
WebSocket 协议应运而生,它提供了一种在单个 TCP 连接上进行全双工、双向通信的机制。一旦连接建立,服务器和客户端可以随时互相发送数据,无需重复建立连接和发送冗余的 HTTP 头。这使得构建聊天应用、实时数据展示、游戏、协作工具等实时应用成为可能。
选择 FastAPI 来构建 WebSocket 服务具有显著的优势:
- 高性能: FastAPI 基于 Starlette 构建,而 Starlette 是一个轻量级且高性能的 ASGI (Asynchronous Server Gateway Interface) 框架。它原生支持异步操作,能够高效处理大量并发连接,这对于 WebSocket 服务至关重要。
- 易用性与现代性: FastAPI 充分利用 Python 3.6+ 的类型提示,结合 Pydantic 进行数据验证和序列化,提供了自动生成 API 文档(Swagger UI/ReDoc)等强大功能,极大地提高了开发效率。
- 集成度高: WebSocket 功能是 Starlette 的核心组成部分,因此在 FastAPI 中使用 WebSocket 非常自然和方便。
- 异步支持: FastAPI 的
async/await
语法使得编写并发的 WebSocket 处理逻辑变得直观。
本文将详细介绍如何使用 FastAPI 从零开始搭建一个 WebSocket 服务,包括基本概念、连接管理、消息广播以及一些进阶考虑。
前置准备
在开始之前,确保你已经安装了 Python 3.6+。然后安装 FastAPI 和一个 ASGI 服务器,例如 Uvicorn:
bash
pip install fastapi uvicorn
基础 WebSocket 端点
首先,我们创建一个最简单的 WebSocket 端点。这个端点将接收客户端发送的文本消息,并将其作为回应发送回去(一个简单的 Echo 服务)。
创建一个 Python 文件,例如 main.py
:
“`python
main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
创建 FastAPI 应用实例
app = FastAPI()
定义一个 WebSocket 端点
@app.websocket(“/ws”) 表示这个路径是一个 WebSocket 连接的入口
async def websocket_endpoint(…) 是处理 WebSocket 连接的异步函数
websocket: WebSocket 是 FastAPI 自动注入的 WebSocket 对象,用于与客户端通信
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 接受客户端的连接。这是建立 WebSocket 连接的关键一步。
# 在接受连接之前,客户端和服务器之间的连接尚未完全建立为 WebSocket 连接。
await websocket.accept()
print(f”WebSocket connection accepted.”) # 打印连接建立信息
try:
# 循环接收客户端发送的消息
while True:
# receive_text() 是一个异步方法,用于接收文本类型的消息。
# 它会阻塞,直到接收到一条消息或连接关闭。
data = await websocket.receive_text()
print(f"Received message: {data}") # 打印接收到的消息
# 将接收到的消息发送回客户端
# send_text() 是一个异步方法,用于发送文本类型的消息。
await websocket.send_text(f"Message text was: {data}")
print(f"Sent message: Message text was: {data}") # 打印发送信息
except WebSocketDisconnect as e:
# 当客户端断开连接时,会抛出 WebSocketDisconnect 异常
# 这个异常对象 e 包含断开连接的代码和原因
print(f"WebSocket disconnected: code={e.code}, reason={e.reason}")
except Exception as e:
# 捕获其他可能的异常
print(f"An error occurred: {e}")
finally:
# 无论连接正常关闭还是异常断开,最终都会执行到这里
# 在更复杂的应用中,这里可以执行一些清理工作,
# 例如从连接列表中移除客户端
print(f"WebSocket connection closed.")
“`
代码解释:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
: 导入所需的类。WebSocket
代表一个客户端的 WebSocket 连接,WebSocketDisconnect
是连接断开时抛出的异常。app = FastAPI()
: 创建 FastAPI 应用实例。@app.websocket("/ws")
: 这是一个装饰器,将websocket_endpoint
函数注册为处理/ws
路径上 WebSocket 连接的处理器。async def websocket_endpoint(websocket: WebSocket):
: 定义一个异步函数来处理连接。FastAPI 会自动将代表当前连接的WebSocket
对象作为参数传递进来。await websocket.accept()
: 重要步骤! 在开始与客户端交换数据之前,必须调用accept()
方法来完成 WebSocket 连接的握手。try...except WebSocketDisconnect
: WebSocket 连接的生命周期通常通过一个无限循环来维护,直到连接断开。当客户端断开连接时,FastAPI/Starlette 会抛出WebSocketDisconnect
异常。我们使用try...except
块来优雅地处理连接断开。while True:
: 持续接收客户端的消息。await websocket.receive_text()
: 接收客户端发送的文本消息。还有receive_bytes()
和receive_json()
等方法用于接收其他类型的数据。await websocket.send_text(...)
: 向客户端发送文本消息。类似地,有send_bytes()
和send_json()
等方法。finally:
: 确保在连接处理结束时执行一些清理操作。
运行基础服务
保存 main.py
文件,然后在终端中运行 Uvicorn 服务器:
bash
uvicorn main:app --reload
这将启动一个 ASGI 服务器,监听在 http://127.0.0.1:8000
。--reload
标志会在代码更改时自动重启服务器,方便开发。
现在,你的 WebSocket 服务正在运行。你需要一个 WebSocket 客户端来连接和测试它。
测试基础服务(客户端)
你可以使用各种工具或库来测试 WebSocket 连接:
- 在线 WebSocket 测试工具: 搜索 “online websocket tester”,有很多网站提供这样的服务。输入 WebSocket 地址(例如
ws://127.0.0.1:8000/ws
),连接后发送消息并查看响应。 -
浏览器控制台: 在现代浏览器的开发者工具控制台中,可以使用 JavaScript 创建 WebSocket 连接:
“`javascript
// 在浏览器控制台中运行
let ws = new WebSocket(“ws://127.0.0.1:8000/ws”);ws.onopen = function(event) {
console.log(“WebSocket connection opened”);
ws.send(“Hello Server!”); // 连接成功后发送一条消息
};ws.onmessage = function(event) {
console.log(“Message from server: “, event.data); // 接收服务器消息
};ws.onerror = function(event) {
console.error(“WebSocket error observed:”, event); // 错误处理
};ws.onclose = function(event) {
if (event.wasClean) {
console.log([close] Connection closed cleanly, code=${event.code} reason=${event.reason}
);
} else {
console.error(‘[close] Connection died’); // 例如,服务器进程被杀死
}
};// 随时可以通过 ws.send(“你的消息”) 发送消息
// 可以通过 ws.close() 关闭连接
``
websockets` 库。
3. **Python 客户端库:** 例如
通过客户端发送消息,你应该能在服务器端看到 “Received message: …” 的输出,并在客户端收到 “Message text was: …” 的回复。当客户端关闭连接时,服务器端会打印 “WebSocket disconnected…” 信息。
管理多个连接:构建一个连接管理器
在实际应用中,你通常需要管理多个并发的 WebSocket 连接。例如,在一个聊天室应用中,服务器接收到一条消息后,需要将这条消息广播给所有在线的用户。基础的 Echo 服务无法实现这一点,因为它只处理单个连接的生命周期。
为了管理多个连接,我们可以创建一个简单的类来存储活动的连接,并提供广播、发送给特定用户等功能。
“`python
main.py (在之前代码的基础上增加 ConnectionManager)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json # 导入 json 用于处理 JSON 消息,虽然 FastAPI 可以自动处理 Pydantic 模型
app = FastAPI()
定义一个连接管理器类
class ConnectionManager:
def init(self):
# 使用一个列表来存储所有活跃的 WebSocket 连接对象
# 列表是简单的数据结构,对于需要根据用户ID或其他标识符管理连接时,可以使用字典
self.active_connections: List[WebSocket] = []
# 新连接建立时调用
async def connect(self, websocket: WebSocket):
# 在接受连接前先将其添加到列表,确保即使 accept 失败也能处理清理( हालांकि accept 一般不会失败在这里)
# 更常见且安全的做法是在 accept() 成功后再添加
await websocket.accept() # 接受连接
self.active_connections.append(websocket) # 将新的连接添加到活跃连接列表
print(f"Client connected. Total active connections: {len(self.active_connections)}")
# 连接断开时调用
def disconnect(self, websocket: WebSocket):
# 从活跃连接列表中移除断开的连接
try:
self.active_connections.remove(websocket)
print(f"Client disconnected. Total active connections: {len(self.active_connections)}")
except ValueError:
# 如果连接不在列表中,说明可能在添加之前就断开了,或者重复移除了
print(f"Attempted to remove a connection not in the list.")
# 向单个客户端发送文本消息
async def send_personal_message(self, message: str, websocket: WebSocket):
try:
await websocket.send_text(message)
# print(f"Sent personal message '{message}' to a client.") # 调试信息,可能很频繁
except Exception as e:
print(f"Error sending personal message: {e}. Connection might be closed.")
# 这里可以考虑调用 disconnect(websocket) 移除这个失败的连接
# 向所有活跃客户端广播文本消息
async def broadcast(self, message: str):
print(f"Broadcasting message: '{message}' to {len(self.active_connections)} connections.")
# 创建一个任务列表,并发地向所有客户端发送消息
# 使用 list() 复制列表,避免在循环过程中列表被修改(例如,有客户端断开被移除)
disconnected_clients = [] # 存放发送失败(已断开)的客户端
for connection in list(self.active_connections):
try:
await connection.send_text(message)
except WebSocketDisconnect:
# 客户端已经断开,虽然我们已经在 endpoint 的 except 中处理,
# 但在广播时也可能遇到,记录下来稍后移除
print(f"Broadcast failed for a client due to disconnection.")
disconnected_clients.append(connection)
except Exception as e:
# 捕获其他发送异常
print(f"Broadcast failed for a client: {e}")
disconnected_clients.append(connection)
# 在广播结束后,移除所有发送失败的连接
for client in disconnected_clients:
self.disconnect(client)
创建一个连接管理器实例。这个实例应该在整个应用生命周期中是唯一的。
manager = ConnectionManager()
修改 WebSocket 端点以使用连接管理器
@app.websocket(“/ws/{client_id}”) # 示例:通过路径参数区分客户端(虽然简单,实际可能用token等)
async def websocket_endpoint(websocket: WebSocket, client_id: int): # 获取 client_id
# 将新连接添加到管理器
await manager.connect(websocket)
# 通知所有客户端有新用户加入 (可选)
await manager.broadcast(f”Client #{client_id} joined the chat.”)
try:
# 持续接收该客户端的消息
while True:
data = await websocket.receive_text()
# 接收到消息后,可以发送给特定客户端,或者广播给所有人
# await manager.send_personal_message(f"You sent: {data}", websocket) # 发送给发消息的客户端
await manager.broadcast(f"Client #{client_id} says: {data}") # 广播给所有客户端
except WebSocketDisconnect:
# 客户端断开时,从管理器中移除
manager.disconnect(websocket)
# 通知所有客户端有用户离开 (可选)
await manager.broadcast(f"Client #{client_id} left the chat.")
except Exception as e:
# 捕获其他可能的异常
print(f"Error in websocket_endpoint for client #{client_id}: {e}")
manager.disconnect(websocket) # 异常发生也尝试移除
“`
代码解释:
ConnectionManager
类:active_connections: List[WebSocket]
: 存储所有当前连接的WebSocket
对象。使用列表是最简单的方式,如果需要按用户 ID 或其他标识符查找连接,可以使用字典{client_id: websocket}
。connect(self, websocket: WebSocket)
: 接受连接并将WebSocket
对象添加到列表。disconnect(self, websocket: WebSocket)
: 从列表中移除断开的连接。send_personal_message(self, message: str, websocket: WebSocket)
: 向指定的WebSocket
对象发送消息。broadcast(self, message: str)
: 遍历所有活跃连接,向每个连接发送消息。这里使用了list()
创建列表的副本进行迭代,以避免在迭代过程中因客户端断开导致列表修改引发错误。同时增加了对发送失败的连接进行记录和移除的逻辑,增强健壮性。
manager = ConnectionManager()
: 在 FastAPI 应用实例创建后,创建一个全局的ConnectionManager
实例。所有连接都会共用这一个管理器。@app.websocket("/ws/{client_id}")
: 我们在路径中引入了一个路径参数client_id
,这样每个连接可以有一个简单的标识符(虽然在实际应用中可能更复杂)。async def websocket_endpoint(websocket: WebSocket, client_id: int):
: 端点函数现在接收client_id
参数。await manager.connect(websocket)
: 在函数开始时,将新的连接添加到管理器。manager.disconnect(websocket)
: 在WebSocketDisconnect
异常处理块中,从管理器中移除断开的连接。await manager.broadcast(...)
: 接收到消息后,不再是简单回显,而是调用管理器的broadcast
方法将消息发送给所有连接。- 广播消息时,我们通过
f"Client #{client_id} says: {data}"
的格式,将发送消息的客户端 ID 也包含在内,方便客户端区分。
现在,使用 uvicorn main:app --reload
再次运行服务。使用多个客户端(不同的浏览器标签页或工具)连接 ws://127.0.0.1:8000/ws/123
,ws://127.0.0.1:8000/ws/456
等不同的 client_id
。在一个客户端发送消息,应该能在所有连接的客户端上看到消息。
处理结构化消息(JSON)
仅仅发送和接收文本消息在实际应用中是不够的。通常,客户端和服务器会通过发送结构化的数据,如 JSON,来表示不同的消息类型(例如,聊天消息、用户加入通知、状态更新等)。
FastAPI 可以很好地结合 Pydantic 来处理 JSON 数据。
“`python
main.py (在 ConnectionManager 的基础上修改)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict, Any
from pydantic import BaseModel # 导入 BaseModel
app = FastAPI()
定义一个 Pydantic 模型来表示 JSON 消息的结构
class Message(BaseModel):
type: str # 消息类型,例如 “chat”, “status”, “join”, “leave”
payload: Dict[str, Any] # 消息的具体内容,可以是任意字典
修改 ConnectionManager 以支持发送 JSON (虽然 send_json 已经支持字典)
也可以增加 send_json_message 方法
class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = [] # 依然使用列表,方便广播
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
print(f"Client connected. Total active connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
try:
self.active_connections.remove(websocket)
print(f"Client disconnected. Total active connections: {len(self.active_connections)}")
except ValueError:
print(f"Attempted to remove a connection not in the list.")
# 发送任意 JSON 数据 (字典形式) 给单个客户端
async def send_json_message(self, data: Dict[str, Any], websocket: WebSocket):
try:
await websocket.send_json(data)
except Exception as e:
print(f"Error sending JSON message: {e}. Connection might be closed.")
# 广播任意 JSON 数据 (字典形式) 给所有客户端
async def broadcast_json(self, data: Dict[str, Any]):
print(f"Broadcasting JSON message: {data} to {len(self.active_connections)} connections.")
disconnected_clients = []
for connection in list(self.active_connections):
try:
await connection.send_json(data) # 使用 send_json 发送字典
except WebSocketDisconnect:
print(f"Broadcast failed for a client (JSON) due to disconnection.")
disconnected_clients.append(connection)
except Exception as e:
print(f"Broadcast failed for a client (JSON): {e}")
disconnected_clients.append(connection)
for client in disconnected_clients:
self.disconnect(client)
manager = ConnectionManager()
@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
# 新用户加入时,发送一个结构化的加入消息
await manager.broadcast_json(
{"type": "status", "payload": {"message": f"Client #{client_id} joined the chat.", "user_count": len(manager.active_connections)}}
)
try:
while True:
# 使用 receive_json() 接收 JSON 消息
data = await websocket.receive_json()
print(f"Received JSON message from client #{client_id}: {data}")
# 可以选择将接收到的字典验证或解析为 Pydantic 模型,这里直接使用字典
# validated_message = Message(**data) # 如果需要 Pydantic 验证
# 假设客户端发送的消息格式是 {"type": "chat", "payload": {"text": "..."}}
if data.get("type") == "chat" and isinstance(data.get("payload"), dict) and "text" in data["payload"]:
chat_text = data["payload"]["text"]
# 广播聊天消息,包含发送者 ID
await manager.broadcast_json(
{"type": "chat", "payload": {"sender_id": client_id, "text": chat_text}}
)
else:
# 处理未知或格式错误的消息
print(f"Received unknown or malformed message from client #{client_id}: {data}")
await manager.send_json_message(
{"type": "error", "payload": {"message": "Unknown or malformed message format."}},
websocket
)
except WebSocketDisconnect:
manager.disconnect(websocket)
# 用户离开时,发送一个结构化的离开消息
await manager.broadcast_json(
{"type": "status", "payload": {"message": f"Client #{client_id} left the chat.", "user_count": len(manager.active_connections)}}
)
except Exception as e:
print(f"Error processing message for client #{client_id}: {e}")
manager.disconnect(websocket)
“`
代码解释:
from pydantic import BaseModel
: 导入BaseModel
。class Message(BaseModel): ...
: 定义一个简单的 Pydantic 模型来规范消息结构。虽然在这个例子中我们直接处理接收到的字典,但在更复杂的应用中,使用 Pydantic 模型进行数据验证和自动文档生成非常有益。await websocket.receive_json()
: 接收客户端发送的 JSON 消息,FastAPI 会自动将其解析为 Python 字典。await websocket.send_json(data)
: 向客户端发送 Python 字典,FastAPI 会自动将其序列化为 JSON 字符串。- 根据接收到的 JSON 消息的
type
字段,我们可以实现不同的业务逻辑(例如,处理聊天消息、用户在线状态等)。 - 广播消息时,我们使用
broadcast_json
方法发送 JSON 字典,包含消息类型、发送者 ID、内容等信息。 - 在连接建立和断开时,发送带有
type: "status"
的 JSON 消息,通知所有客户端用户状态变化。
使用支持发送 JSON 的 WebSocket 客户端(例如,使用 JavaScript 的 ws.send(JSON.stringify({...}))
)连接并测试。发送如 {"type": "chat", "payload": {"text": "Hello everyone!"}}
这样的 JSON 消息,看是否能正确广播。
进阶考虑与局限性
上面构建的 WebSocket 服务在一个进程中运行,连接管理器将所有活跃连接保存在内存中。这种方法对于开发和中小规模应用是可行的,但存在一些局限性:
- 单进程限制: 如果你需要运行多个 FastAPI 进程(例如,为了利用多核 CPU 或在不同服务器上部署),每个进程都会有自己的
ConnectionManager
实例和独立的连接列表。这意味着运行在不同进程上的客户端无法互相通信或接收广播消息。 - 非持久性: 如果服务器进程崩溃或重启,所有内存中的连接信息将丢失,活跃的 WebSocket 连接也会断开。
为了解决这些问题,实现水平扩展和更健壮的服务,你需要引入外部的消息代理(Message Broker),例如:
- Redis Pub/Sub: Redis 的发布/订阅功能是一个轻量级的选择,适用于在多个进程或服务器之间转发消息进行广播。当一个进程接收到 WebSocket 消息时,它将消息发布到 Redis 的某个频道;其他所有连接到该频道的进程都会收到消息,然后通过各自管理的 WebSocket 连接将消息转发给客户端。
- RabbitMQ, Kafka 等: 这些是更成熟、功能更丰富的消息队列系统,提供持久性、保证消息投递等特性,适用于更复杂和关键的应用场景。
在 FastAPI 应用中使用消息代理,你需要:
- 在 FastAPI 应用启动时连接到消息代理。
- 创建一个后台任务(例如使用
asyncio.create_task
或 FastAPI 的 DependenciesBackgroundTasks
的概念,虽然后台任务更常用)来订阅消息代理的频道。 - 当后台任务从消息代理接收到消息时,通过 ConnectionManager 的方法(可能需要修改以通过用户 ID 等查找连接)将消息发送给相关的客户端。
- 当 FastAPI 接收到客户端的 WebSocket 消息时,将消息发布到消息代理。
这部分内容超出了本文入门的范畴,但理解这一扩展路径对于构建可伸缩的 WebSocket 服务至关重要。
其他进阶考虑:
- 认证和授权: 如何验证 WebSocket 连接的客户端身份?如何控制客户端可以执行的操作(例如,只有登录用户才能发送聊天消息)?这通常可以在 WebSocket 握手阶段(例如,通过 URL 参数传递 token,或从 HTTP headers 中获取 cookies/auth headers – Starlette/FastAPI 在握手阶段允许访问这些)进行处理。
- 心跳机制: 为了保持连接活跃和检测断开,客户端和服务器可能需要定期发送小型的心跳消息。
- 错误处理和重连: 客户端和服务器需要实现适当的错误处理和自动重连逻辑。
- 不同消息类型的处理: 使用 Pydantic 和基于消息
type
字段的条件逻辑是处理多种消息类型的有效方式。
总结
通过本文,我们详细学习了如何使用 FastAPI 搭建 WebSocket 服务。我们从一个简单的 Echo 端点开始,逐步引入了连接管理器的概念,使其能够处理多个客户端并实现消息广播。接着,我们探讨了如何使用 JSON 和 Pydantic 来处理结构化的消息。最后,我们讨论了单进程内存在管理器方案的局限性以及如何通过引入消息代理来实现服务的水平扩展。
FastAPI 凭借其高性能的 ASGI 基础和易用性,是构建现代、实时 WebSocket 服务的优秀选择。掌握 FastAPI 的 WebSocket 功能,将为你打开构建丰富交互式应用的大门。希望这篇文章能为你提供坚实的基础,助你在实时通信的领域走得更远。