FastAPI WebSockets:从入门到实战 – wiki基地


FastAPI WebSockets:从入门到实战

在现代 Web 应用中,实时通信已经从一种高级特性转变为许多核心功能的需求,例如在线聊天、实时数据仪表盘、多人协作应用、游戏以及即时通知等。传统的 HTTP 请求-响应模式在这种场景下效率低下,因为它需要客户端不断地进行轮询才能获取最新数据。WebSockets 提供了一种更高效、更低延迟的双向通信方式。

FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建基于标准 Python 类型提示的 APIs。其内置的对异步编程(asyncio)的良好支持,使其成为构建高性能 WebSocket 应用的理想选择。

本文将带你从零开始,深入理解 FastAPI 中的 WebSockets,从基本概念到构建一个简单的实时聊天应用,最终探讨一些高级话题和最佳实践。

第一部分:理解 WebSockets

1.1 什么是 WebSockets?

WebSockets 是一种在单个 TCP 连接上进行全双工通信的协议。这意味着服务器和客户端可以随时向对方发送数据,而无需像 HTTP 那样每次都发起新的请求。

核心特点:

  • 持久连接: 一旦建立,连接会保持打开状态,直到一方主动关闭或发生错误。
  • 全双工: 数据可以同时在客户端和服务器之间流动。
  • 低延迟: 避免了 HTTP 的连接建立、请求头发送等开销,数据传输更快。
  • 协议升级: WebSocket 连接通常是通过 HTTP/1.1 的 Upgrade 机制从一个标准的 HTTP 连接升级而来的。

1.2 为什么在 FastAPI 中使用 WebSockets?

  • 异步支持: FastAPI 基于 Starlette 和 Pydantic,天然支持 Python 的 asyncio。WebSockets 的操作(连接、发送、接收)本质上是异步的,FastAPI 的设计与之完美契合,可以高效地处理大量并发连接。
  • 高性能: 得益于 Starlette 和 Uvicorn(一个 ASGI 服务器),FastAPI 提供了出色的性能,这对于需要处理高并发 WebSocket 连接的应用至关重要。
  • 易于使用: FastAPI 提供了简洁的 API 来定义 WebSocket 终端,使用类型提示使得代码易于阅读和维护。
  • 集成性: 可以轻松地将 WebSockets 与 FastAPI 的其他功能(如依赖注入、安全性、数据验证等)结合使用。

1.3 WebSockets 与 HTTP 的区别

特性 HTTP/1.1 WebSockets
通信方式 请求-响应(半双工) 全双工
连接 通常是短连接(除非使用 Keep-Alive),每次请求可能需要重新建立连接 长连接,一旦建立,保持打开直到关闭
开销 每次请求都有头部开销 初始连接建立开销,之后数据帧开销小
数据传输 客户端发起请求,服务器响应 客户端和服务器都可以随时发起数据传输
用途 无状态请求、资源获取、API 调用 实时数据传输、通知、聊天、多人应用等
状态 无状态(或使用 Session/Cookies 模拟状态) 有状态(连接本身代表一个状态)

对于需要实时、低延迟双向通信的场景,WebSockets 是更优的选择。

第二部分:FastAPI WebSocket 入门

2.1 安装必要的库

你需要安装 FastAPI、一个 ASGI 服务器(如 uvicorn)以及 websockets 库(FastAPI 内部使用它来处理 WebSocket 连接)。

bash
pip install fastapi uvicorn websockets

2.2 创建一个基本的 WebSocket 终端

创建一个 Python 文件(例如 main.py),编写以下代码:

“`python

main.py

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

定义一个 WebSocket 终端

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 接受客户端的连接
await websocket.accept()
print(“WebSocket connection accepted.”)

try:
    # 循环接收客户端发送的消息
    while True:
        # 接收文本消息
        data = await websocket.receive_text()
        print(f"Received message: {data}")

        # 向客户端发送一个回复
        await websocket.send_text(f"Message text was: {data}")

except WebSocketDisconnect as e:
    # 处理 WebSocket 断开连接事件
    print(f"WebSocket connection closed: {e.code}, {e.reason}")
except Exception as e:
    # 处理其他可能的错误
    print(f"An error occurred: {e}")
finally:
    # 可选:连接断开后执行清理工作
    print("WebSocket connection terminated.")

“`

代码解释:

  • from fastapi import FastAPI, WebSocket, WebSocketDisconnect: 导入 FastAPI 应用类、WebSocket 类型以及 WebSocketDisconnect 异常。
  • app = FastAPI(): 创建一个 FastAPI 应用实例。
  • @app.websocket("/ws"): 这是一个装饰器,将下面的异步函数 websocket_endpoint 注册为处理 /ws 路径的 WebSocket 连接。
  • async def websocket_endpoint(websocket: WebSocket):: 定义一个异步函数来处理 WebSocket 连接。FastAPI 会将代表当前连接的 WebSocket 对象作为参数传递给它。
  • await websocket.accept(): 这是一个关键步骤。当客户端尝试连接时,服务器需要通过调用 accept() 方法来接受连接。这标志着 WebSocket 握手完成,连接正式建立。
  • while True:: 进入一个无限循环,以便持续接收客户端发送的消息。
  • data = await websocket.receive_text(): 异步地等待并接收客户端发送的文本消息。如果接收到的是其他类型的帧(如二进制),或者连接关闭,此方法可能会抛出异常。
  • await websocket.send_text(f"Message text was: {data}"): 异步地向当前客户端发送一个文本消息。
  • except WebSocketDisconnect as e:: 这是一个重要的异常处理块。当客户端正常或异常断开连接时,receive_text()(或其他 receive_* 方法)会抛出 WebSocketDisconnect 异常。捕获此异常可以让你优雅地处理连接关闭事件。异常对象 e 包含关闭的代码和原因。
  • except Exception as e:: 捕获其他潜在的错误,例如发送/接收数据时发生的网络错误等。
  • finally:: 无论是否发生异常,这个块的代码都会执行,可以用来执行一些清理操作。

2.3 运行服务器

打开终端,进入 main.py 所在的目录,运行 Uvicorn 服务器:

bash
uvicorn main:app --reload

Uvicorn 会启动服务器,默认监听 http://127.0.0.1:8000

2.4 创建一个简单的 WebSocket 客户端(HTML/JavaScript)

为了测试服务器,你需要一个 WebSocket 客户端。最简单的方法是使用浏览器中的 JavaScript。创建一个 HTML 文件(例如 client.html):

“`html





FastAPI WebSocket Client

FastAPI WebSocket Test


Messages:



“`

代码解释:

  • new WebSocket("ws://127.0.0.1:8000/ws"): 创建一个 WebSocket 对象,尝试连接到指定的 URL。ws:// 是 WebSocket 的标准协议头,对应 HTTP 的 http://
  • ws.onopen: 连接成功建立时执行的函数。
  • ws.onmessage: 收到服务器消息时执行的函数。event.data 包含收到的数据。
  • ws.onerror: 连接发生错误时执行的函数。
  • ws.onclose: 连接关闭时执行的函数。event.wasClean 表示是否为正常关闭,event.codeevent.reason 提供关闭信息。
  • ws.send(message): 向服务器发送数据。
  • window.onbeforeunload: 在页面关闭或刷新前尝试关闭 WebSocket 连接,这是一种良好的实践。

2.5 测试连接

  1. 确保 Uvicorn 服务器正在运行。
  2. 用浏览器打开 client.html 文件。
  3. 打开浏览器的开发者工具(通常按 F12),查看控制台输出。
  4. 在页面输入框中输入消息,点击“Send”。
  5. 观察浏览器控制台、页面消息区域以及服务器终端的输出。你会看到消息被发送、接收并回复的过程。

恭喜!你已经成功建立并测试了一个基本的 FastAPI WebSocket 连接。

第三部分:处理多种消息类型和多个连接

前面的例子只能处理文本消息,并且只能与单个客户端通信。在实际应用中,你需要处理 JSON 数据并管理多个并发连接。

3.1 发送和接收 JSON 数据

在实时应用中,数据通常是结构化的,JSON 是常用的格式。FastAPI 的 WebSocket 对象提供了方便的方法来处理 JSON。

“`python

main.py (修改部分)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict, Any

app = FastAPI()

@app.websocket(“/ws/json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
print(“JSON WebSocket connection accepted.”)

try:
    while True:
        # 接收 JSON 消息
        data: Dict[Any, Any] = await websocket.receive_json()
        print(f"Received JSON message: {data}")

        # 验证接收到的数据(可选,但推荐)
        if isinstance(data, dict) and "message" in data:
            message_text = data["message"]
            # 向客户端发送 JSON 回复
            await websocket.send_json({"status": "success", "received_message": message_text})
        else:
             await websocket.send_json({"status": "error", "message": "Invalid JSON format, expected {'message': '...'}"})

except WebSocketDisconnect as e:
    print(f"JSON WebSocket connection closed: {e.code}, {e.reason}")
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    print("JSON WebSocket connection terminated.")

“`

客户端 (HTML/JavaScript 修改):

你需要修改客户端来发送和接收 JSON。

“`html





FastAPI WebSocket JSON Client

FastAPI WebSocket JSON Test


Messages:



“`

要点:

  • 服务器端使用 await websocket.receive_json() 接收 JSON,使用 await websocket.send_json(data) 发送 JSON。FastAPI/Starlette 会自动处理 JSON 的解析和序列化。
  • 客户端 JavaScript 需要手动使用 JSON.stringify() 将 JavaScript 对象转换为 JSON 字符串发送,并使用 JSON.parse() 将收到的 JSON 字符串解析回 JavaScript 对象。

3.2 管理多个连接和广播消息

一个 WebSocket 服务器通常需要同时处理来自多个客户端的连接,并可能需要向所有(或部分)客户端广播消息。你需要一种机制来跟踪所有活跃的连接。

一个简单的方法是使用一个列表来存储所有的 WebSocket 对象。为了更好地组织代码,可以创建一个 ConnectionManager 类。

“`python

main.py (再次修改)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

ConnectionManager 类用于管理所有活跃的 WebSocket 连接

class ConnectionManager:
def init(self):
# 存储所有活跃的 WebSocket 连接
self.active_connections: List[WebSocket] = []

# 添加新的连接
async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections.append(websocket)
    print(f"New connection added. Total active connections: {len(self.active_connections)}")

# 移除断开的连接
def disconnect(self, websocket: WebSocket):
    try:
        self.active_connections.remove(websocket)
        print(f"Connection removed. Total active connections: {len(self.active_connections)}")
    except ValueError:
        # 如果连接不在列表中,说明可能已经被移除(例如在另一个地方处理了断开)
        print("Attempted to remove a connection that was not in the active list.")


# 向所有活跃连接广播文本消息
async def broadcast(self, message: str):
    # 注意:如果在循环发送消息时某个连接断开,可能会抛出异常。
    # 更健壮的做法是捕获异常并在异常处理中移除连接。
    # 这里的简单实现假设发送过程中不会频繁断开。
    # 稍后在聊天室例子中会展示更健壮的处理方式。
    for connection in self.active_connections:
        try:
            await connection.send_text(message)
        except Exception as e:
            # 如果发送失败,说明连接有问题,移除它
            print(f"Error sending to a connection: {e}. Removing connection.")
            # 这里的移除是同步的,如果在 async for 循环中修改列表,可能会有问题
            # 更好的方法是收集要移除的连接列表,然后在循环外部移除
            # 为了简单,这里暂时不处理并发修改列表的问题,后续例子会改进
            pass # 在简单广播中先忽略错误,或在更复杂场景下处理

# 向所有活跃连接广播 JSON 消息
async def broadcast_json(self, data: dict):
     for connection in self.active_connections:
        try:
            await connection.send_json(data)
        except Exception as e:
            print(f"Error sending JSON to a connection: {e}. Removing connection.")
            pass # 同上,简单处理

创建一个全局的连接管理器实例

manager = ConnectionManager()

修改 /ws 终端使用连接管理器

@app.websocket(“/ws/broadcast”)
async def websocket_broadcast_endpoint(websocket: WebSocket):
# 使用管理器来接受并存储连接
await manager.connect(websocket)
try:
while True:
# 接收客户端发送的消息
data = await websocket.receive_text()

        # 将收到的消息广播给所有连接
        await manager.broadcast(f"Client says: {data}")

except WebSocketDisconnect as e:
    # 连接断开时,从管理器中移除
    manager.disconnect(websocket)
    # 可选:广播一条消息通知大家有人离开了
    await manager.broadcast(f"A client left the chat. Code: {e.code}")
except Exception as e:
     print(f"An error occurred in broadcast endpoint: {e}")
     # 如果是发送错误导致断开,尝试移除连接
     try:
          manager.disconnect(websocket)
     except:
          pass # 忽略移除失败
     await manager.broadcast(f"A client encountered an error and disconnected.")

“`

代码解释:

  • ConnectionManager 类:
    • active_connections: 一个列表,用来保存所有连接到服务器的 WebSocket 对象。
    • connect(websocket): 接受一个新的 WebSocket 连接,并将其添加到 active_connections 列表中。
    • disconnect(websocket): 从 active_connections 列表中移除一个断开的连接。
    • broadcast(message) / broadcast_json(data): 遍历 active_connections 列表,并向列表中的每个 WebSocket 对象发送消息。
  • manager = ConnectionManager(): 在应用启动时创建一个 ConnectionManager 的单例实例。
  • @app.websocket("/ws/broadcast") 终端函数中:
    • 连接建立时,调用 await manager.connect(websocket)
    • 在消息接收循环中,接收到消息后,不再是回复给发送者,而是调用 await manager.broadcast(...) 将消息发送给所有连接。
    • WebSocketDisconnect 异常块中,调用 manager.disconnect(websocket) 移除断开的连接,并可选地广播一条通知消息。

客户端 (HTML/JavaScript 修改):

你需要修改客户端来连接新的终端,并且不再期望收到发送消息后的直接回复,而是接收来自服务器的广播消息。

“`html





FastAPI WebSocket Broadcast Client

FastAPI WebSocket Broadcast Test


Messages:



“`

测试广播:

  1. 确保服务器运行新的代码 (uvicorn main:app --reload)。
  2. 打开两个或多个浏览器窗口,都打开 client_broadcast.html
  3. 在一个窗口输入消息并发送。
  4. 观察所有打开的窗口,它们都应该收到并显示这条消息(由服务器广播)。

这个例子展示了如何使用一个简单的 ConnectionManager 来管理多个连接并实现广播功能。

第四部分:构建一个简单的聊天室应用

现在,我们将把前面学到的概念结合起来,构建一个更实用的简单聊天室应用。每个用户进入聊天室时指定一个用户名,发送的消息会带上用户名广播给其他用户。

“`python

main.py (聊天室应用)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse
from typing import List, Dict
import uuid # 用于生成唯一的客户端ID

app = FastAPI()

稍微改进的 ConnectionManager,存储客户端ID和对应的 WebSocket 对象

class ConnectionManager:
def init(self):
# 使用字典存储连接,key为客户端ID,value为 WebSocket 对象
self.active_connections: Dict[str, WebSocket] = {}

async def connect(self, client_id: str, websocket: WebSocket):
    await websocket.accept()
    self.active_connections[client_id] = websocket
    print(f"Client {client_id} connected. Total: {len(self.active_connections)}")

def disconnect(self, client_id: str):
    if client_id in self.active_connections:
        del self.active_connections[client_id]
        print(f"Client {client_id} disconnected. Total: {len(self.active_connections)}")
    else:
        print(f"Attempted to disconnect client {client_id} not found in active list.")

# 向所有活跃连接广播文本消息
async def broadcast(self, message: str):
    disconnected_clients = []
    for client_id, connection in self.active_connections.items():
        try:
            await connection.send_text(message)
        except WebSocketDisconnect:
             # 捕获断开连接异常
             print(f"Client {client_id} disconnected during broadcast.")
             disconnected_clients.append(client_id)
        except Exception as e:
            # 捕获其他发送错误
            print(f"Error sending to client {client_id}: {e}")
            disconnected_clients.append(client_id)

    # 在广播循环外部移除断开的连接
    for client_id in disconnected_clients:
        self.disconnect(client_id)

manager = ConnectionManager()

提供一个简单的 HTML 页面作为聊天室客户端

html = “””




FastAPI Chat


FastAPI Simple Chat







“””

提供 HTML 页面

@app.get(“/”)
async def get():
return HTMLResponse(html)

聊天室 WebSocket 终端

@app.websocket(“/ws/chat/{client_id}”)
async def websocket_chat_endpoint(websocket: WebSocket, client_id: str):
# 接受连接并添加到管理器
await manager.connect(client_id, websocket)
# 广播用户加入消息
await manager.broadcast(f”System: Client {client_id} joined the chat.”)
try:
while True:
# 接收消息
data = await websocket.receive_text()
# 广播聊天消息 (带上发送者ID)
await manager.broadcast(f”{client_id}: {data}”)

except WebSocketDisconnect:
    # 连接断开,从管理器移除
    manager.disconnect(client_id)
    # 广播用户离开消息
    await manager.broadcast(f"System: Client {client_id} left the chat.")
except Exception as e:
     print(f"An error occurred with client {client_id}: {e}")
     # 如果发生其他错误,也尝试移除连接并广播离开消息
     manager.disconnect(client_id)
     await manager.broadcast(f"System: Client {client_id} encountered an error and left the chat.")

“`

代码解释:

  • 服务器端 (main.py):
    • ConnectionManager 现在使用字典 active_connections: Dict[str, WebSocket] 来存储连接,以客户端 ID(在此例中是用户名)作为键,方便查找和管理特定用户的连接。
    • connect 方法接受 client_id 参数,并在字典中存储 client_id: websocket
    • disconnect 方法根据 client_id 移除字典中的连接。
    • broadcast 方法现在增加了对 WebSocketDisconnect 和其他异常的捕获,并在循环外部批量移除断开的连接,使其更健壮。
    • @app.get("/"): 添加了一个标准的 HTTP GET 终端,用于提供聊天室的 HTML 页面。
    • @app.websocket("/ws/chat/{client_id}"): WebSocket 终端现在包含一个路径参数 {client_id},客户端连接时会提供自己的 ID(用户名)。
    • 在 WebSocket 终端函数中,连接成功后广播 “joined” 消息,接收到消息后广播带有用户名的消息,断开时广播 “left” 消息。异常处理确保连接断开后能正确地从管理器中移除。
  • 客户端端 (内置 HTML 的 JavaScript):
    • 增加了用户名输入框和连接/断开按钮。
    • 用户输入用户名后,点击“Connect”按钮,调用 connectWebSocket() 函数,该函数获取用户名并以此连接到 /ws/chat/{client_id} 终端。
    • updateButtonStates 函数用于根据连接状态启用/禁用输入框和按钮。
    • log 函数用于向聊天框添加消息,并实现自动滚动。
    • ws.onmessage 直接将收到的消息添加到聊天框。
    • sendMessage 函数获取输入框内容并通过 WebSocket 发送。
    • 增加了对回车键发送消息的支持。

运行和测试聊天室:

  1. 确保服务器运行最新的代码 (uvicorn main:app --reload)。
  2. 打开浏览器访问 http://127.0.0.1:8000/
  3. 打开两个或多个浏览器窗口/标签页,访问同一个地址。
  4. 在每个窗口中输入不同的用户名,然后点击“Connect”。
  5. 连接成功后,在一个窗口输入消息并发送。
  6. 观察所有窗口,消息应该以 “[用户名]: [消息内容]” 的格式出现在所有聊天框中。
  7. 尝试关闭一个窗口,其他窗口应该会收到 “[用户名] left the chat.” 的系统消息。

这个聊天室例子虽然简单,但包含了 WebSocket 应用中的核心概念:连接管理、多客户端通信和广播。

第五部分:高级话题与实战考量

构建生产级别的 WebSocket 应用还需要考虑更多因素。

5.1 身份验证和授权

WebSocket 连接本身不像 HTTP 请求那样容易携带 Header(通常是在连接升级时携带一次)。如何在 WebSocket 连接上实现身份验证和授权?

  • 基于查询参数或路径参数: 在连接 URL 中携带 token 或用户 ID(如聊天室例子)。简单但不安全,因为这些信息可能被记录在日志中。
  • 发送初始认证消息: 连接建立后,客户端发送一条包含 token 或凭证的认证消息。服务器验证通过后,才将该连接视为已认证用户,并允许其发送/接收受限消息。这是更推荐的方式。在 ConnectionManager 中可以维护一个已认证连接的列表或字典。
  • 使用 Cookie: 如果 WebSocket 连接与 HTTP 站点在同一域下,浏览器可能会自动发送相关的 Cookie,服务器端可以读取这些 Cookie 进行认证(例如,读取 session ID)。
  • 在 HTTP 握手阶段认证: 利用 HTTP Upgrade 请求携带 Header,服务器在接受 WebSocket 连接前进行认证。Starlette/FastAPI 可以访问到发起连接的 Request 对象,可以在接受连接前检查 Header 或 Cookie。

“`python

示例:在连接前检查 header 中的 token

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Header, Depends, status
from fastapi.exceptions import HTTPException
from typing import Optional

app = FastAPI()

简单的 token 验证函数

async def verify_token(x_token: str = Header(…)):
# 实际应用中,这里会查询数据库或缓存来验证 token 的有效性
if x_token != “fake-super-secret-token”:
# 注意:直接在 websocket endpoint 依赖中抛出 HTTPException 不会像 HTTP endpoint 那样返回标准错误响应
# 连接会直接失败,客户端会收到 1006 错误码
# 更友好的方式是在 accept() 之前手动检查并拒绝
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=”Invalid token”)
return x_token

在 WebSocket 终端中使用依赖(会在 accept() 之前运行)

@app.websocket(“/ws/auth”)
async def websocket_authenticated_endpoint(
websocket: WebSocket,
token: str = Depends(verify_token) # 在连接建立前调用 verify_token
):
# 如果 verify_token 成功,token 变量会被赋值
print(f”Authenticated connection with token: {token}”)
await websocket.accept()
print(“Authenticated WebSocket connection accepted.”)

try:
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Received authorized message: {data}")
except WebSocketDisconnect as e:
    print(f"Authenticated connection closed: {e.code}, {e.reason}")
except Exception as e:
    print(f"An error occurred: {e}")

更直接的在 accept() 前手动检查和拒绝

@app.websocket(“/ws/auth_manual”)
async def websocket_auth_manual_endpoint(websocket: WebSocket):
# 在 accept() 之前获取 headers
headers = websocket.headers
token = headers.get(“x-token”)

if token != "fake-super-secret-token":
    # 如果认证失败,拒绝连接并发送关闭码
    # 1008 (Policy Violation) 是一个合适的关闭码
    await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Authentication failed")
    print("Authentication failed. Connection rejected.")
    return # 结束函数执行,不接受连接

# 如果认证成功,接受连接
print("Authentication successful. Accepting connection.")
await websocket.accept()
print("WebSocket connection accepted.")

try:
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Received authorized message: {data}")
except WebSocketDisconnect as e:
    print(f"Authenticated connection closed: {e.code}, {e.reason}")
except Exception as e:
    print(f"An error occurred: {e}")

``
**注意:** 在
websocket_authenticated_endpoint中使用Depends并抛出HTTPException会导致连接直接中断,客户端收到 1006 错误码(Abnormal Closure),原因通常不明确。而websocket_auth_manual_endpoint中在accept()之前手动检查并调用await websocket.close(…)` 可以发送更具体的关闭码和原因(如 1008 Policy Violation),这通常对客户端调试更有帮助。

5.2 扩展 ConnectionManager

简单列表或字典适用于单进程应用。但对于生产环境,你可能需要:

  • 分组: 按房间、频道或用户组管理连接,以便向特定组广播消息。
  • 持久化/共享状态: 如果你的应用部署在多个服务器实例后面,每个实例都有自己的 ConnectionManager 实例,它们之间无法直接通信。这意味着一个实例收到的消息无法广播给连接到其他实例的客户端。
  • 心跳机制: 客户端和服务器之间可以定期发送小的数据包(心跳)来检测连接是否仍然活跃,即使没有实际数据传输。如果一段时间内没有收到心跳,可以认为连接已断开并进行清理。

5.3 WebSocket 扩展和子协议

WebSockets 协议支持扩展和子协议。子协议用于标识应用层协议(例如,聊天协议、股票行情协议),以便客户端和服务器都能理解消息的格式和语义。扩展可以添加额外的功能,如消息压缩。

在 FastAPI 中,你可以在 accept() 方法中指定子协议:

python
@app.websocket("/ws/subprotocol")
async def websocket_subprotocol_endpoint(websocket: WebSocket):
# 接受连接,并指定支持的子协议
# 客户端请求的子协议必须在服务器支持的列表中
await websocket.accept(subprotocol="myprotocol")
print(f"Accepted with subprotocol: {websocket.subprotocol}")
# ...

客户端也需要在连接时指定子协议:

javascript
// client.html
var ws = new WebSocket("ws://127.0.0.1:8000/ws/subprotocol", "myprotocol");

如果服务器接受了客户端指定的子协议,websocket.subprotocol 会是该协议的名称;否则,websocket.subprotocol 会是 None

5.4 异步发送和接收

while True 循环中,你可能需要同时处理接收消息和在后台发送消息(例如,服务器推送通知)。这需要更复杂的异步协程管理,例如使用 asyncio.gather 或管理多个任务。

“`python
import asyncio

@app.websocket(“/ws/advanced”)
async def websocket_advanced_endpoint(websocket: WebSocket):
await websocket.accept()

async def receive_messages():
    try:
        while True:
            data = await websocket.receive_text()
            print(f"Received: {data}")
            await manager.broadcast(f"Received from a client: {data}") # 广播接收到的消息
    except WebSocketDisconnect:
        print("Receive task detected disconnect.")
    except Exception as e:
        print(f"Receive task error: {e}")

async def send_periodic_messages():
    # 示例:每隔5秒向这个特定客户端发送消息
    try:
        while True:
            await asyncio.sleep(5)
            # 假设你想发送给这个特定的 websocket 实例
            await websocket.send_text("Server periodic update!")
            print("Sent periodic update.")
    except WebSocketDisconnect:
         print("Send periodic task detected disconnect.")
    except Exception as e:
        print(f"Send periodic task error: {e}")

# 创建并运行两个异步任务:一个接收消息,一个发送周期消息
# 当其中一个任务因为连接断开(WebSocketDisconnect)或其他错误结束时,取消另一个任务
receive_task = asyncio.create_task(receive_messages())
send_task = asyncio.create_task(send_periodic_messages())

try:
    # 等待两个任务完成(通常是因为连接断开)
    await asyncio.gather(receive_task, send_task)
except Exception as e:
     print(f"Gather task failed: {e}")
finally:
    # 确保任务被取消
    if not receive_task.done():
        receive_task.cancel()
    if not send_task.done():
        send_task.cancel()
    # 在任务结束后处理连接断开(如果尚未处理)
    # 这里的示例 manager 没有根据 task 结束自动 disconnect
    # 实际应用中,receive_messages 捕获 WebSocketDisconnect 后应该通知 manager 移除连接
    print("WebSocket connection closed by gather.")

``
这个例子展示了如何在一个 WebSocket 终端函数中使用
asyncio.create_taskasyncio.gather` 同时处理接收消息和执行其他异步操作(如定时发送)。在实际应用中,你需要更精细地管理这些任务和连接状态。

5.5 扩展到分布式系统(水平伸缩)

前面提到的 ConnectionManager 只在单个进程中有效。如果部署多个 FastAPI 实例,连接会分散到不同的实例上。要实现跨实例的广播,你需要一个发布/订阅(Pub/Sub)系统。

  • Redis Pub/Sub: 一个常用的选择。当一个实例收到消息需要广播时,它不是直接遍历本地连接列表,而是将消息发布到一个 Redis 频道。其他所有实例都订阅这个频道,收到消息后,再向它们 本地 的连接广播。
  • Kafka, RabbitMQ 等消息队列: 也可以用于构建更复杂的消息路由和处理系统。

“`python

概念示例 (需要安装 redis-py):

pip install redis websockets fastapi uvicorn

main.py (伪代码,需要补充 Redis 连接和 Pub/Sub 逻辑)

import asyncio
import redis
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Dict

… (ConnectionManager 和其他 FastAPI setup) …

假设 Redis 连接已建立

r = redis.asyncio.from_url(“redis://localhost”) # 对于 asyncio

p = r.pubsub()

await p.subscribe(“chat_channel”)

修改 ConnectionManager 以集成 Redis

class DistributedConnectionManager:
def init(self, redis_client):
self.active_connections: Dict[str, WebSocket] = {}
self.redis = redis_client
self.pubsub_task = None # 用于运行 Redis pubsub 监听任务

async def connect(self, client_id: str, websocket: WebSocket):
    await websocket.accept()
    self.active_connections[client_id] = websocket
    print(f"[Instance] Client {client_id} connected.")
    # 可选:在连接时发布一个事件到 Redis
    # await self.redis.publish("chat_events", f"user_joined:{client_id}")

def disconnect(self, client_id: str):
     if client_id in self.active_connections:
        del self.active_connections[client_id]
        print(f"[Instance] Client {client_id} disconnected.")
        # 可选:发布一个事件到 Redis
        # asyncio.create_task(self.redis.publish("chat_events", f"user_left:{client_id}")) # 在 async def 外部使用 create_task


# 这个方法在本实例收到消息时调用
async def process_incoming_message(self, client_id: str, message: str):
     # 收到消息后,发布到 Redis 频道
     redis_message = f"chat_message:{client_id}:{message}"
     print(f"[Instance] Publishing to Redis: {redis_message}")
     await self.redis.publish("chat_channel", redis_message)

# 这个方法由 Redis Pub/Sub 监听任务调用
async def handle_redis_message(self, message):
     if message and message['type'] == 'message':
         data = message['data'].decode('utf-8')
         print(f"[Instance] Received from Redis: {data}")
         # 假设收到的是广播消息,向所有本地连接广播
         await self.broadcast_local(f"[Broadcast from other instance] {data}")


# 只向本实例的连接广播
async def broadcast_local(self, message: str):
    disconnected_clients = []
    for client_id, connection in list(self.active_connections.items()): # 迭代副本防止在循环中修改
        try:
            await connection.send_text(message)
        except (WebSocketDisconnect, RuntimeError, Exception) as e: # 捕获更多发送错误
            print(f"[Instance] Error sending to client {client_id} locally: {e}")
            disconnected_clients.append(client_id)

    for client_id in disconnected_clients:
         self.disconnect(client_id)

async def run_pubsub_listener(self):
     # 运行 Redis Pub/Sub 监听循环
     pubsub = self.redis.pubsub()
     await pubsub.subscribe("chat_channel", "chat_events") # 订阅相关频道
     print("[Instance] Starting Redis Pub/Sub listener.")
     async for message in pubsub.listen():
         # await self.handle_redis_message(message) # 处理收到的消息
         # 为了避免阻塞 pubsub 监听,最好在单独任务中处理
         asyncio.create_task(self.handle_redis_message(message))

在应用启动时连接 Redis 并启动监听任务

@app.on_event(“startup”)
async def startup_event():
# r = redis.asyncio.from_url(“redis://localhost”) # 实际连接
# app.state.redis = r # 将 Redis 连接存储在 app.state 中
# app.state.manager = DistributedConnectionManager(app.state.redis)
# app.state.manager.pubsub_task = asyncio.create_task(app.state.manager.run_pubsub_listener())
print(“App startup: Redis connection and Pub/Sub listener should be started.”)
# 这里仅作示例,实际需要完整的 Redis 集成

在应用关闭时关闭 Redis 连接并取消监听任务

@app.on_event(“shutdown”)
async def shutdown_event():
# if hasattr(app.state, ‘redis’) and app.state.redis:
# await app.state.redis.close()
# if hasattr(app.state, ‘manager’) and app.state.manager.pubsub_task:
# app.state.manager.pubsub_task.cancel()
# try:
# await app.state.manager.pubsub_task # 等待任务取消
# except asyncio.CancelledError:
# print(“Pub/Sub listener task cancelled.”)
print(“App shutdown: Redis connection and Pub/Sub listener should be closed.”)

聊天室终端 (使用分布式管理器)

@app.websocket(“/ws/chat/{client_id}”)
async def websocket_chat_endpoint_distributed(websocket: WebSocket, client_id: str):
# manager = app.state.manager # 获取管理器实例
await manager.connect(client_id, websocket)
# 广播用户加入消息 (通过 Redis)
await manager.process_incoming_message(“System”, f”Client {client_id} joined the chat.”) # 将系统消息也通过广播流程发送

try:
    while True:
        data = await websocket.receive_text()
        # 收到消息后,通过管理器发布到 Redis
        await manager.process_incoming_message(client_id, data)

except WebSocketDisconnect:
    manager.disconnect(client_id)
    # 广播用户离开消息 (通过 Redis)
    await manager.process_incoming_message("System", f"Client {client_id} left the chat.")
except Exception as e:
     print(f"An error occurred with client {client_id}: {e}")
     manager.disconnect(client_id)
     await manager.process_incoming_message("System", f"Client {client_id} encountered an error and left the chat.")

``
这个分布式概念示例展示了如何将 Redis Pub/Sub 集成到
ConnectionManager中。每个 FastAPI 实例都会:
1. 维护自己的本地连接列表。
2. 订阅一个 Redis 频道(例如
chat_channel`)。
3. 当从本地连接收到消息时,将消息发布到 Redis 频道。
4. 当从 Redis 频道收到消息时,向 本地 的所有连接广播这条消息。

这样,即使客户端连接到不同的 FastAPI 实例,它们也能通过 Redis 实现消息的互通。

第六部分:总结与展望

WebSockets 为现代 Web 应用提供了强大的实时通信能力。FastAPI 凭借其异步特性、高性能和简洁的语法,是构建 WebSocket 服务器的优秀选择。

本文从 WebSocket 的基本概念出发,逐步深入到在 FastAPI 中如何建立连接、处理不同类型的消息、管理多个连接以及实现消息广播。通过一个简单的聊天室例子,我们展示了这些概念如何在实际应用中结合。最后,我们探讨了身份验证、连接管理的高级方面、以及如何通过 Pub/Sub 系统实现分布式部署下的 WebSocket 水平伸缩。

要构建一个健壮、高性能、可伸缩的生产级 WebSocket 应用,你还需要深入研究:

  • 更复杂的连接管理策略: 如何按频道、主题或用户角色对连接进行分组。
  • 错误处理和重连: 如何在客户端和服务器端优雅地处理网络错误、连接中断,并实现自动重连。
  • 心跳和 Keep-Alive: 实现机制以检测不活跃或断开的连接。
  • 安全加固: 防止拒绝服务攻击、消息注入等。使用 WSS (WebSocket Secure,基于 TLS/SSL) 是必须的。
  • 性能调优和监控: 使用工具监控连接数、消息吞吐量、延迟等指标。
  • 选择合适的 Pub/Sub 或消息队列: 根据应用规模、吞吐量和可靠性需求选择合适的技术栈(Redis, Kafka, RabbitMQ, NATS 等)。

FastAPI 提供了坚实的基础,但构建复杂的实时系统本身是一个具有挑战性的任务,需要仔细设计和持续优化。希望本文能为你开启 FastAPI WebSocket 开发之旅提供详实的指导。


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部