FastAPI WebSocket 开发实战:构建高性能实时应用
在现代 Web 应用中,实时性需求日益增加,例如在线聊天、实时数据看板、游戏、协作编辑等。传统的 HTTP 请求/响应模式难以高效地实现这些功能。这时,WebSocket 就派上了用场。WebSocket 协议提供了客户端与服务器之间的全双工通信通道,允许服务器主动向客户端推送数据,而无需客户端频繁地轮询。
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建基于标准 Python 类型提示的 API。得益于 Starlette 的强大异步能力,FastAPI 对 WebSocket 的支持非常出色且易于使用。本文将深入探讨如何在 FastAPI 中进行 WebSocket 开发,从基础概念到实战技巧,帮助你构建高性能的实时应用。
1. 理解 WebSocket
在深入 FastAPI 实现之前,我们先回顾一下 WebSocket 的核心概念:
- 全双工通信 (Full-Duplex): 客户端和服务器可以在同一时间相互发送数据,类似于电话通话。
- 持久连接 (Persistent Connection): 一旦连接建立,它就会保持开放,直到其中一方关闭连接或发生错误。这避免了 HTTP 中每个请求都需要建立新连接的开销。
- 基于帧 (Frame-based): 数据以小块(帧)的形式传输,可以是文本、二进制等格式。
- 握手 (Handshake): WebSocket 连接建立始于一个特殊的 HTTP 请求(通常是 GET),客户端在请求头中包含
Upgrade: websocket
和Connection: Upgrade
。如果服务器支持 WebSocket,它会返回一个特殊的响应,完成协议升级,之后连接就切换到 WebSocket 协议。
相比于 HTTP 的无状态和请求-响应模型,WebSocket 更适合需要低延迟、高频率双向数据交换的场景。
2. FastAPI 对 WebSocket 的支持
FastAPI 构建在 Starlette 之上,而 Starlette 提供了强大的 ASGI (Asynchronous Server Gateway Interface) 支持,这使得处理 WebSocket 连接变得非常自然。FastAPI 通过 WebSocket
类型和 @app.websocket()
装饰器,将底层的 ASGI WebSocket 事件抽象成易于使用的 Python 接口。
核心组件:
@app.websocket("/ws")
装饰器: 用于定义 WebSocket 终端(endpoint)。async def websocket_endpoint(websocket: WebSocket):
: WebSocket 终端处理函数。它接收一个WebSocket
类型的参数,代表当前的客户端连接。websocket: WebSocket
对象: 这个对象提供了与客户端通信的核心方法:accept()
: 接受传入的 WebSocket 连接请求。这是处理 WebSocket 连接的第一步,也是必须的一步。receive_text()
: 接收客户端发送的文本消息。这是一个异步操作,会阻塞直到接收到消息。send_text(data)
: 向客户端发送文本消息。receive_json()
: 接收客户端发送的 JSON 消息,并自动解析为 Python 数据结构(字典、列表等)。send_json(data)
: 向客户端发送 Python 数据结构,并自动序列化为 JSON 文本。receive_bytes()
: 接收客户端发送的二进制消息。send_bytes(data)
: 向客户端发送二进制消息。close(code: int = 1000, reason: str = None)
: 关闭连接。可以指定关闭码和原因。disconnect()
: 这是一个异常类型 (websockets.exceptions.WebSocketDisconnect
),当客户端断开连接时会被抛出。
3. 构建一个简单的 Echo Server
最简单的 WebSocket 应用是 Echo Server,它接收到什么消息就原样发回去。
首先,确保你安装了 FastAPI 和 ASGI 服务器(如 uvicorn):
bash
pip install fastapi uvicorn websockets
创建 main.py
文件:
“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn
import logging
配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
app = FastAPI()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
“””
简单的 Echo Server WebSocket 终端
接收到文本消息后原样发送回去
“””
# 1. 接受连接
await websocket.accept()
logger.info(f”WebSocket connection accepted: {websocket.client}”)
try:
# 2. 持续接收和发送消息
while True:
# 接收文本消息
data = await websocket.receive_text()
logger.info(f"Received message from {websocket.client}: {data}")
# 发送消息回客户端
await websocket.send_text(f"Message text was: {data}")
logger.info(f"Sent message back to {websocket.client}")
except WebSocketDisconnect as e:
# 3. 处理客户端断开连接
logger.info(f"Client disconnected: {websocket.client}, code: {e.code}, reason: {e.reason}")
except Exception as e:
# 4. 处理其他异常
logger.error(f"WebSocket error for client {websocket.client}: {e}")
# 可以在这里选择关闭连接,如果它尚未关闭
# try:
# await websocket.close()
# except Exception:
# pass # Connection might already be closed
finally:
# 任何清理工作(例如从连接列表中移除)通常在这里做,
# 但在 Echo Server 中没有连接列表
logger.info(f"WebSocket connection closed for {websocket.client}")
if name == “main“:
# 使用 uvicorn 运行应用
# –host 0.0.0.0 允许从外部访问(如果需要)
uvicorn.run(app, host=”127.0.0.1″, port=8000, log_level=”info”)
“`
运行代码:
bash
uvicorn main:app --reload
测试:
你可以使用 WebSocket 客户端(如 Postman 的 WebSocket 功能、浏览器的开发者工具或专门的 WebSocket 测试工具)连接到 ws://127.0.0.1:8000/ws
。发送任何文本消息,你应该会收到服务器回复的带有前缀的相同消息。当你关闭客户端连接时,服务器端会打印断开连接的日志。
代码解释:
@app.websocket("/ws")
: 定义/ws
路径作为 WebSocket 终端。async def websocket_endpoint(websocket: WebSocket):
: 异步函数处理 WebSocket 连接。await websocket.accept()
: 这是关键的第一步。在处理任何接收/发送操作之前,必须先接受连接。如果服务器不支持 WebSocket 或请求不合法,FastAPI/Starlette 会自动处理握手失败,不会调用此函数。while True:
: 创建一个循环,以便持续接收客户端发送的消息。WebSocket 连接建立后通常会保持开放,直到显式关闭或出现错误。data = await websocket.receive_text()
: 在循环内,等待接收客户端发送的下一条文本消息。这是一个阻塞调用,直到消息到达。await websocket.send_text(...)
: 接收到消息后,立即向同一个客户端发送回复。except WebSocketDisconnect:
: 这个异常非常重要,用于优雅地处理客户端主动断开连接的情况。当客户端关闭连接时,receive_*
方法会抛出此异常。except Exception:
: 捕获其他可能的异常,例如网络错误等。finally:
: 无论连接是正常关闭还是因异常中断,finally
块中的代码都会执行,适合进行资源清理。
4. 处理多个客户端:Connection Manager
Echo Server 只处理单个连接。实际应用中,通常需要管理多个同时在线的 WebSocket 连接,并能够向特定客户端或所有客户端广播消息。一个常见的模式是使用一个 ConnectionManager
类来集中管理这些连接。
“`python
main.py 中继续添加或修改
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import uvicorn
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
app = FastAPI()
ConnectionManager 类来管理所有活跃的连接
class ConnectionManager:
def init(self):
# 使用列表存储 WebSocket 连接对象
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
"""
接受新连接并添加到列表中
"""
await websocket.accept()
self.active_connections.append(websocket)
logger.info(f"Client connected: {websocket.client}. Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
"""
从列表中移除断开的连接
"""
try:
self.active_connections.remove(websocket)
logger.info(f"Client disconnected: {websocket.client}. Total connections: {len(self.active_connections)}")
except ValueError:
# 如果连接已经因为某种原因不在列表里(例如重复调用disconnect)
logger.warning(f"Attempted to disconnect a non-existent connection: {websocket.client}")
async def send_personal_message(self, message: str, websocket: WebSocket):
"""
向指定客户端发送消息
"""
try:
await websocket.send_text(message)
# logger.info(f"Sent personal message to {websocket.client}") # 太频繁可能影响日志可读性
except Exception as e:
logger.error(f"Failed to send personal message to {websocket.client}: {e}")
# 如果发送失败,可能是客户端已经断开,可以考虑在这里调用 disconnect
# self.disconnect(websocket) # 注意:在遍历广播时调用 disconnect 可能会导致问题
async def broadcast(self, message: str):
"""
向所有活跃客户端广播消息
"""
disconnected_clients = []
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception as e:
# 如果向某个客户端发送失败,说明它可能已经断开了
logger.error(f"Failed to broadcast message to {connection.client}: {e}")
disconnected_clients.append(connection)
# 移除那些发送失败的连接,它们很可能已经断开
for connection in disconnected_clients:
self.disconnect(connection)
创建一个 ConnectionManager 实例
manager = ConnectionManager()
@app.websocket(“/ws/chat”)
async def websocket_chat_endpoint(websocket: WebSocket):
“””
简单的聊天室 WebSocket 终端
新连接加入时通知所有人
收到消息时广播给所有人
断开连接时通知所有人
“””
# 1. 接受连接并添加到管理器
await manager.connect(websocket)
try:
# 通知所有客户端有新用户加入
await manager.broadcast(f"用户 {websocket.client.host}:{websocket.client.port} 进入聊天室")
# 2. 持续接收和广播消息
while True:
# 接收文本消息
data = await websocket.receive_text()
logger.info(f"Received message from {websocket.client}: {data}")
# 广播消息给所有客户端
# 注意:这里将接收到的消息原样广播,实际应用中可能需要包含发送者信息
await manager.broadcast(f"来自 {websocket.client.host}:{websocket.client.port} 的消息: {data}")
except WebSocketDisconnect as e:
# 3. 处理客户端断开连接
manager.disconnect(websocket)
logger.info(f"Client disconnected: {websocket.client}, code: {e.code}, reason: {e.reason}")
# 通知所有客户端有用户离开
await manager.broadcast(f"用户 {websocket.client.host}:{websocket.client.port} 离开了聊天室")
except Exception as e:
# 4. 处理其他异常
logger.error(f"WebSocket error for client {websocket.client}: {e}")
# 如果发生其他异常,也应该从管理器中移除连接
manager.disconnect(websocket)
await manager.broadcast(f"用户 {websocket.client.host}:{websocket.client.port} 因异常离开了聊天室")
如果你还想保留 Echo Server,可以改成不同的路径
@app.websocket(“/ws/echo”)
async def websocket_echo_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f”Echo: {data}”)
except WebSocketDisconnect:
pass
if name == “main“:
uvicorn.run(app, host=”127.0.0.1″, port=8000, log_level=”info”)
“`
代码解释:
ConnectionManager
: 这是一个类,负责维护一个active_connections
列表,存储所有活跃的WebSocket
对象。connect(websocket)
: 当新的 WebSocket 连接建立时,调用此方法。它先调用await websocket.accept()
接受连接,然后将 WebSocket 对象添加到列表中。disconnect(websocket)
: 当客户端断开连接时,调用此方法从列表中移除对应的 WebSocket 对象。send_personal_message(message, websocket)
: 向列表中特定的websocket
对象发送消息。broadcast(message)
: 遍历active_connections
列表,尝试向每个连接发送消息。这里增加了错误处理:如果在广播过程中向某个客户端发送失败(很可能是因为该客户端已经意外断开但尚未被WebSocketDisconnect
捕获并移除),则捕获异常,并将该连接标记为待移除。遍历完成后,再统一移除这些发送失败的连接。这种方式比在遍历过程中直接移除更安全。- 在
@app.websocket("/ws/chat")
终端函数中:- 不再直接调用
websocket.accept()
,而是调用manager.connect(websocket)
,由管理器处理接受和添加连接。 - 在
try
块内部,当接收到消息时,调用manager.broadcast()
将消息发送给 所有 在线客户端(包括发送者自己,你可以在broadcast
方法中稍作修改以排除发送者)。 - 在
except WebSocketDisconnect
块中,调用manager.disconnect(websocket)
从管理器中移除该连接,并通知其他人该用户已离开。
- 不再直接调用
测试:
运行修改后的代码,使用多个 WebSocket 客户端连接到 ws://127.0.0.1:8000/ws/chat
。在一个客户端发送消息,其他所有客户端都会收到该消息。当一个客户端断开连接时,其他客户端会收到“用户离开”的通知。
5. 发送和接收 JSON 数据
WebSocket 连接不仅可以传输文本,还可以传输 JSON 数据。这在前后端需要结构化数据交换时非常有用。FastAPI 的 WebSocket
对象提供了便捷的 send_json()
和 receive_json()
方法。
“`python
main.py (在 ConnectionManager 和 FastAPI app 定义之后)
假设我们需要发送和接收结构化消息,例如包含类型和内容的字典
@app.websocket(“/ws/json”)
async def websocket_json_endpoint(websocket: WebSocket):
“””
处理 JSON 消息的 WebSocket 终端
接收 JSON 消息,处理后发送 JSON 回复
“””
await websocket.accept()
logger.info(f”WebSocket JSON connection accepted: {websocket.client}”)
try:
while True:
# 接收 JSON 消息
data = await websocket.receive_json()
logger.info(f"Received JSON message from {websocket.client}: {data}")
# 验证接收到的数据(简单示例)
if isinstance(data, dict) and 'action' in data and 'payload' in data:
action = data.get('action')
payload = data.get('payload')
# 根据 action 进行处理并发送 JSON 回复
response = {"status": "success", "received_action": action, "processed_payload": str(payload).upper()}
await websocket.send_json(response)
logger.info(f"Sent JSON response to {websocket.client}: {response}")
else:
# 如果数据格式不符合预期
error_response = {"status": "error", "message": "Invalid JSON format"}
await websocket.send_json(error_response)
logger.warning(f"Received invalid JSON from {websocket.client}: {data}")
except WebSocketDisconnect as e:
logger.info(f"Client disconnected from JSON endpoint: {websocket.client}, code: {e.code}, reason: {e.reason}")
except Exception as e:
logger.error(f"WebSocket JSON error for client {websocket.client}: {e}")
finally:
logger.info(f"WebSocket JSON connection closed for {websocket.client}")
在 uvicorn.run 中确保 log_level 为 info 或 debug 才能看到上面的 logger.info 输出
if name == “main“:
uvicorn.run(app, host=”127.0.0.1″, port=8000, log_level=”info”)
“`
代码解释:
await websocket.receive_json()
: 等待接收 JSON 消息,并将其解析为 Python 对象(字典、列表、字符串、数字、布尔值、None)。如果收到的不是有效的 JSON,可能会抛出异常。await websocket.send_json(response)
: 将 Python 对象序列化为 JSON 字符串,并发送给客户端。- 在示例中,我们简单地检查了接收到的 JSON 是否是包含
'action'
和'payload'
键的字典,并根据这些数据构造了一个回复 JSON。
测试:
使用 WebSocket 客户端连接到 ws://127.0.0.1:8000/ws/json
。发送一个 JSON 字符串,例如:
json
{"action": "process", "payload": "hello world"}
你应该会收到一个 JSON 回复,例如:
json
{"status": "success", "received_action": "process", "processed_payload": "HELLO WORLD"}
发送非 JSON 文本或格式错误的 JSON,你会收到一个包含错误信息的 JSON 回复。
6. 从服务器主动发送消息(后台任务)
在聊天室示例中,服务器只在接收到客户端消息后才广播。但很多时候,我们需要服务器根据自己的逻辑(例如定时器、数据库更新、其他服务通知)主动向客户端推送消息。这需要在 FastAPI 应用的后台运行一个异步任务。
我们可以结合 ConnectionManager
和 asyncio.create_task
来实现这个功能。
“`python
main.py (在 ConnectionManager 和 FastAPI app 定义之后)
import asyncio
import datetime # 用于示例中的时间戳发送
假设使用上面定义的 ConnectionManager
添加一个后台任务,定期向所有客户端发送时间戳
async def send_timestamp_periodically():
“””
一个后台任务,每隔几秒向所有客户端广播当前时间戳
“””
while True:
# 每隔 5 秒发送一次
await asyncio.sleep(5)
timestamp = datetime.datetime.now().strftime(“%Y-%m-%d %H:%M:%S”)
message = f”Server timestamp: {timestamp}”
# 使用 ConnectionManager 广播消息
# 注意:这个任务需要访问到 manager 实例
# 我们可以让任务在 FastAPI 启动时创建,并将 manager 传递进去
# 或者通过其他方式(如全局变量或依赖注入)访问 manager
# 简单示例使用全局变量
if manager: # 确保 manager 已经被实例化
await manager.broadcast(message)
logger.info(f"Broadcasted timestamp: {timestamp}")
@app.on_event(“startup”)
async def startup_event():
“””
在 FastAPI 应用启动时创建后台任务
“””
logger.info(“Starting FastAPI application…”)
# 创建后台任务
asyncio.create_task(send_timestamp_periodically())
logger.info(“Timestamp broadcasting task started.”)
@app.on_event(“shutdown”)
def shutdown_event():
“””
在 FastAPI 应用关闭时执行清理
实际应用中可能需要更复杂的任务取消逻辑
“””
logger.info(“Shutting down FastAPI application…”)
# 在更复杂的应用中,你可能需要取消之前创建的 asyncio task
# 但对于这个简单的示例,Python 进程退出时会自动清理
确保 /ws/chat endpoint 仍然存在并使用 manager
@app.websocket(“/ws/chat”)
async def websocket_chat_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
await manager.broadcast(f”用户 {websocket.client.host}:{websocket.client.port} 进入聊天室”)
while True:
data = await websocket.receive_text()
logger.info(f”Received message from {websocket.client}: {data}”)
await manager.broadcast(f”来自 {websocket.client.host}:{websocket.client.port} 的消息: {data}”)
except WebSocketDisconnect as e:
manager.disconnect(websocket)
logger.info(f”Client disconnected: {websocket.client}, code: {e.code}, reason: {e.reason}”)
await manager.broadcast(f”用户 {websocket.client.host}:{websocket.client.port} 离开了聊天室”)
except Exception as e:
logger.error(f”WebSocket error for client {websocket.client}: {e}”)
manager.disconnect(websocket)
await manager.broadcast(f”用户 {websocket.client.host}:{websocket.client.port} 因异常离开了聊天室”)
全局 manager 实例(确保在使用前已实例化)
manager = ConnectionManager()
if name == “main“:
uvicorn.run(app, host=”127.0.0.1″, port=8000, log_level=”info”)
“`
代码解释:
send_timestamp_periodically()
: 这是一个独立的异步函数,包含一个无限循环。在循环中,它等待 5 秒 (await asyncio.sleep(5)
),然后获取当前时间戳,构建消息。manager.broadcast(message)
: 在任务中,通过访问全局的manager
实例,调用broadcast
方法将消息发送给所有连接的客户端。@app.on_event("startup")
: FastAPI 提供的一个钩子函数,在应用启动时执行。asyncio.create_task(send_timestamp_periodically())
: 在启动事件中,使用asyncio.create_task()
创建并启动send_timestamp_periodically
任务。这个任务会在事件循环中独立运行,不会阻塞 FastAPI 的主线程。
测试:
运行代码,连接到 ws://127.0.0.1:8000/ws/chat
。除了聊天消息外,你还会每隔 5 秒收到服务器推送的当前时间戳消息。
7. 客户端实现
为了测试上面的服务器代码,我们需要一个 WebSocket 客户端。一个简单的 HTML/JavaScript 页面就可以实现。
创建一个 index.html
文件:
“`html
FastAPI WebSocket Chat Example
“`
使用方法:
- 确保 FastAPI 应用正在运行 (
uvicorn main:app --reload
). - 在浏览器中打开
index.html
文件(可以直接双击打开,或者通过简单的 HTTP 服务器打开)。 - 你应该看到“System: Connected to chat server.”的消息。
- 在输入框中输入消息,点击“Send”或按回车键。
- 消息会被发送到 FastAPI 服务器,然后服务器会广播给所有连接的客户端(包括你自己的浏览器标签页)。
这个客户端示例展示了 WebSocket JavaScript API 的基本用法 (onopen
, onmessage
, onerror
, onclose
, send
).
8. 进一步考虑和进阶话题
- 认证和授权: 如何知道连接的客户端是谁?WebSocket 握手是基于 HTTP 的,你可以在 HTTP 握手阶段使用 FastAPI 的依赖注入 (
Depends
) 来验证用户身份(例如检查 Header 中的 Token 或 Cookie)。一旦握手成功并接受连接,你可以在websocket
对象上存储用户的身份信息,以便在后续消息处理中使用。 - 连接唯一标识: 在
ConnectionManager
中使用WebSocket
对象本身作为标识符是可行的,但有时候需要更稳定的、与业务逻辑相关的标识(如用户 ID)。你可以将active_connections
存储为{user_id: WebSocket}
的字典,或者存储包含 WebSocket 对象和用户 ID 的自定义对象列表。 - Pydantic 模型: 虽然
receive_json()
会将 JSON 解析为 Python 原生类型,但它不会自动验证或解析到 Pydantic 模型。如果你需要严格的数据结构,可以在receive_json()
后手动使用 Pydantic 模型进行解析和验证。 - 扩展性: 简单的列表管理连接在单进程单线程或多进程单线程模式下工作良好。但如果使用多进程模式 (
uvicorn --workers N
),每个工作进程都会有自己的ConnectionManager
实例,连接状态无法共享。这需要更高级的解决方案,例如使用 Redis Pub/Sub 或其他消息队列来协调不同进程间的 WebSocket 消息广播。 - 心跳机制: 为了检测客户端是否意外断开(例如网络突然中断),可以实现心跳机制。客户端或服务器定期发送一个小的“心跳”消息。如果一方在预定时间内没有收到对方的心跳,就认为连接已死并主动关闭它。
- 错误处理和重连: 客户端和服务器都应该健壮地处理错误和断开连接。客户端应该有重连逻辑,服务器应该能优雅地清理死连接。
- 二进制数据: 对于图片、音频等二进制数据,使用
send_bytes()
和receive_bytes()
。 - 子协议 (Subprotocols): WebSocket 允许定义子协议来规范消息格式和行为。可以通过
websocket.accept(subprotocol='myprotocol')
来指定使用的子协议。 - 部署: 部署包含 WebSocket 的 FastAPI 应用时,通常需要一个支持 WebSocket 协议升级的反向代理服务器(如 Nginx、Traefik)。反向代理负责处理初始的 HTTP 握手并将连接切换到 WebSocket 模式,然后将流量转发给后端的 ASGI 服务器。
9. 总结
FastAPI 凭借其异步能力和基于 Starlette 的 WebSocket 支持,使得构建高性能实时应用变得相对简单。通过 WebSocket
对象提供的方法,我们可以轻松地进行连接接受、消息发送、消息接收和断开处理。结合 ConnectionManager
模式,可以有效地管理多个客户端连接,并实现消息广播和点对点发送。通过 asyncio.create_task
和 FastAPI 的事件钩子,可以方便地集成后台任务,实现服务器主动推送数据。
虽然本文涵盖了 FastAPI WebSocket 开发的核心实战内容,但构建生产级别的实时应用还需要考虑认证、扩展性、错误处理、心跳、部署等更多方面。希望本文能为你奠定坚实的基础,让你能够 confidently 地在 FastAPI 中开启你的 WebSocket 开发之旅!