FastAPI WebSocket 示例:构建实时数据流 – wiki基地

FastAPI WebSocket 示例:构建实时数据流

在现代Web应用程序开发中,实时通信变得越来越重要。例如,在线游戏、实时数据仪表盘、协作编辑工具等都需要服务器能够主动向客户端推送数据,而不是仅仅响应客户端的请求。WebSocket 是一种在单个 TCP 连接上提供全双工通信协议,非常适合构建实时数据流应用。

FastAPI 是一个现代、快速(高性能)的 Web 框架,用于基于标准 Python 类型提示构建 API。它不仅易于学习和使用,还提供了强大的 WebSocket 支持。本文将深入探讨如何使用 FastAPI 构建一个实时数据流应用,包括 WebSocket 连接的建立、数据的发送与接收、错误处理、身份验证、以及如何将其应用于实际场景。

一、WebSocket 简介

在深入 FastAPI 示例之前,让我们先了解一些 WebSocket 的基本概念。

  • 全双工通信: WebSocket 提供全双工通信,这意味着客户端和服务器可以同时发送和接收数据,而无需像 HTTP 那样需要先发送请求才能接收响应。
  • 持久连接: WebSocket 连接一旦建立,就会保持打开状态,直到客户端或服务器主动关闭连接。这避免了 HTTP 连接的频繁建立和关闭带来的开销。
  • 基于 HTTP 握手: WebSocket 连接的建立是通过 HTTP 握手实现的。客户端发送一个特殊的 HTTP 请求,服务器如果支持 WebSocket 协议,则会返回一个特殊的 HTTP 响应,从而建立 WebSocket 连接。
  • 帧 (Frames): WebSocket 数据以帧的形式发送。每一帧包含控制信息(例如,是否为最后一帧、帧的类型)和实际的数据。

二、FastAPI WebSocket 支持

FastAPI 提供了 WebSocket 类和 WebSocketRoute 类,可以方便地处理 WebSocket 连接。

  • WebSocket 类: 提供了 accept(), send_text(), send_bytes(), receive_text(), receive_bytes(), close() 等方法,用于处理 WebSocket 连接的建立、数据的发送和接收,以及连接的关闭。
  • WebSocketRoute 类: 用于将 WebSocket 端点注册到 FastAPI 应用中。

三、构建一个简单的实时数据流示例

让我们从一个简单的例子开始,创建一个能够向客户端推送实时时间数据的 FastAPI WebSocket 应用。

“`python
from fastapi import FastAPI, WebSocket
import asyncio
import datetime

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
now = datetime.datetime.now().isoformat()
await websocket.send_text(f”Current Time: {now}”)
await asyncio.sleep(1) # 每秒发送一次数据
except Exception as e:
print(f”Error: {e}”)
finally:
await websocket.close()
“`

代码解释:

  1. 导入必要的模块: 导入 FastAPI, WebSocket, asyncio, 和 datetime 模块。asyncio 用于处理异步操作,datetime 用于获取当前时间。
  2. 创建 FastAPI 应用: 创建 FastAPI 应用实例 app.
  3. 定义 WebSocket 端点: 使用 @app.websocket("/ws") 装饰器定义一个 WebSocket 端点,路径为 /ws
  4. websocket_endpoint 函数: 这个函数负责处理 WebSocket 连接。
    • websocket: WebSocket 参数: FastAPI 会自动将 WebSocket 对象注入到这个参数中,通过这个对象可以进行 WebSocket 通信。
    • await websocket.accept() 接受 WebSocket 连接。这是建立 WebSocket 连接的关键一步。
    • try...except...finally 块: 使用 try...except...finally 块来处理可能发生的异常,并确保连接在发生错误时能够正确关闭。
    • while True 循环: 进入一个无限循环,持续向客户端发送数据。
      • now = datetime.datetime.now().isoformat() 获取当前时间,并将其格式化为 ISO 字符串。
      • await websocket.send_text(f"Current Time: {now}") 将包含当前时间的文本消息发送给客户端。send_text() 方法用于发送文本数据。
      • await asyncio.sleep(1) 暂停 1 秒,防止 CPU 占用过高。
    • await websocket.close() 在循环结束后或发生错误时,关闭 WebSocket 连接。

运行示例:

  1. 安装 FastAPI 和 Uvicorn:

    bash
    pip install fastapi uvicorn

  2. 运行 FastAPI 应用:

    bash
    uvicorn main:app --reload

    (假设上面的代码保存在 main.py 文件中)

客户端示例(HTML + JavaScript):

“`html




FastAPI WebSocket Example

实时时间



“`

客户端代码解释:

  1. 创建 WebSocket 对象: const websocket = new WebSocket("ws://localhost:8000/ws"); 创建一个 WebSocket 对象,连接到 FastAPI 服务器上的 /ws 端点。
  2. onopen 事件: 当 WebSocket 连接建立成功时触发,打印一条消息到控制台。
  3. onmessage 事件: 当服务器发送消息时触发。event.data 包含接收到的消息内容。这段代码将消息内容更新到页面上的 div 元素中。
  4. onclose 事件: 当 WebSocket 连接关闭时触发,打印一条消息到控制台。
  5. onerror 事件: 当发生 WebSocket 错误时触发,打印错误信息到控制台。

四、处理不同数据类型

除了文本数据,WebSocket 还可以发送二进制数据。FastAPI 提供了 send_bytes()receive_bytes() 方法来处理二进制数据。

服务端示例(发送二进制数据):

“`python
from fastapi import FastAPI, WebSocket
import asyncio
import datetime
import json

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
now = datetime.datetime.now().isoformat()
data = {“time”: now, “message”: “Hello from FastAPI!”}
# Convert dictionary to JSON string then encode to bytes
data_bytes = json.dumps(data).encode(‘utf-8’)
await websocket.send_bytes(data_bytes)
await asyncio.sleep(1) # 每秒发送一次数据
except Exception as e:
print(f”Error: {e}”)
finally:
await websocket.close()
“`

客户端示例(接收二进制数据):

“`html




FastAPI WebSocket Example (Binary Data)

实时数据



“`

关键点:

  • 服务端: 将数据转换为 JSON 字符串,然后将其编码为字节流,并使用 send_bytes() 发送。
  • 客户端: 设置 websocket.binaryType = 'arraybuffer'; 以确保浏览器将接收到的二进制数据作为 ArrayBuffer 处理。然后,将 ArrayBuffer 转换为 Uint8Array,并使用 TextDecoder 解码为 UTF-8 字符串,最后解析 JSON 字符串。

五、处理连接错误

WebSocket 连接可能会因为各种原因而中断。应该在服务器端和客户端都处理这些错误。

服务端错误处理:

在上面的示例中,我们使用了 try...except...finally 块来处理异常。在 except 块中,可以记录错误信息,或者采取其他补救措施。在 finally 块中,确保 WebSocket 连接被正确关闭。

客户端错误处理:

客户端通过 onerror 事件处理 WebSocket 错误。在 onerror 事件处理函数中,可以显示错误消息,或者尝试重新连接。

六、身份验证

在许多情况下,需要对 WebSocket 连接进行身份验证,以确保只有授权用户才能访问实时数据。FastAPI 提供了多种身份验证方法,例如:

  • 基于查询参数: 在 WebSocket 连接 URL 中传递身份验证令牌。
  • 基于 Cookies: 使用 HTTP Cookies 进行身份验证。
  • 基于 JWT: 使用 JSON Web Tokens 进行身份验证。

基于查询参数的身份验证示例:

服务端:

“`python
from fastapi import FastAPI, WebSocket, Query
from typing import Optional

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket, token: Optional[str] = Query(None)):
if token != “secret_token”:
await websocket.close(code=1008, reason=”Unauthorized”) # 1008: Policy Violation
return

await websocket.accept()
try:
    while True:
        now = datetime.datetime.now().isoformat()
        await websocket.send_text(f"Current Time: {now}")
        await asyncio.sleep(1)
except Exception as e:
    print(f"Error: {e}")
finally:
    await websocket.close()

“`

客户端:

“`html




FastAPI WebSocket Example (Authentication)

实时时间



“`

代码解释:

  • 服务端: 使用 Query(None) 获取查询参数 token。如果 token 不正确,则关闭连接,并发送错误代码 1008 (Policy Violation)。
  • 客户端: 在 WebSocket 连接 URL 中添加 token 查询参数。在 onclose 事件处理函数中,检查关闭代码是否为 1008,如果是,则显示身份验证失败的提示。

七、实际应用场景

FastAPI WebSocket 可以用于构建各种实时数据流应用,例如:

  • 实时数据仪表盘: 服务器可以实时推送数据更新到客户端,客户端可以动态更新图表和指标。
  • 在线游戏: WebSocket 可以用于实时同步游戏状态,例如玩家位置、得分等。
  • 协作编辑工具: 多个用户可以同时编辑同一个文档,WebSocket 可以用于实时同步编辑内容。
  • 实时聊天应用: 用户可以实时发送和接收消息。
  • 监控系统: 服务器可以实时推送系统状态信息到客户端,以便进行监控和报警。

八、更复杂的示例:Chatroom

以下是一个更完整的示例,展示如何创建一个简单的聊天室。

server.py:

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
    self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
    await websocket.send_text(message)

async def broadcast(self, message: str):
    for connection in self.active_connections:
        await connection.send_text(message)

manager = ConnectionManager()

@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(f”Client #{client_id} says: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f”Client #{client_id} left the chat”)
“`

client.html:

“`html




WebSocket Chat

WebSocket Chat




“`

关键点:

  • ConnectionManager: 负责管理所有活动的 WebSocket 连接。它提供 connect(), disconnect(), send_personal_message(), 和 broadcast() 方法。
  • WebSocketDisconnect: 用于捕获客户端断开连接的异常。
  • Broadcast: 向所有连接的客户端发送消息。
  • Random Client ID: 客户端生成一个随机ID,用于区分不同的客户端。

九、总结

FastAPI 提供了简单易用的 WebSocket 支持,可以方便地构建各种实时数据流应用。通过本文的学习,你应该能够:

  • 理解 WebSocket 的基本概念和工作原理。
  • 使用 FastAPI 创建 WebSocket 端点。
  • 发送和接收文本和二进制数据。
  • 处理 WebSocket 连接错误。
  • 对 WebSocket 连接进行身份验证。
  • 将 WebSocket 应用于实际场景。

在实际开发中,可能需要根据具体需求对这些示例进行修改和扩展。例如,可以使用更复杂的身份验证机制,可以实现更高级的数据协议,或者可以集成其他技术(例如消息队列)来提高系统的可扩展性和可靠性。希望本文能够帮助你入门 FastAPI WebSocket 开发,构建强大的实时数据流应用。

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部