FastAPI WebSocket 完整指南
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API,基于 Python 3.7+ 的类型提示。它充分利用了 Python 的异步特性 (async/await),使其在处理并发请求时表现出色。除了标准的 HTTP 请求之外,FastAPI 也完美支持 WebSocket,提供了一种实现实时双向通信的优雅方式。本文将深入探讨如何在 FastAPI 中使用 WebSocket,涵盖从基础概念到高级应用的各个方面。
1. WebSocket 基础
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。与传统的 HTTP 请求/响应模式不同,WebSocket 允许服务器主动向客户端推送数据,而无需客户端显式发起请求。这使得实时应用,例如聊天应用、在线游戏、股票行情推送等成为可能。
2. FastAPI 中的 WebSocket 实现
在 FastAPI 中,使用 WebSocket 非常简单,只需使用 WebSocket
类即可。以下是一个简单的示例:
“`python
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f”Message text was: {data}”)
“`
代码解释:
@app.websocket("/ws")
: 定义 WebSocket 路由,客户端可以通过/ws
路径连接。async def websocket_endpoint(websocket: WebSocket)
: 异步函数,处理 WebSocket 连接。await websocket.accept()
: 接受客户端的连接请求。while True
: 保持连接,持续监听客户端消息。data = await websocket.receive_text()
: 接收客户端发送的文本消息。await websocket.send_text(f"Message text was: {data}")
: 将接收到的消息回传给客户端。
3. 发送和接收不同类型的数据
除了文本消息,WebSocket 还支持发送和接收其他类型的数据,例如二进制数据和 JSON 数据。
- 接收二进制数据:
await websocket.receive_bytes()
- 发送二进制数据:
await websocket.send_bytes(data)
- 接收 JSON 数据:
await websocket.receive_json()
- 发送 JSON 数据:
await websocket.send_json(data)
4. 处理连接和断开事件
可以使用 try...except
块来处理连接和断开事件:
python
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Message text was: {data}")
except WebSocketDisconnect:
print("Client disconnected")
5. 广播消息
在实际应用中,我们经常需要将消息广播给多个连接的客户端。一种实现方式是维护一个连接列表:
“`python
connected_clients = set()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
connected_clients.add(websocket)
try:
while True:
data = await websocket.receive_text()
for client in connected_clients:
await client.send_text(f”Client sent: {data}”)
except WebSocketDisconnect:
connected_clients.remove(websocket)
print(“Client disconnected”)
“`
6. 高级应用:结合第三方库
FastAPI 可以与其他 Python 库无缝集成,例如 aioredis
和 websockets
,实现更高级的 WebSocket 功能,例如:
- 使用
aioredis
实现消息队列和发布/订阅: 将 WebSocket 连接与 Redis 集成,实现高效的消息广播和持久化。 - 使用
websockets
进行更底层的 WebSocket 控制:websockets
提供了对 WebSocket 协议的更精细的控制,可以实现自定义的握手流程和消息处理逻辑。
7. 安全 considerations:
- Origin 检查: 限制允许连接的域名,防止跨站 WebSocket hijacking 攻击。
- 身份验证: 使用 OAuth 2.0 或其他身份验证机制,验证客户端身份。
- 数据校验: 对客户端发送的数据进行校验,防止恶意数据注入。
8. 部署 WebSocket 应用:
部署 FastAPI WebSocket 应用与部署普通的 FastAPI 应用类似,可以使用 Uvicorn 或 Gunicorn 等 ASGI 服务器。需要注意的是,为了支持 WebSocket 的持久连接,需要配置服务器以处理长时间运行的请求。
9. 示例:简单的聊天应用
“`python
from fastapi import FastAPI, WebSocket
from typing import List
app = FastAPI()
class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f”Client #{client_id} says: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f”Client #{client_id} left the chat”)
“`
这个例子展示了一个简单的聊天应用,客户端可以通过 /ws/{client_id}
连接,发送的消息会广播给所有连接的客户端。
总结:
FastAPI 提供了一种简单而强大的方式来构建 WebSocket 应用。通过结合 Python 的异步特性和 FastAPI 的简洁语法,我们可以轻松地创建高性能、实时的 Web 应用。 本文涵盖了 FastAPI WebSocket 的核心概念和常用技巧,希望能够帮助你快速上手并构建自己的 WebSocket 应用。 记住,安全始终是重要的考虑因素,在开发和部署 WebSocket 应用时,务必采取必要的安全措施。