FastAPI WebSocket 示例:构建实时数据流
在现代Web应用程序开发中,实时通信变得越来越重要。例如,在线游戏、实时数据仪表盘、协作编辑工具等都需要服务器能够主动向客户端推送数据,而不是仅仅响应客户端的请求。WebSocket 是一种在单个 TCP 连接上提供全双工通信协议,非常适合构建实时数据流应用。
FastAPI 是一个现代、快速(高性能)的 Web 框架,用于基于标准 Python 类型提示构建 API。它不仅易于学习和使用,还提供了强大的 WebSocket 支持。本文将深入探讨如何使用 FastAPI 构建一个实时数据流应用,包括 WebSocket 连接的建立、数据的发送与接收、错误处理、身份验证、以及如何将其应用于实际场景。
一、WebSocket 简介
在深入 FastAPI 示例之前,让我们先了解一些 WebSocket 的基本概念。
- 全双工通信: WebSocket 提供全双工通信,这意味着客户端和服务器可以同时发送和接收数据,而无需像 HTTP 那样需要先发送请求才能接收响应。
- 持久连接: WebSocket 连接一旦建立,就会保持打开状态,直到客户端或服务器主动关闭连接。这避免了 HTTP 连接的频繁建立和关闭带来的开销。
- 基于 HTTP 握手: WebSocket 连接的建立是通过 HTTP 握手实现的。客户端发送一个特殊的 HTTP 请求,服务器如果支持 WebSocket 协议,则会返回一个特殊的 HTTP 响应,从而建立 WebSocket 连接。
- 帧 (Frames): WebSocket 数据以帧的形式发送。每一帧包含控制信息(例如,是否为最后一帧、帧的类型)和实际的数据。
二、FastAPI WebSocket 支持
FastAPI 提供了 WebSocket
类和 WebSocketRoute
类,可以方便地处理 WebSocket 连接。
WebSocket
类: 提供了accept()
,send_text()
,send_bytes()
,receive_text()
,receive_bytes()
,close()
等方法,用于处理 WebSocket 连接的建立、数据的发送和接收,以及连接的关闭。WebSocketRoute
类: 用于将 WebSocket 端点注册到 FastAPI 应用中。
三、构建一个简单的实时数据流示例
让我们从一个简单的例子开始,创建一个能够向客户端推送实时时间数据的 FastAPI WebSocket 应用。
“`python
from fastapi import FastAPI, WebSocket
import asyncio
import datetime
app = FastAPI()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
now = datetime.datetime.now().isoformat()
await websocket.send_text(f”Current Time: {now}”)
await asyncio.sleep(1) # 每秒发送一次数据
except Exception as e:
print(f”Error: {e}”)
finally:
await websocket.close()
“`
代码解释:
- 导入必要的模块: 导入
FastAPI
,WebSocket
,asyncio
, 和datetime
模块。asyncio
用于处理异步操作,datetime
用于获取当前时间。 - 创建 FastAPI 应用: 创建 FastAPI 应用实例
app
. - 定义 WebSocket 端点: 使用
@app.websocket("/ws")
装饰器定义一个 WebSocket 端点,路径为/ws
。 websocket_endpoint
函数: 这个函数负责处理 WebSocket 连接。websocket: WebSocket
参数: FastAPI 会自动将WebSocket
对象注入到这个参数中,通过这个对象可以进行 WebSocket 通信。await websocket.accept()
: 接受 WebSocket 连接。这是建立 WebSocket 连接的关键一步。try...except...finally
块: 使用try...except...finally
块来处理可能发生的异常,并确保连接在发生错误时能够正确关闭。while True
循环: 进入一个无限循环,持续向客户端发送数据。now = datetime.datetime.now().isoformat()
: 获取当前时间,并将其格式化为 ISO 字符串。await websocket.send_text(f"Current Time: {now}")
: 将包含当前时间的文本消息发送给客户端。send_text()
方法用于发送文本数据。await asyncio.sleep(1)
: 暂停 1 秒,防止 CPU 占用过高。
await websocket.close()
: 在循环结束后或发生错误时,关闭 WebSocket 连接。
运行示例:
-
安装 FastAPI 和 Uvicorn:
bash
pip install fastapi uvicorn -
运行 FastAPI 应用:
bash
uvicorn main:app --reload(假设上面的代码保存在
main.py
文件中)
客户端示例(HTML + JavaScript):
“`html
实时时间
“`
客户端代码解释:
- 创建 WebSocket 对象:
const websocket = new WebSocket("ws://localhost:8000/ws");
创建一个 WebSocket 对象,连接到 FastAPI 服务器上的/ws
端点。 onopen
事件: 当 WebSocket 连接建立成功时触发,打印一条消息到控制台。onmessage
事件: 当服务器发送消息时触发。event.data
包含接收到的消息内容。这段代码将消息内容更新到页面上的div
元素中。onclose
事件: 当 WebSocket 连接关闭时触发,打印一条消息到控制台。onerror
事件: 当发生 WebSocket 错误时触发,打印错误信息到控制台。
四、处理不同数据类型
除了文本数据,WebSocket 还可以发送二进制数据。FastAPI 提供了 send_bytes()
和 receive_bytes()
方法来处理二进制数据。
服务端示例(发送二进制数据):
“`python
from fastapi import FastAPI, WebSocket
import asyncio
import datetime
import json
app = FastAPI()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
now = datetime.datetime.now().isoformat()
data = {“time”: now, “message”: “Hello from FastAPI!”}
# Convert dictionary to JSON string then encode to bytes
data_bytes = json.dumps(data).encode(‘utf-8’)
await websocket.send_bytes(data_bytes)
await asyncio.sleep(1) # 每秒发送一次数据
except Exception as e:
print(f”Error: {e}”)
finally:
await websocket.close()
“`
客户端示例(接收二进制数据):
“`html
实时数据
“`
关键点:
- 服务端: 将数据转换为 JSON 字符串,然后将其编码为字节流,并使用
send_bytes()
发送。 - 客户端: 设置
websocket.binaryType = 'arraybuffer';
以确保浏览器将接收到的二进制数据作为 ArrayBuffer 处理。然后,将 ArrayBuffer 转换为 Uint8Array,并使用TextDecoder
解码为 UTF-8 字符串,最后解析 JSON 字符串。
五、处理连接错误
WebSocket 连接可能会因为各种原因而中断。应该在服务器端和客户端都处理这些错误。
服务端错误处理:
在上面的示例中,我们使用了 try...except...finally
块来处理异常。在 except
块中,可以记录错误信息,或者采取其他补救措施。在 finally
块中,确保 WebSocket 连接被正确关闭。
客户端错误处理:
客户端通过 onerror
事件处理 WebSocket 错误。在 onerror
事件处理函数中,可以显示错误消息,或者尝试重新连接。
六、身份验证
在许多情况下,需要对 WebSocket 连接进行身份验证,以确保只有授权用户才能访问实时数据。FastAPI 提供了多种身份验证方法,例如:
- 基于查询参数: 在 WebSocket 连接 URL 中传递身份验证令牌。
- 基于 Cookies: 使用 HTTP Cookies 进行身份验证。
- 基于 JWT: 使用 JSON Web Tokens 进行身份验证。
基于查询参数的身份验证示例:
服务端:
“`python
from fastapi import FastAPI, WebSocket, Query
from typing import Optional
app = FastAPI()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket, token: Optional[str] = Query(None)):
if token != “secret_token”:
await websocket.close(code=1008, reason=”Unauthorized”) # 1008: Policy Violation
return
await websocket.accept()
try:
while True:
now = datetime.datetime.now().isoformat()
await websocket.send_text(f"Current Time: {now}")
await asyncio.sleep(1)
except Exception as e:
print(f"Error: {e}")
finally:
await websocket.close()
“`
客户端:
“`html
实时时间
“`
代码解释:
- 服务端: 使用
Query(None)
获取查询参数token
。如果token
不正确,则关闭连接,并发送错误代码 1008 (Policy Violation)。 - 客户端: 在 WebSocket 连接 URL 中添加
token
查询参数。在onclose
事件处理函数中,检查关闭代码是否为 1008,如果是,则显示身份验证失败的提示。
七、实际应用场景
FastAPI WebSocket 可以用于构建各种实时数据流应用,例如:
- 实时数据仪表盘: 服务器可以实时推送数据更新到客户端,客户端可以动态更新图表和指标。
- 在线游戏: WebSocket 可以用于实时同步游戏状态,例如玩家位置、得分等。
- 协作编辑工具: 多个用户可以同时编辑同一个文档,WebSocket 可以用于实时同步编辑内容。
- 实时聊天应用: 用户可以实时发送和接收消息。
- 监控系统: 服务器可以实时推送系统状态信息到客户端,以便进行监控和报警。
八、更复杂的示例:Chatroom
以下是一个更完整的示例,展示如何创建一个简单的聊天室。
server.py:
“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f”Client #{client_id} says: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f”Client #{client_id} left the chat”)
“`
client.html:
“`html
WebSocket Chat
“`
关键点:
- ConnectionManager: 负责管理所有活动的 WebSocket 连接。它提供
connect()
,disconnect()
,send_personal_message()
, 和broadcast()
方法。 - WebSocketDisconnect: 用于捕获客户端断开连接的异常。
- Broadcast: 向所有连接的客户端发送消息。
- Random Client ID: 客户端生成一个随机ID,用于区分不同的客户端。
九、总结
FastAPI 提供了简单易用的 WebSocket 支持,可以方便地构建各种实时数据流应用。通过本文的学习,你应该能够:
- 理解 WebSocket 的基本概念和工作原理。
- 使用 FastAPI 创建 WebSocket 端点。
- 发送和接收文本和二进制数据。
- 处理 WebSocket 连接错误。
- 对 WebSocket 连接进行身份验证。
- 将 WebSocket 应用于实际场景。
在实际开发中,可能需要根据具体需求对这些示例进行修改和扩展。例如,可以使用更复杂的身份验证机制,可以实现更高级的数据协议,或者可以集成其他技术(例如消息队列)来提高系统的可扩展性和可靠性。希望本文能够帮助你入门 FastAPI WebSocket 开发,构建强大的实时数据流应用。