FastAPI WebSocket 教程:从入门到实战
WebSocket 是一种在客户端和服务器之间提供全双工通信信道的协议。与传统的 HTTP 请求-响应模式不同,WebSocket 允许服务器主动向客户端推送数据,而无需客户端发起请求,这使得它非常适合实时应用程序,例如聊天室、在线游戏、监控仪表盘等。
FastAPI 是一个现代、快速(高性能)的 web 框架,用于构建 API,基于标准的 Python 类型提示。它不仅易于学习和使用,而且集成了许多高级特性,包括对 WebSocket 的强大支持。
本文将带你从入门到实战,详细了解 FastAPI WebSocket 的使用方法,涵盖基本概念、核心代码、高级技巧以及实际应用案例,让你能够轻松构建强大的实时应用程序。
一、WebSocket 基础概念
在深入 FastAPI WebSocket 之前,我们先来了解一些 WebSocket 的基本概念:
- 持久连接: 与 HTTP 的短连接不同,WebSocket 建立的是持久连接。一旦连接建立,客户端和服务器可以双向地发送和接收数据,直到连接关闭。
- 全双工通信: WebSocket 支持全双工通信,意味着客户端和服务器可以同时发送和接收数据,而无需等待对方的响应。
- 协议升级: WebSocket 连接是通过 HTTP 协议升级建立的。客户端首先发送一个包含
Upgrade: websocket
和Connection: Upgrade
头部字段的 HTTP 请求。如果服务器支持 WebSocket,它会返回一个包含101 Switching Protocols
状态码的 HTTP 响应,从而完成协议升级。 - 数据帧: WebSocket 消息被分成一个个的数据帧进行传输。每个数据帧包含一个头部,指示数据类型、数据长度等信息,以及实际的数据负载。
- 事件驱动: WebSocket 通常采用事件驱动的编程模型。客户端和服务器通过监听不同的事件(例如连接建立、消息接收、连接关闭等)来处理 WebSocket 通信。
二、FastAPI WebSocket 入门
接下来,我们来看看如何在 FastAPI 中使用 WebSocket。
- 安装 FastAPI 和 uvicorn:
首先,你需要安装 FastAPI 和 uvicorn,uvicorn 是一个 ASGI 服务器,用于运行 FastAPI 应用程序。
bash
pip install fastapi uvicorn
- 创建 FastAPI 应用:
创建一个名为 main.py
的文件,并编写以下代码:
“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
app = FastAPI()
html = “””
WebSocket Chat
“””
@app.get(“/”)
async def get():
return HTMLResponse(html)
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f”Message text was: {data}”)
except WebSocketDisconnect:
print(“Client disconnected”)
“`
这段代码创建了一个简单的 FastAPI 应用程序,包含以下几个部分:
FastAPI()
: 创建 FastAPI 应用实例。html
: 定义了一个简单的 HTML 页面,包含一个表单用于发送消息,以及一个列表用于显示消息。@app.get("/")
: 定义一个路由处理函数,用于返回 HTML 页面。@app.websocket("/ws")
: 定义一个 WebSocket 端点,用于处理 WebSocket 连接。-
websocket_endpoint(websocket: WebSocket)
: WebSocket 端点处理函数,接收一个WebSocket
对象作为参数,该对象用于与客户端进行通信。await websocket.accept()
: 接受 WebSocket 连接,这是建立 WebSocket 连接的必要步骤。while True:
: 进入一个无限循环,不断接收和发送消息。data = await websocket.receive_text()
: 接收客户端发送的文本消息。await websocket.send_text(f"Message text was: {data}")
: 将接收到的消息发送回客户端。except WebSocketDisconnect:
: 捕获WebSocketDisconnect
异常,该异常在客户端断开连接时抛出。
-
运行 FastAPI 应用:
在终端中运行以下命令:
bash
uvicorn main:app --reload
这将启动 uvicorn 服务器,并监听 localhost:8000
。
- 测试 WebSocket 连接:
打开浏览器,访问 http://localhost:8000/
。你将看到一个简单的聊天页面。在输入框中输入消息,点击 “Send” 按钮,你将看到消息被发送到服务器,并由服务器返回显示在消息列表中。
三、FastAPI WebSocket 实战:聊天室
接下来,我们将使用 FastAPI WebSocket 构建一个简单的聊天室应用。
- 修改
main.py
文件:
“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import List
app = FastAPI()
html = “””
WebSocket Chat
“””
@app.get(“/”)
async def get():
return HTMLResponse(html)
class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f”Client says: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f”Client left the chat”)
“`
在这个修改后的代码中,我们添加了一个 ConnectionManager
类,用于管理 WebSocket 连接:
ConnectionManager
类:__init__()
: 初始化一个空的连接列表active_connections
。connect(websocket: WebSocket)
: 接受一个新的 WebSocket 连接,并将其添加到active_connections
列表中。disconnect(websocket: WebSocket)
: 断开一个 WebSocket 连接,并将其从active_connections
列表中移除。send_personal_message(message: str, websocket: WebSocket)
: 向指定的 WebSocket 连接发送消息。broadcast(message: str)
: 向所有连接的客户端广播消息。
在 websocket_endpoint
函数中,我们使用 ConnectionManager
来管理 WebSocket 连接:
await manager.connect(websocket)
: 当一个新的客户端连接时,调用manager.connect()
方法将其添加到连接列表中。await manager.broadcast(f"Client says: {data}")
: 当客户端发送消息时,调用manager.broadcast()
方法将消息广播给所有连接的客户端。manager.disconnect(websocket)
: 当客户端断开连接时,调用manager.disconnect()
方法将其从连接列表中移除。-
await manager.broadcast(f"Client left the chat")
: 当客户端断开连接时,广播一条消息通知其他客户端。 -
运行 FastAPI 应用:
在终端中运行以下命令:
bash
uvicorn main:app --reload
- 测试聊天室:
打开多个浏览器窗口,访问 http://localhost:8000/
。在每个窗口中输入消息,点击 “Send” 按钮,你将看到消息被广播到所有窗口。
四、高级技巧
除了基本的 WebSocket 功能外,FastAPI 还提供了一些高级技巧,可以帮助你构建更强大的实时应用程序。
- 发送 JSON 数据:
除了发送文本消息外,你还可以发送 JSON 数据。使用 websocket.send_json()
方法发送 JSON 数据,并使用 websocket.receive_json()
方法接收 JSON 数据。
python
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_json()
await manager.broadcast(f"Client says: {data}") # Assuming data is a string in this example. You might need to process the JSON accordingly.
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client left the chat")
在客户端,你需要使用 JavaScript 的 JSON.stringify()
方法将 JavaScript 对象转换为 JSON 字符串,然后通过 ws.send()
方法发送。接收到 JSON 数据后,使用 JSON.parse()
方法将其转换为 JavaScript 对象。
- 使用 Pydantic 模型进行数据验证:
可以使用 Pydantic 模型对 WebSocket 消息进行数据验证。定义一个 Pydantic 模型,用于描述消息的结构和类型。在 websocket_endpoint
函数中,使用 parse_obj_as()
方法将接收到的 JSON 数据解析为 Pydantic 模型。
“`python
from pydantic import BaseModel, parse_obj_as
class Message(BaseModel):
username: str
text: str
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_json()
message = parse_obj_as(Message, data)
await manager.broadcast(f”{message.username}: {message.text}”)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f”Client left the chat”)
“`
这样可以确保接收到的消息符合预期的格式,并防止恶意数据。
- 使用 Background Tasks 进行异步处理:
可以使用 Background Tasks 在后台异步处理 WebSocket 消息。例如,可以将消息保存到数据库,或者发送到其他服务。
“`python
from fastapi import BackgroundTasks
async def save_message(message: str):
# Save message to database
print(f”Saving message: {message}”)
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket, background_tasks: BackgroundTasks):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
background_tasks.add_task(save_message, data)
await manager.broadcast(f”Client says: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f”Client left the chat”)
“`
这样可以避免阻塞 WebSocket 连接,并提高应用程序的性能。
- 身份验证和授权:
可以使用 FastAPI 的依赖注入系统,对 WebSocket 连接进行身份验证和授权。例如,可以使用 JWT(JSON Web Token)来验证客户端的身份,并根据用户的角色授予不同的权限。
“`python
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=”token”)
async def get_current_user(token: str = Depends(oauth2_scheme)):
# Verify JWT token and retrieve user information
# This is a placeholder. Implement actual JWT verification here.
if token == “testtoken”: # Example – replace with actual token verification
return {“username”: “testuser”}
else:
raise HTTPException(status_code=401, detail=”Invalid credentials”)
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket, user: dict = Depends(get_current_user)):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f”{user[‘username’]} says: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f”Client left the chat”)
“`
这样可以确保只有经过身份验证的用户才能访问 WebSocket 端点,并防止未经授权的访问。 重要: 上述 JWT 验证是示例,务必使用安全的 JWT 库和密钥进行实际部署。
五、总结
本文详细介绍了 FastAPI WebSocket 的使用方法,涵盖了基本概念、核心代码、高级技巧以及实际应用案例。通过学习本文,你已经掌握了使用 FastAPI 构建实时应用程序的基本技能。
希望本文能够帮助你更好地理解和使用 FastAPI WebSocket,构建出更加强大的实时应用程序。
下一步可以尝试的方向:
- 更复杂的聊天室: 添加用户身份验证,私聊功能,房间管理等。
- 在线游戏: 使用 WebSocket 进行实时游戏状态同步。
- 实时监控仪表盘: 使用 WebSocket 将服务器指标实时推送到客户端。
- 集成数据库: 将 WebSocket 消息持久化到数据库中。
- 使用 Redis 作为消息代理: 使用 Redis 实现更高效的广播机制。
掌握 FastAPI WebSocket 将为你打开实时应用开发的大门,祝你编码愉快!