Python FastAPI WebSocket 编程终极指南
在现代 Web 开发中,实时通信已成为不可或缺的一部分。从在线聊天、实时通知、股票行情更新到协同编辑工具,用户期望获得即时、动态的交互体验。HTTP 协议的请求-响应模式在这种场景下显得力不从心,而 WebSocket 则以其持久、双向的通信能力,成为了实现这一切的黄金标准。
FastAPI,作为近年来 Python Web 框架的翘楚,凭借其卓越的性能、现代化的异步支持以及建立在 Starlette 和 Pydantic 之上的强大功能,为 WebSocket 编程提供了无与伦比的简洁与高效。
本指南将带你从 WebSocket 的基础概念出发,深入探索 FastAPI 中 WebSocket 的使用方法,涵盖从简单的“Hello, World”到构建复杂的多人聊天室,再到身份验证、连接管理和生产环境部署等高级主题。无论你是初学者还是有经验的开发者,这篇终极指南都将为你提供全面而深刻的见解。
目录
- WebSocket 基础知识
- WebSocket 是什么?为什么需要它?
- 与 HTTP 的区别
- 工作流程:握手与数据传输
- 环境准备与第一个 WebSocket 连接
- 安装必要的库
- 创建你的第一个 WebSocket 端点
- 编写一个简单的 HTML/JavaScript 客户端进行测试
- 深入数据交换
- 发送和接收文本数据
- 发送和接收 JSON 数据 (结合 Pydantic)
- 处理二进制数据
- 优雅地处理连接断开
- 核心模式:连接管理与消息广播
- 构建一个
ConnectionManager
- 实现单用户消息发送
- 实现向所有用户广播消息
- 实战:构建一个多人在线聊天室
- 构建一个
- 高级主题与最佳实践
- WebSocket 中的身份验证与授权
- 处理连接状态与关闭代码
- 依赖注入 (Dependencies) 的妙用
- 跨进程/服务器扩展 (Redis Pub/Sub)
- 客户端实现考量
- 更健壮的 JavaScript 客户端
- 使用 Python 客户端进行测试或后端通信
- 总结
1. WebSocket 基础知识
WebSocket 是什么?为什么需要它?
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。与传统的 HTTP 不同,WebSocket 允许服务器主动向客户端推送信息,也允许客户端随时向服务器发送信息,而无需每次都发起新的请求。
为什么需要它?
- 低延迟:一旦连接建立,数据帧可以直接在客户端和服务器之间传输,减少了 HTTP 请求中包含的头部开销。
- 实时性:服务器可以立即将更新推送给客户端,非常适合需要实时数据的应用,如在线游戏、金融交易平台和监控仪表盘。
- 双向通信:客户端和服务器处于平等地位,都可以是通信的发起方。
- 状态保持:连接是持久的,这使得服务器可以轻松地跟踪每个连接的状态。
与 HTTP 的区别
特性 | HTTP | WebSocket |
---|---|---|
连接模型 | 无状态,请求-响应 | 有状态,持久连接 |
通信方向 | 单向(客户端发起) | 双向(全双工) |
延迟 | 较高(每次请求都有头部开销) | 极低(连接建立后开销小) |
协议头 | http:// 或 https:// |
ws:// 或 wss:// |
适用场景 | 传统的网页浏览、API 调用 | 实时聊天、通知、数据流 |
工作流程:握手与数据传输
WebSocket 连接的生命周期始于一个 HTTP 请求,这个请求被称为“握手”(Handshake)。客户端发送一个特殊的 HTTP GET 请求,其中包含 Upgrade: websocket
和 Connection: Upgrade
等头部信息。
如果服务器支持 WebSocket,它会返回一个 101 Switching Protocols
的响应。此后,底层的 TCP 连接就不再用于 HTTP 通信,而是转变为 WebSocket 数据通道,双方可以通过这个通道自由地发送数据帧。
2. 环境准备与第一个 WebSocket 连接
现在,让我们卷起袖子,用 FastAPI 编写第一个 WebSocket 应用。
安装必要的库
你需要安装 FastAPI、一个 ASGI 服务器(如 Uvicorn),以及 python-websockets
库(FastAPI 在底层使用它来处理 WebSocket)。
bash
pip install fastapi "uvicorn[standard]" websockets
创建你的第一个 WebSocket 端点
创建一个名为 main.py
的文件,并写入以下代码:
“`python
main.py
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
app = FastAPI()
html = “””
WebSocket Chat
“””
@app.get(“/”)
async def get():
return HTMLResponse(html)
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 接受客户端连接
await websocket.accept()
try:
while True:
# 等待接收客户端发来的消息
data = await websocket.receive_text()
# 向客户端发送消息
await websocket.send_text(f”Message text was: {data}”)
except Exception as e:
print(f”Error: {e}”)
finally:
print(“Client disconnected”)
“`
代码解析:
@app.websocket("/ws")
:这是一个装饰器,它告诉 FastAPI/ws
路径是一个 WebSocket 端点,而不是一个普通的 HTTP 端点。async def websocket_endpoint(websocket: WebSocket)
:端点函数必须是异步的 (async def
)。它接收一个WebSocket
类的实例作为参数,这个对象包含了处理 WebSocket 连接的所有方法。await websocket.accept()
:这是建立连接的关键一步。在调用此方法之前,你无法与客户端进行数据交换。你可以在accept()
之前执行身份验证等逻辑。while True:
:我们使用一个无限循环来持续监听来自客户端的消息。data = await websocket.receive_text()
:异步等待并接收一条文本消息。await websocket.send_text(...)
:异步向客户端发送一条文本消息。try...finally
:这个结构确保了即使在连接异常断开(例如用户关闭浏览器)时,我们也能执行一些清理代码。
编写一个简单的 HTML/JavaScript 客户端进行测试
为了方便,上面的代码已经包含了一个简单的 HTML 页面。这个页面通过根路径 /
提供服务。
运行应用:
在终端中运行以下命令:
bash
uvicorn main:app --reload
现在,在浏览器中打开 http://localhost:8000
。你会看到一个简单的聊天界面。在输入框中输入任何内容并点击 “Send”,服务器会接收到你的消息,并返回一条格式化后的消息,显示在下方的列表中。你已经成功建立了第一个 FastAPI WebSocket 连接!
3. 深入数据交换
FastAPI 的 WebSocket
对象提供了处理不同数据类型的便捷方法。
发送和接收文本数据
这正是我们在第一个例子中做的:
* await websocket.receive_text()
* await websocket.send_text("some string")
发送和接收 JSON 数据 (结合 Pydantic)
在实际应用中,我们通常交换结构化的 JSON 数据。FastAPI 让这个过程变得异常简单。
await websocket.receive_json()
: 接收 JSON 数据并将其自动解析为 Python 字典或列表。await websocket.send_json({"key": "value"})
: 将 Python 字典或列表序列化为 JSON 字符串并发送。
你可以结合 Pydantic 模型来验证接收到的数据,这体现了 FastAPI 的核心优势。
“`python
from pydantic import BaseModel
from fastapi import WebSocket, WebSocketDisconnect
class Message(BaseModel):
user: str
text: str
… 在你的 WebSocket 端点中 …
@app.websocket(“/ws_json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
try:
# 使用 Pydantic 模型进行验证
message = Message(**data)
response = {“user”: message.user, “text”: f”Echo: {message.text}”}
await websocket.send_json(response)
except Exception:
# 如果数据格式不正确
await websocket.send_json({“error”: “Invalid data format”})
except WebSocketDisconnect:
print(“Client disconnected”)
“`
处理二进制数据
如果你需要传输图片、音频或其他二进制文件,可以使用 bytes
方法:
await websocket.receive_bytes()
await websocket.send_bytes(b"some binary data")
优雅地处理连接断开
当客户端断开连接时,receive_*
方法会抛出一个 WebSocketDisconnect
异常。捕获这个异常是进行清理工作的最佳时机,比如将用户从活动连接列表中移除。
“`python
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f”Client #{client_id} says: {data}”)
except WebSocketDisconnect:
print(f”Client #{client_id} disconnected”)
# 在这里执行清理操作
“`
4. 核心模式:连接管理与消息广播
在真实世界的应用中,一个 WebSocket 端点通常需要服务多个客户端,并且需要能够将消息从一个客户端广播到其他所有(或部分)客户端。要实现这一点,我们需要一个集中的连接管理器。
构建一个 ConnectionManager
让我们创建一个单例类来管理所有活动的 WebSocket 连接。
“`python
main.py (扩展)
from typing import List
class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
“`
这个 ConnectionManager
类提供了以下功能:
* connect
: 接受新连接并将其存储在列表中。
* disconnect
: 从列表中移除断开的连接。
* send_personal_message
: 向单个特定的 WebSocket 连接发送消息。
* broadcast
: 向所有活动的连接广播消息。
我们创建了一个全局实例 manager
,以便在整个应用中共享。
实战:构建一个多人在线聊天室
现在,让我们使用 ConnectionManager
来构建一个功能齐全的聊天室。
“`python
main.py (完整聊天室示例)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import List
app = FastAPI()
HTML 页面部分省略,与第一个示例相同,但 WebSocket URL 需要更新
html = “””… (同上, 但 WebSocket URL 改为 ws://localhost:8000/ws/CLIENT_ID) …”””
class ConnectionManager:
# … (同上) …
manager = ConnectionManager()
@app.get(“/”)
async def get():
# 简单的 HTML 响应,实际应用中会更复杂
return HTMLResponse(“
Visit /chat/{client_id} with a unique client_id
“)
@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
await manager.broadcast(f”Client #{client_id} has entered the chat”)
try:
while True:
data = await websocket.receive_text()
# 向发送者发送回执
# await manager.send_personal_message(f”You wrote: {data}”, websocket)
# 向所有其他人广播消息
await manager.broadcast(f”Client #{client_id} says: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f”Client #{client_id} has left the chat”)
“`
运行与测试:
- 运行
uvicorn main:app --reload
。 - 在两个或多个不同的浏览器标签页中,分别打开
http://localhost:8000/ws/1
、http://localhost:8000/ws/2
等(你需要一个能与 WebSocket 交互的客户端,或修改 HTML 让用户输入 client_id)。 - 在一个标签页中发送消息,你会看到所有其他标签页都会收到这条消息。当一个标签页关闭时,其他页面会收到它离开的通知。
这个模式是构建几乎所有实时应用的基础。
5. 高级主题与最佳实践
WebSocket 中的身份验证与授权
保护 WebSocket 端点至关重要。由于 WebSocket 握手是基于 HTTP 的,我们可以重用 FastAPI 强大的依赖注入系统。
最常见的方法是通过查询参数传递令牌(token)。
“`python
from fastapi import Depends, Query, HTTPException, status
假设你已经有一个用于验证 token 的函数
async def get_current_user(token: str = Query(…)):
user = fake_users_db.get(token) # 伪代码:从数据库验证 token
if not user:
# 注意:在 WebSocket 中,我们不能直接返回 HTTP 错误
# 抛出异常或直接关闭连接是更好的选择
return None
return user
@app.websocket(“/ws_secure/{client_id}”)
async def websocket_secure_endpoint(
websocket: WebSocket,
client_id: int,
token: str = Query(…)
):
# 简单的 token 验证
if token != “secret-token”:
# 关闭连接并提供原因
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return
await manager.connect(websocket)
# ... (后续逻辑) ...
“`
更优雅的方式是使用 Depends
,但这在 WebSocket 中有一点不同。依赖项不能直接返回一个响应。如果验证失败,它应该抛出 HTTPException
(Starlette 会将其转换为 WebSocket 关闭帧),或者依赖项本身可以调用 websocket.close()
。
“`python
async def get_token(websocket: WebSocket, token: str = Query(None)):
if token is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return token
@app.websocket(“/ws_secure_depends”)
async def websocket_endpoint_depends(
websocket: WebSocket,
token: str = Depends(get_token)
):
# 此时 token 已经被验证
await websocket.accept(headers=[(b’X-Token’, token.encode())])
# …
“`
处理连接状态与关闭代码
WebSocket
对象有一个 client_state
和 application_state
属性,可以查询其状态(CONNECTING
, CONNECTED
, DISCONNECTED
)。
当你主动关闭连接时,可以使用 await websocket.close(code=1000, reason="Normal closure")
。WebSocket 协议定义了一系列的关闭代码,使用它们可以向客户端传达更明确的关闭原因。
代码 | 名称 | 描述 |
---|---|---|
1000 |
NORMAL_CLOSURE |
正常关闭 |
1001 |
GOING_AWAY |
端点正在离开(例如服务器关闭) |
1002 |
PROTOCOL_ERROR |
协议错误 |
1008 |
POLICY_VIOLATION |
收到违反策略的消息 |
FastAPI 在 fastapi.status
模块中提供了这些代码的常量,例如 status.WS_1000_NORMAL_CLOSURE
。
跨进程/服务器扩展 (Redis Pub/Sub)
我们之前实现的 ConnectionManager
存在一个巨大限制:它只在单个 Python 进程中有效。当你使用 Gunicorn 或 Uvicorn 运行多个工作进程,或者将服务部署到多台服务器时,每个进程/服务器都有自己独立的连接列表。一个进程中的用户无法向另一个进程中的用户广播消息。
解决方案是使用一个外部消息代理(Message Broker),如 Redis、RabbitMQ 或 Kafka。Redis 的 Pub/Sub(发布/订阅)功能非常适合这个场景。
架构思路:
- 连接:每个 FastAPI 实例正常接受 WebSocket 连接,并将连接对象保存在其本地的
ConnectionManager
中。 - 发布:当一个实例(例如,实例A)收到来自客户端的消息需要广播时,它不再直接遍历本地连接列表。相反,它将这条消息发布(Publish)到 Redis 的一个特定频道(Channel),例如 “chat_channel”。
- 订阅:所有 FastAPI 实例在启动时都会订阅(Subscribe)”chat_channel”。
- 接收与广播:所有实例(包括实例A自身)都会从 Redis 收到这条消息。收到消息后,每个实例会遍历其本地的
ConnectionManager
中的连接,并将消息发送给它所管理的所有客户端。
这样,通过 Redis 作为中间人,就实现了跨进程、跨服务器的消息广播。实现这个需要一个异步的 Redis 客户端库,如 redis-py
(asyncio extra) 或 aioredis
。
6. 客户端实现考量
更健壮的 JavaScript 客户端
一个生产级的客户端需要处理更多情况:
* 连接生命周期事件:onopen
, onmessage
, onclose
, onerror
。
* 自动重连:当连接因网络问题意外关闭(onclose
事件),客户端应尝试在一段时间后自动重新连接。
* 心跳检测:为了防止代理或防火墙因长时间不活动而关闭连接,客户端和服务器可以约定定期发送“心跳”消息(ping/pong)。
“`javascript
function connect() {
const ws = new WebSocket(“ws://localhost:8000/ws/my-id”);
ws.onopen = function(event) {
console.log("WebSocket connection established.");
// 可以发送一条初始化消息
ws.send(JSON.stringify({type: "auth", token: "my-secret-token"}));
};
ws.onmessage = function(event) {
const data = JSON.parse(event.data);
console.log("Received message:", data);
// 更新 UI
};
ws.onclose = function(event) {
console.log("WebSocket connection closed. Code:", event.code, "Reason:", event.reason);
// 尝试 5 秒后重连
setTimeout(connect, 5000);
};
ws.onerror = function(error) {
console.error("WebSocket error:", error);
ws.close(); // 发生错误时确保关闭
};
}
connect(); // 初始连接
“`
使用 Python 客户端进行测试或后端通信
websockets
库不仅是 FastAPI 的后端依赖,它本身也是一个强大的客户端库。这对于编写自动化测试或实现服务间的 WebSocket 通信非常有用。
“`python
import asyncio
import websockets
async def test_chat():
uri = “ws://localhost:8000/ws/python-client”
async with websockets.connect(uri) as websocket:
await websocket.send(“Hello from Python client!”)
# 监听服务器消息
while True:
try:
message = await websocket.recv()
print(f"< Received: {message}")
except websockets.ConnectionClosed:
print("Connection closed.")
break
if name == “main“:
asyncio.run(test_chat())
“`
7. 总结
FastAPI 为 Python 世界的 WebSocket 编程带来了前所未有的简洁与高效。通过本指南,我们学习了:
- 基础:如何使用
@app.websocket
装饰器创建端点,并使用accept
,receive_*
,send_*
方法进行基本通信。 - 核心模式:
ConnectionManager
是管理多个连接并实现消息广播的关键设计模式。 - 高级应用:我们探讨了如何利用 FastAPI 的依赖注入系统实现身份验证,如何优雅地处理连接关闭,以及最重要的——如何通过 Redis 等消息代理来扩展 WebSocket 应用以支持多进程/多服务器部署。
- 健壮性:无论是前端还是后端,都需要考虑连接的生命周期管理、错误处理和自动重连机制。
WebSocket 是构建现代、高度互动 Web 应用的基石。FastAPI 以其原生的异步支持和优雅的 API 设计,无疑是 Python 开发者拥抱这一技术的最佳选择。现在,你已经掌握了从基础到高级的完整知识体系,是时候去构建你自己的实时应用了!