使用 FastAPI 构建 WebSocket 应用 – wiki基地


使用 FastAPI 构建高性能 WebSocket 应用:从入门到实战

在现代 Web 应用中,实时通信变得越来越重要。无论是聊天应用、实时股票行情、多人协作文档还是在线游戏,都离不开服务器与客户端之间的双向、低延迟数据交换。传统的 HTTP 协议是基于请求-响应模式的,无法满足这种实时推送的需求。这时,WebSocket 协议应运而生,它提供了一种在单个 TCP 连接上进行全双工通信的通道。

FastAPI 是一个现代、快速(高性能)的 Web 框架,用于构建 API。它基于标准的 Python 类型提示,能够自动生成 API 文档,并充分利用了 ASGI (Asynchronous Server Gateway Interface) 的异步能力。FastAPI 对 WebSocket 的支持非常友好,使得构建实时应用变得高效且相对简单。

本文将深入探讨如何在 FastAPI 中使用 WebSocket,从基本概念、环境搭建,到构建简单的 Echo 服务器,再到实现一个基础的多用户聊天室,并触及一些高级话题,帮助你全面掌握 FastAPI WebSocket 开发。

1. 理解 WebSocket

在深入 FastAPI 的实现之前,我们先回顾一下 WebSocket 的基本工作原理:

  1. 握手阶段 (Handshake): WebSocket 连接建立的第一步是一个 HTTP 请求。客户端向服务器发送一个带有特定头的 HTTP GET 请求,表明希望将连接升级到 WebSocket 协议。如果服务器支持并同意,它会返回一个特殊的 HTTP 响应,表示握手成功,连接从此切换到 WebSocket 协议。
  2. 数据传输阶段: 握手成功后,客户端和服务器之间就可以通过这个持久化的 TCP 连接自由地、双向地发送数据帧了。数据帧可以是文本格式 (UTF-8 编码) 或二进制格式。
  3. 关闭阶段: 任何一方都可以发送一个特殊的控制帧来发起连接关闭请求。

相比于传统的轮询 (Polling) 或长轮询 (Long Polling),WebSocket 显著减少了通信开销(无需重复建立和关闭连接,头部信息更小),降低了延迟,并且支持真正的服务器推送。

2. 为什么选择 FastAPI 构建 WebSocket 应用?

FastAPI 基于 ASGI 规范,天生支持异步编程。WebSocket 操作(如接收、发送数据)通常是 I/O 密集型的,异步非阻塞的特性使得 FastAPI 在处理大量并发 WebSocket 连接时具有出色的性能。

此外,FastAPI 继承了 Starlette 对 WebSocket 的良好支持,提供简洁的 API 来处理 WebSocket 连接、接收和发送不同格式的数据,以及处理连接断开等事件。结合 FastAPI 强大的依赖注入系统和自动文档生成能力(虽然 WebSocket 端点目前不会自动出现在 OpenAPI 文档中,但其核心机制与 HTTP 端点一致),开发体验非常流畅。

3. 环境准备

在开始之前,确保你的开发环境中安装了 Python (推荐 3.7+)。然后安装 FastAPI 和一个 ASGI 服务器,例如 Uvicorn。为了方便 WebSocket 测试,我们可能还需要一个 WebSocket 客户端库或者直接使用浏览器的开发者工具。

bash
pip install fastapi uvicorn websockets python-multipart

  • fastapi: FastAPI 框架核心。
  • uvicorn: 一个高性能的 ASGI 服务器,用于运行 FastAPI 应用。
  • websockets: 一个 Python WebSocket 库,FastAPI/Starlette 在底层使用了它,并且它本身也可以作为一个独立的客户端或服务器用于测试。
  • python-multipart: 处理文件上传等,虽然不是 WebSocket 直接必需,但 FastAPI 常用依赖。

4. FastAPI WebSocket 基础

在 FastAPI 中创建一个 WebSocket 端点与创建 HTTP 端点类似,使用 @app.websocket() 装饰器即可。

“`python
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() # 接受 WebSocket 连接
# 连接建立后,就可以开始收发数据了
while True:
data = await websocket.receive_text() # 接收文本数据
await websocket.send_text(f”Message text was: {data}”) # 发送文本数据
“`

这段代码定义了一个 /ws 路径的 WebSocket 端点。

  • @app.websocket("/ws"): 装饰器,标记此异步函数为一个 WebSocket 端点,监听 /ws 路径的 WebSocket 请求。
  • async def websocket_endpoint(websocket: WebSocket):: 定义一个异步函数来处理连接。FastAPI 会自动将一个 WebSocket 对象作为参数注入进来,代表了当前的 WebSocket 连接。
  • await websocket.accept(): 这是处理 WebSocket 连接的第一步,必须调用它来接受客户端的连接请求。如果服务器端因为某些原因(如认证失败)不想建立连接,可以不调用 accept(),连接将会被关闭。
  • while True:: 进入一个无限循环,持续监听客户端发送的数据。
  • data = await websocket.receive_text(): 异步地接收客户端发送的文本数据。这是一个阻塞操作,会暂停协程直到接收到数据。receive_text() 专门用于接收文本数据,还有 receive_bytes() 用于接收二进制数据,receive_json() 用于接收 JSON 数据(底层是接收文本/二进制再解析)。
  • await websocket.send_text(...): 异步地向客户端发送文本数据。同样有 send_bytes()send_json() 方法。

5. 处理连接断开

WebSocket 连接可能因为多种原因断开:客户端主动关闭、网络问题、服务器关闭等。当连接断开时,receive_* 方法会抛出 starlette.websockets.WebSocketDisconnect 异常。我们应该捕获这个异常来执行清理工作。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
print(f”Received: {data}”)
await websocket.send_text(f”Echo: {data}”)
except WebSocketDisconnect as e:
# 连接断开时的处理
print(f”WebSocket disconnected: {e.code}, {e.reason}”)
except Exception as e:
# 处理其他可能的异常
print(f”An error occurred: {e}”)
finally:
# 无论连接是否正常断开,都会执行此处的清理工作
print(“Connection closed”)
# 在更复杂的应用中,这里可能需要从连接列表中移除该 websocket
“`

将接收数据的逻辑放在 try...except WebSocketDisconnect 块中,可以优雅地处理连接断开事件。WebSocketDisconnect 异常包含 codereason 属性,提供了断开的原因信息。一个通用的做法是在 finally 块中执行资源清理,例如从活跃连接列表中移除当前 websocket 对象。

6. 示例 1:简单的 Echo 服务器

这是一个完整的 Echo 服务器示例,它接收客户端发送的任何文本消息,然后将相同的消息前面加上 “Echo: ” 发送回去。

main.py:

“`python
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.get(“/”)
async def get():
return {“message”: “Go to /index.html to see the WebSocket example.”}

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
print(“WebSocket connection accepted.”)
try:
while True:
data = await websocket.receive_text()
print(f”Received message: {data}”)
await websocket.send_text(f”Echo: {data}”)
except WebSocketDisconnect as e:
print(f”WebSocket disconnected (Code: {e.code}, Reason: {e.reason})”)
except Exception as e:
print(f”An error occurred: {e}”)
finally:
print(“WebSocket connection closed.”)

if name == “main“:
# 运行应用
# 请确保安装了 uvicorn: pip install uvicorn
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`

客户端测试方法 (使用 Python websockets 库):

安装 websockets 库 (pip install websockets),然后运行以下 Python 脚本:

“`python
import asyncio
import websockets

async def test_websocket():
uri = “ws://localhost:8000/ws”
async with websockets.connect(uri) as websocket:
await websocket.send(“Hello, FastAPI WebSocket!”)
print(f”> Hello, FastAPI WebSocket!”)

    response = await websocket.recv()
    print(f"< {response}")

    await websocket.send("Another message.")
    print(f"> Another message.")

    response = await websocket.recv()
    print(f"< {response}")

asyncio.get_event_loop().run_until_complete(test_websocket())
“`

客户端测试方法 (使用浏览器开发者工具):

  1. 打开浏览器的开发者工具 (通常按 F12)。
  2. 切换到 “Console” 或 “控制台” 选项卡。
  3. 输入以下 JavaScript 代码并执行:

    “`javascript
    var ws = new WebSocket(“ws://localhost:8000/ws”);

    ws.onopen = function(event) {
    console.log(“WebSocket connection opened:”, event);
    ws.send(“Hello from browser!”); // 发送消息
    };

    ws.onmessage = function(event) {
    console.log(“Message from server:”, event.data); // 接收消息
    };

    ws.onerror = function(event) {
    console.error(“WebSocket error observed:”, event);
    };

    ws.onclose = function(event) {
    if (event.wasClean) {
    console.log(WebSocket connection closed cleanly, code=${event.code} reason=${event.reason});
    } else {
    console.error(‘WebSocket connection died’);
    }
    };

    // 可以随时通过 ws.send(“你的消息”) 发送更多消息
    // 可以通过 ws.close() 关闭连接
    “`

运行 main.py (uvicorn main:app --reload),然后运行 Python 客户端脚本或在浏览器控制台执行 JavaScript 代码,你就能看到服务器和客户端之间的消息交互了。

7. 处理多个客户端和广播消息

Echo 服务器只处理单个连接,但实际应用通常需要服务器能够与多个客户端通信,甚至向所有连接的客户端广播消息(例如聊天室)。由于每个 WebSocket 连接都会在服务器端对应一个独立的协程实例来处理,这些协程之间默认是隔离的。为了实现广播或向特定客户端发送消息,我们需要一个机制来跟踪所有活跃的连接,并在需要时遍历它们。

一个常见的模式是创建一个 ConnectionManager 类来管理所有活跃的 WebSocket 连接。

“`python
from typing import List
from fastapi import WebSocket

class ConnectionManager:
def init(self):
# 使用 Set 存储活跃连接,Set 的查找和删除操作通常比 List 更高效
self.active_connections: set[WebSocket] = set()

async def connect(self, websocket: WebSocket):
    """接受新连接并添加到列表中"""
    await websocket.accept()
    self.active_connections.add(websocket)
    print(f"New connection accepted. Total connections: {len(self.active_connections)}")

def disconnect(self, websocket: WebSocket):
    """从列表中移除断开的连接"""
    if websocket in self.active_connections:
         self.active_connections.remove(websocket)
         print(f"Connection disconnected. Total connections: {len(self.active_connections)}")

async def send_personal_message(self, message: str, websocket: WebSocket):
    """向单个客户端发送消息"""
    try:
        await websocket.send_text(message)
    except Exception as e:
        print(f"Error sending message to a connection: {e}")
        # 发送失败可能意味着连接已无效,考虑断开处理
        # self.disconnect(websocket) # 根据实际情况决定是否在此处断开

async def broadcast(self, message: str):
    """向所有活跃客户端广播消息"""
    # 创建一个列表副本,防止在迭代过程中集合被修改(例如,连接断开被移除)
    inactive_connections = []
    for connection in list(self.active_connections):
        try:
            await connection.send_text(message)
        except Exception as e:
            print(f"Error broadcasting to a connection, marking for removal: {e}")
            inactive_connections.append(connection) # 标记发送失败的连接

    # 移除所有发送失败的连接
    for connection in inactive_connections:
         self.disconnect(connection)

“`

这个 ConnectionManager 类维护了一个 set 来存储活跃的 WebSocket 对象。它提供了三个核心方法:
* connect(): 接受新的 WebSocket 连接并将其添加到集合中。
* disconnect(): 从集合中移除一个断开的连接。
* send_personal_message(): 向特定的 WebSocket 对象发送消息。
* broadcast(): 遍历所有连接,向每个连接发送相同的消息。广播时需要注意异常处理,如果向某个连接发送失败,说明该连接可能已经无效,应将其移除。我们通过在一个临时列表中收集无效连接,然后在迭代完成后再移除它们,避免在迭代时修改原集合。

8. 示例 2:基础聊天室应用

使用 ConnectionManager,我们可以构建一个简单的多用户聊天室。

main.py:

“`python
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
import logging # 添加日志模块

配置日志

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)

app = FastAPI()

挂载静态文件目录,用于提供 index.html

app.mount(“/static”, StaticFiles(directory=”static”), name=”static”)

全局 ConnectionManager 实例

class ConnectionManager:
def init(self):
self.active_connections: set[WebSocket] = set()

async def connect(self, websocket: WebSocket):
    await websocket.accept()
    self.active_connections.add(websocket)
    logger.info(f"New connection accepted. Total connections: {len(self.active_connections)}")

def disconnect(self, websocket: WebSocket):
    if websocket in self.active_connections:
         self.active_connections.remove(websocket)
         logger.info(f"Connection disconnected. Total connections: {len(self.active_connections)}")

async def send_personal_message(self, message: str, websocket: WebSocket):
    try:
        await websocket.send_text(message)
    except Exception as e:
        logger.error(f"Error sending personal message: {e}")
        # 在 send_personal_message 中通常不立即断开,因为可能只是瞬时问题
        # 断开处理应主要由 receive_* 抛出 WebSocketDisconnect 触发

async def broadcast(self, message: str):
    inactive_connections = []
    for connection in list(self.active_connections): # Iterate over a copy
        try:
            await connection.send_text(message)
        except Exception as e:
            logger.error(f"Error broadcasting to a connection, marking for removal: {e}")
            inactive_connections.append(connection)

    for connection in inactive_connections:
         self.disconnect(connection)

manager = ConnectionManager() # 创建 ConnectionManager 实例

提供一个根路径,用于重定向或显示指引

@app.get(“/”)
async def get(request: Request):
# 返回一个简单的 HTML 页面,或者重定向到 /static/index.html
# return HTMLResponse(content=”

Simple Chat App

Go to static/index.html to access the chat.

“)
# 或者直接提供 HTML 内容
html_content = “””
<!DOCTYPE html>


Simple Chat


FastAPI WebSocket Chat






    <script>
        var client_id = document.getElementById("clientId").value;
        var ws = new WebSocket(`ws://localhost:8000/ws/${client_id}`); // Pass client_id in path or query

        ws.onopen = function(event) {
            console.log("WebSocket connection opened:", event);
            document.getElementById("chatbox").innerHTML += "<p><em>Connected to chat.</em></p>";
        };

        ws.onmessage = function(event) {
            var messages = document.getElementById('chatbox');
            messages.innerHTML += `<p>${event.data}</p>`;
            // 滚动到底部
            messages.scrollTop = messages.scrollHeight;
        };

        ws.onerror = function(event) {
             console.error("WebSocket error observed:", event);
             document.getElementById("chatbox").innerHTML += "<p><em>Connection error!</em></p>";
        };

        ws.onclose = function(event) {
            if (event.wasClean) {
              console.log(`WebSocket connection closed cleanly, code=${event.code} reason=${event.reason}`);
              document.getElementById("chatbox").innerHTML += "<p><em>Disconnected from chat.</em></p>";
            } else {
              console.error('WebSocket connection died');
              document.getElementById("chatbox").innerHTML += "<p><em>Connection died!</em></p>";
            }
        };

        function sendMessage() {
            var input = document.getElementById("messageText");
            if (input.value.trim()) {
                ws.send(input.value); // 发送消息到服务器
                input.value = ''; // 清空输入框
            }
        }

        // 支持回车发送
        document.getElementById("messageText").addEventListener("keypress", function(event) {
            if (event.key === "Enter") {
                event.preventDefault(); // 防止默认的回车换行行为
                sendMessage();
            }
        });

    </script>
</body>
</html>
"""
return HTMLResponse(content=html_content)

Chat WebSocket endpoint

@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket) # 接受连接并添加到管理器
# 通知所有用户有人加入了
await manager.broadcast(f”Client #{client_id} has joined the chat.”)
logger.info(f”Client #{client_id} connected.”)

try:
    while True:
        # 接收客户端发送的消息
        data = await websocket.receive_text()
        # 构建要广播的消息格式
        message = f"Client #{client_id}: {data}"
        logger.info(f"Received from #{client_id}: {data}")
        # 向所有活跃客户端广播消息
        await manager.broadcast(message)

except WebSocketDisconnect as e:
    # 处理连接断开
    manager.disconnect(websocket) # 从管理器中移除连接
    # 通知所有用户有人离开了
    await manager.broadcast(f"Client #{client_id} has left the chat.")
    logger.info(f"Client #{client_id} disconnected (Code: {e.code}, Reason: {e.reason}).")
except Exception as e:
    # 处理其他异常
    logger.error(f"An error occurred with client #{client_id}: {e}")
    # 确保连接从管理器中移除
    manager.disconnect(websocket)
    await manager.broadcast(f"Client #{client_id} encountered an error and left.")

if name == “main“:
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`

客户端 HTML/JavaScript (index.html 或直接内嵌在根路径的 HTMLResponse 中):

上面的代码直接将 HTML/JavaScript 内嵌在根路径的 HTMLResponse 中了,方便直接通过访问 http://localhost:8000/ 来测试。

运行步骤:

  1. 确保所有依赖已安装 (pip install fastapi uvicorn websockets python-multipart).
  2. 保存上面的 Python 代码为 main.py
  3. 运行服务器:uvicorn main:app --reload
  4. 打开多个浏览器窗口或标签页,访问 http://localhost:8000/
  5. 在每个窗口的输入框中输入消息并发送,观察消息是否会出现在所有连接的聊天框中。

代码解释:

  • 我们创建了一个 ConnectionManager 的全局实例 manager
  • WebSocket 端点 @app.websocket("/ws/{client_id}") 现在接受一个路径参数 client_id,这可以用来标识不同的客户端(虽然这里只是简单地显示,实际应用中需要更安全的身份验证)。
  • 当一个新的 WebSocket 连接到来时,首先调用 await manager.connect(websocket) 接受连接并将其添加到管理器。
  • 然后向所有客户端广播一条用户加入的消息。
  • 进入一个 while True 循环,持续接收来自当前客户端的消息。
  • 收到消息后,格式化消息内容,并调用 await manager.broadcast(message) 将消息发送给管理器中的所有连接。
  • try...except WebSocketDisconnect 块捕获断开事件,调用 manager.disconnect(websocket) 移除连接,并广播一条用户离开的消息。
  • 添加了简单的日志输出,帮助跟踪连接和消息流动。
  • 根路径 / 返回一个内嵌了 HTML 和 JavaScript 的响应,方便快速测试。JavaScript 部分创建 WebSocket 连接,处理 onopen, onmessage, onerror, onclose 事件,并提供发送消息的函数。

这个聊天室示例非常基础,仅处理文本消息,没有用户名管理、历史记录、房间概念、消息序列化(发送和接收的都是纯文本,实际应用可能需要 JSON)等。但这展示了使用 ConnectionManager 处理多客户端连接的核心模式。

9. 高级话题与注意事项

9.1 认证和授权

WebSocket 握手阶段是基于 HTTP 的,因此可以在握手请求中利用 HTTP 的认证机制(如 Cookie、Header 中的 Token、查询参数)。FastAPI 的依赖注入系统可以在 WebSocket 端点函数中访问请求对象 (Request),从而获取认证信息。

“`python
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request, Depends
from starlette.exceptions import HTTPException

app = FastAPI()

async def get_user_from_token(request: Request, token: str = Depends(…)): # Depends logic here
# Verify token, fetch user data
user = {“id”: “user123”, “name”: “Alice”} # Example user
if not user:
raise HTTPException(status_code=401, detail=”Invalid token”)
return user

@app.websocket(“/ws”)
async def websocket_endpoint(
websocket: WebSocket,
# 可以通过 query 参数获取 token
token: str = None # 或者从 request.headers[‘Authorization’] 获取
# user: dict = Depends(get_user_from_token) # 更复杂的认证逻辑
):
# 在 accept() 之前进行认证检查
if token != “mysecrettoken”: # 简单示例
await websocket.close(code=1008) # 1008: Policy Violation
return

await websocket.accept()
# 连接成功后,可以认为用户已认证
# 将用户信息与 websocket 关联起来,例如存在 ConnectionManager 中
print(f"Authenticated connection accepted with token: {token}")

try:
    while True:
        data = await websocket.receive_text()
        await websocket.send_text(f"Authenticated Echo: {data}")
except WebSocketDisconnect:
    print("Connection disconnected.")

“`

请注意,await websocket.accept() 必须在发送或接收任何数据之前调用。认证检查应该在 accept() 之前完成。如果认证失败,应使用 await websocket.close(...) 关闭连接,并使用合适的 WebSocket 关闭状态码(例如 1008 表示策略违规)。

9.2 扩展到分布式系统 (多服务器实例)

上面使用的 ConnectionManager 将活跃连接存储在内存中。这对于单服务器实例是有效的。但如果部署多个 FastAPI 实例,每个实例都有自己的 ConnectionManager,它们之间无法直接通信。这意味着一个用户连接到实例 A,发送一条消息,这条消息只能通过实例 A 广播给连接到实例 A 的其他用户,而连接到实例 B 的用户将收不到这条消息。

解决这个问题通常需要引入一个外部的消息队列或发布/订阅系统,如 Redis Pub/Sub、RabbitMQ、Kafka 等。

架构变为:
1. 每个 FastAPI 实例维护自己本地的活跃连接集合。
2. 当一个实例收到消息时,它将消息发布到消息队列的一个特定主题。
3. 所有 FastAPI 实例都订阅了这个主题。
4. 当一个实例从消息队列收到一条消息时,它会将这条消息广播给它本地维护的所有活跃连接。

这样,消息就能够在所有服务器实例之间传递,最终到达所有连接的客户端。

9.3 使用 JSON 进行数据交换

虽然 WebSocket 支持发送任意文本或二进制数据,但在 Web 开发中,使用 JSON 格式交换结构化数据非常常见。FastAPI 的 WebSocket 对象提供了 send_json()receive_json() 方法,非常方便。

“`python
from fastapi import WebSocket

@app.websocket(“/ws_json”)
async def websocket_json_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
# 接收 JSON 数据,自动解析为 Python 字典/列表
data = await websocket.receive_json()
print(f”Received JSON: {data}”)

        # 构建 JSON 响应
        response = {"status": "success", "received": data}
        # 发送 JSON 数据
        await websocket.send_json(response)

except WebSocketDisconnect:
    print("JSON connection disconnected.")
except Exception as e:
    print(f"JSON error: {e}")

“`
客户端 JavaScript 示例(发送 JSON):

javascript
ws.send(JSON.stringify({ type: "chat", message: "Hello JSON!" }));

客户端 JavaScript 示例(接收 JSON):

javascript
ws.onmessage = function(event) {
try {
const data = JSON.parse(event.data);
console.log("Received JSON:", data);
// 根据 data 的结构处理不同类型的消息
if (data.type === "chat") {
// 显示聊天消息
}
} catch (e) {
console.error("Failed to parse JSON:", event.data, e);
// 处理非 JSON 消息或解析错误
}
};

9.4 后台任务发送消息

在某些情况下,你可能需要在不阻塞 WebSocket 主接收循环的情况下发送消息(例如,由一个外部事件触发的推送)。可以使用 Starlette/FastAPI 的 BackgroundTasks,或者直接在接收循环外部创建另一个协程来发送消息。不过,对于简单的广播,在接收到消息后立即广播通常是可行的,因为发送操作本身是异步的。

如果发送操作非常耗时或容易失败(例如,需要查询数据库),并且你不希望它影响其他客户端的消息接收,可以考虑将其放入后台任务。但更常见的是,消息推送是由 其他 HTTP 请求或外部系统触发的,这时你需要从 ConnectionManager 的实例中调用 broadcastsend_personal_message 方法。这要求你在应用的可访问范围内持有 ConnectionManager 的实例(例如,作为全局变量或通过某种服务定位器模式)。

9.5 性能考虑

  • 异步是关键: WebSocket 操作天生适合异步。FastAPI 和 Uvicorn 已经利用了 ASGI 的异步能力。确保你的 WebSocket 处理函数是 async def
  • 高效的连接管理: 使用 set 来存储活跃连接通常比 list 更高效,尤其是在需要频繁地添加、删除和查找连接时。
  • 广播效率: 在广播时,迭代连接并逐个发送是必要的。上面 ConnectionManager 中的广播方法已经考虑了在迭代时处理断开连接的情况。
  • 消息序列化/反序列化: 使用 send_json/receive_json 会增加一点点 CPU 开销,但通常比手动处理字符串和解析更方便且安全。选择适合你数据结构的格式。
  • 负载均衡与扩展: 如前所述,多实例部署需要外部消息队列。负载均衡器应配置为粘性会话 (Sticky Sessions),尽量将同一用户的连接路由到同一个实例,但这对于 WebSocket 不是强制性的,因为消息队列解决了跨实例通信问题。

10. 结论

FastAPI 结合其底层的 Starlette 和 ASGI 服务器 (如 Uvicorn),为构建高性能的 WebSocket 应用提供了强大而易用的平台。通过简单的 @app.websocket() 装饰器,你可以轻松定义 WebSocket 端点,使用 await websocket.accept() 接受连接,await websocket.receive_*() 接收数据,await websocket.send_*() 发送数据,并通过捕获 WebSocketDisconnect 异常来处理连接断开。

对于需要处理多个客户端并实现广播等功能的复杂应用,引入 ConnectionManager 模式来集中管理活跃连接是有效且常见的做法。随着应用规模的增长,理解如何将应用扩展到多个服务器实例,并引入消息队列等技术将是必不可少的。

掌握了 FastAPI 对 WebSocket 的支持,你就能充分利用其异步高性能的优势,轻松构建各种实时交互式 Web 应用。希望本文提供的详细解释和示例代码能帮助你快速入门并深入实践 FastAPI WebSocket 开发。


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部