FastAPI WebSockets:从入门到实战
在现代 Web 应用中,实时通信已经从一种高级特性转变为许多核心功能的需求,例如在线聊天、实时数据仪表盘、多人协作应用、游戏以及即时通知等。传统的 HTTP 请求-响应模式在这种场景下效率低下,因为它需要客户端不断地进行轮询才能获取最新数据。WebSockets 提供了一种更高效、更低延迟的双向通信方式。
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建基于标准 Python 类型提示的 APIs。其内置的对异步编程(asyncio
)的良好支持,使其成为构建高性能 WebSocket 应用的理想选择。
本文将带你从零开始,深入理解 FastAPI 中的 WebSockets,从基本概念到构建一个简单的实时聊天应用,最终探讨一些高级话题和最佳实践。
第一部分:理解 WebSockets
1.1 什么是 WebSockets?
WebSockets 是一种在单个 TCP 连接上进行全双工通信的协议。这意味着服务器和客户端可以随时向对方发送数据,而无需像 HTTP 那样每次都发起新的请求。
核心特点:
- 持久连接: 一旦建立,连接会保持打开状态,直到一方主动关闭或发生错误。
- 全双工: 数据可以同时在客户端和服务器之间流动。
- 低延迟: 避免了 HTTP 的连接建立、请求头发送等开销,数据传输更快。
- 协议升级: WebSocket 连接通常是通过 HTTP/1.1 的 Upgrade 机制从一个标准的 HTTP 连接升级而来的。
1.2 为什么在 FastAPI 中使用 WebSockets?
- 异步支持: FastAPI 基于 Starlette 和 Pydantic,天然支持 Python 的
asyncio
。WebSockets 的操作(连接、发送、接收)本质上是异步的,FastAPI 的设计与之完美契合,可以高效地处理大量并发连接。 - 高性能: 得益于 Starlette 和 Uvicorn(一个 ASGI 服务器),FastAPI 提供了出色的性能,这对于需要处理高并发 WebSocket 连接的应用至关重要。
- 易于使用: FastAPI 提供了简洁的 API 来定义 WebSocket 终端,使用类型提示使得代码易于阅读和维护。
- 集成性: 可以轻松地将 WebSockets 与 FastAPI 的其他功能(如依赖注入、安全性、数据验证等)结合使用。
1.3 WebSockets 与 HTTP 的区别
特性 | HTTP/1.1 | WebSockets |
---|---|---|
通信方式 | 请求-响应(半双工) | 全双工 |
连接 | 通常是短连接(除非使用 Keep-Alive),每次请求可能需要重新建立连接 | 长连接,一旦建立,保持打开直到关闭 |
开销 | 每次请求都有头部开销 | 初始连接建立开销,之后数据帧开销小 |
数据传输 | 客户端发起请求,服务器响应 | 客户端和服务器都可以随时发起数据传输 |
用途 | 无状态请求、资源获取、API 调用 | 实时数据传输、通知、聊天、多人应用等 |
状态 | 无状态(或使用 Session/Cookies 模拟状态) | 有状态(连接本身代表一个状态) |
对于需要实时、低延迟双向通信的场景,WebSockets 是更优的选择。
第二部分:FastAPI WebSocket 入门
2.1 安装必要的库
你需要安装 FastAPI、一个 ASGI 服务器(如 uvicorn)以及 websockets
库(FastAPI 内部使用它来处理 WebSocket 连接)。
bash
pip install fastapi uvicorn websockets
2.2 创建一个基本的 WebSocket 终端
创建一个 Python 文件(例如 main.py
),编写以下代码:
“`python
main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
定义一个 WebSocket 终端
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 接受客户端的连接
await websocket.accept()
print(“WebSocket connection accepted.”)
try:
# 循环接收客户端发送的消息
while True:
# 接收文本消息
data = await websocket.receive_text()
print(f"Received message: {data}")
# 向客户端发送一个回复
await websocket.send_text(f"Message text was: {data}")
except WebSocketDisconnect as e:
# 处理 WebSocket 断开连接事件
print(f"WebSocket connection closed: {e.code}, {e.reason}")
except Exception as e:
# 处理其他可能的错误
print(f"An error occurred: {e}")
finally:
# 可选:连接断开后执行清理工作
print("WebSocket connection terminated.")
“`
代码解释:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
: 导入 FastAPI 应用类、WebSocket 类型以及 WebSocketDisconnect 异常。app = FastAPI()
: 创建一个 FastAPI 应用实例。@app.websocket("/ws")
: 这是一个装饰器,将下面的异步函数websocket_endpoint
注册为处理/ws
路径的 WebSocket 连接。async def websocket_endpoint(websocket: WebSocket):
: 定义一个异步函数来处理 WebSocket 连接。FastAPI 会将代表当前连接的WebSocket
对象作为参数传递给它。await websocket.accept()
: 这是一个关键步骤。当客户端尝试连接时,服务器需要通过调用accept()
方法来接受连接。这标志着 WebSocket 握手完成,连接正式建立。while True:
: 进入一个无限循环,以便持续接收客户端发送的消息。data = await websocket.receive_text()
: 异步地等待并接收客户端发送的文本消息。如果接收到的是其他类型的帧(如二进制),或者连接关闭,此方法可能会抛出异常。await websocket.send_text(f"Message text was: {data}")
: 异步地向当前客户端发送一个文本消息。except WebSocketDisconnect as e:
: 这是一个重要的异常处理块。当客户端正常或异常断开连接时,receive_text()
(或其他receive_*
方法)会抛出WebSocketDisconnect
异常。捕获此异常可以让你优雅地处理连接关闭事件。异常对象e
包含关闭的代码和原因。except Exception as e:
: 捕获其他潜在的错误,例如发送/接收数据时发生的网络错误等。finally:
: 无论是否发生异常,这个块的代码都会执行,可以用来执行一些清理操作。
2.3 运行服务器
打开终端,进入 main.py
所在的目录,运行 Uvicorn 服务器:
bash
uvicorn main:app --reload
Uvicorn 会启动服务器,默认监听 http://127.0.0.1:8000
。
2.4 创建一个简单的 WebSocket 客户端(HTML/JavaScript)
为了测试服务器,你需要一个 WebSocket 客户端。最简单的方法是使用浏览器中的 JavaScript。创建一个 HTML 文件(例如 client.html
):
“`html
FastAPI WebSocket Test
Messages:
“`
代码解释:
new WebSocket("ws://127.0.0.1:8000/ws")
: 创建一个 WebSocket 对象,尝试连接到指定的 URL。ws://
是 WebSocket 的标准协议头,对应 HTTP 的http://
。ws.onopen
: 连接成功建立时执行的函数。ws.onmessage
: 收到服务器消息时执行的函数。event.data
包含收到的数据。ws.onerror
: 连接发生错误时执行的函数。ws.onclose
: 连接关闭时执行的函数。event.wasClean
表示是否为正常关闭,event.code
和event.reason
提供关闭信息。ws.send(message)
: 向服务器发送数据。window.onbeforeunload
: 在页面关闭或刷新前尝试关闭 WebSocket 连接,这是一种良好的实践。
2.5 测试连接
- 确保 Uvicorn 服务器正在运行。
- 用浏览器打开
client.html
文件。 - 打开浏览器的开发者工具(通常按 F12),查看控制台输出。
- 在页面输入框中输入消息,点击“Send”。
- 观察浏览器控制台、页面消息区域以及服务器终端的输出。你会看到消息被发送、接收并回复的过程。
恭喜!你已经成功建立并测试了一个基本的 FastAPI WebSocket 连接。
第三部分:处理多种消息类型和多个连接
前面的例子只能处理文本消息,并且只能与单个客户端通信。在实际应用中,你需要处理 JSON 数据并管理多个并发连接。
3.1 发送和接收 JSON 数据
在实时应用中,数据通常是结构化的,JSON 是常用的格式。FastAPI 的 WebSocket
对象提供了方便的方法来处理 JSON。
“`python
main.py (修改部分)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Any
app = FastAPI()
@app.websocket(“/ws/json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
print(“JSON WebSocket connection accepted.”)
try:
while True:
# 接收 JSON 消息
data: Dict[Any, Any] = await websocket.receive_json()
print(f"Received JSON message: {data}")
# 验证接收到的数据(可选,但推荐)
if isinstance(data, dict) and "message" in data:
message_text = data["message"]
# 向客户端发送 JSON 回复
await websocket.send_json({"status": "success", "received_message": message_text})
else:
await websocket.send_json({"status": "error", "message": "Invalid JSON format, expected {'message': '...'}"})
except WebSocketDisconnect as e:
print(f"JSON WebSocket connection closed: {e.code}, {e.reason}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
print("JSON WebSocket connection terminated.")
“`
客户端 (HTML/JavaScript 修改):
你需要修改客户端来发送和接收 JSON。
“`html
FastAPI WebSocket JSON Test
Messages:
“`
要点:
- 服务器端使用
await websocket.receive_json()
接收 JSON,使用await websocket.send_json(data)
发送 JSON。FastAPI/Starlette 会自动处理 JSON 的解析和序列化。 - 客户端 JavaScript 需要手动使用
JSON.stringify()
将 JavaScript 对象转换为 JSON 字符串发送,并使用JSON.parse()
将收到的 JSON 字符串解析回 JavaScript 对象。
3.2 管理多个连接和广播消息
一个 WebSocket 服务器通常需要同时处理来自多个客户端的连接,并可能需要向所有(或部分)客户端广播消息。你需要一种机制来跟踪所有活跃的连接。
一个简单的方法是使用一个列表来存储所有的 WebSocket
对象。为了更好地组织代码,可以创建一个 ConnectionManager
类。
“`python
main.py (再次修改)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
ConnectionManager 类用于管理所有活跃的 WebSocket 连接
class ConnectionManager:
def init(self):
# 存储所有活跃的 WebSocket 连接
self.active_connections: List[WebSocket] = []
# 添加新的连接
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
print(f"New connection added. Total active connections: {len(self.active_connections)}")
# 移除断开的连接
def disconnect(self, websocket: WebSocket):
try:
self.active_connections.remove(websocket)
print(f"Connection removed. Total active connections: {len(self.active_connections)}")
except ValueError:
# 如果连接不在列表中,说明可能已经被移除(例如在另一个地方处理了断开)
print("Attempted to remove a connection that was not in the active list.")
# 向所有活跃连接广播文本消息
async def broadcast(self, message: str):
# 注意:如果在循环发送消息时某个连接断开,可能会抛出异常。
# 更健壮的做法是捕获异常并在异常处理中移除连接。
# 这里的简单实现假设发送过程中不会频繁断开。
# 稍后在聊天室例子中会展示更健壮的处理方式。
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception as e:
# 如果发送失败,说明连接有问题,移除它
print(f"Error sending to a connection: {e}. Removing connection.")
# 这里的移除是同步的,如果在 async for 循环中修改列表,可能会有问题
# 更好的方法是收集要移除的连接列表,然后在循环外部移除
# 为了简单,这里暂时不处理并发修改列表的问题,后续例子会改进
pass # 在简单广播中先忽略错误,或在更复杂场景下处理
# 向所有活跃连接广播 JSON 消息
async def broadcast_json(self, data: dict):
for connection in self.active_connections:
try:
await connection.send_json(data)
except Exception as e:
print(f"Error sending JSON to a connection: {e}. Removing connection.")
pass # 同上,简单处理
创建一个全局的连接管理器实例
manager = ConnectionManager()
修改 /ws 终端使用连接管理器
@app.websocket(“/ws/broadcast”)
async def websocket_broadcast_endpoint(websocket: WebSocket):
# 使用管理器来接受并存储连接
await manager.connect(websocket)
try:
while True:
# 接收客户端发送的消息
data = await websocket.receive_text()
# 将收到的消息广播给所有连接
await manager.broadcast(f"Client says: {data}")
except WebSocketDisconnect as e:
# 连接断开时,从管理器中移除
manager.disconnect(websocket)
# 可选:广播一条消息通知大家有人离开了
await manager.broadcast(f"A client left the chat. Code: {e.code}")
except Exception as e:
print(f"An error occurred in broadcast endpoint: {e}")
# 如果是发送错误导致断开,尝试移除连接
try:
manager.disconnect(websocket)
except:
pass # 忽略移除失败
await manager.broadcast(f"A client encountered an error and disconnected.")
“`
代码解释:
ConnectionManager
类:active_connections
: 一个列表,用来保存所有连接到服务器的WebSocket
对象。connect(websocket)
: 接受一个新的 WebSocket 连接,并将其添加到active_connections
列表中。disconnect(websocket)
: 从active_connections
列表中移除一个断开的连接。broadcast(message)
/broadcast_json(data)
: 遍历active_connections
列表,并向列表中的每个WebSocket
对象发送消息。
manager = ConnectionManager()
: 在应用启动时创建一个ConnectionManager
的单例实例。- 在
@app.websocket("/ws/broadcast")
终端函数中:- 连接建立时,调用
await manager.connect(websocket)
。 - 在消息接收循环中,接收到消息后,不再是回复给发送者,而是调用
await manager.broadcast(...)
将消息发送给所有连接。 - 在
WebSocketDisconnect
异常块中,调用manager.disconnect(websocket)
移除断开的连接,并可选地广播一条通知消息。
- 连接建立时,调用
客户端 (HTML/JavaScript 修改):
你需要修改客户端来连接新的终端,并且不再期望收到发送消息后的直接回复,而是接收来自服务器的广播消息。
“`html
FastAPI WebSocket Broadcast Test
Messages:
“`
测试广播:
- 确保服务器运行新的代码 (
uvicorn main:app --reload
)。 - 打开两个或多个浏览器窗口,都打开
client_broadcast.html
。 - 在一个窗口输入消息并发送。
- 观察所有打开的窗口,它们都应该收到并显示这条消息(由服务器广播)。
这个例子展示了如何使用一个简单的 ConnectionManager
来管理多个连接并实现广播功能。
第四部分:构建一个简单的聊天室应用
现在,我们将把前面学到的概念结合起来,构建一个更实用的简单聊天室应用。每个用户进入聊天室时指定一个用户名,发送的消息会带上用户名广播给其他用户。
“`python
main.py (聊天室应用)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse
from typing import List, Dict
import uuid # 用于生成唯一的客户端ID
app = FastAPI()
稍微改进的 ConnectionManager,存储客户端ID和对应的 WebSocket 对象
class ConnectionManager:
def init(self):
# 使用字典存储连接,key为客户端ID,value为 WebSocket 对象
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, client_id: str, websocket: WebSocket):
await websocket.accept()
self.active_connections[client_id] = websocket
print(f"Client {client_id} connected. Total: {len(self.active_connections)}")
def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
print(f"Client {client_id} disconnected. Total: {len(self.active_connections)}")
else:
print(f"Attempted to disconnect client {client_id} not found in active list.")
# 向所有活跃连接广播文本消息
async def broadcast(self, message: str):
disconnected_clients = []
for client_id, connection in self.active_connections.items():
try:
await connection.send_text(message)
except WebSocketDisconnect:
# 捕获断开连接异常
print(f"Client {client_id} disconnected during broadcast.")
disconnected_clients.append(client_id)
except Exception as e:
# 捕获其他发送错误
print(f"Error sending to client {client_id}: {e}")
disconnected_clients.append(client_id)
# 在广播循环外部移除断开的连接
for client_id in disconnected_clients:
self.disconnect(client_id)
manager = ConnectionManager()
提供一个简单的 HTML 页面作为聊天室客户端
html = “””
FastAPI Simple Chat
“””
提供 HTML 页面
@app.get(“/”)
async def get():
return HTMLResponse(html)
聊天室 WebSocket 终端
@app.websocket(“/ws/chat/{client_id}”)
async def websocket_chat_endpoint(websocket: WebSocket, client_id: str):
# 接受连接并添加到管理器
await manager.connect(client_id, websocket)
# 广播用户加入消息
await manager.broadcast(f”System: Client {client_id} joined the chat.”)
try:
while True:
# 接收消息
data = await websocket.receive_text()
# 广播聊天消息 (带上发送者ID)
await manager.broadcast(f”{client_id}: {data}”)
except WebSocketDisconnect:
# 连接断开,从管理器移除
manager.disconnect(client_id)
# 广播用户离开消息
await manager.broadcast(f"System: Client {client_id} left the chat.")
except Exception as e:
print(f"An error occurred with client {client_id}: {e}")
# 如果发生其他错误,也尝试移除连接并广播离开消息
manager.disconnect(client_id)
await manager.broadcast(f"System: Client {client_id} encountered an error and left the chat.")
“`
代码解释:
- 服务器端 (
main.py
):ConnectionManager
现在使用字典active_connections: Dict[str, WebSocket]
来存储连接,以客户端 ID(在此例中是用户名)作为键,方便查找和管理特定用户的连接。connect
方法接受client_id
参数,并在字典中存储client_id: websocket
。disconnect
方法根据client_id
移除字典中的连接。broadcast
方法现在增加了对WebSocketDisconnect
和其他异常的捕获,并在循环外部批量移除断开的连接,使其更健壮。@app.get("/")
: 添加了一个标准的 HTTP GET 终端,用于提供聊天室的 HTML 页面。@app.websocket("/ws/chat/{client_id}")
: WebSocket 终端现在包含一个路径参数{client_id}
,客户端连接时会提供自己的 ID(用户名)。- 在 WebSocket 终端函数中,连接成功后广播 “joined” 消息,接收到消息后广播带有用户名的消息,断开时广播 “left” 消息。异常处理确保连接断开后能正确地从管理器中移除。
- 客户端端 (内置 HTML 的 JavaScript):
- 增加了用户名输入框和连接/断开按钮。
- 用户输入用户名后,点击“Connect”按钮,调用
connectWebSocket()
函数,该函数获取用户名并以此连接到/ws/chat/{client_id}
终端。 updateButtonStates
函数用于根据连接状态启用/禁用输入框和按钮。log
函数用于向聊天框添加消息,并实现自动滚动。ws.onmessage
直接将收到的消息添加到聊天框。sendMessage
函数获取输入框内容并通过 WebSocket 发送。- 增加了对回车键发送消息的支持。
运行和测试聊天室:
- 确保服务器运行最新的代码 (
uvicorn main:app --reload
)。 - 打开浏览器访问
http://127.0.0.1:8000/
。 - 打开两个或多个浏览器窗口/标签页,访问同一个地址。
- 在每个窗口中输入不同的用户名,然后点击“Connect”。
- 连接成功后,在一个窗口输入消息并发送。
- 观察所有窗口,消息应该以 “[用户名]: [消息内容]” 的格式出现在所有聊天框中。
- 尝试关闭一个窗口,其他窗口应该会收到 “[用户名] left the chat.” 的系统消息。
这个聊天室例子虽然简单,但包含了 WebSocket 应用中的核心概念:连接管理、多客户端通信和广播。
第五部分:高级话题与实战考量
构建生产级别的 WebSocket 应用还需要考虑更多因素。
5.1 身份验证和授权
WebSocket 连接本身不像 HTTP 请求那样容易携带 Header(通常是在连接升级时携带一次)。如何在 WebSocket 连接上实现身份验证和授权?
- 基于查询参数或路径参数: 在连接 URL 中携带 token 或用户 ID(如聊天室例子)。简单但不安全,因为这些信息可能被记录在日志中。
- 发送初始认证消息: 连接建立后,客户端发送一条包含 token 或凭证的认证消息。服务器验证通过后,才将该连接视为已认证用户,并允许其发送/接收受限消息。这是更推荐的方式。在
ConnectionManager
中可以维护一个已认证连接的列表或字典。 - 使用 Cookie: 如果 WebSocket 连接与 HTTP 站点在同一域下,浏览器可能会自动发送相关的 Cookie,服务器端可以读取这些 Cookie 进行认证(例如,读取 session ID)。
- 在 HTTP 握手阶段认证: 利用 HTTP Upgrade 请求携带 Header,服务器在接受 WebSocket 连接前进行认证。Starlette/FastAPI 可以访问到发起连接的 Request 对象,可以在接受连接前检查 Header 或 Cookie。
“`python
示例:在连接前检查 header 中的 token
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Header, Depends, status
from fastapi.exceptions import HTTPException
from typing import Optional
app = FastAPI()
简单的 token 验证函数
async def verify_token(x_token: str = Header(…)):
# 实际应用中,这里会查询数据库或缓存来验证 token 的有效性
if x_token != “fake-super-secret-token”:
# 注意:直接在 websocket endpoint 依赖中抛出 HTTPException 不会像 HTTP endpoint 那样返回标准错误响应
# 连接会直接失败,客户端会收到 1006 错误码
# 更友好的方式是在 accept() 之前手动检查并拒绝
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=”Invalid token”)
return x_token
在 WebSocket 终端中使用依赖(会在 accept() 之前运行)
@app.websocket(“/ws/auth”)
async def websocket_authenticated_endpoint(
websocket: WebSocket,
token: str = Depends(verify_token) # 在连接建立前调用 verify_token
):
# 如果 verify_token 成功,token 变量会被赋值
print(f”Authenticated connection with token: {token}”)
await websocket.accept()
print(“Authenticated WebSocket connection accepted.”)
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Received authorized message: {data}")
except WebSocketDisconnect as e:
print(f"Authenticated connection closed: {e.code}, {e.reason}")
except Exception as e:
print(f"An error occurred: {e}")
更直接的在 accept() 前手动检查和拒绝
@app.websocket(“/ws/auth_manual”)
async def websocket_auth_manual_endpoint(websocket: WebSocket):
# 在 accept() 之前获取 headers
headers = websocket.headers
token = headers.get(“x-token”)
if token != "fake-super-secret-token":
# 如果认证失败,拒绝连接并发送关闭码
# 1008 (Policy Violation) 是一个合适的关闭码
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Authentication failed")
print("Authentication failed. Connection rejected.")
return # 结束函数执行,不接受连接
# 如果认证成功,接受连接
print("Authentication successful. Accepting connection.")
await websocket.accept()
print("WebSocket connection accepted.")
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Received authorized message: {data}")
except WebSocketDisconnect as e:
print(f"Authenticated connection closed: {e.code}, {e.reason}")
except Exception as e:
print(f"An error occurred: {e}")
``
websocket_authenticated_endpoint
**注意:** 在中使用
Depends并抛出
HTTPException会导致连接直接中断,客户端收到 1006 错误码(Abnormal Closure),原因通常不明确。而
websocket_auth_manual_endpoint中在
accept()之前手动检查并调用
await websocket.close(…)` 可以发送更具体的关闭码和原因(如 1008 Policy Violation),这通常对客户端调试更有帮助。
5.2 扩展 ConnectionManager
简单列表或字典适用于单进程应用。但对于生产环境,你可能需要:
- 分组: 按房间、频道或用户组管理连接,以便向特定组广播消息。
- 持久化/共享状态: 如果你的应用部署在多个服务器实例后面,每个实例都有自己的
ConnectionManager
实例,它们之间无法直接通信。这意味着一个实例收到的消息无法广播给连接到其他实例的客户端。 - 心跳机制: 客户端和服务器之间可以定期发送小的数据包(心跳)来检测连接是否仍然活跃,即使没有实际数据传输。如果一段时间内没有收到心跳,可以认为连接已断开并进行清理。
5.3 WebSocket 扩展和子协议
WebSockets 协议支持扩展和子协议。子协议用于标识应用层协议(例如,聊天协议、股票行情协议),以便客户端和服务器都能理解消息的格式和语义。扩展可以添加额外的功能,如消息压缩。
在 FastAPI 中,你可以在 accept()
方法中指定子协议:
python
@app.websocket("/ws/subprotocol")
async def websocket_subprotocol_endpoint(websocket: WebSocket):
# 接受连接,并指定支持的子协议
# 客户端请求的子协议必须在服务器支持的列表中
await websocket.accept(subprotocol="myprotocol")
print(f"Accepted with subprotocol: {websocket.subprotocol}")
# ...
客户端也需要在连接时指定子协议:
javascript
// client.html
var ws = new WebSocket("ws://127.0.0.1:8000/ws/subprotocol", "myprotocol");
如果服务器接受了客户端指定的子协议,websocket.subprotocol
会是该协议的名称;否则,websocket.subprotocol
会是 None
。
5.4 异步发送和接收
在 while True
循环中,你可能需要同时处理接收消息和在后台发送消息(例如,服务器推送通知)。这需要更复杂的异步协程管理,例如使用 asyncio.gather
或管理多个任务。
“`python
import asyncio
@app.websocket(“/ws/advanced”)
async def websocket_advanced_endpoint(websocket: WebSocket):
await websocket.accept()
async def receive_messages():
try:
while True:
data = await websocket.receive_text()
print(f"Received: {data}")
await manager.broadcast(f"Received from a client: {data}") # 广播接收到的消息
except WebSocketDisconnect:
print("Receive task detected disconnect.")
except Exception as e:
print(f"Receive task error: {e}")
async def send_periodic_messages():
# 示例:每隔5秒向这个特定客户端发送消息
try:
while True:
await asyncio.sleep(5)
# 假设你想发送给这个特定的 websocket 实例
await websocket.send_text("Server periodic update!")
print("Sent periodic update.")
except WebSocketDisconnect:
print("Send periodic task detected disconnect.")
except Exception as e:
print(f"Send periodic task error: {e}")
# 创建并运行两个异步任务:一个接收消息,一个发送周期消息
# 当其中一个任务因为连接断开(WebSocketDisconnect)或其他错误结束时,取消另一个任务
receive_task = asyncio.create_task(receive_messages())
send_task = asyncio.create_task(send_periodic_messages())
try:
# 等待两个任务完成(通常是因为连接断开)
await asyncio.gather(receive_task, send_task)
except Exception as e:
print(f"Gather task failed: {e}")
finally:
# 确保任务被取消
if not receive_task.done():
receive_task.cancel()
if not send_task.done():
send_task.cancel()
# 在任务结束后处理连接断开(如果尚未处理)
# 这里的示例 manager 没有根据 task 结束自动 disconnect
# 实际应用中,receive_messages 捕获 WebSocketDisconnect 后应该通知 manager 移除连接
print("WebSocket connection closed by gather.")
``
asyncio.create_task
这个例子展示了如何在一个 WebSocket 终端函数中使用和
asyncio.gather` 同时处理接收消息和执行其他异步操作(如定时发送)。在实际应用中,你需要更精细地管理这些任务和连接状态。
5.5 扩展到分布式系统(水平伸缩)
前面提到的 ConnectionManager
只在单个进程中有效。如果部署多个 FastAPI 实例,连接会分散到不同的实例上。要实现跨实例的广播,你需要一个发布/订阅(Pub/Sub)系统。
- Redis Pub/Sub: 一个常用的选择。当一个实例收到消息需要广播时,它不是直接遍历本地连接列表,而是将消息发布到一个 Redis 频道。其他所有实例都订阅这个频道,收到消息后,再向它们 本地 的连接广播。
- Kafka, RabbitMQ 等消息队列: 也可以用于构建更复杂的消息路由和处理系统。
“`python
概念示例 (需要安装 redis-py):
pip install redis websockets fastapi uvicorn
main.py (伪代码,需要补充 Redis 连接和 Pub/Sub 逻辑)
import asyncio
import redis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict
… (ConnectionManager 和其他 FastAPI setup) …
假设 Redis 连接已建立
r = redis.asyncio.from_url(“redis://localhost”) # 对于 asyncio
p = r.pubsub()
await p.subscribe(“chat_channel”)
修改 ConnectionManager 以集成 Redis
class DistributedConnectionManager:
def init(self, redis_client):
self.active_connections: Dict[str, WebSocket] = {}
self.redis = redis_client
self.pubsub_task = None # 用于运行 Redis pubsub 监听任务
async def connect(self, client_id: str, websocket: WebSocket):
await websocket.accept()
self.active_connections[client_id] = websocket
print(f"[Instance] Client {client_id} connected.")
# 可选:在连接时发布一个事件到 Redis
# await self.redis.publish("chat_events", f"user_joined:{client_id}")
def disconnect(self, client_id: str):
if client_id in self.active_connections:
del self.active_connections[client_id]
print(f"[Instance] Client {client_id} disconnected.")
# 可选:发布一个事件到 Redis
# asyncio.create_task(self.redis.publish("chat_events", f"user_left:{client_id}")) # 在 async def 外部使用 create_task
# 这个方法在本实例收到消息时调用
async def process_incoming_message(self, client_id: str, message: str):
# 收到消息后,发布到 Redis 频道
redis_message = f"chat_message:{client_id}:{message}"
print(f"[Instance] Publishing to Redis: {redis_message}")
await self.redis.publish("chat_channel", redis_message)
# 这个方法由 Redis Pub/Sub 监听任务调用
async def handle_redis_message(self, message):
if message and message['type'] == 'message':
data = message['data'].decode('utf-8')
print(f"[Instance] Received from Redis: {data}")
# 假设收到的是广播消息,向所有本地连接广播
await self.broadcast_local(f"[Broadcast from other instance] {data}")
# 只向本实例的连接广播
async def broadcast_local(self, message: str):
disconnected_clients = []
for client_id, connection in list(self.active_connections.items()): # 迭代副本防止在循环中修改
try:
await connection.send_text(message)
except (WebSocketDisconnect, RuntimeError, Exception) as e: # 捕获更多发送错误
print(f"[Instance] Error sending to client {client_id} locally: {e}")
disconnected_clients.append(client_id)
for client_id in disconnected_clients:
self.disconnect(client_id)
async def run_pubsub_listener(self):
# 运行 Redis Pub/Sub 监听循环
pubsub = self.redis.pubsub()
await pubsub.subscribe("chat_channel", "chat_events") # 订阅相关频道
print("[Instance] Starting Redis Pub/Sub listener.")
async for message in pubsub.listen():
# await self.handle_redis_message(message) # 处理收到的消息
# 为了避免阻塞 pubsub 监听,最好在单独任务中处理
asyncio.create_task(self.handle_redis_message(message))
在应用启动时连接 Redis 并启动监听任务
@app.on_event(“startup”)
async def startup_event():
# r = redis.asyncio.from_url(“redis://localhost”) # 实际连接
# app.state.redis = r # 将 Redis 连接存储在 app.state 中
# app.state.manager = DistributedConnectionManager(app.state.redis)
# app.state.manager.pubsub_task = asyncio.create_task(app.state.manager.run_pubsub_listener())
print(“App startup: Redis connection and Pub/Sub listener should be started.”)
# 这里仅作示例,实际需要完整的 Redis 集成
在应用关闭时关闭 Redis 连接并取消监听任务
@app.on_event(“shutdown”)
async def shutdown_event():
# if hasattr(app.state, ‘redis’) and app.state.redis:
# await app.state.redis.close()
# if hasattr(app.state, ‘manager’) and app.state.manager.pubsub_task:
# app.state.manager.pubsub_task.cancel()
# try:
# await app.state.manager.pubsub_task # 等待任务取消
# except asyncio.CancelledError:
# print(“Pub/Sub listener task cancelled.”)
print(“App shutdown: Redis connection and Pub/Sub listener should be closed.”)
聊天室终端 (使用分布式管理器)
@app.websocket(“/ws/chat/{client_id}”)
async def websocket_chat_endpoint_distributed(websocket: WebSocket, client_id: str):
# manager = app.state.manager # 获取管理器实例
await manager.connect(client_id, websocket)
# 广播用户加入消息 (通过 Redis)
await manager.process_incoming_message(“System”, f”Client {client_id} joined the chat.”) # 将系统消息也通过广播流程发送
try:
while True:
data = await websocket.receive_text()
# 收到消息后,通过管理器发布到 Redis
await manager.process_incoming_message(client_id, data)
except WebSocketDisconnect:
manager.disconnect(client_id)
# 广播用户离开消息 (通过 Redis)
await manager.process_incoming_message("System", f"Client {client_id} left the chat.")
except Exception as e:
print(f"An error occurred with client {client_id}: {e}")
manager.disconnect(client_id)
await manager.process_incoming_message("System", f"Client {client_id} encountered an error and left the chat.")
``
ConnectionManager
这个分布式概念示例展示了如何将 Redis Pub/Sub 集成到中。每个 FastAPI 实例都会:
chat_channel`)。
1. 维护自己的本地连接列表。
2. 订阅一个 Redis 频道(例如
3. 当从本地连接收到消息时,将消息发布到 Redis 频道。
4. 当从 Redis 频道收到消息时,向 本地 的所有连接广播这条消息。
这样,即使客户端连接到不同的 FastAPI 实例,它们也能通过 Redis 实现消息的互通。
第六部分:总结与展望
WebSockets 为现代 Web 应用提供了强大的实时通信能力。FastAPI 凭借其异步特性、高性能和简洁的语法,是构建 WebSocket 服务器的优秀选择。
本文从 WebSocket 的基本概念出发,逐步深入到在 FastAPI 中如何建立连接、处理不同类型的消息、管理多个连接以及实现消息广播。通过一个简单的聊天室例子,我们展示了这些概念如何在实际应用中结合。最后,我们探讨了身份验证、连接管理的高级方面、以及如何通过 Pub/Sub 系统实现分布式部署下的 WebSocket 水平伸缩。
要构建一个健壮、高性能、可伸缩的生产级 WebSocket 应用,你还需要深入研究:
- 更复杂的连接管理策略: 如何按频道、主题或用户角色对连接进行分组。
- 错误处理和重连: 如何在客户端和服务器端优雅地处理网络错误、连接中断,并实现自动重连。
- 心跳和 Keep-Alive: 实现机制以检测不活跃或断开的连接。
- 安全加固: 防止拒绝服务攻击、消息注入等。使用 WSS (WebSocket Secure,基于 TLS/SSL) 是必须的。
- 性能调优和监控: 使用工具监控连接数、消息吞吐量、延迟等指标。
- 选择合适的 Pub/Sub 或消息队列: 根据应用规模、吞吐量和可靠性需求选择合适的技术栈(Redis, Kafka, RabbitMQ, NATS 等)。
FastAPI 提供了坚实的基础,但构建复杂的实时系统本身是一个具有挑战性的任务,需要仔细设计和持续优化。希望本文能为你开启 FastAPI WebSocket 开发之旅提供详实的指导。