如何在 FastAPI 中使用 WebSocket?代码示例与详细步骤 – wiki基地

FastAPI WebSocket 完全指南:从核心原理到生产级实战

在现代 Web 开发中,实时通信已成为许多应用程序的核心需求。无论是即时聊天、实时看板、在线协作工具还是金融交易系统,传统的 HTTP 轮询模式在效率和延迟上都难以满足要求。FastAPI 作为高性能的异步 Python 框架,原生提供了对 WebSocket 协议的卓越支持。

本文将深入探讨如何在 FastAPI 中构建稳定、高效且可扩展的 WebSocket 应用。我们将从基础概念出发,逐步深入到连接管理、认证授权以及复杂的广播系统设计。


一、 理解 WebSocket 协议与 FastAPI 的结合

1.1 为什么选择 WebSocket?

HTTP 是一个“请求-响应”协议,其特点是无状态且单向(客户端发起,服务器响应)。而 WebSocket 提供了一个在单个 TCP 连接上进行**全双工(Full-Duplex)**通信的通道。

  • 低延迟:建立连接后,数据帧可以直接发送,无需重复携带繁重的 HTTP Header。
  • 实时性:服务器可以主动向客户端推送数据。
  • 持久性:连接在一段时间内保持打开状态,减少了握手开销。

1.2 FastAPI 的优势

FastAPI 基于 Starlette 构建,完美兼容 ASGI(Asynchronous Server Gateway Interface)。这意味着它天生支持异步处理,能够轻松处理数以万计的并发 WebSocket 连接,而不会阻塞主线程。


二、 快速上手:创建一个基础 WebSocket 接口

在 FastAPI 中使用 WebSocket 非常直观。你只需要从 fastapi 导入 WebSocket 类型,并使用 @app.websocket 装饰器。

2.1 最小可行性代码

“`python
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse

app = FastAPI()

html = “””




FastAPI WebSocket

WebSocket Demo




    “””

    @app.get(“/”)
    async def get():
    return HTMLResponse(html)

    @app.websocket(“/ws”)
    async def websocket_endpoint(websocket: WebSocket):
    # 1. 接受连接
    await websocket.accept()
    try:
    while True:
    # 2. 等待接收数据
    data = await websocket.receive_text()
    # 3. 发送响应数据
    await websocket.send_text(f”服务器已收到消息: {data}”)
    except Exception as e:
    print(f”连接断开: {e}”)
    “`

    2.2 核心步骤解析

    1. websocket: WebSocket: 定义注入参数。
    2. await websocket.accept(): 显式接受握手。如果在此时检查 Header 发现非法,可以直接拒绝。
    3. while True: WebSocket 是持久连接,通常需要一个循环来维持监听状态。
    4. receive_text / send_text: 处理文本数据。FastAPI 还提供了 receive_jsonsend_json 用于处理结构化数据。

    三、 高级连接管理:ConnectionManager 设计模式

    在实际项目中,我们往往需要管理多个并发连接(例如聊天室),或者根据用户 ID 推送特定消息。手动维护这些逻辑会让代码变得混乱。推荐使用 ConnectionManager 模式。

    3.1 定义连接管理器

    “`python
    from typing import List, Dict

    class ConnectionManager:
    def init(self):
    # 存储激活的连接
    self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
    
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
    
    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)
    
    async def broadcast(self, message: str):
        """广播给所有在线用户"""
        for connection in self.active_connections:
            await connection.send_text(message)
    

    manager = ConnectionManager()
    “`

    3.2 在路由中使用管理器

    python
    @app.websocket("/ws/{client_id}")
    async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    await manager.broadcast(f"用户 {client_id} 进入了聊天室")
    try:
    while True:
    data = await websocket.receive_text()
    await manager.send_personal_message(f"你说: {data}", websocket)
    await manager.broadcast(f"用户 {client_id} 说: {data}")
    except Exception:
    manager.disconnect(websocket)
    await manager.broadcast(f"用户 {client_id} 离开了聊天室")


    四、 安全性:WebSocket 的认证与授权

    WebSocket 在握手阶段是通过 HTTP 发起的,因此我们可以利用这一点进行身份校验。常见的做法有两种:Query 参数校验Header/Cookie 校验

    4.1 使用 Query 参数校验

    由于浏览器标准的 WebSocket API 不支持自定义 Header,最简单的方法是将 Token 放入 URL。

    “`python
    from fastapi import Depends, HTTPException, status

    async def get_token(
    websocket: WebSocket,
    token: str | None = None,
    ):
    if token is None or token != “secret-token”:
    await websocket.close(code=status.WS_1008_POLICY_VIOLATION)
    return None
    return token

    @app.websocket(“/ws/secure”)
    async def secure_websocket(
    websocket: WebSocket,
    token: str = Depends(get_token)
    ):
    if token is None:
    return
    await websocket.accept()
    await websocket.send_text(“认证成功,连接已建立”)
    await websocket.close()
    “`

    4.2 注意事项

    • 跨域资源共享 (CORS):WebSocket 不受标准浏览器 CORS 策略限制,但 FastAPI/Starlette 会检查 Origin 头部。如果需要跨域,需确保配置正确。
    • 状态码:WebSocket 有专门的状态码(如 1000 正常关闭,1008 违反策略),使用 websocket.close(code=...) 能让前端更清晰地处理错误。

    五、 处理 JSON 与复杂数据结构

    现代应用很少只传输纯文本。FastAPI 的 receive_json 结合 Pydantic 模型可以实现强大的类型校验。

    5.1 数据模型定义

    “`python
    from pydantic import BaseModel, ValidationError

    class MessageModel(BaseModel):
    action: str
    payload: dict
    target_id: int | None = None

    @app.websocket(“/ws/json”)
    async def websocket_json_endpoint(websocket: WebSocket):
    await websocket.accept()
    while True:
    try:
    # 直接接收并转换为字典
    data = await websocket.receive_json()
    # 使用 Pydantic 校验
    msg = MessageModel(**data)

            if msg.action == "ping":
                await websocket.send_json({"status": "pong"})
    
        except (ValidationError, ValueError):
            await websocket.send_json({"error": "无效的数据格式"})
        except Exception:
            break
    

    “`


    六、 进阶:处理并发任务与心跳检测

    WebSocket 连接通常是长生命周期的。如果服务器在处理一个耗时的计算任务,可能会导致该连接的消息积压,甚至超时。

    6.1 使用 asyncio.gather 处理并发

    如果你需要同时监听输入并执行后台输出(如实时推送系统监控指标),可以使用 asyncio.create_task

    “`python
    import asyncio

    async def send_time_updates(websocket: WebSocket):
    while True:
    await asyncio.sleep(5)
    await websocket.send_text(f”当前服务器时间: {asyncio.get_event_loop().time()}”)

    @app.websocket(“/ws/monitor”)
    async def monitor_endpoint(websocket: WebSocket):
    await websocket.accept()
    # 启动后台任务
    task = asyncio.create_task(send_time_updates(websocket))
    try:
    while True:
    # 主循环处理用户交互
    data = await websocket.receive_text()
    print(f”收到指令: {data}”)
    finally:
    # 确保连接断开时取消任务
    task.cancel()
    “`

    6.2 心跳检测(Heartbeat)

    网络波动可能导致连接已断开但服务器未感知(僵尸连接)。通常通过发送 Ping/Pong 帧解决。
    FastAPI 底层的 Uvicorn 默认支持协议级的心跳,但应用层心跳通常更可控:

    1. 客户端每隔 30s 发送一个 {"type": "ping"}
    2. 服务器记录最后活动时间,若超时未收到则主动剔除。

    七、 生产环境下的扩展性问题

    当你的应用需要从单台服务器扩展到集群(多实例)时,简单的内存 List(ConnectionManager)将不再起作用。

    7.1 使用 Redis Pub/Sub 实现分布式 WebSocket

    如果用户 A 连接在服务器 1,用户 B 连接在服务器 2,服务器 1 无法直接向服务器 2 的 WebSocket 发送数据。

    解决方案:

    1. 所有服务器实例订阅同一个 Redis 频道。
    2. 当需要广播时,服务器将消息发布到 Redis。
    3. 所有服务器收到 Redis 消息后,再通过本地的 ConnectionManager 推送给各自连接的客户端。

    7.2 负载均衡配置

    如果你在 FastAPI 前面使用了 Nginx,必须显式配置以支持 WebSocket 升级请求:

    nginx
    location /ws {
    proxy_pass http://fastapi_backend;
    proxy_http_version 1.1;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    }


    八、 最佳实践总结

    1. 异常处理:始终在 WebSocket 循环中使用 try...except 捕获 WebSocketDisconnect 异常,确保资源(如连接列表、数据库连接)能被正确释放。
    2. 异步非阻塞:在 WebSocket 逻辑中禁止使用任何同步阻塞 IO(如 requests.get 或同步数据库驱动),否则会拖累整个事件循环,导致所有连接延迟增加。
    3. 连接限制:为了防止 OOM(内存溢出),应对单个 IP 的最大连接数进行限制。
    4. 二进制支持:对于音视频流或高性能压缩数据,使用 receive_bytessend_bytes 处理二进制序列化。
    5. 解耦逻辑:将消息路由逻辑(如 if action == 'chat': ...)从 WebSocket 路由函数中抽离到专门的服务层中。

    九、 总结

    FastAPI 的 WebSocket 实现既简单又强大。通过 WebSocket 类提供的异步 API,我们可以轻松构建从基础聊天室到复杂的实时监控系统。在构建大型应用时,引入 ConnectionManager 进行解耦,并结合 Redis 实现水平扩展,是通往生产级实时系统的必经之路。

    掌握了这些技术,你就能充分利用 FastAPI 的异步特性,为用户提供丝滑、即时的交互体验。

    滚动至顶部