FastAPI WebSocket 详解与实战 – wiki基地


FastAPI WebSocket 详解与实战:构建实时应用的基石

在现代 Web 应用开发中,实时交互已成为衡量用户体验的关键指标之一。传统的 HTTP 请求-响应模式虽然强大,但在需要服务器主动向客户端推送数据、或者客户端之间需要频繁、低延迟通信的场景下显得力不从心。此时,WebSocket 技术应运而生,它提供了客户端和服务器之间持久的双向通信通道,极大地提升了实时应用的开发效率和性能。

FastAPI,作为一款现代、快速(高性能)的 Python Web 框架,凭借其异步支持、自动文档生成和易用性,迅速成为构建 API 的热门选择。FastAPI 对 WebSocket 的原生支持使其成为构建实时后端服务的强大工具。

本文将深入探讨 FastAPI 中的 WebSocket,从基础概念到实际应用,为你提供一个全面、详细的指南,帮助你理解并掌握如何使用 FastAPI 构建强大的实时应用。

1. 理解 WebSocket:告别请求-响应,拥抱双向通信

在深入 FastAPI 的 WebSocket 实现之前,我们先回顾一下 WebSocket 的基本概念。

1.1 HTTP 的局限性

传统的 Web 通信主要依赖 HTTP 协议。HTTP 是一种无状态的请求-响应协议:客户端发起请求,服务器处理请求并返回响应,连接随即关闭(或在 HTTP/1.1 中短时间保持)。这种模式对于网页浏览、RESTful API 调用等场景非常有效。

然而,在需要服务器频繁更新客户端状态的应用中,例如:

  • 在线聊天室: 用户发送消息,需要所有在线用户立即收到。
  • 股票行情: 股价变动需要实时推送到交易者界面。
  • 游戏: 多人在线游戏中的玩家位置、动作需要快速同步。
  • 通知系统: 服务器有新事件发生时,需要立即通知用户。
  • 协作编辑: 多人同时编辑文档时,需要实时同步修改。

在这些场景下,如果继续使用 HTTP,只能采取以下低效的方式:

  • 轮询 (Polling): 客户端定时向服务器发送请求,询问是否有新数据。这会产生大量无效请求,浪费带宽和服务器资源,且实时性差。
  • 长轮询 (Long Polling): 客户端发起请求后,服务器 удержи连接直到有新数据或超时,然后再返回响应,客户端立即发起新的请求。这比轮询效率高,但仍然是基于请求-响应模式,且服务器需要维护大量等待中的连接。
  • 流 (Streaming): 服务器维持一个开放的 HTTP 连接,并持续向客户端发送数据。这可以实现单向的服务器到客户端推送,但客户端无法通过同一连接向服务器发送消息。

这些方式都无法完美解决双向、低延迟、持久连接的需求。

1.2 WebSocket 的诞生与优势

WebSocket 协议(RFC 6455)提供了一种在单个 TCP 连接上进行全双工(双向)通信的机制。与 HTTP 不同,WebSocket 连接建立后,客户端和服务器都可以随时向对方发送数据,而无需等待请求。

WebSocket 的工作流程:

  1. 握手 (Handshake): 客户端通过发送一个特殊的 HTTP 请求(带有 Upgrade 头部和特定的 Header)发起 WebSocket 连接请求。
  2. 协议升级: 如果服务器支持 WebSocket 协议,它会返回一个特殊的 HTTP 响应,表示同意升级协议。
  3. 建立连接: 握手成功后,底层的 TCP 连接不再使用 HTTP 协议,而是切换到 WebSocket 协议。这个连接会一直保持开放,直到客户端或服务器主动关闭。
  4. 数据传输: 连接建立后,客户端和服务器可以通过这个持久连接自由地发送和接收数据帧(Frames),数据可以是文本或二进制格式。

WebSocket 的优势显而易见:

  • 双向通信: 客户端和服务器都可以随时发起数据传输。
  • 低延迟: 无需反复建立连接,数据可以直接通过开放的连接发送。
  • 高效: 相比 HTTP,WebSocket 的协议开销更小,特别是在频繁传输小数据时。
  • 持久连接: 连接保持开放,适合实时数据推送和频繁交互。

2. FastAPI 对 WebSocket 的支持

FastAPI 基于 Starlette 构建,而 Starlette 天然支持 ASGI (Asynchronous Server Gateway Interface)。ASGI 是 Python 异步 Web 应用的一种标准接口,它允许异步框架(如 Starlette, FastAPI)与异步服务器(如 Uvicorn, Hypercorn)进行通信。WebSocket 是 ASGI 规范的一部分,因此 FastAPI 能够非常方便地支持 WebSocket。

FastAPI 通过简洁的装饰器和类型提示,将 ASGI 级别的 WebSocket 处理封装起来,提供了 Pythonic 的开发体验。你可以像定义 HTTP 路径操作一样,轻松定义 WebSocket 端点。

3. FastAPI WebSocket 基础:定义端点与处理连接

在 FastAPI 中定义 WebSocket 端点非常直观。

3.1 定义 WebSocket 端点

使用 @app.websocket() 装饰器来定义一个 WebSocket 端点,就像使用 @app.get()@app.post() 一样。路径可以是固定的,也可以包含路径参数。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 在这里处理 WebSocket 连接
pass
“`

这个函数是一个 async 函数,它接收一个类型为 WebSocket 的参数。这个 websocket 对象就是客户端与服务器之间建立的 WebSocket 连接的代表,我们将通过它来发送和接收数据。

3.2 接受连接

当客户端尝试连接到 WebSocket 端点时,服务器首先会收到一个握手请求。在 FastAPI 中,你需要在 WebSocket 路径操作函数的开头调用 await websocket.accept() 来接受这个连接。如果连接被接受,函数将继续执行;否则,连接会被拒绝。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# 连接已建立,可以开始通信
print(“Client connected”)
# … 后续处理 …
“`

await websocket.accept() 之后,客户端和服务器之间的 WebSocket 连接才真正建立起来。

3.3 发送和接收数据

WebSocket 对象提供了发送和接收数据的方法:

  • 发送数据:
    • await websocket.send_text(data: str): 发送文本数据。
    • await websocket.send_bytes(data: bytes): 发送二进制数据。
    • await websocket.send_json(data: Any): 发送 JSON 数据。FastAPI 会自动将 Python 对象序列化为 JSON 字符串。
  • 接收数据:
    • await websocket.receive_text() -> str: 接收文本数据。这是一个阻塞操作,会等待直到收到文本消息。
    • await websocket.receive_bytes() -> bytes: 接收二进制数据。阻塞操作,等待收到二进制消息。
    • await websocket.receive_json() -> Any: 接收 JSON 数据。阻塞操作,等待收到 JSON 消息并自动反序列化。
    • await websocket.receive_text_or_bytes() -> Union[str, bytes]: 接收文本或二进制数据。返回收到的原始数据。
    • await websocket.receive_text_or_json() -> Union[str, Any]: 接收文本或 JSON 数据。自动反序列化 JSON。

示例:一个简单的 Echo 服务器

最简单的 WebSocket 应用是 Echo 服务器,它接收到什么数据就原样返回什么数据。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text() # 接收文本数据
await websocket.send_text(f”Message text was: {data}”) # 发送文本数据
except WebSocketDisconnect:
print(“Client disconnected”)
except Exception as e:
print(f”Error: {e}”)

“`

在这个例子中:

  1. await websocket.accept() 接受连接。
  2. while True: 循环持续监听接收消息。
  3. data = await websocket.receive_text() 等待并接收客户端发送的文本消息。
  4. await websocket.send_text(...) 将收到的消息加上前缀后发送回客户端。
  5. try...except WebSocketDisconnect 块用于捕获客户端断开连接的事件。当客户端关闭连接时(正常关闭或意外断开),receive_text()(或其它 receive_* 方法)会抛出 WebSocketDisconnect 异常。
  6. except Exception 捕获其他可能的错误。

3.4 处理断开连接

如上所示,处理客户端断开连接是通过捕获 WebSocketDisconnect 异常来实现的。当此异常发生时,你应该执行清理操作,例如从活动连接列表中移除该连接(这在管理多个连接时非常重要)。

3.5 发送和接收 JSON 数据

WebSocket 经常用于发送结构化数据,JSON 是一个常见格式。FastAPI 提供了便捷的 send_jsonreceive_json 方法。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket(“/ws/json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json() # 接收 JSON 数据
print(f”Received JSON: {data}”)
# 假设收到 {“action”: “echo”, “message”: “hello”}
if isinstance(data, dict) and data.get(“action”) == “echo”:
await websocket.send_json({“status”: “received”, “echo”: data.get(“message”)}) # 发送 JSON 数据
else:
await websocket.send_json({“status”: “error”, “message”: “Invalid format”})
except WebSocketDisconnect:
print(“Client disconnected”)
except Exception as e:
print(f”Error: {e}”)
“`

使用 send_jsonreceive_json 可以方便地在 Python 字典/列表和 JSON 字符串之间进行转换,这对于构建基于 JSON 的实时 API 非常有用。

4. 管理多个 WebSocket 连接:构建聊天室基础

在大多数实际应用中,WebSocket 服务器需要同时处理来自多个客户端的连接。例如,在一个聊天室应用中,当一个用户发送消息时,服务器需要将这条消息广播给所有在线的用户。这就需要一个机制来跟踪和管理所有活动连接。

一个常见的模式是创建一个类(或模块级别的对象)来充当连接管理器,负责存储和操作连接列表。

4.1 创建 ConnectionManager

我们定义一个 ConnectionManager 类,它包含以下基本功能:

  • __init__: 初始化一个列表(或集合)来存储活动连接。
  • connect(websocket: WebSocket): 当新客户端连接时调用,将新的 websocket 对象添加到列表中,并 accept() 连接。
  • disconnect(websocket: WebSocket): 当客户端断开连接时调用,从列表中移除对应的 websocket 对象。
  • send_personal_message(message: str, websocket: WebSocket): 向单个客户端发送消息。
  • broadcast(message: str): 向所有活动客户端广播消息。

“`python
from typing import List
from fastapi import WebSocket

class ConnectionManager:
def init(self):
# 使用 set 来存储连接,因为 set 的查找和删除操作通常比 list 快
# 并且可以避免重复添加
self.active_connections: set[WebSocket] = set()

async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections.add(websocket)
    print(f"New connection: {websocket}. Total active connections: {len(self.active_connections)}")

def disconnect(self, websocket: WebSocket):
    if websocket in self.active_connections:
        self.active_connections.remove(websocket)
        print(f"Connection closed: {websocket}. Total active connections: {len(self.active_connections)}")

async def send_personal_message(self, message: str, websocket: WebSocket):
    try:
        await websocket.send_text(message)
    except Exception as e:
        print(f"Error sending personal message to {websocket}: {e}")
        # 发送失败可能意味着连接已不可用,考虑断开和移除
        # self.disconnect(websocket) # 根据实际情况决定是否在此处断开

async def broadcast(self, message: str):
    # 遍历所有连接,尝试发送消息
    # 使用 list() 复制集合,避免在遍历时修改集合(例如,如果发送失败导致断开)
    # 尽管在 async 函数中,通常不会在遍历时同时修改集合,但复制是更安全的做法
    disconnected_websockets = []
    for connection in list(self.active_connections):
        try:
            await connection.send_text(message)
        except Exception as e:
            print(f"Error broadcasting message to {connection}: {e}")
            disconnected_websockets.append(connection)

    # 移除发送失败的连接
    for connection in disconnected_websockets:
        self.disconnect(connection)

“`

注意并发问题: 在上面的 broadcast 方法中,为了在遍历集合时安全地移除元素(如果在发送过程中出现错误),我们复制了 self.active_connections。在高度并发的环境下,如果多个异步任务同时访问和修改 self.active_connections,可能会出现竞态条件。对于更健壮的应用,可能需要使用 asyncio.Lock 来保护对 self.active_connections 的访问。但是对于初学者或简单场景,上述实现通常是足够的。

4.2 在 FastAPI 中使用 ConnectionManager

现在,我们在 FastAPI 应用中创建 ConnectionManager 的实例,并在 WebSocket 路径操作中使用它。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from typing import List
import json

app = FastAPI()

class ConnectionManager:
def init(self):
self.active_connections: set[WebSocket] = set()

async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections.add(websocket)
    print(f"New connection established: {websocket}")

def disconnect(self, websocket: WebSocket):
    if websocket in self.active_connections:
        self.active_connections.remove(websocket)
        print(f"Connection {websocket} disconnected.")

async def send_personal_message(self, message: str, websocket: WebSocket):
    try:
        await websocket.send_text(message)
    except Exception as e:
        print(f"Error sending personal message to {websocket}: {e}")
        # 在实际应用中,这里可能需要更优雅地处理连接错误

async def broadcast(self, message: str):
    disconnected_websockets = []
    for connection in list(self.active_connections): # Iterate over a copy
        try:
            await connection.send_text(message)
        except Exception as e:
            print(f"Error broadcasting message to {connection}: {e}")
            disconnected_websockets.append(connection)

    for connection in disconnected_websockets:
        self.disconnect(connection)

创建 ConnectionManager 实例 (应用级别共享)

manager = ConnectionManager()

@app.get(“/”)
async def get():
return {“message”: “Hello World”}

WebSocket 聊天室端点

@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket) # 接受连接并添加到管理器
try:
# 通知所有用户有新用户加入
await manager.broadcast(f”Client #{client_id} joined the chat”)

    while True:
        # 接收客户端消息
        data = await websocket.receive_text()

        # 向发送者回复确认消息 (可选)
        # await manager.send_personal_message(f"You wrote: {data}", websocket)

        # 将消息广播给所有在线用户 (包括发送者)
        await manager.broadcast(f"Client #{client_id}: {data}")

except WebSocketDisconnect:
    # 客户端断开连接时触发
    manager.disconnect(websocket) # 从管理器移除连接
    # 通知所有用户有用户离开
    await manager.broadcast(f"Client #{client_id} left the chat")
except Exception as e:
     print(f"An error occurred with client #{client_id}: {e}")
     # 发生其他异常时,也进行断开处理
     manager.disconnect(websocket)
     await manager.broadcast(f"Client #{client_id} experienced an error and left.")

“`

在这个聊天室示例中:

  1. 我们创建了一个全局(或应用实例级别)的 manager = ConnectionManager()
  2. WebSocket 路径 "/ws/{client_id}" 包含一个路径参数 client_id,用于标识不同的客户端。
  3. websocket_endpoint 函数的开头,调用 await manager.connect(websocket) 来接受连接,并将其添加到 manageractive_connections 集合中。
  4. 连接成功后,我们广播一条消息通知其他用户。
  5. while True 循环持续监听来自当前客户端的消息。
  6. data = await websocket.receive_text() 接收客户端发送的文本消息。
  7. await manager.broadcast(...) 将收到的消息(带上客户端 ID)广播给所有在线用户。
  8. except WebSocketDisconnect: 块捕获断开连接事件,调用 manager.disconnect(websocket) 移除连接,并广播用户离开的消息。
  9. 额外的 except Exception 捕获其他未预料的错误,确保在出现问题时也能清理连接并通知。

运行此应用:

bash
uvicorn main:app --reload

你可以使用 WebSocket 客户端(如浏览器开发者工具的 WebSocket 标签、Postman、wscat 工具或编写简单的 HTML/JavaScript 页面)连接到 ws://127.0.0.1:8000/ws/1, ws://127.0.0.1:8000/ws/2 等,然后发送消息进行测试。

例如,使用 wscat:

“`bash

打开第一个客户端

wscat -c ws://127.0.0.1:8000/ws/1

打开第二个客户端

wscat -c ws://127.0.0.1:8000/ws/2
“`

在一个客户端输入消息,另一个客户端将实时收到。当一个客户端关闭连接时,其他客户端也会收到用户离开的通知。

5. 进阶话题

5.1 发送/接收 JSON 数据并结合 Pydantic

对于更复杂的结构化数据,建议使用 Pydantic 模型来定义 WebSocket 消息的格式。虽然 FastAPI 的 WebSocket 端点不像 HTTP 端点那样直接支持 Pydantic 自动解析请求体,但你可以在接收到 JSON 数据后手动进行解析和校验。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from typing import List, Dict, Any
import json

app = FastAPI()

class Message(BaseModel):
sender_id: int
text: str

class ConnectionManager:
# … (ConnectionManager class remains the same as above)
def init(self):
self.active_connections: set[WebSocket] = set() # Using set

async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections.add(websocket)
    print(f"New connection established: {websocket}")

def disconnect(self, websocket: WebSocket):
    if websocket in self.active_connections:
        self.active_connections.remove(websocket)
        print(f"Connection {websocket} disconnected.")

async def send_personal_json(self, data: Any, websocket: WebSocket):
     try:
        await websocket.send_json(data)
     except Exception as e:
        print(f"Error sending personal JSON to {websocket}: {e}")

async def broadcast_json(self, data: Any):
    disconnected_websockets = []
    for connection in list(self.active_connections): # Iterate over a copy
        try:
            await connection.send_json(data)
        except Exception as e:
            print(f"Error broadcasting JSON to {connection}: {e}")
            disconnected_websockets.append(connection)

    for connection in disconnected_websockets:
        self.disconnect(connection)

manager = ConnectionManager()

@app.websocket(“/ws/jsonchat/{client_id}”)
async def websocket_json_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
await manager.broadcast_json({“type”: “status”, “message”: f”Client #{client_id} joined the chat”})

    while True:
        # 接收 JSON 数据
        data = await websocket.receive_json()
        print(f"Received JSON from client #{client_id}: {data}")

        # 使用 Pydantic 校验数据
        try:
            message = Message(**data) # 尝试将接收到的 dict 解析为 Message Pydantic 模型
            print(f"Parsed message: {message.dict()}")

            # 广播解析后的消息 (可以包含更多信息)
            await manager.broadcast_json({
                "type": "message",
                "sender_id": client_id,
                "text": message.text,
                "original_data": data # 可以包含原始数据或其他元信息
            })

        except Exception as e:
            # 如果解析失败 (数据格式不符合 Pydantic 模型)
            error_message = f"Error parsing message from client #{client_id}: {e}"
            print(error_message)
            # 可以选择向客户端发送错误通知
            await manager.send_personal_json({"type": "error", "message": "Invalid message format"}, websocket)


except WebSocketDisconnect:
    manager.disconnect(websocket)
    await manager.broadcast_json({"type": "status", "message": f"Client #{client_id} left the chat"})
except Exception as e:
     print(f"An unexpected error occurred with client #{client_id}: {e}")
     manager.disconnect(websocket)
     await manager.broadcast_json({"type": "status", "message": f"Client #{client_id} experienced an error and left."})

“`

在这个例子中,我们定义了一个 Message Pydantic 模型。在接收到 JSON 数据后,我们手动尝试将数据转换为 Message 模型的实例。如果转换失败,意味着客户端发送的数据格式不正确。这提供了一种更严格的数据校验方式。

5.2 在 FastAPI 应用状态中存储 ConnectionManager

ConnectionManager 实例直接创建在模块级别是可行的,但如果你的应用结构更复杂,或者希望利用 FastAPI 的依赖注入系统来管理状态,可以考虑将其存储在 FastAPI 应用实例的 state 中。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

… (ConnectionManager and Message classes from above)

app = FastAPI()

在应用启动时创建 ConnectionManager 实例并存储到 state 中

@app.on_event(“startup”)
async def startup_event():
app.state.manager = ConnectionManager()
print(“ConnectionManager initialized in app state.”)

在需要使用 manager 的地方通过 Request 获取 app 实例,然后访问 state

或者使用依赖注入 (Depends) 如果你需要将 manager 作为参数传递给其他函数

Example using Request (simple for demonstration)

@app.websocket(“/ws/jsonchat_state/{client_id}”)
async def websocket_json_endpoint_state(websocket: WebSocket, client_id: int, request: Request):
manager: ConnectionManager = request.app.state.manager # Access manager from app state

await manager.connect(websocket)
try:
    await manager.broadcast_json({"type": "status", "message": f"Client #{client_id} joined the chat"})

    while True:
        data = await websocket.receive_json()
        # ... (Pydantic parsing and broadcasting logic as before) ...
        try:
            message = Message(**data)
            await manager.broadcast_json({
                "type": "message",
                "sender_id": client_id,
                "text": message.text,
            })
        except Exception as e:
            print(f"Error parsing message: {e}")
            await manager.send_personal_json({"type": "error", "message": "Invalid message format"}, websocket)

except WebSocketDisconnect:
    manager.disconnect(websocket)
    await manager.broadcast_json({"type": "status", "message": f"Client #{client_id} left the chat"})
except Exception as e:
     print(f"An unexpected error occurred: {e}")
     manager.disconnect(websocket)
     await manager.broadcast_json({"type": "status", "message": f"Client #{client_id} experienced an error and left."})

“`

ConnectionManager 存储在 app.state 中,使其成为应用生命周期的一部分,并且可以在不同的请求/连接处理函数中方便地访问。

5.3 WebSocket 的认证与授权

WebSocket 连接的认证和授权通常在握手阶段进行。你可以通过检查请求的头部(如 Authorization)、Cookie、查询参数等来验证用户身份。一旦连接建立,你可以在 WebSocket 对象中存储用户身份信息,并在后续的消息处理中使用。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, status

… (ConnectionManager and Message classes)

app = FastAPI()
manager = ConnectionManager() # Or use app.state

WebSocket 端点带简单的 Token 认证 (示例)

@app.websocket(“/ws/auth/{client_id}”)
async def websocket_auth_endpoint(
websocket: WebSocket,
client_id: int,
# 通过查询参数传递 token (不安全,仅为示例)
# 更安全的做法是通过 HTTP 握手时的 Header 或 Cookie 传递
token: str = Query(…)
):
# 简单的 token 验证逻辑 (替换成实际的认证逻辑)
if token != “fake-super-secret-token”:
# 拒绝连接,发送关闭码和原因
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=”Invalid token”)
return # 验证失败则不再继续处理

# 认证成功,接受连接并添加到管理器
await manager.connect(websocket)
print(f"Client #{client_id} authenticated and connected.")

try:
    await manager.broadcast_json({"type": "status", "message": f"Client #{client_id} joined (authenticated)"})

    while True:
        data = await websocket.receive_json()
        # ... (Process and broadcast messages as before) ...
        try:
            message = Message(**data)
            # 在广播消息中包含发送者的 ID
            await manager.broadcast_json({
                "type": "message",
                "sender_id": client_id, # Use the authenticated client_id
                "text": message.text,
            })
        except Exception as e:
            print(f"Error processing message from client #{client_id}: {e}")
            await manager.send_personal_json({"type": "error", "message": "Invalid message format"}, websocket)


except WebSocketDisconnect:
    manager.disconnect(websocket)
    await manager.broadcast_json({"type": "status", "message": f"Client #{client_id} left"})
except Exception as e:
     print(f"An unexpected error occurred with client #{client_id}: {e}")
     manager.disconnect(websocket)
     await manager.broadcast_json({"type": "status", "message": f"Client #{client_id} experienced an error and left."})

“`

在此示例中,我们通过查询参数接收一个 token。在 await websocket.accept() 之前,我们检查 token 是否有效。如果无效,我们调用 await websocket.close() 并返回一个适当的关闭码和原因来拒绝连接。请注意,通过查询参数传递敏感信息是不安全的,实际应用应使用更安全的方式进行认证(如在 HTTP 握手阶段使用 Header 或 Cookie)。

5.4 使用 Pub/Sub 实现分布式 WebSocket 应用

如果你的应用部署在多个服务器实例上(例如,使用 Kubernetes 部署多个 Pod),每个实例都有自己的 ConnectionManager,那么简单的广播将无法覆盖所有客户端。此时,你需要一个 Pub/Sub 系统(如 Redis Pub/Sub, Kafka, RabbitMQ)来协调不同服务器实例之间的消息。

基本思路是:

  1. 每个 FastAPI 实例维护其本地的 ConnectionManager
  2. 当一个实例接收到消息时,它将消息发布到 Pub/Sub 系统的特定频道。
  3. 所有其他 FastAPI 实例都订阅这个频道。
  4. 当一个实例从 Pub/Sub 系统收到消息时,它将该消息通过其本地的 ConnectionManager 广播给它所管理的连接。

这超出了本文基础范围,但理解这一点对于构建可扩展的实时应用至关重要。

6. 安全性与部署考虑

6.1 安全性

  • 身份验证和授权: 如前所述,在握手阶段验证客户端身份,并确保客户端有权限连接到该 WebSocket 端点。
  • 输入验证: 始终校验从客户端收到的数据,无论它是文本、二进制还是 JSON。使用 Pydantic 是一个好方法。
  • Origin 检查: 检查 WebSocket 请求的 Origin 头部,只允许来自你信任的域的连接。
  • 速率限制: 防止单个客户端发送过多消息,避免滥用和 DoS 攻击。这可以在业务逻辑层面或通过反向代理实现。
  • 数据加密: 始终使用 wss:// 协议(基于 TLS/SSL 的 WebSocket),确保数据在传输过程中加密。这通常由反向代理处理。

6.2 部署

  • 服务器: FastAPI 应用需要一个 ASGI 服务器来运行,如 Uvicorn 或 Hypercorn。确保你使用的服务器支持 WebSocket。
  • 反向代理: 在生产环境中,通常会在 FastAPI 应用前面部署一个反向代理服务器(如 Nginx, Traefik)。反向代理负责处理 TLS 终止、负载均衡、静态文件服务等。配置反向代理时,需要特殊配置以支持 WebSocket 协议升级,并将 WebSocket 流量正确地转发到 FastAPI 应用实例。
  • 负载均衡: 如果部署多个 FastAPI 实例,需要负载均衡器。对于 WebSocket,如果客户端的连接需要维持状态(例如,在哪个实例上建立了连接),可能需要使用“粘性会话”(Sticky Sessions),确保同一客户端的后续请求(包括 WebSocket 消息)总是被路由到同一个后端实例。
  • 资源管理: WebSocket 连接是持久的,每个连接都会占用服务器资源(内存、文件句柄)。在高并发场景下,需要关注服务器的资源使用情况,并考虑水平扩展。

7. 总结

WebSocket 技术为构建实时、双向通信的 Web 应用提供了强大的支持。FastAPI 凭借其现代化的异步框架和对 ASGI 的良好集成,使得在 Python 中开发 WebSocket 服务变得前所未有的简单和高效。

本文详细介绍了 FastAPI 中 WebSocket 的基础知识,包括端点的定义、连接的接受、数据的发送和接收,以及如何优雅地处理断开连接。我们还通过构建一个简单的聊天室示例,演示了如何管理多个活动连接并实现消息广播。最后,我们探讨了一些进阶话题,如 JSON 和 Pydantic 的结合、连接状态管理、基本的认证考虑以及分布式部署的挑战。

掌握 FastAPI WebSocket,你将能够轻松构建出高性能、实时交互的 Web 应用,无论是聊天室、实时仪表盘、协作工具还是在线游戏,FastAPI 都能为你提供坚实的后端支持。

希望这篇详细的教程能够帮助你更好地理解和使用 FastAPI 中的 WebSocket。开始你的实时应用开发之旅吧!


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部