深入理解 FastAPI WebSocket:构建实时应用的基石
在现代 Web 应用中,实时性变得越来越重要。传统的 HTTP 请求/响应模式适用于一次性的数据交换,但对于聊天应用、实时数据仪表盘、游戏或协作工具等需要服务器主动推送数据给客户端的场景,效率较低且实现复杂(通常需要轮询)。这时,WebSocket 技术应运而生,它提供了一种在客户端和服务器之间建立持久性、双向通信通道的能力。
FastAPI,作为一款现代、快速(高性能)的 Web 框架,天然支持异步编程,这使得它与 WebSockets 的异步特性完美契合。本篇文章将带你深入了解如何在 FastAPI 中构建 WebSocket 应用程序。
一、什么是 WebSocket?为什么在 FastAPI 中使用它?
WebSocket 是一种网络传输协议,它在客户端和服务器之间提供全双工(即双向)通信通道。一旦连接建立,服务器和客户端都可以随时向对方发送数据,而无需像 HTTP 那样每次通信都建立新的连接。这大大减少了延迟和开销,特别适合需要频繁、低延迟数据交换的场景。
为什么在 FastAPI 中使用 WebSocket?
- 异步原生支持: FastAPI 基于 Starlette 构建,而 Starlette 是为异步框架设计的。WebSocket 操作(如接受连接、接收消息、发送消息)本质上是 I/O 密集型的,异步编程可以让你在等待一个连接的 I/O 操作时,同时处理其他连接或任务,从而提高服务器的并发能力和整体性能。
- 简洁的API: FastAPI 提供了非常直观和易用的 API 来处理 WebSocket 连接和消息。
- 高性能: 结合 Uvicorn、Hypercorn 等 ASGI 服务器,FastAPI 的异步特性使得 WebSocket 应用能够处理大量并发连接。
- 集成性: FastAPI 强大的依赖注入系统、数据验证(Pydantic)、安全性特性等都可以与 WebSocket endpoint 结合使用。
二、准备工作
在开始之前,请确保你已经安装了 Python 3.7+ 和 FastAPI 以及一个 ASGI 服务器(如 Uvicorn)。
bash
pip install fastapi uvicorn websockets
注意:websockets
库是 FastAPI 处理 WebSocket 连接的底层依赖,尽管你直接使用 fastapi
和 uvicorn
即可启动应用,但显式安装 websockets
是一个好习惯。
三、构建一个基本的 WebSocket Endpoint
我们将从一个最简单的例子开始:一个 WebSocket endpoint,它接受连接,然后循环接收来自客户端的消息,并将收到的消息原样发回给客户端(一个简单的 echo 服务器)。
创建一个 Python 文件,例如 main.py
:
“`python
main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
“””
处理 WebSocket 连接的 endpoint
“””
await websocket.accept() # 接受客户端的连接
print(“WebSocket 连接已建立”)
try:
while True:
# 循环接收客户端发送的文本消息
data = await websocket.receive_text()
print(f”收到消息: {data}”)
# 将收到的消息发送回客户端
await websocket.send_text(f”消息内容是: {data}”)
except WebSocketDisconnect:
# 捕获连接断开异常
print(“WebSocket 连接已断开”)
# 可以在这里执行一些清理工作,例如从连接列表中移除该连接
except Exception as e:
# 捕获其他可能的异常
print(f”WebSocket 发生错误: {e}”)
# 可以选择关闭连接
# await websocket.close() # 根据需要是否显式关闭
“`
代码解释:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
: 导入必要的类和异常。app = FastAPI()
: 创建 FastAPI 应用实例。@app.websocket("/ws")
: 这是一个新的 FastAPI 装饰器,用于注册一个 WebSocket endpoint。它指定了客户端连接时使用的路径。async def websocket_endpoint(websocket: WebSocket):
: 定义一个异步函数来处理 WebSocket 连接。FastAPI 会自动将代表当前连接的WebSocket
对象作为参数传递给这个函数。使用类型提示websocket: WebSocket
是推荐的做法。await websocket.accept()
: 这是建立 WebSocket 连接的关键步骤。服务器必须调用accept()
方法来完成握手,客户端才会认为连接成功建立。while True:
: 进入一个无限循环,持续监听来自客户端的消息。data = await websocket.receive_text()
: 异步地等待并接收客户端发送的下一条文本消息。这个方法会阻塞,直到收到消息或连接断开。FastAPI 的WebSocket
对象提供了多种接收方法,如receive_text()
(接收文本)、receive_bytes()
(接收二进制数据)、receive_json()
(接收 JSON 数据并自动解析)。await websocket.send_text(f"消息内容是: {data}")
: 异步地向客户端发送一条文本消息。同样,也有send_bytes()
和send_json()
方法。except WebSocketDisconnect:
: 当客户端正常或非正常断开连接时,receive_text()
(或其他接收方法)会抛出WebSocketDisconnect
异常。我们捕获这个异常,可以在这里记录日志或执行清理操作。except Exception as e:
: 捕获处理过程中可能发生的其他异常,例如消息解析错误等。
四、运行应用程序
使用 Uvicorn 运行这个应用:
bash
uvicorn main:app --reload
这会启动一个服务器,监听默认端口 8000。
五、测试 WebSocket 连接
你不能直接在浏览器地址栏访问 http://127.0.0.1:8000/ws
来测试 WebSocket,因为这是不同的协议。你需要一个 WebSocket 客户端。
以下是几种测试方法:
1. 使用浏览器开发者工具 (Console)
打开一个空白页面,按 F12 打开开发者工具,切换到 Console
标签页。输入以下 JavaScript 代码:
“`javascript
// 创建 WebSocket 连接
var ws = new WebSocket(“ws://127.0.0.1:8000/ws”);
// 连接成功时触发
ws.onopen = function(event) {
console.log(“WebSocket 连接成功”);
// 连接成功后发送消息
ws.send(“Hello FastAPI!”);
};
// 收到消息时触发
ws.onmessage = function(event) {
console.log(“收到服务器消息: “, event.data);
};
// 连接错误时触发
ws.onerror = function(event) {
console.error(“WebSocket 错误: “, event);
};
// 连接关闭时触发
ws.onclose = function(event) {
console.log(“WebSocket 连接已关闭: “, event.code, event.reason);
};
// 手动关闭连接 (可选)
// ws.close();
“`
执行这些代码,你会在控制台看到连接建立的日志,发送的消息,以及收到服务器回显的消息。在运行 Uvicorn 的终端也会看到相应的日志输出。
2. 使用 Python 客户端库
你可以编写一个简单的 Python 脚本作为客户端:
“`python
client.py
import asyncio
import websockets
async def connect_and_send():
uri = “ws://127.0.0.1:8000/ws”
try:
async with websockets.connect(uri) as websocket:
print(“WebSocket 连接成功”)
# 发送消息
message = "你好,服务器!"
await websocket.send(message)
print(f"已发送: {message}")
# 接收回显消息
response = await websocket.recv()
print(f"收到回显: {response}")
# 可以继续发送和接收
message2 = "第二条消息"
await websocket.send(message2)
print(f"已发送: {message2}")
response2 = await websocket.recv()
print(f"收到回显: {response2}")
except websockets.exceptions.ConnectionClosedOK:
print("WebSocket 连接正常关闭")
except websockets.exceptions.ConnectionClosedError as e:
print(f"WebSocket 连接异常关闭: {e}")
except Exception as e:
print(f"发生其他错误: {e}")
if name == “main“:
asyncio.run(connect_and_send())
“`
运行 python client.py
,同样可以看到与服务器的交互过程。
3. 使用命令行工具
例如,可以使用 wscat
(需要 Node.js 环境安装 npm install -g wscat
):
bash
wscat -c ws://127.0.0.1:8000/ws
连接成功后,你可以在终端输入文本,按回车发送,服务器的回显消息也会显示在终端。
六、处理多个客户端连接
上面的例子只能处理一个连接。在实际应用中,服务器需要同时处理多个 WebSocket 连接,并且可能需要实现广播消息(发给所有连接的客户端)或向特定客户端发送消息的功能(私聊)。
为了管理这些连接,我们可以创建一个 ConnectionManager
类来跟踪所有活跃的连接。
修改 main.py
:
“`python
main.py (处理多个客户端)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
class ConnectionManager:
“””
用于管理 WebSocket 连接的类
“””
def init(self):
# 使用列表存储所有活跃的 WebSocket 连接
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
"""
处理新连接的建立
"""
await websocket.accept() # 接受连接
self.active_connections.append(websocket) # 将新连接添加到列表
def disconnect(self, websocket: WebSocket):
"""
处理连接断开
"""
try:
self.active_connections.remove(websocket) # 从列表中移除断开的连接
except ValueError:
# 如果连接不在列表中,说明已经被移除了(可能在广播时因断开移除)
pass
async def send_personal_message(self, message: str, websocket: WebSocket):
"""
向单个客户端发送消息
"""
await websocket.send_text(message)
async def broadcast(self, message: str):
"""
向所有活跃客户端广播消息
"""
# 遍历所有活跃连接并发送消息
# 注意:在遍历时如果连接断开,send_text() 会抛出异常
# 为了安全,我们可以在发送前检查,或捕获异常后移除
disconnected_connections = []
for connection in self.active_connections:
try:
await connection.send_text(message)
except RuntimeError as e:
# 如果在异步操作中遇到连接已关闭的错误
# print(f"发送消息失败,连接可能已关闭: {e}")
disconnected_connections.append(connection)
except Exception as e:
print(f"向连接 {connection} 发送消息时发生错误: {e}")
disconnected_connections.append(connection)
# 移除发送失败的连接
for connection in disconnected_connections:
self.disconnect(connection)
创建一个全局或应用级别的连接管理器实例
manager = ConnectionManager()
@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
“””
处理带有客户端ID的 WebSocket 连接
“””
await manager.connect(websocket) # 使用管理器建立连接
print(f”客户端 {client_id} 连接已建立”)
try:
# 通知所有客户端有新用户加入 (广播)
await manager.broadcast(f”客户端 {client_id} 加入了聊天”)
while True:
# 接收客户端消息
data = await websocket.receive_text()
print(f"收到来自客户端 {client_id} 的消息: {data}")
# 将消息广播给所有其他客户端 (模拟聊天室)
await manager.broadcast(f"客户端 {client_id} 说: {data}")
# 也可以向特定客户端发送消息 (如果知道其连接对象或ID映射)
# await manager.send_personal_message(f"你发送了: {data}", websocket)
except WebSocketDisconnect:
# 处理连接断开
manager.disconnect(websocket) # 使用管理器移除连接
print(f"客户端 {client_id} 连接已断开")
# 通知所有客户端有用户离开 (广播)
await manager.broadcast(f"客户端 {client_id} 离开了聊天")
except Exception as e:
print(f"客户端 {client_id} WebSocket 发生错误: {e}")
# 发生其他错误时也断开连接
manager.disconnect(websocket)
await manager.broadcast(f"客户端 {client_id} 因错误退出")
除了 WebSocket endpoint,你还可以有标准的 HTTP endpoint
@app.get(“/”)
async def get():
return {“message”: “Hello HTTP”}
或者一个简单的 HTML 页面,包含 JavaScript WebSocket 客户端代码,方便测试
from fastapi.responses import HTMLResponse
html = “””
<!DOCTYPE html>
Chat
WebSocket Chat
Client ID:
Message:
var ws = null;
function connectWebSocket() {
var clientId = document.getElementById("clientId").value;
if (ws && ws.readyState === WebSocket.OPEN) {
console.log("Already connected.");
return;
}
var url = "ws://localhost:8000/ws/" + clientId;
ws = new WebSocket(url);
ws.onopen = function(event) {
console.log("WebSocket connection opened.");
};
ws.onmessage = function(event) {
var messages = document.getElementById('messages');
var message = document.createElement('li');
var content = document.createTextNode(event.data);
message.appendChild(content);
messages.appendChild(message);
};
ws.onerror = function(event) {
console.error("WebSocket error observed:", event);
};
ws.onclose = function(event) {
console.log("WebSocket connection closed:", event.code, event.reason);
ws = null; // Reset ws on close
};
}
function sendMessage() {
if (!ws || ws.readyState !== WebSocket.OPEN) {
console.log("WebSocket is not connected.");
return;
}
var input = document.getElementById("messageText");
var message = input.value;
ws.send(message);
input.value = '';
}
“””
@app.get(“/chat”, response_class=HTMLResponse)
async def get_chat_page():
return html
“`
代码解释:
ConnectionManager
类:active_connections: List[WebSocket]
: 存储所有当前活跃的WebSocket
对象。connect(websocket)
: 将新的连接添加到列表。disconnect(websocket)
: 从列表中移除断开的连接。send_personal_message(message, websocket)
: 向指定的单个连接发送消息。broadcast(message)
: 遍历列表,向所有连接发送消息。这里增加了简单的错误处理,如果在发送过程中发现连接已失效,会将其从列表中移除。
manager = ConnectionManager()
: 创建一个ConnectionManager
实例。在实际应用中,如果有多 worker 或分布式部署,需要使用 Redis Pub/Sub 或其他消息队列来实现跨进程/机器的广播。这里简化为单进程内存管理。@app.websocket("/ws/{client_id}")
: WebSocket endpoint 现在可以接收路径参数,例如用于标识不同的客户端或房间。这里我们接收一个client_id
。- 在
websocket_endpoint
函数内部:await manager.connect(websocket)
: 使用管理器接受连接并将其添加到管理列表。manager.disconnect(websocket)
: 在WebSocketDisconnect
异常中,使用管理器移除连接。await manager.broadcast(...)
: 使用管理器的广播方法发送消息给所有连接。
现在你可以启动服务器,并使用多个客户端连接 ws://127.0.0.1:8000/ws/1
, ws://127.0.0.1:8000/ws/2
等。你会发现一个客户端发送的消息会被广播给所有连接的客户端。
七、发送和接收不同类型的数据
除了文本 (send_text
, receive_text
),FastAPI 的 WebSocket
对象还支持:
- 二进制数据: 使用
send_bytes()
和receive_bytes()
。 - JSON 数据: 使用
send_json()
和receive_json()
。这非常方便,FastAPI 会自动处理 JSON 的序列化和反序列化。
示例:发送和接收 JSON
“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket(“/ws_json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
print(“WebSocket JSON 连接已建立”)
try:
while True:
# 接收 JSON 数据
data = await websocket.receive_json()
print(f”收到 JSON 数据: {data}”)
# 构建 JSON 响应
response = {"received": data, "status": "success"}
# 发送 JSON 数据
await websocket.send_json(response)
except WebSocketDisconnect:
print("WebSocket JSON 连接已断开")
except Exception as e:
print(f"WebSocket JSON 发生错误: {e}")
“`
客户端可以使用相应的 JSON 发送/接收方法或手动序列化/反序列化。例如,在 JavaScript 客户端中使用 JSON.stringify()
和 JSON.parse()
。
八、错误处理与连接状态
WebSocketDisconnect
是处理连接正常或异常关闭的主要异常。始终建议在try...except WebSocketDisconnect
块中包含 WebSocket 的接收循环。- 在异步发送消息 (
send_text
,send_json
, etc.) 时,如果连接在发送过程中断开,可能会抛出RuntimeError
或其他与连接状态相关的异常。在处理多个连接并广播时,遍历列表发送消息时需要小心处理这些异常,以避免一个断开的连接导致整个广播失败,并在捕获异常后及时将该连接从管理列表中移除。 - 可以使用
websocket.close()
方法从服务器端主动关闭连接。
九、进阶话题(简述)
- 身份验证与授权: 在 WebSocket 握手阶段或接收消息后,可以像处理 HTTP 请求一样进行用户身份验证。例如,可以在连接建立前检查请求头、Cookie 或 URL 参数中的 token。一旦连接建立,可以基于用户的身份控制其行为(例如,只允许发送给特定房间)。
- 使用 Pydantic 进行数据验证: 结合
receive_json()
,可以利用 Pydantic 模型自动验证收到的 JSON 数据格式。 - 后台任务: 对于需要服务器定期向客户端推送数据的场景(例如实时股票报价),可以使用 FastAPI 的后台任务(
@app.on_event("startup")
启动一个 loop)或asyncio
任务来完成,这些任务可以访问ConnectionManager
并向客户端发送数据。 - 跨进程/机器扩展: 如前所述,简单的内存
ConnectionManager
不适用于多 worker 或分布式部署。需要使用外部消息代理(如 Redis Pub/Sub, RabbitMQ, Kafka)来实现跨多个服务器实例的广播和消息路由。Starlette (FastAPI 的底层) 有一些社区库(如starlette-redis-pubsub
)可以帮助实现这一点。
十、总结
FastAPI 为构建高性能、易于维护的 WebSocket 应用程序提供了强大的支持。通过其简洁的 API 和异步特性,你可以轻松地实现实时数据传输、构建聊天室等功能。理解 WebSocket 的全双工通信模式、如何在 FastAPI 中定义 endpoint、处理连接生命周期(接受、接收、发送、断开)以及如何管理多个连接是构建复杂实时应用的关键。
希望本教程能帮助你迈出使用 FastAPI 构建 WebSocket 应用的第一步!