使用 FastAPI 构建高性能 WebSocket 应用:从入门到实战
在现代 Web 应用中,实时通信变得越来越重要。无论是聊天应用、实时股票行情、多人协作文档还是在线游戏,都离不开服务器与客户端之间的双向、低延迟数据交换。传统的 HTTP 协议是基于请求-响应模式的,无法满足这种实时推送的需求。这时,WebSocket 协议应运而生,它提供了一种在单个 TCP 连接上进行全双工通信的通道。
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API。它基于标准的 Python 类型提示,能够自动生成 API 文档,并充分利用了 ASGI (Asynchronous Server Gateway Interface) 的异步能力。FastAPI 对 WebSocket 的支持非常友好,使得构建实时应用变得高效且相对简单。
本文将深入探讨如何在 FastAPI 中使用 WebSocket,从基本概念、环境搭建,到构建简单的 Echo 服务器,再到实现一个基础的多用户聊天室,并触及一些高级话题,帮助你全面掌握 FastAPI WebSocket 开发。
1. 理解 WebSocket
在深入 FastAPI 的实现之前,我们先回顾一下 WebSocket 的基本工作原理:
- 握手阶段 (Handshake): WebSocket 连接建立的第一步是一个 HTTP 请求。客户端向服务器发送一个带有特定头的 HTTP GET 请求,表明希望将连接升级到 WebSocket 协议。如果服务器支持并同意,它会返回一个特殊的 HTTP 响应,表示握手成功,连接从此切换到 WebSocket 协议。
- 数据传输阶段: 握手成功后,客户端和服务器之间就可以通过这个持久化的 TCP 连接自由地、双向地发送数据帧了。数据帧可以是文本格式 (UTF-8 编码) 或二进制格式。
- 关闭阶段: 任何一方都可以发送一个特殊的控制帧来发起连接关闭请求。
相比于传统的轮询 (Polling) 或长轮询 (Long Polling),WebSocket 显著减少了通信开销(无需重复建立和关闭连接,头部信息更小),降低了延迟,并且支持真正的服务器推送。
2. 为什么选择 FastAPI 构建 WebSocket 应用?
FastAPI 基于 ASGI 规范,天生支持异步编程。WebSocket 操作(如接收、发送数据)通常是 I/O 密集型的,异步非阻塞的特性使得 FastAPI 在处理大量并发 WebSocket 连接时具有出色的性能。
此外,FastAPI 继承了 Starlette 对 WebSocket 的良好支持,提供简洁的 API 来处理 WebSocket 连接、接收和发送不同格式的数据,以及处理连接断开等事件。结合 FastAPI 强大的依赖注入系统和自动文档生成能力(虽然 WebSocket 端点目前不会自动出现在 OpenAPI 文档中,但其核心机制与 HTTP 端点一致),开发体验非常流畅。
3. 环境准备
在开始之前,确保你的开发环境中安装了 Python (推荐 3.7+)。然后安装 FastAPI 和一个 ASGI 服务器,例如 Uvicorn。为了方便 WebSocket 测试,我们可能还需要一个 WebSocket 客户端库或者直接使用浏览器的开发者工具。
bash
pip install fastapi uvicorn websockets python-multipart
fastapi
: FastAPI 框架核心。uvicorn
: 一个高性能的 ASGI 服务器,用于运行 FastAPI 应用。websockets
: 一个 Python WebSocket 库,FastAPI/Starlette 在底层使用了它,并且它本身也可以作为一个独立的客户端或服务器用于测试。python-multipart
: 处理文件上传等,虽然不是 WebSocket 直接必需,但 FastAPI 常用依赖。
4. FastAPI WebSocket 基础
在 FastAPI 中创建一个 WebSocket 端点与创建 HTTP 端点类似,使用 @app.websocket()
装饰器即可。
“`python
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() # 接受 WebSocket 连接
# 连接建立后,就可以开始收发数据了
while True:
data = await websocket.receive_text() # 接收文本数据
await websocket.send_text(f”Message text was: {data}”) # 发送文本数据
“`
这段代码定义了一个 /ws
路径的 WebSocket 端点。
@app.websocket("/ws")
: 装饰器,标记此异步函数为一个 WebSocket 端点,监听/ws
路径的 WebSocket 请求。async def websocket_endpoint(websocket: WebSocket):
: 定义一个异步函数来处理连接。FastAPI 会自动将一个WebSocket
对象作为参数注入进来,代表了当前的 WebSocket 连接。await websocket.accept()
: 这是处理 WebSocket 连接的第一步,必须调用它来接受客户端的连接请求。如果服务器端因为某些原因(如认证失败)不想建立连接,可以不调用accept()
,连接将会被关闭。while True:
: 进入一个无限循环,持续监听客户端发送的数据。data = await websocket.receive_text()
: 异步地接收客户端发送的文本数据。这是一个阻塞操作,会暂停协程直到接收到数据。receive_text()
专门用于接收文本数据,还有receive_bytes()
用于接收二进制数据,receive_json()
用于接收 JSON 数据(底层是接收文本/二进制再解析)。await websocket.send_text(...)
: 异步地向客户端发送文本数据。同样有send_bytes()
和send_json()
方法。
5. 处理连接断开
WebSocket 连接可能因为多种原因断开:客户端主动关闭、网络问题、服务器关闭等。当连接断开时,receive_*
方法会抛出 starlette.websockets.WebSocketDisconnect
异常。我们应该捕获这个异常来执行清理工作。
“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
print(f”Received: {data}”)
await websocket.send_text(f”Echo: {data}”)
except WebSocketDisconnect as e:
# 连接断开时的处理
print(f”WebSocket disconnected: {e.code}, {e.reason}”)
except Exception as e:
# 处理其他可能的异常
print(f”An error occurred: {e}”)
finally:
# 无论连接是否正常断开,都会执行此处的清理工作
print(“Connection closed”)
# 在更复杂的应用中,这里可能需要从连接列表中移除该 websocket
“`
将接收数据的逻辑放在 try...except WebSocketDisconnect
块中,可以优雅地处理连接断开事件。WebSocketDisconnect
异常包含 code
和 reason
属性,提供了断开的原因信息。一个通用的做法是在 finally
块中执行资源清理,例如从活跃连接列表中移除当前 websocket
对象。
6. 示例 1:简单的 Echo 服务器
这是一个完整的 Echo 服务器示例,它接收客户端发送的任何文本消息,然后将相同的消息前面加上 “Echo: ” 发送回去。
main.py
:
“`python
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.get(“/”)
async def get():
return {“message”: “Go to /index.html to see the WebSocket example.”}
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
print(“WebSocket connection accepted.”)
try:
while True:
data = await websocket.receive_text()
print(f”Received message: {data}”)
await websocket.send_text(f”Echo: {data}”)
except WebSocketDisconnect as e:
print(f”WebSocket disconnected (Code: {e.code}, Reason: {e.reason})”)
except Exception as e:
print(f”An error occurred: {e}”)
finally:
print(“WebSocket connection closed.”)
if name == “main“:
# 运行应用
# 请确保安装了 uvicorn: pip install uvicorn
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`
客户端测试方法 (使用 Python websockets
库):
安装 websockets
库 (pip install websockets
),然后运行以下 Python 脚本:
“`python
import asyncio
import websockets
async def test_websocket():
uri = “ws://localhost:8000/ws”
async with websockets.connect(uri) as websocket:
await websocket.send(“Hello, FastAPI WebSocket!”)
print(f”> Hello, FastAPI WebSocket!”)
response = await websocket.recv()
print(f"< {response}")
await websocket.send("Another message.")
print(f"> Another message.")
response = await websocket.recv()
print(f"< {response}")
asyncio.get_event_loop().run_until_complete(test_websocket())
“`
客户端测试方法 (使用浏览器开发者工具):
- 打开浏览器的开发者工具 (通常按 F12)。
- 切换到 “Console” 或 “控制台” 选项卡。
-
输入以下 JavaScript 代码并执行:
“`javascript
var ws = new WebSocket(“ws://localhost:8000/ws”);ws.onopen = function(event) {
console.log(“WebSocket connection opened:”, event);
ws.send(“Hello from browser!”); // 发送消息
};ws.onmessage = function(event) {
console.log(“Message from server:”, event.data); // 接收消息
};ws.onerror = function(event) {
console.error(“WebSocket error observed:”, event);
};ws.onclose = function(event) {
if (event.wasClean) {
console.log(WebSocket connection closed cleanly, code=${event.code} reason=${event.reason}
);
} else {
console.error(‘WebSocket connection died’);
}
};// 可以随时通过 ws.send(“你的消息”) 发送更多消息
// 可以通过 ws.close() 关闭连接
“`
运行 main.py
(uvicorn main:app --reload
),然后运行 Python 客户端脚本或在浏览器控制台执行 JavaScript 代码,你就能看到服务器和客户端之间的消息交互了。
7. 处理多个客户端和广播消息
Echo 服务器只处理单个连接,但实际应用通常需要服务器能够与多个客户端通信,甚至向所有连接的客户端广播消息(例如聊天室)。由于每个 WebSocket 连接都会在服务器端对应一个独立的协程实例来处理,这些协程之间默认是隔离的。为了实现广播或向特定客户端发送消息,我们需要一个机制来跟踪所有活跃的连接,并在需要时遍历它们。
一个常见的模式是创建一个 ConnectionManager
类来管理所有活跃的 WebSocket 连接。
“`python
from typing import List
from fastapi import WebSocket
class ConnectionManager:
def init(self):
# 使用 Set 存储活跃连接,Set 的查找和删除操作通常比 List 更高效
self.active_connections: set[WebSocket] = set()
async def connect(self, websocket: WebSocket):
"""接受新连接并添加到列表中"""
await websocket.accept()
self.active_connections.add(websocket)
print(f"New connection accepted. Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
"""从列表中移除断开的连接"""
if websocket in self.active_connections:
self.active_connections.remove(websocket)
print(f"Connection disconnected. Total connections: {len(self.active_connections)}")
async def send_personal_message(self, message: str, websocket: WebSocket):
"""向单个客户端发送消息"""
try:
await websocket.send_text(message)
except Exception as e:
print(f"Error sending message to a connection: {e}")
# 发送失败可能意味着连接已无效,考虑断开处理
# self.disconnect(websocket) # 根据实际情况决定是否在此处断开
async def broadcast(self, message: str):
"""向所有活跃客户端广播消息"""
# 创建一个列表副本,防止在迭代过程中集合被修改(例如,连接断开被移除)
inactive_connections = []
for connection in list(self.active_connections):
try:
await connection.send_text(message)
except Exception as e:
print(f"Error broadcasting to a connection, marking for removal: {e}")
inactive_connections.append(connection) # 标记发送失败的连接
# 移除所有发送失败的连接
for connection in inactive_connections:
self.disconnect(connection)
“`
这个 ConnectionManager
类维护了一个 set
来存储活跃的 WebSocket
对象。它提供了三个核心方法:
* connect()
: 接受新的 WebSocket 连接并将其添加到集合中。
* disconnect()
: 从集合中移除一个断开的连接。
* send_personal_message()
: 向特定的 WebSocket
对象发送消息。
* broadcast()
: 遍历所有连接,向每个连接发送相同的消息。广播时需要注意异常处理,如果向某个连接发送失败,说明该连接可能已经无效,应将其移除。我们通过在一个临时列表中收集无效连接,然后在迭代完成后再移除它们,避免在迭代时修改原集合。
8. 示例 2:基础聊天室应用
使用 ConnectionManager
,我们可以构建一个简单的多用户聊天室。
main.py
:
“`python
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
import logging # 添加日志模块
配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
app = FastAPI()
挂载静态文件目录,用于提供 index.html
app.mount(“/static”, StaticFiles(directory=”static”), name=”static”)
全局 ConnectionManager 实例
class ConnectionManager:
def init(self):
self.active_connections: set[WebSocket] = set()
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.add(websocket)
logger.info(f"New connection accepted. Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
logger.info(f"Connection disconnected. Total connections: {len(self.active_connections)}")
async def send_personal_message(self, message: str, websocket: WebSocket):
try:
await websocket.send_text(message)
except Exception as e:
logger.error(f"Error sending personal message: {e}")
# 在 send_personal_message 中通常不立即断开,因为可能只是瞬时问题
# 断开处理应主要由 receive_* 抛出 WebSocketDisconnect 触发
async def broadcast(self, message: str):
inactive_connections = []
for connection in list(self.active_connections): # Iterate over a copy
try:
await connection.send_text(message)
except Exception as e:
logger.error(f"Error broadcasting to a connection, marking for removal: {e}")
inactive_connections.append(connection)
for connection in inactive_connections:
self.disconnect(connection)
manager = ConnectionManager() # 创建 ConnectionManager 实例
提供一个根路径,用于重定向或显示指引
@app.get(“/”)
async def get(request: Request):
# 返回一个简单的 HTML 页面,或者重定向到 /static/index.html
# return HTMLResponse(content=”
Simple Chat App
Go to static/index.html to access the chat.
“)
# 或者直接提供 HTML 内容
html_content = “””
<!DOCTYPE html>
FastAPI WebSocket Chat
<script>
var client_id = document.getElementById("clientId").value;
var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`); // Pass client_id in path or query
ws.onopen = function(event) {
console.log("WebSocket connection opened:", event);
document.getElementById("chatbox").innerHTML += "<p><em>Connected to chat.</em></p>";
};
ws.onmessage = function(event) {
var messages = document.getElementById('chatbox');
messages.innerHTML += `<p>${event.data}</p>`;
// 滚动到底部
messages.scrollTop = messages.scrollHeight;
};
ws.onerror = function(event) {
console.error("WebSocket error observed:", event);
document.getElementById("chatbox").innerHTML += "<p><em>Connection error!</em></p>";
};
ws.onclose = function(event) {
if (event.wasClean) {
console.log(`WebSocket connection closed cleanly, code=${event.code} reason=${event.reason}`);
document.getElementById("chatbox").innerHTML += "<p><em>Disconnected from chat.</em></p>";
} else {
console.error('WebSocket connection died');
document.getElementById("chatbox").innerHTML += "<p><em>Connection died!</em></p>";
}
};
function sendMessage() {
var input = document.getElementById("messageText");
if (input.value.trim()) {
ws.send(input.value); // 发送消息到服务器
input.value = ''; // 清空输入框
}
}
// 支持回车发送
document.getElementById("messageText").addEventListener("keypress", function(event) {
if (event.key === "Enter") {
event.preventDefault(); // 防止默认的回车换行行为
sendMessage();
}
});
</script>
</body>
</html>
"""
return HTMLResponse(content=html_content)
Chat WebSocket endpoint
@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket) # 接受连接并添加到管理器
# 通知所有用户有人加入了
await manager.broadcast(f”Client #{client_id} has joined the chat.”)
logger.info(f”Client #{client_id} connected.”)
try:
while True:
# 接收客户端发送的消息
data = await websocket.receive_text()
# 构建要广播的消息格式
message = f"Client #{client_id}: {data}"
logger.info(f"Received from #{client_id}: {data}")
# 向所有活跃客户端广播消息
await manager.broadcast(message)
except WebSocketDisconnect as e:
# 处理连接断开
manager.disconnect(websocket) # 从管理器中移除连接
# 通知所有用户有人离开了
await manager.broadcast(f"Client #{client_id} has left the chat.")
logger.info(f"Client #{client_id} disconnected (Code: {e.code}, Reason: {e.reason}).")
except Exception as e:
# 处理其他异常
logger.error(f"An error occurred with client #{client_id}: {e}")
# 确保连接从管理器中移除
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} encountered an error and left.")
if name == “main“:
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`
客户端 HTML/JavaScript (index.html
或直接内嵌在根路径的 HTMLResponse 中):
上面的代码直接将 HTML/JavaScript 内嵌在根路径的 HTMLResponse
中了,方便直接通过访问 http://localhost:8000/
来测试。
运行步骤:
- 确保所有依赖已安装 (
pip install fastapi uvicorn websockets python-multipart
). - 保存上面的 Python 代码为
main.py
。 - 运行服务器:
uvicorn main:app --reload
- 打开多个浏览器窗口或标签页,访问
http://localhost:8000/
。 - 在每个窗口的输入框中输入消息并发送,观察消息是否会出现在所有连接的聊天框中。
代码解释:
- 我们创建了一个
ConnectionManager
的全局实例manager
。 - WebSocket 端点
@app.websocket("/ws/{client_id}")
现在接受一个路径参数client_id
,这可以用来标识不同的客户端(虽然这里只是简单地显示,实际应用中需要更安全的身份验证)。 - 当一个新的 WebSocket 连接到来时,首先调用
await manager.connect(websocket)
接受连接并将其添加到管理器。 - 然后向所有客户端广播一条用户加入的消息。
- 进入一个
while True
循环,持续接收来自当前客户端的消息。 - 收到消息后,格式化消息内容,并调用
await manager.broadcast(message)
将消息发送给管理器中的所有连接。 try...except WebSocketDisconnect
块捕获断开事件,调用manager.disconnect(websocket)
移除连接,并广播一条用户离开的消息。- 添加了简单的日志输出,帮助跟踪连接和消息流动。
- 根路径
/
返回一个内嵌了 HTML 和 JavaScript 的响应,方便快速测试。JavaScript 部分创建 WebSocket 连接,处理onopen
,onmessage
,onerror
,onclose
事件,并提供发送消息的函数。
这个聊天室示例非常基础,仅处理文本消息,没有用户名管理、历史记录、房间概念、消息序列化(发送和接收的都是纯文本,实际应用可能需要 JSON)等。但这展示了使用 ConnectionManager
处理多客户端连接的核心模式。
9. 高级话题与注意事项
9.1 认证和授权
WebSocket 握手阶段是基于 HTTP 的,因此可以在握手请求中利用 HTTP 的认证机制(如 Cookie、Header 中的 Token、查询参数)。FastAPI 的依赖注入系统可以在 WebSocket 端点函数中访问请求对象 (Request
),从而获取认证信息。
“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
from starlette.exceptions import HTTPException
app = FastAPI()
async def get_user_from_token(request: Request, token: str = Depends(…)): # Depends logic here
# Verify token, fetch user data
user = {“id”: “user123”, “name”: “Alice”} # Example user
if not user:
raise HTTPException(status_code=401, detail=”Invalid token”)
return user
@app.websocket(“/ws”)
async def websocket_endpoint(
websocket: WebSocket,
# 可以通过 query 参数获取 token
token: str = None # 或者从 request.headers[‘Authorization’] 获取
# user: dict = Depends(get_user_from_token) # 更复杂的认证逻辑
):
# 在 accept() 之前进行认证检查
if token != “mysecrettoken”: # 简单示例
await websocket.close(code=1008) # 1008: Policy Violation
return
await websocket.accept()
# 连接成功后,可以认为用户已认证
# 将用户信息与 websocket 关联起来,例如存在 ConnectionManager 中
print(f"Authenticated connection accepted with token: {token}")
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Authenticated Echo: {data}")
except WebSocketDisconnect:
print("Connection disconnected.")
“`
请注意,await websocket.accept()
必须在发送或接收任何数据之前调用。认证检查应该在 accept()
之前完成。如果认证失败,应使用 await websocket.close(...)
关闭连接,并使用合适的 WebSocket 关闭状态码(例如 1008 表示策略违规)。
9.2 扩展到分布式系统 (多服务器实例)
上面使用的 ConnectionManager
将活跃连接存储在内存中。这对于单服务器实例是有效的。但如果部署多个 FastAPI 实例,每个实例都有自己的 ConnectionManager
,它们之间无法直接通信。这意味着一个用户连接到实例 A,发送一条消息,这条消息只能通过实例 A 广播给连接到实例 A 的其他用户,而连接到实例 B 的用户将收不到这条消息。
解决这个问题通常需要引入一个外部的消息队列或发布/订阅系统,如 Redis Pub/Sub、RabbitMQ、Kafka 等。
架构变为:
1. 每个 FastAPI 实例维护自己本地的活跃连接集合。
2. 当一个实例收到消息时,它将消息发布到消息队列的一个特定主题。
3. 所有 FastAPI 实例都订阅了这个主题。
4. 当一个实例从消息队列收到一条消息时,它会将这条消息广播给它本地维护的所有活跃连接。
这样,消息就能够在所有服务器实例之间传递,最终到达所有连接的客户端。
9.3 使用 JSON 进行数据交换
虽然 WebSocket 支持发送任意文本或二进制数据,但在 Web 开发中,使用 JSON 格式交换结构化数据非常常见。FastAPI 的 WebSocket
对象提供了 send_json()
和 receive_json()
方法,非常方便。
“`python
from fastapi import WebSocket
@app.websocket(“/ws_json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收 JSON 数据,自动解析为 Python 字典/列表
data = await websocket.receive_json()
print(f”Received JSON: {data}”)
# 构建 JSON 响应
response = {"status": "success", "received": data}
# 发送 JSON 数据
await websocket.send_json(response)
except WebSocketDisconnect:
print("JSON connection disconnected.")
except Exception as e:
print(f"JSON error: {e}")
“`
客户端 JavaScript 示例(发送 JSON):
javascript
ws.send(JSON.stringify({ type: "chat", message: "Hello JSON!" }));
客户端 JavaScript 示例(接收 JSON):
javascript
ws.onmessage = function(event) {
try {
const data = JSON.parse(event.data);
console.log("Received JSON:", data);
// 根据 data 的结构处理不同类型的消息
if (data.type === "chat") {
// 显示聊天消息
}
} catch (e) {
console.error("Failed to parse JSON:", event.data, e);
// 处理非 JSON 消息或解析错误
}
};
9.4 后台任务发送消息
在某些情况下,你可能需要在不阻塞 WebSocket 主接收循环的情况下发送消息(例如,由一个外部事件触发的推送)。可以使用 Starlette/FastAPI 的 BackgroundTasks
,或者直接在接收循环外部创建另一个协程来发送消息。不过,对于简单的广播,在接收到消息后立即广播通常是可行的,因为发送操作本身是异步的。
如果发送操作非常耗时或容易失败(例如,需要查询数据库),并且你不希望它影响其他客户端的消息接收,可以考虑将其放入后台任务。但更常见的是,消息推送是由 其他 HTTP 请求或外部系统触发的,这时你需要从 ConnectionManager
的实例中调用 broadcast
或 send_personal_message
方法。这要求你在应用的可访问范围内持有 ConnectionManager
的实例(例如,作为全局变量或通过某种服务定位器模式)。
9.5 性能考虑
- 异步是关键: WebSocket 操作天生适合异步。FastAPI 和 Uvicorn 已经利用了 ASGI 的异步能力。确保你的 WebSocket 处理函数是
async def
。 - 高效的连接管理: 使用
set
来存储活跃连接通常比list
更高效,尤其是在需要频繁地添加、删除和查找连接时。 - 广播效率: 在广播时,迭代连接并逐个发送是必要的。上面
ConnectionManager
中的广播方法已经考虑了在迭代时处理断开连接的情况。 - 消息序列化/反序列化: 使用
send_json
/receive_json
会增加一点点 CPU 开销,但通常比手动处理字符串和解析更方便且安全。选择适合你数据结构的格式。 - 负载均衡与扩展: 如前所述,多实例部署需要外部消息队列。负载均衡器应配置为粘性会话 (Sticky Sessions),尽量将同一用户的连接路由到同一个实例,但这对于 WebSocket 不是强制性的,因为消息队列解决了跨实例通信问题。
10. 结论
FastAPI 结合其底层的 Starlette 和 ASGI 服务器 (如 Uvicorn),为构建高性能的 WebSocket 应用提供了强大而易用的平台。通过简单的 @app.websocket()
装饰器,你可以轻松定义 WebSocket 端点,使用 await websocket.accept()
接受连接,await websocket.receive_*()
接收数据,await websocket.send_*()
发送数据,并通过捕获 WebSocketDisconnect
异常来处理连接断开。
对于需要处理多个客户端并实现广播等功能的复杂应用,引入 ConnectionManager
模式来集中管理活跃连接是有效且常见的做法。随着应用规模的增长,理解如何将应用扩展到多个服务器实例,并引入消息队列等技术将是必不可少的。
掌握了 FastAPI 对 WebSocket 的支持,你就能充分利用其异步高性能的优势,轻松构建各种实时交互式 Web 应用。希望本文提供的详细解释和示例代码能帮助你快速入门并深入实践 FastAPI WebSocket 开发。