FastAPI 如何处理 WebSocket 连接:实战指南 – wiki基地


FastAPI 中的实时利器:WebSocket 连接处理深度指南

在现代 Web 应用中,实时通信的需求日益增长。从在线聊天、股票行情更新到协作文档编辑,传统的基于 HTTP 请求/响应模式已经无法满足这些场景。这时,WebSocket 应运而生,它提供了一种在客户端和服务器之间建立持久连接、进行双向通信的标准。

FastAPI,作为一个现代化、高性能的 Web 框架,基于 ASGI 标准构建,天生就支持异步操作,这使得它成为处理 WebSocket 连接的绝佳选择。本文将深入探讨如何在 FastAPI 应用中实现和管理 WebSocket 连接,从基础概念到高级技巧,并提供详细的实战代码示例。

1. 理解 WebSocket:超越 HTTP 的双向桥梁

在深入 FastAPI 的实现之前,让我们先回顾一下 WebSocket 的基本原理。

HTTP 是无状态的请求/响应协议:客户端发送请求,服务器处理并返回响应,然后连接通常关闭(或保持短暂活跃)。每次通信都需要重新建立或重用连接。

WebSocket 则不同。它始于一个特殊的 HTTP 请求(握手阶段)。客户端发送一个带有特定头的 HTTP 请求,请求将连接“升级”到 WebSocket 协议。如果服务器支持并同意,握手成功后,客户端和服务器之间就会建立一个持久的、低延迟的双向连接。一旦连接建立,双方都可以随时发送数据,而无需显式地发起新的请求。

这种持久连接和双向通信的能力,使得 WebSocket 非常适合需要实时数据推送和即时交互的应用。

2. FastAPI 对 WebSocket 的支持:为何如此契合?

FastAPI 构建于 ASGI (Asynchronous Server Gateway Interface) 标准之上,与传统的 WSGI (Web Server Gateway Interface) 不同,ASGI 是为异步 Python Web 应用设计的。WebSocket 本质上是一种异步、事件驱动的通信模式,它与 ASGI 的核心理念高度契合。

FastAPI 利用其底层的 ASGI 框架(如 Uvicorn)来处理 WebSocket 的握手、连接维护以及数据的异步发送和接收。通过简单的装饰器和类型提示,FastAPI 使得在 Python 中实现 WebSocket 服务器变得非常直观和高效。

3. 基础:在 FastAPI 中创建第一个 WebSocket 端点

在 FastAPI 中创建一个 WebSocket 端点非常简单,就像创建 HTTP 端点一样,使用特定的装饰器即可。

首先,确保你已经安装了 FastAPI 以及一个 ASGI 服务器(推荐 Uvicorn)和 WebSocket 库:

bash
pip install fastapi uvicorn websockets

现在,我们可以创建一个基本的 WebSocket “Echo” 服务器,它会将接收到的任何消息发送回客户端。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() # 接受 WebSocket 连接
try:
while True:
data = await websocket.receive_text() # 接收文本消息
await websocket.send_text(f”Message text was: {data}”) # 发送文本消息
except WebSocketDisconnect:
print(“Client disconnected”)
# 连接断开时可以执行清理操作

if name == “main“:
import uvicorn
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`

代码解释:

  1. from fastapi import FastAPI, WebSocket, WebSocketDisconnect: 导入必要的类。WebSocket 类用于表示一个活动的 WebSocket 连接。WebSocketDisconnect 是一个异常,当客户端断开连接时会抛出。
  2. @app.websocket("/ws"): 这个装饰器将一个异步函数标记为一个 WebSocket 端点。路径 /ws 是客户端连接时使用的 URL。
  3. async def websocket_endpoint(websocket: WebSocket):: WebSocket 处理函数必须是异步的 (async def),并且接收一个类型为 WebSocket 的参数,这个参数代表了当前客户端的连接对象。
  4. await websocket.accept(): 在与客户端进行任何通信之前,服务器必须先接受连接。这是一个必要的步骤,它完成了 WebSocket 握手的最后阶段。
  5. try...except WebSocketDisconnect:: WebSocket 连接是持久的,处理函数会一直运行,直到连接中断。客户端断开连接时会抛出 WebSocketDisconnect 异常。我们使用 try...except 块来优雅地处理断开事件。
  6. while True:: 在连接被接受后,进入一个无限循环,持续监听客户端发送的消息。
  7. data = await websocket.receive_text(): 这是一个异步操作,等待客户端发送文本消息。当收到消息时,它返回消息内容。如果客户端发送非文本消息或连接关闭,会抛出异常。
  8. await websocket.send_text(...): 这是一个异步操作,用于向客户端发送文本消息。
  9. print("Client disconnected"): 在捕获到 WebSocketDisconnect 异常时打印一条消息,表示连接已断开。

如何运行和测试?

  1. 保存上述代码为 main.py
  2. 在终端运行 uvicorn main:app --reload
  3. 你需要一个 WebSocket 客户端来测试。可以使用在线 WebSocket 测试工具,或者一个简单的 HTML/JavaScript 页面,或者命令行工具(如 websocat)。

使用 websocat 测试:

bash
websocat ws://localhost:8000/ws

连接成功后,在终端输入文本,按回车发送。你应该会看到服务器将消息原样发回,并在前面加上 “Message text was: “。当你关闭 websocat 或停止服务器时,服务器端会打印 “Client disconnected”。

使用 HTML/JavaScript 测试:

创建一个 index.html 文件:

“`html




WebSocket Echo Test

WebSocket Echo Test




“`

打开 index.html 文件(可以直接在浏览器打开或通过一个简单的 HTTP 服务器提供),在输入框输入文本并点击发送,你会在页面上看到返回的消息。

4. 处理不同类型的数据:文本、JSON 和二进制

除了文本消息,WebSocket 也支持 JSON 和二进制数据。FastAPI 的 WebSocket 对象提供了相应的方法:

  • await websocket.receive_json(): 接收 JSON 消息并自动解析为 Python 字典或列表。
  • await websocket.send_json(data): 发送 Python 字典或列表,自动序列化为 JSON 字符串。
  • await websocket.receive_bytes(): 接收二进制消息。
  • await websocket.send_bytes(data): 发送二进制消息。

你可以通过 receive_text()receive_json() 的返回值类型来判断收到的数据格式,或者使用 receive_text(), receive_json(), receive_bytes() 都可以接收任何类型的数据,但你需要自己处理解析。通常,为了清晰起见,最好约定客户端和服务器之间使用特定格式(如文本或 JSON)。使用 receive_json()send_json() 是处理结构化数据的推荐方式。

JSON 示例:

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json # 虽然send_json/receive_json内置了,但导入json模块备用

app = FastAPI()

@app.websocket(“/ws/json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json() # 接收 JSON
print(f”Received JSON: {data}”)
# 假设客户端发送一个带有 “message” 键的 JSON 对象
message = data.get(“message”, “No message provided”)
await websocket.send_json({“response”: f”Server received: {message}”}) # 发送 JSON
except WebSocketDisconnect:
print(“Client disconnected from /ws/json”)
except Exception as e:
print(f”Error in /ws/json: {e}”)
await websocket.close(code=1011) # 发送一个错误关闭码

客户端可以使用类似以下 JavaScript 发送 JSON:

websocket.send(JSON.stringify({ “message”: “Hello from client!” }));

“`

5. 管理多个连接:构建一个简单的连接管理器

实际应用中,WebSocket 服务器通常需要处理多个同时在线的客户端连接。例如,在一个聊天应用中,服务器接收到一条消息后,需要将这条消息广播给所有连接到同一个聊天室的客户端。

FastAPI 的每个 @app.websocket() 处理函数实例只处理一个客户端连接。要实现广播或向特定客户端发送消息,我们需要在 FastAPI 应用的生命周期中维护一个所有活动连接的列表或字典。

我们可以创建一个简单的 ConnectionManager 类来管理连接:

“`python
from typing import List
from fastapi import WebSocket

class ConnectionManager:
def init(self):
# 存储活动的 WebSocket 连接列表
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
    # 接受连接并在列表中添加新的 WebSocket
    await websocket.accept()
    self.active_connections.append(websocket)
    print(f"Client connected. Total active connections: {len(self.active_connections)}")

def disconnect(self, websocket: WebSocket):
    # 从列表中移除断开的 WebSocket
    self.active_connections.remove(websocket)
    print(f"Client disconnected. Total active connections: {len(self.active_connections)}")

async def send_personal_message(self, message: str, websocket: WebSocket):
    # 向特定客户端发送消息
    await websocket.send_text(message)

async def broadcast(self, message: str):
    # 向所有活动客户端广播消息
    for connection in self.active_connections:
        try:
            await connection.send_text(message)
        except Exception as e:
            # 处理发送失败的情况 (例如,客户端在广播期间断开)
            print(f"Error sending message to a client: {e}")
            # 注意: 如果发送失败,连接可能已经失效,
            # 但WebSocketDisconnect异常会在 receive_* 或 send_* 之外抛出,
            # 更安全的做法是在 receive_* 循环中处理 disconnect 并移除。
            # 对于广播中的瞬时错误,这里捕获即可,失效的连接会在下次 receive 时被清理。

在应用启动时创建 ConnectionManager 实例

manager = ConnectionManager()

现在修改 WebSocket 端点来使用这个管理器

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket(“/ws/chat”)
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket) # 新连接到来时,添加到管理器
try:
while True:
data = await websocket.receive_text()
# 收到消息后,广播给所有连接
await manager.broadcast(f”Client says: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket) # 连接断开时,从管理器移除
await manager.broadcast(f”A client left the chat. Total active connections: {len(manager.active_connections)}”)
except Exception as e:
print(f”Unexpected error: {e}”)
manager.disconnect(websocket) # 异常断开时也移除
await manager.broadcast(f”A client disconnected due to an error. Total active connections: {len(manager.active_connections)}”)

if name == “main“:
import uvicorn
uvicorn.run(app, host=”0.0.0.0”, port=8000)

“`

使用这个管理器,你可以实现:

  • 广播: 例如,当一个用户发送消息时,调用 manager.broadcast() 将消息发送给所有用户(如上面的聊天示例)。
  • 私聊: 假设你有办法标识每个用户(例如,通过认证令牌获取用户 ID),你可以维护一个 dict[str, WebSocket] 来映射用户 ID 到 WebSocket 对象,然后使用 manager.send_personal_message(message, specific_websocket) 发送消息给特定用户。
  • 群组聊天: 维护一个 dict[str, List[WebSocket]] 来映射群组 ID 到该群组的连接列表,实现群组消息的发送。

重要注意事项:

  • 上述 ConnectionManager 是基于内存的。这意味着它只适用于单进程、单线程的 FastAPI 应用。如果你部署多个 Uvicorn worker 或在多台服务器上运行应用,每个 worker 或服务器都会有自己的 ConnectionManager 实例,它们之间无法共享连接信息。
  • 对于分布式或需要水平扩展的应用,你需要使用外部的 Pub/Sub 系统来管理消息和连接状态,例如 Redis Pub/Sub、RabbitMQ、Kafka 等。服务器端收到消息后,将消息发布到一个频道;所有订阅该频道的服务器实例收到消息后,再通过各自管理的 WebSocket 连接发送给相关的客户端。我们会在后面简要讨论这一点。

6. 处理 WebSocket 断开连接与错误

WebSocket 连接可以在任何时候由客户端或服务器关闭。FastAPI 通过抛出 WebSocketDisconnect 异常来通知你客户端连接已断开。如前面的示例所示,使用 try...except WebSocketDisconnect 块是处理这种情况的标准做法。

except WebSocketDisconnect: 块中,你应该执行清理工作,例如从连接管理器中移除该连接。

除了正常的断开,连接也可能因为网络问题或其他异常而中断。捕获更广泛的 Exception 并进行清理是良好的实践。在发生错误时,你也可以选择使用 await websocket.close(code=...) 方法以特定的状态码关闭连接,告知客户端关闭的原因。常用的关闭码包括:

  • 1000: 正常关闭。
  • 1001: 离开(例如,用户离开页面)。
  • 1006: 异常关闭(无状态码)。
  • 1008: 违反协议。
  • 1009: 消息太大。
  • 1011: 服务器遇到意外情况。

7. 高级主题与实战考量

构建健壮的 WebSocket 应用需要考虑更多因素:

7.1 身份验证与授权

WebSocket 连接通常需要知道是哪个用户连接的,以及该用户是否有权限进行特定操作。由于 WebSocket 握手是基于 HTTP 的,你可以在握手阶段利用 HTTP 的认证机制。

常用的方法:

  1. 通过查询参数传递 Token:
    客户端连接时,URL 中包含认证令牌,如 ws://localhost:8000/ws?token=your_auth_token
    在 WebSocket 端点函数中,你可以通过 Query 参数接收令牌:

    “`python
    from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query, HTTPException

    app = FastAPI()

    假设有一个函数来验证 token

    async def verify_token(token: str):
    if token == “valid-token”:
    return {“user_id”: “some_user”}
    else:
    return None

    @app.websocket(“/ws/authenticated”)
    async def websocket_authenticated_endpoint(
    websocket: WebSocket,
    token: str = Query(…) # 通过查询参数获取 token
    ):
    user = await verify_token(token)
    if not user:
    # 在接受连接之前拒绝
    await websocket.close(code=1008) # 违反协议码
    print(“Authentication failed for a client.”)
    return

    # 认证成功,接受连接
    await websocket.accept()
    print(f"Client connected (user: {user['user_id']})")
    
    # 可以将用户信息关联到 WebSocket 对象或在管理器中存储
    # websocket.user = user # WebSocket对象本身没有 user 属性,可以考虑一个字典映射
    # manager.connect(websocket, user_id=user['user_id']) # 在管理器中关联
    
    try:
        while True:
            data = await websocket.receive_text()
            # 处理消息,可能需要检查用户权限
            await websocket.send_text(f"Hello {user['user_id']}, received: {data}")
    except WebSocketDisconnect:
        print(f"Client disconnected (user: {user['user_id']})")
        # manager.disconnect(websocket)
    except Exception as e:
        print(f"Error for user {user['user_id']}: {e}")
        await websocket.close(code=1011)
    

    “`
    注意: 将敏感令牌直接放在 URL 查询参数中不是最安全的方式,因为它可能会被日志记录。

  2. 通过 HTTP 头传递 Token/Cookie:
    WebSocket 握手是 HTTP 请求。客户端可以在发起握手请求时包含 Authorization 头或相关的 Cookie。然而,通过 WebSocket 装饰器直接访问握手时的 HTTP 头并不像在标准的 HTTP 路由中那样直观。一种方法是在一个常规的 HTTP 路由中进行认证,然后返回一个临时令牌或一个可以用于 WebSocket 连接的标识符。或者,可以使用更底层的 ASGI 应用来实现更复杂的握手头处理,但这超出了 FastAPI 装饰器的范畴。

    更常见的做法是在 accept() 之前 检查查询参数或依赖注入的某些验证逻辑(如果能访问到请求对象的话,尽管在 @app.websocket 中直接访问原始请求对象需要一些技巧)。FastAPI 提供了 WebSocket 对象的 headers 属性,但它可能在 accept() 之后 才能完全访问,或者并不包含所有原始握手头。查询参数通常是最直接且在 FastAPI 装饰器内易于访问的方式。对于更严格的安全要求,结合短期令牌和定期重连可能是必要的。

7.2 后台任务发送消息

有时你可能需要在后台任务中(例如,处理完一个耗时操作后,或者响应另一个 HTTP 请求后)向某个或所有连接的客户端发送消息。

由于 WebSocket 连接对象 (websocket) 是在 WebSocket 端点函数中创建的,并且 await websocket.send_text() 等操作是异步的,你不能直接在同步函数或不相关的异步函数中调用它们。你需要获取到活动的 WebSocket 对象,并在一个异步上下文中使用 await 调用其发送方法。

如果使用前面的 ConnectionManager,可以在需要发送消息的地方(例如,另一个 HTTP 路由处理函数中)访问全局或依赖注入的 manager 实例,然后调用其 send_personal_messagebroadcast 方法。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.routing import APIRouter # 导入 APIRouter

假设 ConnectionManager 已经在别处定义并实例化为 manager

from .connections import manager # 或者根据你的文件结构导入

为了示例方便,这里重新定义 ConnectionManager

class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
self.user_connections: dict[str, WebSocket] = {} # 用户ID到连接的映射

async def connect(self, websocket: WebSocket, user_id: str):
    await websocket.accept()
    self.active_connections.append(websocket)
    self.user_connections[user_id] = websocket # 存储用户连接
    print(f"User {user_id} connected. Total: {len(self.active_connections)}")

def disconnect(self, websocket: WebSocket, user_id: str):
    if websocket in self.active_connections:
        self.active_connections.remove(websocket)
    if user_id in self.user_connections and self.user_connections[user_id] == websocket:
         del self.user_connections[user_id]
    print(f"User {user_id} disconnected. Total: {len(self.active_connections)}")

async def send_personal_message(self, message: str, user_id: str):
    connection = self.user_connections.get(user_id)
    if connection:
        try:
            await connection.send_text(message)
        except Exception as e:
             print(f"Failed to send message to user {user_id}: {e}")
             # Consider disconnecting the user if sending fails consistently

async def broadcast(self, message: str):
    for connection in list(self.active_connections): # Iterate over a copy in case list changes during iteration
        try:
            await connection.send_text(message)
        except Exception as e:
             print(f"Failed to send message to a client during broadcast: {e}")
             # The receive loop for this connection should handle actual disconnect

manager = ConnectionManager() # 假设这是一个单例管理器

app = FastAPI()

WebSocket endpoint (assuming authentication happens elsewhere or simpler)

@app.websocket(“/ws/background/{user_id}”)
async def websocket_background_endpoint(
websocket: WebSocket,
user_id: str # Assuming user_id comes from path or auth
):
await manager.connect(websocket, user_id)
try:
while True:
# This endpoint might primarily receive pings or control messages,
# or just keep the connection alive.
data = await websocket.receive_text()
print(f”Received message from {user_id}: {data}”)
# Optionally respond
await websocket.send_text(f”Ack from server: {data}”)
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
print(f”User {user_id} disconnected.”)
except Exception as e:
print(f”Error for user {user_id}: {e}”)
manager.disconnect(websocket, user_id)
await websocket.close(code=1011)

HTTP endpoint that triggers a WebSocket message

@app.get(“/send-to-user/{user_id}”)
async def send_message_http(user_id: str, message: str):
# In a real app, you’d likely perform auth here too
await manager.send_personal_message(f”Message from HTTP: {message}”, user_id)
return {“status”: “message sent (if user connected)”}

@app.get(“/broadcast-from-http”)
async def broadcast_message_http(message: str):
await manager.broadcast(f”Broadcast from HTTP: {message}”)
return {“status”: “broadcast sent”}

if name == “main“:
import uvicorn
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`

在这个例子中,HTTP GET 请求 /send-to-user/{user_id}?message=... 会调用 manager.send_personal_message(),尝试向指定的在线用户发送 WebSocket 消息。请注意,这需要在同一个进程中运行,以便 HTTP 端点和 WebSocket 端点共享同一个 manager 实例。

7.3 扩展性:多进程与分布式部署

如前所述,内存中的 ConnectionManager 无法跨进程或跨服务器工作。要构建可扩展的 WebSocket 应用,你需要一个共享的消息总线。

典型的架构是:

  1. 消息总线 (Pub/Sub): 使用 Redis Pub/Sub, RabbitMQ, Kafka 等作为中央消息发布/订阅系统。
  2. WebSocket 服务器实例: 每个 FastAPI/Uvicorn 实例都管理一部分客户端连接。
  3. 订阅消息: 每个 WebSocket 服务器实例都订阅消息总线上的相关频道(例如,一个全局广播频道,或者按聊天室、用户 ID 划分的频道)。
  4. 发送消息:
    • 当一个 WebSocket 客户端发送消息时,服务器实例收到消息后,不是直接处理(如广播给 本地 连接),而是将消息发布到消息总线上相应的频道。
    • 任何需要发送消息的服务(包括 WebSocket 服务器实例自己、独立的后台 worker、HTTP 处理程序等)都将消息发布到总线。
  5. 接收消息并转发: 订阅了该频道的 所有 WebSocket 服务器实例都会收到总线上的消息。它们检查消息是发给哪些客户端的,并使用 自己管理的 WebSocket 连接将消息发送给这些客户端。

这种架构确保了即使有多个服务器实例,消息也能正确地路由到目标客户端。实现这种架构需要引入额外的库来与消息总线交互(如 redis 库用于 Redis),并且需要更复杂的连接管理逻辑来处理用户与服务器实例之间的映射关系(尽管通常 Pub/Sub 模型可以简化这一点,通过让所有相关实例订阅同一个频道,然后实例自己检查哪些连接应该收到消息)。

这是一个更复杂的领域,超出了本文直接提供完整代码的范围,但理解其原理对于构建生产级的 WebSocket 应用至关重要。

7.4 WebSocket 客户端实现

虽然本文重点是 FastAPI 服务器,但一个完整的 WebSocket 应用需要客户端。客户端可以使用:

  • Web 浏览器: 使用 JavaScript 的 WebSocket API (如前面 HTML 示例所示)。这是最常见的 Web 应用场景。
  • 移动应用: 使用平台提供的 WebSocket 库。
  • 桌面应用: 使用桌面 GUI 框架或标准库/第三方库提供的 WebSocket 功能。
  • Python 脚本: 使用 websockets 库编写 Python 客户端,这对于测试或编写独立的客户端非常有用。

“`python

Python WebSocket Client Example (using websockets library)

import asyncio
import websockets

async def hello():
uri = “ws://localhost:8000/ws” # Change to your WebSocket URL
async with websockets.connect(uri) as websocket:
print(“Connected!”)

    # Send a message
    await websocket.send("Hello, FastAPI!")
    print("Sent: Hello, FastAPI!")

    # Receive a message
    greeting = await websocket.recv()
    print(f"Received: {greeting}")

    # Example of sending multiple messages
    await websocket.send("Another message")
    print("Sent another message")
    response = await websocket.recv()
    print(f"Received: {response}")

async def chat_client(user_id):
uri = f”ws://localhost:8000/ws/chat?user_id={user_id}” # Assuming chat uses query for user_id
async with websockets.connect(uri) as websocket:
print(f”User {user_id} connected to chat.”)

    # Start a task to receive messages
    async def receive_messages():
        try:
            while True:
                message = await websocket.recv()
                print(f"[{user_id}] Received: {message}")
        except websockets.exceptions.ConnectionClosedOK:
            print(f"[{user_id}] Connection closed normally.")
        except websockets.exceptions.ConnectionClosedError as e:
            print(f"[{user_id}] Connection closed with error: {e}")
        except Exception as e:
            print(f"[{user_id}] Unexpected error during receive: {e}")

    receive_task = asyncio.create_task(receive_messages())

    # Send messages from input
    while True:
        try:
            message = await asyncio.get_event_loop().run_in_executor(None, input, "")
            if message.lower() == 'exit':
                break
            await websocket.send(message)
        except websockets.exceptions.ConnectionClosed:
            print(f"[{user_id}] Connection lost.")
            break
        except EOFError: # Handle Ctrl+D
             print(f"[{user_id}] Exiting...")
             break
        except Exception as e:
             print(f"[{user_id}] Error sending message: {e}")
             break


    print(f"[{user_id}] Closing connection.")
    await websocket.close()
    await receive_task # Wait for receive task to finish

if name == “main“:
# asyncio.run(hello()) # Run the echo client example
user = input(“Enter your user ID for chat: “)
asyncio.run(chat_client(user)) # Run the chat client example
“`

7.5 测试 WebSocket 端点

测试 WebSocket 端点比测试 HTTP 端点稍微复杂。你需要一个 WebSocket 客户端库来连接并发送/接收消息。

  • 手动测试: 使用 websocat 或浏览器开发者工具。
  • 自动化测试: 在你的测试代码中,使用 websockets 库作为客户端,连接到 FastAPI 测试客户端启动的服务器(需要配置 FastAPI 的测试客户端来支持 ASGI 应用,httpx 库通常用于测试 FastAPI 的 HTTP 部分,但对于 WebSocket,你需要专门的 WebSocket 客户端)。或者,直接在测试环境中运行 Uvicorn 并在另一个线程/进程中连接。testclient from starlette.testclient supports websockets.

“`python

Example using Starlette’s TestClient (FastAPI built on Starlette)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from starlette.testclient import TestClient

app = FastAPI()

@app.websocket(“/ws/test”)
async def websocket_test_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f”Test response: {data}”)
except WebSocketDisconnect:
pass

client = TestClient(app)

def test_websocket_endpoint():
with client.websocket_connect(“/ws/test”) as websocket:
websocket.send_text(“Hello Test”)
data = websocket.receive_text()
assert data == “Test response: Hello Test”

    websocket.send_text("Another Test")
    data = websocket.receive_text()
    assert data == "Test response: Another Test"

# Connection should be closed after exiting the 'with' block

Run with pytest

pytest your_test_file.py

“`

8. 最佳实践总结

  • 总是 await websocket.accept(): 这是建立连接的第一步,必须在接收或发送任何消息之前调用。
  • 使用 try...except WebSocketDisconnect: 优雅地处理客户端断开连接,并在异常块中执行清理操作(如从管理器移除连接)。
  • 处理其他异常: 除了正常的断开,也要捕获其他可能的异常,防止服务器崩溃,并可能需要记录错误或以错误码关闭连接。
  • 使用连接管理器: 对于需要广播或定向发送的应用,使用一个类来集中管理所有活动的 WebSocket 连接。
  • 考虑数据格式: 根据应用需求选择文本、JSON 或二进制格式,并使用相应的方法 (send_text, receive_json, etc.)。JSON 是处理结构化数据的常见和推荐方式。
  • 身份验证:accept() 之前验证客户端身份,通常通过查询参数或 HTTP 头在握手阶段进行。拒绝未认证的连接。
  • 异步操作: WebSocket 方法 (accept, receive_*, send_*, close) 都是异步的,必须使用 await 调用。
  • 扩展性: 对于需要水平扩展的应用,规划使用外部 Pub/Sub 系统来管理跨进程/服务器的消息和连接状态。
  • 心跳机制 (Optional but Recommended): 对于长时间不发送数据的连接,客户端和服务器可以定期发送小的 ping/pong 帧来保持连接活跃和检测死连接。websockets 库(FastAPI 内部使用的)通常会为你处理基本的 ping/pong。
  • 日志记录: 记录连接的建立、断开、错误以及关键消息,有助于调试和监控。

9. 结论

FastAPI 凭借其异步架构和简洁的 API,使得在 Python 中构建高性能、实时的 WebSocket 应用变得轻松高效。通过理解 WebSocket 的工作原理,掌握 FastAPI 提供的 WebSocket 对象方法,并采用连接管理器等模式来处理多个客户端,你可以构建从简单的实时更新到复杂的在线协作平台等各种应用。

从单进程的内存管理到分布式系统的 Pub/Sub 集成,FastAPI 为 WebSocket 应用提供了坚实的基础。结合异步编程的最佳实践,你可以充分发挥 WebSocket 的潜力,为用户带来更流畅、更具交互性的体验。现在,拿起你的代码编辑器,开始在 FastAPI 中构建你的下一个实时应用吧!


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部