Python FastAPI WebSocket 编程终极指南 – wiki基地


Python FastAPI WebSocket 编程终极指南

在现代 Web 开发中,实时通信已成为不可或缺的一部分。从在线聊天、实时通知、股票行情更新到协同编辑工具,用户期望获得即时、动态的交互体验。HTTP 协议的请求-响应模式在这种场景下显得力不从心,而 WebSocket 则以其持久、双向的通信能力,成为了实现这一切的黄金标准。

FastAPI,作为近年来 Python Web 框架的翘楚,凭借其卓越的性能、现代化的异步支持以及建立在 Starlette 和 Pydantic 之上的强大功能,为 WebSocket 编程提供了无与伦比的简洁与高效。

本指南将带你从 WebSocket 的基础概念出发,深入探索 FastAPI 中 WebSocket 的使用方法,涵盖从简单的“Hello, World”到构建复杂的多人聊天室,再到身份验证、连接管理和生产环境部署等高级主题。无论你是初学者还是有经验的开发者,这篇终极指南都将为你提供全面而深刻的见解。

目录

  1. WebSocket 基础知识
    • WebSocket 是什么?为什么需要它?
    • 与 HTTP 的区别
    • 工作流程:握手与数据传输
  2. 环境准备与第一个 WebSocket 连接
    • 安装必要的库
    • 创建你的第一个 WebSocket 端点
    • 编写一个简单的 HTML/JavaScript 客户端进行测试
  3. 深入数据交换
    • 发送和接收文本数据
    • 发送和接收 JSON 数据 (结合 Pydantic)
    • 处理二进制数据
    • 优雅地处理连接断开
  4. 核心模式:连接管理与消息广播
    • 构建一个 ConnectionManager
    • 实现单用户消息发送
    • 实现向所有用户广播消息
    • 实战:构建一个多人在线聊天室
  5. 高级主题与最佳实践
    • WebSocket 中的身份验证与授权
    • 处理连接状态与关闭代码
    • 依赖注入 (Dependencies) 的妙用
    • 跨进程/服务器扩展 (Redis Pub/Sub)
  6. 客户端实现考量
    • 更健壮的 JavaScript 客户端
    • 使用 Python 客户端进行测试或后端通信
  7. 总结

1. WebSocket 基础知识

WebSocket 是什么?为什么需要它?

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。与传统的 HTTP 不同,WebSocket 允许服务器主动向客户端推送信息,也允许客户端随时向服务器发送信息,而无需每次都发起新的请求。

为什么需要它?

  • 低延迟:一旦连接建立,数据帧可以直接在客户端和服务器之间传输,减少了 HTTP 请求中包含的头部开销。
  • 实时性:服务器可以立即将更新推送给客户端,非常适合需要实时数据的应用,如在线游戏、金融交易平台和监控仪表盘。
  • 双向通信:客户端和服务器处于平等地位,都可以是通信的发起方。
  • 状态保持:连接是持久的,这使得服务器可以轻松地跟踪每个连接的状态。

与 HTTP 的区别

特性 HTTP WebSocket
连接模型 无状态,请求-响应 有状态,持久连接
通信方向 单向(客户端发起) 双向(全双工)
延迟 较高(每次请求都有头部开销) 极低(连接建立后开销小)
协议头 http://https:// ws://wss://
适用场景 传统的网页浏览、API 调用 实时聊天、通知、数据流

工作流程:握手与数据传输

WebSocket 连接的生命周期始于一个 HTTP 请求,这个请求被称为“握手”(Handshake)。客户端发送一个特殊的 HTTP GET 请求,其中包含 Upgrade: websocketConnection: Upgrade 等头部信息。

如果服务器支持 WebSocket,它会返回一个 101 Switching Protocols 的响应。此后,底层的 TCP 连接就不再用于 HTTP 通信,而是转变为 WebSocket 数据通道,双方可以通过这个通道自由地发送数据帧。


2. 环境准备与第一个 WebSocket 连接

现在,让我们卷起袖子,用 FastAPI 编写第一个 WebSocket 应用。

安装必要的库

你需要安装 FastAPI、一个 ASGI 服务器(如 Uvicorn),以及 python-websockets 库(FastAPI 在底层使用它来处理 WebSocket)。

bash
pip install fastapi "uvicorn[standard]" websockets

创建你的第一个 WebSocket 端点

创建一个名为 main.py 的文件,并写入以下代码:

“`python

main.py

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

html = “””




Chat

WebSocket Chat





“””

@app.get(“/”)
async def get():
return HTMLResponse(html)

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 接受客户端连接
await websocket.accept()
try:
while True:
# 等待接收客户端发来的消息
data = await websocket.receive_text()
# 向客户端发送消息
await websocket.send_text(f”Message text was: {data}”)
except Exception as e:
print(f”Error: {e}”)
finally:
print(“Client disconnected”)

“`

代码解析

  1. @app.websocket("/ws"):这是一个装饰器,它告诉 FastAPI /ws 路径是一个 WebSocket 端点,而不是一个普通的 HTTP 端点。
  2. async def websocket_endpoint(websocket: WebSocket):端点函数必须是异步的 (async def)。它接收一个 WebSocket 类的实例作为参数,这个对象包含了处理 WebSocket 连接的所有方法。
  3. await websocket.accept():这是建立连接的关键一步。在调用此方法之前,你无法与客户端进行数据交换。你可以在 accept() 之前执行身份验证等逻辑。
  4. while True::我们使用一个无限循环来持续监听来自客户端的消息。
  5. data = await websocket.receive_text():异步等待并接收一条文本消息。
  6. await websocket.send_text(...):异步向客户端发送一条文本消息。
  7. try...finally:这个结构确保了即使在连接异常断开(例如用户关闭浏览器)时,我们也能执行一些清理代码。

编写一个简单的 HTML/JavaScript 客户端进行测试

为了方便,上面的代码已经包含了一个简单的 HTML 页面。这个页面通过根路径 / 提供服务。

运行应用

在终端中运行以下命令:

bash
uvicorn main:app --reload

现在,在浏览器中打开 http://localhost:8000。你会看到一个简单的聊天界面。在输入框中输入任何内容并点击 “Send”,服务器会接收到你的消息,并返回一条格式化后的消息,显示在下方的列表中。你已经成功建立了第一个 FastAPI WebSocket 连接!


3. 深入数据交换

FastAPI 的 WebSocket 对象提供了处理不同数据类型的便捷方法。

发送和接收文本数据

这正是我们在第一个例子中做的:
* await websocket.receive_text()
* await websocket.send_text("some string")

发送和接收 JSON 数据 (结合 Pydantic)

在实际应用中,我们通常交换结构化的 JSON 数据。FastAPI 让这个过程变得异常简单。

  • await websocket.receive_json(): 接收 JSON 数据并将其自动解析为 Python 字典或列表。
  • await websocket.send_json({"key": "value"}): 将 Python 字典或列表序列化为 JSON 字符串并发送。

你可以结合 Pydantic 模型来验证接收到的数据,这体现了 FastAPI 的核心优势。

“`python
from pydantic import BaseModel
from fastapi import WebSocket, WebSocketDisconnect

class Message(BaseModel):
user: str
text: str

… 在你的 WebSocket 端点中 …

@app.websocket(“/ws_json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
try:
# 使用 Pydantic 模型进行验证
message = Message(**data)
response = {“user”: message.user, “text”: f”Echo: {message.text}”}
await websocket.send_json(response)
except Exception:
# 如果数据格式不正确
await websocket.send_json({“error”: “Invalid data format”})
except WebSocketDisconnect:
print(“Client disconnected”)
“`

处理二进制数据

如果你需要传输图片、音频或其他二进制文件,可以使用 bytes 方法:

  • await websocket.receive_bytes()
  • await websocket.send_bytes(b"some binary data")

优雅地处理连接断开

当客户端断开连接时,receive_* 方法会抛出一个 WebSocketDisconnect 异常。捕获这个异常是进行清理工作的最佳时机,比如将用户从活动连接列表中移除。

“`python
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f”Client #{client_id} says: {data}”)
except WebSocketDisconnect:
print(f”Client #{client_id} disconnected”)
# 在这里执行清理操作
“`


4. 核心模式:连接管理与消息广播

在真实世界的应用中,一个 WebSocket 端点通常需要服务多个客户端,并且需要能够将消息从一个客户端广播到其他所有(或部分)客户端。要实现这一点,我们需要一个集中的连接管理器。

构建一个 ConnectionManager

让我们创建一个单例类来管理所有活动的 WebSocket 连接。

“`python

main.py (扩展)

from typing import List

class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections.append(websocket)

def disconnect(self, websocket: WebSocket):
    self.active_connections.remove(websocket)

async def send_personal_message(self, message: str, websocket: WebSocket):
    await websocket.send_text(message)

async def broadcast(self, message: str):
    for connection in self.active_connections:
        await connection.send_text(message)

manager = ConnectionManager()
“`

这个 ConnectionManager 类提供了以下功能:
* connect: 接受新连接并将其存储在列表中。
* disconnect: 从列表中移除断开的连接。
* send_personal_message: 向单个特定的 WebSocket 连接发送消息。
* broadcast: 向所有活动的连接广播消息。

我们创建了一个全局实例 manager,以便在整个应用中共享。

实战:构建一个多人在线聊天室

现在,让我们使用 ConnectionManager 来构建一个功能齐全的聊天室。

“`python

main.py (完整聊天室示例)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import List

app = FastAPI()

HTML 页面部分省略,与第一个示例相同,但 WebSocket URL 需要更新

html = “””… (同上, 但 WebSocket URL 改为 ws://localhost:8000/ws/CLIENT_ID) …”””

class ConnectionManager:
# … (同上) …

manager = ConnectionManager()

@app.get(“/”)
async def get():
# 简单的 HTML 响应,实际应用中会更复杂
return HTMLResponse(“

Visit /chat/{client_id} with a unique client_id

“)

@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
await manager.broadcast(f”Client #{client_id} has entered the chat”)
try:
while True:
data = await websocket.receive_text()
# 向发送者发送回执
# await manager.send_personal_message(f”You wrote: {data}”, websocket)
# 向所有其他人广播消息
await manager.broadcast(f”Client #{client_id} says: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f”Client #{client_id} has left the chat”)

“`

运行与测试

  1. 运行 uvicorn main:app --reload
  2. 在两个或多个不同的浏览器标签页中,分别打开 http://localhost:8000/ws/1http://localhost:8000/ws/2 等(你需要一个能与 WebSocket 交互的客户端,或修改 HTML 让用户输入 client_id)。
  3. 在一个标签页中发送消息,你会看到所有其他标签页都会收到这条消息。当一个标签页关闭时,其他页面会收到它离开的通知。

这个模式是构建几乎所有实时应用的基础。


5. 高级主题与最佳实践

WebSocket 中的身份验证与授权

保护 WebSocket 端点至关重要。由于 WebSocket 握手是基于 HTTP 的,我们可以重用 FastAPI 强大的依赖注入系统。

最常见的方法是通过查询参数传递令牌(token)。

“`python
from fastapi import Depends, Query, HTTPException, status

假设你已经有一个用于验证 token 的函数

async def get_current_user(token: str = Query(…)):
user = fake_users_db.get(token) # 伪代码:从数据库验证 token
if not user:
# 注意:在 WebSocket 中,我们不能直接返回 HTTP 错误
# 抛出异常或直接关闭连接是更好的选择
return None
return user

@app.websocket(“/ws_secure/{client_id}”)
async def websocket_secure_endpoint(
websocket: WebSocket,
client_id: int,
token: str = Query(…)
):
# 简单的 token 验证
if token != “secret-token”:
# 关闭连接并提供原因
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return

await manager.connect(websocket)
# ... (后续逻辑) ...

“`

更优雅的方式是使用 Depends,但这在 WebSocket 中有一点不同。依赖项不能直接返回一个响应。如果验证失败,它应该抛出 HTTPException(Starlette 会将其转换为 WebSocket 关闭帧),或者依赖项本身可以调用 websocket.close()

“`python
async def get_token(websocket: WebSocket, token: str = Query(None)):
if token is None:
await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
return token

@app.websocket(“/ws_secure_depends”)
async def websocket_endpoint_depends(
websocket: WebSocket,
token: str = Depends(get_token)
):
# 此时 token 已经被验证
await websocket.accept(headers=[(b’X-Token’, token.encode())])
# …
“`

处理连接状态与关闭代码

WebSocket 对象有一个 client_stateapplication_state 属性,可以查询其状态(CONNECTING, CONNECTED, DISCONNECTED)。

当你主动关闭连接时,可以使用 await websocket.close(code=1000, reason="Normal closure")。WebSocket 协议定义了一系列的关闭代码,使用它们可以向客户端传达更明确的关闭原因。

代码 名称 描述
1000 NORMAL_CLOSURE 正常关闭
1001 GOING_AWAY 端点正在离开(例如服务器关闭)
1002 PROTOCOL_ERROR 协议错误
1008 POLICY_VIOLATION 收到违反策略的消息

FastAPI 在 fastapi.status 模块中提供了这些代码的常量,例如 status.WS_1000_NORMAL_CLOSURE

跨进程/服务器扩展 (Redis Pub/Sub)

我们之前实现的 ConnectionManager 存在一个巨大限制:它只在单个 Python 进程中有效。当你使用 Gunicorn 或 Uvicorn 运行多个工作进程,或者将服务部署到多台服务器时,每个进程/服务器都有自己独立的连接列表。一个进程中的用户无法向另一个进程中的用户广播消息。

解决方案是使用一个外部消息代理(Message Broker),如 Redis、RabbitMQ 或 Kafka。Redis 的 Pub/Sub(发布/订阅)功能非常适合这个场景。

架构思路

  1. 连接:每个 FastAPI 实例正常接受 WebSocket 连接,并将连接对象保存在其本地的 ConnectionManager 中。
  2. 发布:当一个实例(例如,实例A)收到来自客户端的消息需要广播时,它不再直接遍历本地连接列表。相反,它将这条消息发布(Publish)到 Redis 的一个特定频道(Channel),例如 “chat_channel”。
  3. 订阅:所有 FastAPI 实例在启动时都会订阅(Subscribe)”chat_channel”。
  4. 接收与广播:所有实例(包括实例A自身)都会从 Redis 收到这条消息。收到消息后,每个实例会遍历其本地ConnectionManager 中的连接,并将消息发送给它所管理的所有客户端。

这样,通过 Redis 作为中间人,就实现了跨进程、跨服务器的消息广播。实现这个需要一个异步的 Redis 客户端库,如 redis-py (asyncio extra) 或 aioredis


6. 客户端实现考量

更健壮的 JavaScript 客户端

一个生产级的客户端需要处理更多情况:
* 连接生命周期事件onopen, onmessage, onclose, onerror
* 自动重连:当连接因网络问题意外关闭(onclose 事件),客户端应尝试在一段时间后自动重新连接。
* 心跳检测:为了防止代理或防火墙因长时间不活动而关闭连接,客户端和服务器可以约定定期发送“心跳”消息(ping/pong)。

“`javascript
function connect() {
const ws = new WebSocket(“ws://localhost:8000/ws/my-id”);

ws.onopen = function(event) {
    console.log("WebSocket connection established.");
    // 可以发送一条初始化消息
    ws.send(JSON.stringify({type: "auth", token: "my-secret-token"}));
};

ws.onmessage = function(event) {
    const data = JSON.parse(event.data);
    console.log("Received message:", data);
    // 更新 UI
};

ws.onclose = function(event) {
    console.log("WebSocket connection closed. Code:", event.code, "Reason:", event.reason);
    // 尝试 5 秒后重连
    setTimeout(connect, 5000);
};

ws.onerror = function(error) {
    console.error("WebSocket error:", error);
    ws.close(); // 发生错误时确保关闭
};

}

connect(); // 初始连接
“`

使用 Python 客户端进行测试或后端通信

websockets 库不仅是 FastAPI 的后端依赖,它本身也是一个强大的客户端库。这对于编写自动化测试或实现服务间的 WebSocket 通信非常有用。

“`python
import asyncio
import websockets

async def test_chat():
uri = “ws://localhost:8000/ws/python-client”
async with websockets.connect(uri) as websocket:
await websocket.send(“Hello from Python client!”)

    # 监听服务器消息
    while True:
        try:
            message = await websocket.recv()
            print(f"< Received: {message}")
        except websockets.ConnectionClosed:
            print("Connection closed.")
            break

if name == “main“:
asyncio.run(test_chat())
“`


7. 总结

FastAPI 为 Python 世界的 WebSocket 编程带来了前所未有的简洁与高效。通过本指南,我们学习了:

  • 基础:如何使用 @app.websocket 装饰器创建端点,并使用 accept, receive_*, send_* 方法进行基本通信。
  • 核心模式ConnectionManager 是管理多个连接并实现消息广播的关键设计模式。
  • 高级应用:我们探讨了如何利用 FastAPI 的依赖注入系统实现身份验证,如何优雅地处理连接关闭,以及最重要的——如何通过 Redis 等消息代理来扩展 WebSocket 应用以支持多进程/多服务器部署。
  • 健壮性:无论是前端还是后端,都需要考虑连接的生命周期管理、错误处理和自动重连机制。

WebSocket 是构建现代、高度互动 Web 应用的基石。FastAPI 以其原生的异步支持和优雅的 API 设计,无疑是 Python 开发者拥抱这一技术的最佳选择。现在,你已经掌握了从基础到高级的完整知识体系,是时候去构建你自己的实时应用了!

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部