FastAPI WebSocket 开发指南 – wiki基地


FastAPI WebSocket 开发指南:从基础到实践

随着 Web 应用的日益复杂,用户对实时交互的需求也越来越高。传统的 HTTP 请求/响应模式在这种场景下显得力不从心,因为它本质上是无状态的、单向的(客户端发起,服务器响应)。为了实现服务器向客户端“推送”数据,或者客户端与服务器进行持续的双向通信,我们需要一种新的技术——WebSocket。

FastAPI,作为一款现代、快速(高性能)的 Python Web 框架,提供了对 WebSocket 的优秀支持,使其成为构建实时应用的理想选择。本文将深入探讨如何在 FastAPI 中进行 WebSocket 开发,从基本概念到高级应用,助你掌握这一强大的通信技术。

1. 理解 WebSocket:与 HTTP 的区别

在深入 FastAPI 实现之前,先快速回顾一下 WebSocket 的核心概念以及它与 HTTP 的不同之处:

  • HTTP (HyperText Transfer Protocol):

    • 无状态 (Stateless): 每个请求都是独立的,服务器不保留客户端的上下文信息(除非使用 cookies 或会话机制)。
    • 单向 (Unidirectional): 通信由客户端发起请求,服务器返回响应。服务器无法主动向客户端发送数据,除非客户端先发送请求(如长轮询、服务器发送事件 SSE,但它们各有局限性)。
    • 短连接 (Short-lived): 通常每个请求/响应周期完成后,连接就会关闭(尽管现代 HTTP/1.1 引入了持久连接,但仍然是请求-响应模式)。
    • 头部开销大: 每个请求/响应都包含冗余的头部信息。
  • WebSocket:

    • 有状态 (Stateful): 一旦连接建立,客户端和服务器之间会保持一个持久的连接,直到一方主动关闭。
    • 双向 (Full-duplex): 数据可以在任意时刻由客户端或服务器独立地发送,实现了真正的双向通信。
    • 长连接 (Long-lived): 连接在整个会话期间保持开放,减少了连接建立和断开的开销。
    • 开销小: 握手后,数据帧头部很小,传输效率高。

WebSocket 通过 HTTP 的 101 Switching Protocols 状态码发起握手,成功后,底层协议从 HTTP 升级为 WebSocket 协议。

2. FastAPI 中的 WebSocket 支持

FastAPI 通过其优雅的依赖注入系统和对 ASGI(Asynchronous Server Gateway Interface)标准的支持,使得处理 WebSocket 连接变得非常直观。它底层依赖于像 websocketsstarlette 这样的库来实现 WebSocket 协议。因此,你需要安装 FastAPI、一个 ASGI 服务器(如 Uvicorn)以及 WebSocket 库:

bash
pip install fastapi uvicorn websockets

  • fastapi: 框架本身。
  • uvicorn: 一个快速的 ASGI 服务器,能够运行 FastAPI 应用并支持 WebSocket。
  • websockets: 一个流行的 Python WebSocket 库,FastAPI/Starlette 内部可能会使用它或兼容它的接口。

3. 创建第一个 WebSocket 端点 (Echo Server)

最简单的 WebSocket 应用是“Echo Server”,它接收到任何消息后原样发送回去。这有助于理解基本的连接建立、接收和发送过程。

创建一个 Python 文件,例如 main.py

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn

app = FastAPI()

存储活跃的 WebSocket 连接,尽管这不是生产环境的最佳实践(后面会讨论)

active_websockets: list[WebSocket] = []

定义一个 WebSocket 端点

使用 @app.websocket(“/ws”) 装饰器

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 接受客户端的连接
await websocket.accept()
# 将新的连接添加到列表中
active_websockets.append(websocket)
print(f”客户端已连接: {websocket.client}”)

try:
    # 循环接收客户端发送的消息
    while True:
        # 接收文本消息
        data = await websocket.receive_text()
        print(f"收到消息: {data}")
        # 将收到的消息原样发送回客户端 (Echo)
        await websocket.send_text(f"消息已收到: {data}")

except WebSocketDisconnect:
    # 处理客户端断开连接
    active_websockets.remove(websocket)
    print(f"客户端已断开连接: {websocket.client}")
except Exception as e:
    # 处理其他可能的异常
    print(f"发生错误: {e}")
    # 可选:关闭连接
    # await websocket.close() # 如果发生非正常断开,通常连接已经不可用或会自动关闭

@app.get(“/”)
async def read_root():
return {“message”: “FastAPI WebSocket Example”}

你可以在文件末尾添加这段,方便直接运行

if name == “main“:
# 注意这里的 ws 路径与上面的 @app.websocket(“/ws”) 对应
# 在浏览器中访问 http://127.0.0.1:8000/ 进行测试
# WebSocket 客户端连接到 ws://127.0.0.1:8000/ws
uvicorn.run(app, host=”127.0.0.1″, port=8000)

“`

运行服务器:

打开终端,进入 main.py 所在的目录,运行:

bash
uvicorn main:app --reload

服务器将在 http://127.0.0.1:8000 启动。

测试 WebSocket 连接:

你可以使用多种方式测试 WebSocket 连接:

  1. 浏览器控制台 (JavaScript):
    打开任意一个网页(比如访问 http://127.0.0.1:8000/,然后打开开发者工具的控制台),输入以下 JavaScript 代码:

    “`javascript
    const ws = new WebSocket(“ws://127.0.0.1:8000/ws”);

    ws.onopen = function(event) {
    console.log(“WebSocket 连接成功”);
    ws.send(“Hello Server!”); // 发送消息
    };

    ws.onmessage = function(event) {
    console.log(“收到服务器消息: “, event.data);
    };

    ws.onerror = function(event) {
    console.error(“WebSocket 错误: “, event);
    };

    ws.onclose = function(event) {
    if (event.wasClean) {
    console.log(WebSocket 连接关闭,代码=${event.code} 原因=${event.reason});
    } else {
    console.error(‘WebSocket 连接意外关闭’);
    }
    };

    // 关闭连接
    // ws.close();
    “`
    执行这段代码,你将在控制台看到连接建立、发送消息和接收回显消息的过程,同时在服务器端的终端也能看到相应的打印输出。

  2. websocat 命令行工具:
    安装 websocat (cargo install websocat 或下载预编译版本),然后在终端运行:

    bash
    websocat ws://127.0.0.1:8000/ws

    连接成功后,你输入的任何文本都会被服务器回显。按 Ctrl+C 断开连接。

  3. Postman 或 Insomnia:
    这些 API 测试工具通常也支持 WebSocket 连接,你可以创建一个 WebSocket 请求来测试。

代码解释:

  • @app.websocket("/ws"): 这个装饰器将 /ws 路径注册为一个 WebSocket 端点,而不是一个普通的 HTTP 端点。
  • async def websocket_endpoint(websocket: WebSocket):: 定义一个异步函数来处理 WebSocket 连接。FastAPI 会自动将表示当前连接的 WebSocket 对象作为参数注入到函数中。
  • await websocket.accept(): 在开始接收和发送消息之前,必须先调用 accept() 方法来完成 WebSocket 握手,正式建立连接。
  • while True:: 一个无限循环,用于持续监听客户端发送的消息。
  • data = await websocket.receive_text(): 异步等待并接收客户端发送的下一条文本消息。还有 receive_bytes()receive_json() 方法用于接收字节和 JSON 数据。
  • await websocket.send_text(...): 异步发送文本消息给客户端。还有 send_bytes()send_json() 方法用于发送字节和 JSON 数据。
  • except WebSocketDisconnect:: 这是处理客户端正常或非正常断开连接的推荐方式。当客户端关闭连接时(例如关闭浏览器标签页或调用 ws.close()),会抛出 WebSocketDisconnect 异常。
  • active_websockets.append(websocket) / active_websockets.remove(websocket): 这是最简单的连接管理方式,用一个列表来存储所有活跃的连接对象。注意: 在生产环境中使用全局列表可能存在问题,尤其是在多进程或分布式部署时,因为不同进程/机器上的列表是独立的,无法实现跨进程/机器广播。后面会讨论更高级的解决方案。

4. 发送和接收不同类型的数据

WebSocket 协议本身支持发送文本和二进制数据。FastAPI 封装了这些操作,并额外提供了方便的 JSON 数据处理方法。

  • 文本数据:

    • 接收: await websocket.receive_text()
    • 发送: await websocket.send_text("你的文本消息")
  • 二进制数据:

    • 接收: await websocket.receive_bytes() (返回 bytes 类型)
    • 发送: await websocket.send_bytes(b"你的二进制数据")
  • JSON 数据:

    • 接收: await websocket.receive_json() (自动解析 JSON 字符串为 Python 字典/列表/基本类型)
    • 发送: await websocket.send_json({"key": "value", "number": 123}) (自动将 Python 对象序列化为 JSON 字符串)

示例代码(部分修改 Echo Server):

“`python

… (前面的 imports 和 app = FastAPI()) …

@app.websocket(“/ws/json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
print(f”JSON 客户端已连接: {websocket.client}”)

try:
    while True:
        # 接收 JSON 数据
        data = await websocket.receive_json()
        print(f"收到 JSON 消息: {data}, 类型: {type(data)}")

        # 发送 JSON 数据作为响应
        response_data = {
            "status": "success",
            "received_data": data,
            "message": "JSON 消息已收到并处理"
        }
        await websocket.send_json(response_data)

except WebSocketDisconnect:
    print(f"JSON 客户端已断开连接: {websocket.client}")
except Exception as e:
    print(f"JSON 发生错误: {e}")

… (可以在文件末尾添加 uvicorn.run) …

“`

客户端 JavaScript 示例(针对 /ws/json):

“`javascript
const wsJson = new WebSocket(“ws://127.0.0.1:8000/ws/json”);

wsJson.onopen = function(event) {
console.log(“JSON WebSocket 连接成功”);
// 发送 JSON 数据 (注意需要stringify)
wsJson.send(JSON.stringify({ “action”: “echo”, “payload”: “test json” }));
};

wsJson.onmessage = function(event) {
console.log(“收到服务器 JSON 消息: “, JSON.parse(event.data)); // 需要解析
};

// … (其他 error 和 close 处理同上) …
``
使用
send_json()receive_json()` 是处理结构化数据(如聊天消息、游戏状态更新等)的便捷方式。FastAPI 内部使用 Pydantic 进行 JSON 解析,如果需要更严格的数据验证,可以在接收后使用 Pydantic 模型进行解析。

5. 处理多个客户端 (广播)

在实际应用中,通常会有多个客户端同时连接到服务器。聊天室、实时仪表盘等应用都需要将消息从一个客户端发送给其他(或所有)客户端。

之前我们使用了一个全局列表 active_websockets 来存储连接。现在我们用它来实现一个简单的广播功能。

修改 main.py

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn

app = FastAPI()

使用 set 更合适,因为它能自动处理重复并支持高效的添加/删除

active_websockets: set[WebSocket] = set()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
active_websockets.add(websocket) # 添加到 set
print(f”客户端已连接: {websocket.client}”)

# 可选:通知所有其他客户端有新用户加入
welcome_message = f"新用户加入!当前在线人数: {len(active_websockets)}"
# 将新连接排除在外发送消息
for conn in active_websockets:
     if conn != websocket:
          # 使用 try...except 避免向已失效的连接发送消息导致整个广播中断
          try:
              await conn.send_text(welcome_message)
          except RuntimeError: # 例如,连接在循环中途断开
              print(f"广播时发现失效连接: {conn.client}")


try:
    while True:
        data = await websocket.receive_text()
        print(f"收到消息 {websocket.client}: {data}")

        # 广播消息给所有连接的客户端
        message_to_send = f"客户端 {websocket.client} 说: {data}"
        # 遍历所有活跃连接并发送消息
        # 注意:在遍历集合时修改集合(删除元素)可能会导致问题,更安全的方式是复制一份集合或使用列表推导式
        # 这里为了简单起见,我们直接遍历并处理发送错误
        disconnected_clients = []
        for conn in list(active_websockets): # 遍历 active_websockets 的一个副本
            try:
                await conn.send_text(message_to_send)
            except RuntimeError: # 或者更具体的异常如 websockets.exceptions.ConnectionClosedOK/Error
                # 如果发送失败,说明连接已失效
                disconnected_clients.append(conn)
                print(f"广播消息失败,客户端 {conn.client} 可能已断开")

        # 移除失效的连接
        for conn in disconnected_clients:
            active_websockets.discard(conn) # 使用 discard 即使元素不存在也不会报错

except WebSocketDisconnect:
    active_websockets.discard(websocket) # 使用 discard 安全移除当前连接
    print(f"客户端 {websocket.client} 已断开连接")
    # 通知所有其他客户端有用户离开
    disconnect_message = f"用户离开。当前在线人数: {len(active_websockets)}"
    disconnected_clients = []
    for conn in list(active_websockets):
         try:
              await conn.send_text(disconnect_message)
         except RuntimeError:
              disconnected_clients.append(conn)
              print(f"发送离开消息时发现失效连接: {conn.client}")
    for conn in disconnected_clients:
         active_websockets.discard(conn)


except Exception as e:
    print(f"发生错误处理客户端 {websocket.client}: {e}")
    active_websockets.discard(websocket) # 发生其他错误也尝试移除
    # 尝试关闭连接(如果还开着的话)
    try:
        await websocket.close()
    except RuntimeError:
        pass # 连接可能已经关闭

… (可以在文件末尾添加 uvicorn.run) …

“`

代码解释与注意事项:

  • active_websockets: set[WebSocket] = set(): 改用 set 来存储连接。setadddiscard 操作比列表的 appendremove 在处理大量元素时通常更高效,并且 add 重复元素不会有问题。
  • 广播逻辑: 当收到一条消息时,我们遍历 active_websockets 集合,并尝试向每个连接发送消息。
  • 错误处理 (广播时): 在遍历并发送消息的循环中,必须使用 try...except 块来捕获发送时可能出现的异常(例如,客户端在发送消息的瞬间断开了连接)。如果发送失败,说明该连接已经失效,应该将其从 active_websockets 中移除。
  • 集合遍历与修改: 直接在遍历 active_websockets 的同时调用 active_websockets.discard(conn) 是危险的,可能导致迭代器失效或跳过元素。安全的做法是遍历集合的副本 (list(active_websockets)),然后将需要移除的元素收集起来,在遍历完成后再统一移除。示例中采用了这个策略。
  • WebSocketDisconnect 中的移除:except WebSocketDisconnect: 块中,直接使用 active_websockets.discard(websocket) 来移除当前已断开的连接是安全的。
  • 通知新用户加入/离开: 示例中添加了简单的逻辑,在新连接建立和断开时向其他客户端发送通知。

生产环境中的连接管理和广播:

上述使用全局 set 的方法适用于简单的单进程应用。但在生产环境中,你可能会部署多个 Uvicorn worker 进程,或者在多台服务器上运行应用。在这种情况下:

  • 全局变量无效: 每个进程都有自己独立的 active_websockets 集合,进程之间无法直接通信和共享连接列表。
  • 广播问题: 一个进程收到的消息只能广播给连接到同一个进程的客户端,无法广播给连接到其他进程或机器上的客户端。

解决这个问题需要使用外部的消息队列系统 (Message Queue) 或 发布/订阅 (Pub/Sub) 系统,例如:

  • Redis Pub/Sub: 当一个进程收到消息后,不是直接广播,而是将消息发布到一个 Redis 频道。其他所有进程都订阅这个频道,收到消息后,再将消息发送给它们自己所管理的连接。
  • Kafka, RabbitMQ 等: 也可以用作更强大的消息中间件。

使用这些工具可以实现跨进程/跨机器的 WebSocket 消息广播和协调。FastAPI/Starlette 生态中有一些库可以帮助集成,例如 fastapi-pubsub 或直接使用 Redis 客户端库。

6. 通过路径参数和查询参数传递信息

与 HTTP 端点类似,WebSocket 端点也可以接收路径参数和查询参数。这对于传递连接相关的标识符(如房间 ID、用户 ID)非常有用。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
import uvicorn

app = FastAPI()

假设我们有一个字典来按房间 ID 存储连接

connections_by_room: dict[str, set[WebSocket]] = {}

@app.websocket(“/ws/room/{room_id}”)
async def websocket_room_endpoint(
websocket: WebSocket,
room_id: str, # 从路径参数获取房间 ID
user_id: str = Query(…) # 从查询参数获取用户 ID,Query(…) 表示这是必需的查询参数
):
await websocket.accept()

# 初始化房间连接集合(如果不存在)
if room_id not in connections_by_room:
    connections_by_room[room_id] = set()

connections_by_room[room_id].add(websocket)
print(f"用户 {user_id} 已连接到房间 {room_id}")

# 向房间内所有人(包括新连接)发送加入通知
join_message = {"type": "join", "user": user_id, "message": f"用户 {user_id} 加入了房间"}
await broadcast_to_room(room_id, join_message)

try:
    while True:
        # 接收 JSON 消息(假设聊天消息是 JSON 格式)
        message = await websocket.receive_json()
        print(f"从 {user_id} ({room_id}) 收到消息: {message}")

        # 验证消息格式等 (省略详细验证)
        if "text" in message:
             chat_message = {
                 "type": "chat",
                 "user": user_id,
                 "text": message["text"]
             }
             # 将消息广播到房间内所有客户端
             await broadcast_to_room(room_id, chat_message)
        else:
             # 处理其他类型的消息或无效消息
             print(f"收到无效消息格式从 {user_id}: {message}")


except WebSocketDisconnect:
    connections_by_room[room_id].discard(websocket)
    print(f"用户 {user_id} 已断开连接从房间 {room_id}")

    # 清理空的房间连接集合 (可选)
    if not connections_by_room[room_id]:
        del connections_by_room[room_id]
        print(f"房间 {room_id} 已空,集合已移除")

    # 向房间内剩余用户发送离开通知
    leave_message = {"type": "leave", "user": user_id, "message": f"用户 {user_id} 离开了房间"}
    # 注意:此时当前 websocket 已经从集合中移除,不会收到自己的离开通知
    await broadcast_to_room(room_id, leave_message)

except Exception as e:
    print(f"处理用户 {user_id} ({room_id}) 时发生错误: {e}")
    # 发生其他错误也尝试移除连接
    connections_by_room[room_id].discard(websocket)
    # 尝试关闭连接
    try:
        await websocket.close()
    except RuntimeError:
        pass

辅助函数:向指定房间广播消息

async def broadcast_to_room(room_id: str, message: dict):
if room_id in connections_by_room:
disconnected_clients = []
# 复制集合进行遍历
for conn in list(connections_by_room[room_id]):
try:
# 发送 JSON 数据
await conn.send_json(message)
except RuntimeError: # 或 ConnectionClosedOK/Error
disconnected_clients.append(conn)
print(f”向客户端发送消息失败 {conn.client} 在房间 {room_id}”)

    # 移除失效连接
    for conn in disconnected_clients:
         connections_by_room[room_id].discard(conn)
         print(f"移除失效连接 {conn.client} 从房间 {room_id}")

… (可以在文件末尾添加 uvicorn.run) …

“`

测试:

使用 JavaScript 客户端连接到 ws://127.0.0.1:8000/ws/room/chat_room_1?user_id=Alicews://127.0.0.1:8000/ws/room/chat_room_1?user_id=Bob。他们将在同一个房间中。再连接 ws://127.0.0.1:8000/ws/room/room_b?user_id=Charlie,他将在另一个房间。在 Alice 或 Bob 的客户端发送 JSON 消息 {"text": "你好!"},消息只会广播给房间 chat_room_1 内的用户。

代码解释:

  • @app.websocket("/ws/room/{room_id}"): 定义带有路径参数 room_id 的 WebSocket 端点。
  • websocket_room_endpoint(websocket: WebSocket, room_id: str, user_id: str = Query(...)): 函数签名中接收 WebSocket 对象,以及路径参数 room_id 和查询参数 user_idQuery(...) 使 user_id 成为必需的查询参数。
  • connections_by_room: dict[str, set[WebSocket]]: 使用字典来按房间 ID 组织连接集合。
  • 连接建立时,将连接添加到对应房间的集合中。
  • 收到消息时,调用 broadcast_to_room 辅助函数,向指定房间内的所有连接发送消息。
  • 断开连接时,从房间集合中移除连接。如果房间变空,可选地移除该房间的 entry。
  • broadcast_to_room 函数包含了向集合副本遍历发送消息以及处理发送失败后移除失效连接的健壮逻辑。

7. WebSocket 的依赖注入

FastAPI 强大的依赖注入系统也适用于 WebSocket 端点。你可以像 HTTP 端点一样使用 Depends

“`python
from fastapi import FastAPI, WebSocket, Depends, WebSocketDisconnect, Query
import uvicorn

app = FastAPI()

简单的依赖,例如获取当前连接的用户信息 (实际应用中会更复杂)

async def get_current_user_ws(websocket: WebSocket, user_id: str = Query(…)):
# 实际中会根据 user_id 或其他认证信息查询数据库等
# 这里简化为直接返回用户 ID
if not user_id:
await websocket.close(code=1008) # 1008 表示策略违反,这里用于示例
# raise WebSocketDisconnect(code=1008, reason=”User ID is required”) # 或者抛出异常
return None # 依赖失败

# 可以在这里执行认证/授权逻辑
# 例如:检查 user_id 是否合法,是否有权限连接等
print(f"依赖注入获取到用户ID: {user_id}")
return {"user_id": user_id, "status": "authenticated"} # 返回用户信息字典

存储活跃连接, keyed by user_id for simplicity in this example

active_connections: dict[str, WebSocket] = {}

@app.websocket(“/ws/authenticated”)
async def websocket_authenticated_endpoint(
websocket: WebSocket,
user: dict = Depends(get_current_user_ws) # 使用依赖注入获取用户信息
):
if user is None:
# get_current_user_ws 已经关闭连接,这里直接返回即可
return

user_id = user["user_id"]

# 拒绝重复连接 (同一个 user_id 只允许一个连接)
if user_id in active_connections:
    print(f"拒绝用户 {user_id} 的重复连接")
    await websocket.close(code=1008, reason="Already connected")
    return

await websocket.accept()
active_connections[user_id] = websocket
print(f"用户 {user_id} 已通过认证并连接")

# 通知其他在线用户新用户加入
join_message = {"type": "user_joined", "user_id": user_id}
for uid, conn in list(active_connections.items()):
    if uid != user_id:
        try:
            await conn.send_json(join_message)
        except RuntimeError:
            print(f"发送加入通知失败给 {uid}")


try:
    while True:
        data = await websocket.receive_json()
        print(f"收到用户 {user_id} 的消息: {data}")

        # 处理消息,例如广播给所有其他用户
        broadcast_message = {"type": "message", "from": user_id, "text": data.get("text", "无内容")}
        disconnected_users = []
        for uid, conn in list(active_connections.items()):
             if uid != user_id: # 不发给自己
                 try:
                      await conn.send_json(broadcast_message)
                 except RuntimeError:
                      disconnected_users.append(uid)
                      print(f"广播消息失败给用户 {uid}")

        # 移除失效用户
        for uid in disconnected_users:
             if uid in active_connections:
                 del active_connections[uid]
                 print(f"移除失效用户 {uid}")


except WebSocketDisconnect as e:
    if user_id in active_connections:
         del active_connections[user_id]
    print(f"用户 {user_id} 已断开连接 (代码: {e.code})")

    # 通知其他用户用户离开
    leave_message = {"type": "user_left", "user_id": user_id}
    disconnected_users = []
    for uid, conn in list(active_connections.items()):
        try:
            await conn.send_json(leave_message)
        except RuntimeError:
            disconnected_users.append(uid)
            print(f"发送离开通知失败给 {uid}")

    for uid in disconnected_users:
        if uid in active_connections:
            del active_connections[uid]
            print(f"移除失效用户 {uid}")


except Exception as e:
    print(f"处理用户 {user_id} 时发生错误: {e}")
    if user_id in active_connections:
        del active_connections[user_id]
    # 尝试关闭连接
    try:
        await websocket.close()
    except RuntimeError:
        pass

… (可以在文件末尾添加 uvicorn.run) …

“`
代码解释:

  • get_current_user_ws: 这是一个异步函数,它接收 WebSocket 对象和查询参数 user_id。它模拟了根据连接信息进行认证的过程。如果认证失败(例如 user_id 为空),它可以调用 websocket.close() 并返回 None,或者抛出 WebSocketDisconnect 异常。
  • user: dict = Depends(get_current_user_ws): 在 WebSocket 端点函数中,使用 Dependsget_current_user_ws 的返回值注入到 user 参数中。如果在依赖函数中抛出了异常或关闭了连接,FastAPI 会阻止进入 WebSocket 端点函数体。
  • 通过依赖注入,你可以将认证、授权、获取用户特定信息等逻辑从 WebSocket 端点的主要处理逻辑中分离出来,提高代码的可读性和可维护性。

重要提示:

  • 在依赖函数中,如果因为认证失败需要立即断开连接,应该调用 await websocket.close(code=...) 并可能抛出 WebSocketDisconnect。如果依赖函数返回 None 或其他指示失败的值,你需要确保主处理函数能正确处理这种情况(如示例中的 if user is None: return)。
  • 依赖注入主要发生在连接接受之前(在调用 await websocket.accept() 之前),可以基于握手阶段的信息(如查询参数、Header)进行判断。一旦连接接受并进入 while True 循环,依赖函数通常不会再次运行。

8. WebSocket 连接关闭代码

WebSocket 协议定义了一系列状态码用于表示连接关闭的原因。了解这些代码有助于更好地调试和处理连接断开事件。在 FastAPI 中,WebSocketDisconnect 异常有一个 code 属性,表示关闭码。

一些常见的 WebSocket 关闭码:

  • 1000: Normal Closure (正常关闭) – 客户端或服务器正常关闭连接。
  • 1001: Going Away – 终端设备离开(例如,浏览器标签页关闭,或导航到新页面)。
  • 1002: Protocol error – 端点接收到一个它不能解释的未知帧。
  • 1003: Unsupported Data – 端点接收到了不支持的数据类型(例如,期望文本却收到二进制)。
  • 1005: No Status Rcvd – 未收到关闭状态码(保留,不应由应用程序发送)。
  • 1006: Abnormal Closure – 连接非正常关闭(例如,网络断开,没有干净的握手)。
  • 1007: Invalid frame payload data – 端点收到一个帧,其有效载荷不符合消息类型。
  • 1008: Policy Violation – 策略违反。
  • 1009: Message Too Big – 收到消息过大,无法处理。
  • 1010: Mandatory Ext. – 客户端期望服务器协商一个或多个扩展,但服务器没有。
  • 1011: Internal Error – 服务器遇到意外情况,阻止其完成请求。
  • 1012: Service Restart – 服务器正在重启。
  • 1013: Try Again Later – 服务器负载过高,客户端应稍后再试。
  • 1014: Bad Gateway – 网关或代理收到了无效的响应。

except WebSocketDisconnect as e: 块中,你可以访问 e.codee.reason 来获取关闭的详细信息。在调用 websocket.close() 时,也可以指定关闭码和原因:await websocket.close(code=1000, reason="Goodbye!")

9. 最佳实践和注意事项

  • 异步性: WebSocket 操作(accept, receive_*, send_*, close)都是异步的,务必使用 async def 定义 WebSocket 端点函数,并在调用这些方法时使用 await
  • 错误处理: 始终使用 try...except WebSocketDisconnect 来优雅地处理客户端断开连接。在循环内的发送/接收操作中也应考虑其他潜在异常(如 RuntimeError 或底层的 websockets.exceptions),防止一个客户端的异常影响其他连接。
  • 连接管理: 对于需要向多个客户端发送消息的应用,你需要一个有效的机制来跟踪活跃连接。全局列表/集合适用于入门示例,但在生产环境中应考虑使用 Redis Pub/Sub 等方案来支持多进程/分布式部署。
  • 心跳 (Heartbeat): WebSocket 协议本身支持心跳机制,但有时应用层也需要实现自己的心跳,以检测由于网络原因导致的“假死”连接。服务器可以定期发送一个特殊的“ping”消息,要求客户端回发“pong”。
  • 安全性:
    • 认证与授权:accept() 之前或之后立即进行认证和授权检查,确保只有合法用户才能建立或维持连接。使用依赖注入是个好方法。
    • 输入验证: 接收到客户端数据后,特别是 JSON 数据,进行严格的格式和内容验证,防止恶意或畸形数据导致服务器错误或安全漏洞。可以结合 Pydantic 模型进行验证。
    • 防止拒绝服务 (DoS): 限制单个客户端可以发送消息的频率、消息大小、连接数量等,防止恶意客户端消耗过多服务器资源。
  • 日志记录: 记录连接建立、断开、收发消息以及错误,这对于调试和监控非常重要。
  • 前端集成: 在前端使用原生 JavaScript WebSocket API 或流行的 WebSocket 库(如 Socket.IO,虽然它有自己的协议,但也有基于标准 WebSocket 的实现)来连接和交互。
  • 可伸缩性: 随着用户数量的增加,单个服务器实例可能无法处理。考虑使用负载均衡器分发连接,并通过 Redis Pub/Sub 等协调跨实例的消息。

10. 总结

FastAPI 凭借其基于 ASGI 的设计,为 WebSocket 开发提供了强大且易用的支持。通过 @app.websocket() 装饰器和 WebSocket 对象,你可以轻松地构建实时的、双向通信的 Web 应用。从简单的 Echo Server 到复杂的聊天室或实时数据推送服务,FastAPI 都能很好地胜任。

理解 WebSocket 与 HTTP 的区别、掌握 FastAPI 提供的 accept(), receive_*(), send_*(), close() 方法,并学会如何处理连接管理、广播以及利用依赖注入进行认证,是成功进行 FastAPI WebSocket 开发的关键。

虽然本文提供了详细的指南和示例,但 WebSocket 开发还有更多可以深入探索的领域,例如:

  • 更复杂的房间管理和用户状态。
  • 集成 Redis Pub/Sub 实现分布式广播。
  • 结合 Pydantic 模型进行严格的数据收发验证。
  • WebSocket 压力测试和性能优化。
  • 与特定前端框架(如 React, Vue, Angular)的 WebSocket 库集成。

希望这篇指南能帮助你快速上手 FastAPI WebSocket 开发,并为构建下一代实时 Web 应用打下坚实基础!


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部