FastAPI WebSocket Tutorial: Build Real-time Applications – wiki基地


FastAPI WebSocket 教程:构建实时应用的核心指南

在当今数字化的世界中,实时(Real-time)应用已经成为用户体验的关键要素。从即时聊天、在线协作文档、实时股票行情到多人在线游戏,对即时数据传输的需求无处不在。传统 HTTP 请求-响应模型在处理这些场景时显得力不从心,因为它本质上是无状态、短连接的。WebSockets 正是为了解决这一挑战而生,它提供了一个全双工、持久化的通信通道,允许服务器和客户端之间进行双向、低延迟的消息交换。

而当我们谈论使用 Python 构建高性能、可扩展的 Web 应用程序时,FastAPI 无疑是当前最炙手可热的框架之一。它凭借其现代化的异步支持、Pydantic 模型带来的数据验证与序列化、以及自动生成 OpenAPI 文档等特性,迅速赢得了开发者的青睐。将 FastAPI 与 WebSockets 结合,我们可以构建出既高效又易于维护的实时应用程序。

本教程将深入探讨如何在 FastAPI 中利用 WebSockets 构建实时应用。我们将从基础概念讲起,逐步深入到构建一个功能完善的聊天室应用,并讨论高级特性、连接管理、身份验证、状态同步以及部署策略。

第一章:理解 WebSockets 与 FastAPI 的优势

1.1 什么是 WebSocket?为何选择它?

WebSocket 是一种网络通信协议,它在单个 TCP 连接上提供全双工通信通道。这意味着一旦连接建立,服务器和客户端可以独立地发送和接收数据,而无需每次通信都建立新的连接。

WebSocket 相对于 HTTP 的优势:

  • 全双工通信: 客户端和服务器可以同时发送和接收数据。
  • 持久连接: 连接一旦建立就保持打开状态,避免了 HTTP 每次请求的连接建立和关闭开销。
  • 低延迟: 减少了握手次数,数据传输更加迅速。
  • 更少的数据开销: 协议头信息远少于 HTTP,尤其是在大量小数据包传输时。

应用场景:

  • 即时聊天: 最经典的场景,实时消息推送。
  • 在线游戏: 实时同步玩家状态和游戏事件。
  • 实时数据仪表盘: 股票行情、IoT 设备数据等。
  • 协作工具: 多个用户同时编辑文档。
  • 通知系统: 即时推送用户通知。

1.2 FastAPI 为 WebSocket 提供了什么?

FastAPI 是一个现代、快速(高性能)的 Web 框架,用于使用 Python 3.7+ 构建 API。它基于 Starlette 和 Pydantic,充分利用了 Python 的类型提示特性。

FastAPI 处理 WebSocket 的优势:

  • 异步支持: FastAPI 是一个异步框架,完美契合 WebSockets 的异步 I/O 特性。asyncawait 使得编写并发的 WebSocket 逻辑变得自然和高效。
  • 集成简单: Starlette (FastAPI 的底层框架) 原生支持 WebSockets,FastAPI 直接继承了这一能力,通过简单的装饰器即可定义 WebSocket 路由。
  • 类型提示和 Pydantic: 虽然 WebSocket 消息通常是字符串(文本或二进制),但可以通过 receive_json()send_json() 结合 Pydantic 模型来结构化和验证消息内容,提升代码质量和可维护性。
  • 依赖注入: 允许你像处理 HTTP 路由一样,将依赖注入到 WebSocket 端点中,方便集成身份验证、数据库连接等。

第二章:基础 WebSocket 应用:Hello WebSocket!

让我们从一个最简单的例子开始,构建一个能回应 “Hello WebSocket!” 的服务端和对应的客户端。

2.1 安装必要的库

在开始之前,请确保你已经安装了 FastAPI 和 Uvicorn(一个 ASGI 服务器,用于运行 FastAPI 应用)。

bash
pip install fastapi uvicorn websockets

websockets 库虽然不是 FastAPI 直接依赖,但它提供了强大的客户端实现,方便我们在后续的测试中作为参考。

2.2 服务端代码 (main.py)

“`python

main.py

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from starlette.responses import HTMLResponse # 用于提供简单的HTML客户端

app = FastAPI()

简单的HTML页面,作为WebSocket客户端

html = “””




FastAPI WebSocket

FastAPI WebSocket Test





“””

@app.get(“/”)
async def get():
“””提供一个简单的HTML页面作为WebSocket客户端。”””
return HTMLResponse(html)

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
“””
WebSocket端点,处理客户端连接、消息接收和发送。
“””
await websocket.accept() # 接受客户端的WebSocket连接
try:
while True:
# 循环接收客户端发送的文本消息
data = await websocket.receive_text()
# 向客户端发送一个回应
await websocket.send_text(f”Message text was: {data}”)
except WebSocketDisconnect:
# 客户端断开连接时触发此异常
print(“Client disconnected”)
except Exception as e:
# 处理其他可能的异常
print(f”An error occurred: {e}”)
“`

2.3 运行服务端

在终端中,进入 main.py 文件所在的目录,然后运行:

bash
uvicorn main:app --reload

打开浏览器,访问 http://localhost:8000。你将看到一个简单的 HTML 页面。在输入框中输入消息并点击 “Send”,你会在页面上看到你的消息以及服务器的回应。

2.4 代码解析

  • @app.websocket("/ws"): 这个装饰器将 websocket_endpoint 函数注册为一个 WebSocket 路由,客户端可以通过 ws://localhost:8000/ws 来连接。
  • websocket: WebSocket: FastAPI 会自动注入一个 WebSocket 对象,通过它我们可以与客户端进行通信。
  • await websocket.accept(): 这是建立 WebSocket 连接的第一步。它会完成 WebSocket 握手过程,将底层的 HTTP 连接升级为 WebSocket 连接。如果此步骤不执行,客户端将无法成功连接。
  • await websocket.receive_text(): 异步地等待并接收客户端发送的文本消息。如果客户端发送的是二进制数据,可以使用 receive_bytes()
  • await websocket.send_text(f"Message text was: {data}"): 异步地向客户端发送文本消息。同样,send_bytes() 用于发送二进制数据。
  • try...except WebSocketDisconnect: 这是处理 WebSocket 连接中断的关键。当客户端主动关闭连接(或由于网络问题等原因导致连接断开)时,receive_text()send_text() 等操作会抛出 WebSocketDisconnect 异常。捕获此异常可以让你在连接断开时执行清理工作(例如,从活跃连接列表中移除该客户端)。
  • HTMLResponse: 为了方便测试,我们直接在 FastAPI 中提供了一个简单的 HTML 页面作为客户端。在实际应用中,客户端通常是独立的 HTML/JavaScript 文件或移动应用程序。
  • JavaScript 客户端:
    • new WebSocket("ws://localhost:8000/ws"): 创建一个 WebSocket 连接。注意 ws:// 协议。如果是 HTTPS,则应使用 wss://
    • ws.onopen, ws.onmessage, ws.onclose, ws.onerror: 这些是 WebSocket 客户端事件处理函数,分别在连接打开、接收到消息、连接关闭、发生错误时触发。
    • ws.send(input.value): 向服务器发送消息。

第三章:构建一个简单的实时聊天室

现在,我们来构建一个更实用的应用:一个简单的多人聊天室。这将引入“连接管理”的概念,因为服务器需要能够向所有连接的客户端广播消息。

3.1 核心挑战:连接管理

在聊天室中,当一个用户发送消息时,服务器需要将这条消息转发给所有(或部分)其他在线用户。这意味着服务器需要维护一个当前所有活跃 WebSocket 连接的列表。

我们将创建一个 ConnectionManager 类来封装这些连接管理逻辑。

3.2 服务端代码 (main.py)

“`python

main.py – 聊天室版本

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from starlette.responses import HTMLResponse
from typing import List, Dict

app = FastAPI()

class ConnectionManager:
“””
负责管理所有活跃的WebSocket连接。
“””
def init(self):
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
    """
    接受新的WebSocket连接,并将其添加到活跃连接列表中。
    """
    await websocket.accept()
    self.active_connections.append(websocket)
    print(f"New client connected. Total connections: {len(self.active_connections)}")

def disconnect(self, websocket: WebSocket):
    """
    从活跃连接列表中移除断开连接的客户端。
    """
    self.active_connections.remove(websocket)
    print(f"Client disconnected. Total connections: {len(self.active_connections)}")

async def send_personal_message(self, message: str, websocket: WebSocket):
    """
    向单个客户端发送消息。
    """
    await websocket.send_text(message)

async def broadcast(self, message: str):
    """
    向所有活跃的客户端广播消息。
    """
    # 注意:这里需要并发地发送消息,以避免一个慢速连接阻塞所有其他连接。
    # 简单循环await可能导致阻塞,但在小规模应用中可以接受。
    # 更优方案可以使用 asyncio.gather 或 asyncio.create_task。
    print(f"Broadcasting: {message} to {len(self.active_connections)} clients")
    disconnected_clients = []
    for connection in self.active_connections:
        try:
            await connection.send_text(message)
        except WebSocketDisconnect:
            # 如果在广播时发现连接已断开,则标记移除
            disconnected_clients.append(connection)
            print(f"Client {connection} disconnected during broadcast.")
        except Exception as e:
            print(f"Error sending to client {connection}: {e}")
            disconnected_clients.append(connection)

    # 移除在广播过程中发现断开的客户端
    for client in disconnected_clients:
        self.disconnect(client)

manager = ConnectionManager()

客户端HTML代码

html = “””




FastAPI Chat


FastAPI WebSocket Chat

Connecting…





    “””

    @app.get(“/”)
    async def get():
    return HTMLResponse(html)

    @app.websocket(“/ws/{client_id}”)
    async def websocket_endpoint(websocket: WebSocket, client_id: str):
    “””
    WebSocket端点,处理聊天室的客户端连接。
    client_id 用于标识不同的用户。
    “””
    await manager.connect(websocket)
    try:
    # 向所有客户端广播有新用户加入
    await manager.broadcast(f”User {client_id} joined the chat.“)

        while True:
            # 接收客户端消息
            data = await websocket.receive_text()
            # 广播接收到的消息给所有客户端
            await manager.broadcast(f"**`{client_id}`:** {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        # 向所有客户端广播用户离开
        await manager.broadcast(f"**User `{client_id}` left the chat.**")
    except Exception as e:
        print(f"An unexpected error occurred with client {client_id}: {e}")
        manager.disconnect(websocket)
        await manager.broadcast(f"**User `{client_id}` was disconnected due to an error.**")
    

    “`

    3.3 运行与测试

    1. 运行服务端:uvicorn main:app --reload
    2. 打开多个浏览器标签页(例如 Chrome、Firefox),访问 http://localhost:8000
    3. 在每个标签页中输入不同的用户名,然后发送消息。你会看到消息在所有连接的客户端之间实时同步。

    3.4 代码解析与改进点

    • ConnectionManager: 这是核心。它维护了一个 active_connections 列表,并提供了 connectdisconnectbroadcast 方法。
    • @app.websocket("/ws/{client_id}"): 我们通过路径参数 client_id 来识别不同的用户。这是一种简单的身份标识方式。在实际应用中,可能会通过查询参数携带 JWT token 进行更安全的认证。
    • await manager.broadcast(...): 当有用户连接、断开或发送消息时,服务器会调用 manager.broadcast 将相应的消息发送给所有在线用户。
    • 并发广播的改进: 在 broadcast 方法中,我们目前的实现是顺序 await send_text。如果某个客户端网络状况不佳,send_text 可能会阻塞,从而影响所有其他客户端的消息发送。更健壮的方案是使用 asyncio.create_task 来并发发送:

      “`python
      import asyncio # 需要导入 asyncio

      class ConnectionManager:
      # … (其他方法不变)

      async def broadcast(self, message: str):
          print(f"Broadcasting: {message} to {len(self.active_connections)} clients")
          disconnected_clients = []
      
          # 使用 asyncio.gather 并发发送消息
          tasks = [
              self._send_message_safe(connection, message, disconnected_clients)
              for connection in self.active_connections
          ]
          await asyncio.gather(*tasks) # 等待所有发送任务完成
      
          # 移除在广播过程中发现断开的客户端
          for client in disconnected_clients:
              self.disconnect(client)
      
      async def _send_message_safe(self, connection: WebSocket, message: str, disconnected_clients: List[WebSocket]):
          """私有辅助方法,用于安全地发送消息并捕获断开连接的异常。"""
          try:
              await connection.send_text(message)
          except WebSocketDisconnect:
              disconnected_clients.append(connection)
              print(f"Client {connection} disconnected during broadcast.")
          except Exception as e:
              print(f"Error sending to client {connection}: {e}")
              disconnected_clients.append(connection)
      

      “`

      这种改进确保了即使某个客户端发送失败,也不会阻塞其他客户端的消息。

    第四章:高级 WebSocket 特性与最佳实践

    4.1 消息结构化与 Pydantic

    直接发送纯文本消息对于简单聊天室来说足够,但对于更复杂的实时应用,我们需要结构化的消息。FastAPI 结合 Pydantic 可以很好地处理这个问题。

    服务端 Pydantic 模型:

    “`python
    from pydantic import BaseModel
    from typing import Literal, Optional

    class ChatMessage(BaseModel):
    sender: str
    message: str
    type: Literal[“chat”, “system”] = “chat” # 消息类型:聊天消息或系统消息
    timestamp: Optional[float] = None # 可选的时间戳
    “`

    在 WebSocket 端点中使用:

    “`python

    … ConnectionManager 和其他导入 …

    import json
    import time

    修改 Manager.broadcast 方法以发送JSON

    class ConnectionManager:
    # … (其他方法不变)
    async def broadcast(self, message_obj: ChatMessage):
    message_obj.timestamp = time.time() # 服务器端添加时间戳
    json_message = message_obj.json() # 将Pydantic模型转换为JSON字符串

        print(f"Broadcasting: {json_message}")
        disconnected_clients = []
        for connection in self.active_connections:
            try:
                await connection.send_text(json_message) # 发送JSON字符串
            except WebSocketDisconnect:
                disconnected_clients.append(connection)
            except Exception as e:
                print(f"Error sending to client {connection}: {e}")
                disconnected_clients.append(connection)
        for client in disconnected_clients:
            self.disconnect(client)
    

    … 省略 HTML …

    @app.websocket(“/ws/{client_id}”)
    async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket)
    try:
    # 发送系统消息:用户加入
    await manager.broadcast(ChatMessage(sender=”System”, message=f”User {client_id} joined the chat.”, type=”system”))

        while True:
            # 接收客户端发送的JSON消息
            data = await websocket.receive_json() # 接收JSON数据
    
            # 使用Pydantic模型验证和解析接收到的数据
            # 假设客户端发送的消息包含 'message' 字段
            try:
                # 客户端发送的数据可能不是完整的 ChatMessage,这里只处理 'message' 部分
                client_message_content = data.get("message", "Empty message")
                # 创建一个完整的 ChatMessage 对象用于广播
                chat_message = ChatMessage(sender=client_id, message=client_message_content, type="chat")
                await manager.broadcast(chat_message)
            except Exception as e:
                print(f"Invalid message format from {client_id}: {e}")
                await manager.send_personal_message(f"Error: Invalid message format.", websocket)
    
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(ChatMessage(sender="System", message=f"User `{client_id}` left the chat.", type="system"))
    except Exception as e:
        print(f"An unexpected error occurred with client {client_id}: {e}")
        manager.disconnect(websocket)
        await manager.broadcast(ChatMessage(sender="System", message=f"User `{client_id}` was disconnected due to an error.", type="system"))
    

    “`

    客户端 JavaScript 接收和发送 JSON:

    “`javascript
    // … (其他JS代码)

    // 发送JSON
    function sendMessage(event) {
    var input = document.getElementById(“messageInput”);
    if (input.value.trim() !== ”) {
    const messagePayload = {
    message: input.value,
    // sender: username, // 可以在客户端加入,但服务器端最终会覆盖或验证
    // type: “chat”, // 可以在客户端加入
    // timestamp: Date.now() / 1000 // 客户端时间戳
    };
    ws.send(JSON.stringify(messagePayload)); // 发送JSON字符串
    input.value = ”;
    }
    event.preventDefault();
    }

    // 接收JSON
    ws.onmessage = function(event) {
    try {
    const data = JSON.parse(event.data); // 解析JSON
    let msgHtml;
    if (data.type === “system”) {
    msgHtml = <li style="color: gray;"><em>${data.message}</em></li>;
    } else {
    const date = data.timestamp ? new Date(data.timestamp * 1000).toLocaleTimeString() : ”;
    msgHtml = <li><b>${data.sender}</b> (${date}): ${data.message}</li>;
    }
    addMessage(msgHtml, data.type); // 修改addMessage函数以接受HTML字符串
    } catch (e) {
    addMessage(<li>Error parsing message: ${event.data}</li>, “error”);
    console.error(“Error parsing JSON:”, e);
    }
    };

    // … (addMessage函数可能需要修改以接受HTML内容而不是纯文本)
    function addMessage(htmlContent, type = “user”) {
    var messages = document.getElementById(‘messages’);
    var message = document.createElement(‘li’);
    message.innerHTML = htmlContent;
    messages.appendChild(message);
    messages.scrollTop = messages.scrollHeight;
    }
    “`

    4.2 身份验证与授权

    WebSocket 连接的身份验证与 HTTP 类似,但有其特殊性。由于 WebSocket 握手是基于 HTTP 的,你可以在握手阶段利用 HTTP 的认证机制。

    常见策略:

    1. URL 查询参数 (Query Parameters):

      • 客户端在连接时传递 token:ws://localhost:8000/ws?token=your_jwt_token
      • 服务端在 websocket_endpoint 中通过 Query 参数获取并验证。

      “`python
      from fastapi import WebSocket, Query, HTTPException, status

      async def get_current_user_from_token(token: str = Query(…)):
      # 这是一个模拟的验证函数
      if token == “valid-jwt-token”:
      return “authenticated_user”
      raise HTTPException(
      status_code=status.HTTP_401_UNAUTHORIZED,
      detail=”Invalid authentication credentials for WebSocket”,
      )

      @app.websocket(“/ws”)
      async def websocket_auth_endpoint(
      websocket: WebSocket,
      user: str = Depends(get_current_user_from_token) # 在连接建立前验证
      ):
      await manager.connect(websocket)
      try:
      await manager.broadcast(f”User {user} connected.”)
      while True:
      data = await websocket.receive_text()
      await manager.broadcast(f”User {user} says: {data}”)
      except WebSocketDisconnect:
      manager.disconnect(websocket)
      await manager.broadcast(f”User {user} disconnected.”)
      “`

      注意: 这种方式会将 token 暴露在 URL 中,不适合敏感信息,且在浏览器历史记录中可见。

    2. Cookie (更安全):

      • 如果客户端是通过浏览器访问,并且之前通过 HTTP 登录获得了带有身份验证信息的 Cookie,那么这个 Cookie 会自动随 WebSocket 握手请求发送。
      • 服务端可以通过 websocket.cookies 访问这些 Cookie。

      “`python
      from fastapi import WebSocket, Cookie, HTTPException, status

      async def get_current_user_from_cookie(session_id: str = Cookie(None)):
      if session_id == “my_secret_session_id”:
      return “cookie_authenticated_user”
      raise HTTPException(
      status_code=status.HTTP_401_UNAUTHORIZED,
      detail=”Invalid session ID”,
      )

      @app.websocket(“/ws”)
      async def websocket_cookie_auth_endpoint(
      websocket: WebSocket,
      user: str = Depends(get_current_user_from_cookie)
      ):
      # … 逻辑与上同 …
      pass
      “`

    3. 自定义 Header (适合非浏览器客户端):

      • 在 WebSocket 握手请求中添加自定义 HTTP 头。
      • FastAPI 可以通过 websocket.headers 访问这些头。

      “`python
      from fastapi import WebSocket, Header, HTTPException, status

      async def get_current_user_from_header(x_auth_token: str = Header(None)):
      if x_auth_token == “my-custom-auth-token”:
      return “header_authenticated_user”
      raise HTTPException(
      status_code=status.HTTP_401_UNAUTHORIZED,
      detail=”Invalid X-Auth-Token”,
      )

      @app.websocket(“/ws”)
      async def websocket_header_auth_endpoint(
      websocket: WebSocket,
      user: str = Depends(get_current_user_from_header)
      ):
      # … 逻辑与上同 …
      pass
      “`

    4.3 状态同步与 Pub/Sub (Redis)

    我们当前的 ConnectionManager 是单机内存级的。这意味着如果你的 FastAPI 应用部署了多个 Uvicorn 工作进程或多个服务器实例,它们各自拥有独立的 ConnectionManager 实例,无法互相通信。一个用户连接到实例 A,发送消息,实例 A 无法将消息广播到连接到实例 B 的用户。

    为了解决这个问题,我们需要一个跨进程/跨服务器的状态同步机制。Redis 的 Pub/Sub(发布/订阅)模式是完美的解决方案。

    工作原理:

    1. 每个 FastAPI 实例都订阅一个公共的 Redis 频道(例如 chat_messages)。
    2. 当任何一个 FastAPI 实例接收到客户端消息时,它会将消息发布到 chat_messages 频道。
    3. 所有订阅了 chat_messages 频道的 FastAPI 实例都会收到这条消息。
    4. 每个实例收到消息后,再将其广播给 自己 连接的客户端。

    实现步骤:

    1. 安装 redis 客户端库:
      bash
      pip install redis asyncio_redis

      (注意:asyncio_redis 可能略旧,更推荐使用 redis 库配合 aioredis 模式或 aredis,但 redis 库本身也支持 async。) 我们可以直接用 redis 库的 StrictRedis 异步模式。

    2. 修改 ConnectionManagerwebsocket_endpoint:

      “`python

      main.py – Redis Pub/Sub 版本

      from fastapi import FastAPI, WebSocket, WebSocketDisconnect
      from starlette.responses import HTMLResponse
      from typing import List, Dict
      from pydantic import BaseModel
      import json
      import redis.asyncio as aioredis # 推荐使用 redis 库的异步接口
      import asyncio
      import time

      app = FastAPI()

      Pydantic 消息模型

      class ChatMessage(BaseModel):
      sender: str
      message: str
      type: str = “chat”
      timestamp: float

      Redis 配置

      REDIS_URL = “redis://localhost:6379”
      CHAT_CHANNEL = “chat_messages”

      class ConnectionManager:
      def init(self):
      self.active_connections: List[WebSocket] = []
      self.redis_client = None # 将在应用启动时初始化

      async def connect(self, websocket: WebSocket):
          await websocket.accept()
          self.active_connections.append(websocket)
          print(f"New client connected. Total connections: {len(self.active_connections)}")
      
      def disconnect(self, websocket: WebSocket):
          self.active_connections.remove(websocket)
          print(f"Client disconnected. Total connections: {len(self.active_connections)}")
      
      async def send_personal_message(self, message: str, websocket: WebSocket):
          try:
              await websocket.send_text(message)
          except WebSocketDisconnect:
              self.disconnect(websocket)
          except Exception as e:
              print(f"Error sending personal message to client: {e}")
              self.disconnect(websocket)
      
      async def broadcast_to_local_clients(self, message: str):
          """向当前实例连接的客户端广播消息"""
          disconnected_clients = []
          tasks = [
              self._send_message_safe(connection, message, disconnected_clients)
              for connection in self.active_connections
          ]
          await asyncio.gather(*tasks)
      
          for client in disconnected_clients:
              self.disconnect(client)
      
      async def _send_message_safe(self, connection: WebSocket, message: str, disconnected_clients: List[WebSocket]):
          try:
              await connection.send_text(message)
          except WebSocketDisconnect:
              disconnected_clients.append(connection)
          except Exception as e:
              print(f"Error sending to client {connection}: {e}")
              disconnected_clients.append(connection)
      
      async def publish_message(self, message_obj: ChatMessage):
          """将消息发布到Redis频道"""
          if self.redis_client:
              await self.redis_client.publish(CHAT_CHANNEL, message_obj.json())
              print(f"Published to Redis channel '{CHAT_CHANNEL}': {message_obj.json()}")
          else:
              print("Redis client not initialized.")
      
      async def listen_for_redis_messages(self):
          """监听Redis频道并将收到的消息广播给本地客户端"""
          if not self.redis_client:
              print("Redis client not initialized for listening.")
              return
      
          pubsub = self.redis_client.pubsub()
          await pubsub.subscribe(CHAT_CHANNEL)
          print(f"Subscribed to Redis channel '{CHAT_CHANNEL}'")
      
          try:
              while True:
                  message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
                  if message:
                      decoded_message = message['data'].decode('utf-8')
                      print(f"Received from Redis channel '{CHAT_CHANNEL}': {decoded_message}")
                      # 将收到的消息广播给本地连接的所有客户端
                      await self.broadcast_to_local_clients(decoded_message)
                  await asyncio.sleep(0.01) # 短暂休眠以避免CPU空转
          except asyncio.CancelledError:
              print("Redis listener task cancelled.")
          except Exception as e:
              print(f"Error in Redis listener: {e}")
          finally:
              await pubsub.unsubscribe(CHAT_CHANNEL)
              print(f"Unsubscribed from Redis channel '{CHAT_CHANNEL}'")
      

      manager = ConnectionManager()

      应用启动和关闭事件

      @app.on_event(“startup”)
      async def startup_event():
      manager.redis_client = aioredis.from_url(REDIS_URL, encoding=”utf-8″, decode_responses=True)
      # 启动一个后台任务来监听Redis消息
      app.state.redis_listener_task = asyncio.create_task(manager.listen_for_redis_messages())
      print(“FastAPI application startup.”)

      @app.on_event(“shutdown”)
      async def shutdown_event():
      if manager.redis_client:
      await manager.redis_client.close()
      print(“Redis client closed.”)
      if hasattr(app.state, ‘redis_listener_task’) and not app.state.redis_listener_task.done():
      app.state.redis_listener_task.cancel()
      try:
      await app.state.redis_listener_task # 等待任务真正结束
      except asyncio.CancelledError:
      pass
      print(“FastAPI application shutdown.”)

      … (HTML 代码与之前相同,但要注意客户端现在会发送和接收JSON) …

      @app.get(“/”)
      async def get():
      return HTMLResponse(html)

      @app.websocket(“/ws/{client_id}”)
      async def websocket_endpoint(websocket: WebSocket, client_id: str):
      await manager.connect(websocket)
      try:
      # 发送系统消息:用户加入
      system_message = ChatMessage(sender=”System”, message=f”User {client_id} joined the chat.”, type=”system”, timestamp=time.time())
      await manager.publish_message(system_message) # 发布到Redis

          while True:
              data = await websocket.receive_json() # 接收客户端的JSON消息
      
              client_message_content = data.get("message", "Empty message")
              chat_message = ChatMessage(sender=client_id, message=client_message_content, type="chat", timestamp=time.time())
              await manager.publish_message(chat_message) # 发布到Redis
      
      except WebSocketDisconnect:
          manager.disconnect(websocket)
          system_message = ChatMessage(sender="System", message=f"User `{client_id}` left the chat.", type="system", timestamp=time.time())
          await manager.publish_message(system_message) # 发布到Redis
      except Exception as e:
          print(f"An unexpected error occurred with client {client_id}: {e}")
          manager.disconnect(websocket)
          system_message = ChatMessage(sender="System", message=f"User `{client_id}` was disconnected due to an error.", type="system", timestamp=time.time())
          await manager.publish_message(system_message) # 发布到Redis
      

      “`

      运行此版本:

      1. 确保你的机器上运行着 Redis 服务器。
      2. pip install "redis[hiredis]" (hiredis 是 C 扩展,提供更好的性能)
      3. uvicorn main:app --reload --workers 2 (启动两个 Uvicorn worker 来模拟多实例环境)
      4. 打开浏览器,访问 http://localhost:8000。你会发现即使有多个 worker,消息也能在所有客户端之间同步。

    4.4 心跳机制 (Heartbeats/Keepalives)

    长时间不活动(Idle)的 WebSocket 连接可能会因为防火墙、代理服务器或负载均衡器的超时设置而被意外关闭。为了防止这种情况,可以实现心跳机制:

    • 客户端定时发送 ping 消息
    • 服务端在收到 ping 后回应 pong 消息
    • 服务端也可以定时发送 ping,客户端回应 pong
    • 如果一段时间内没有收到回应,则认为连接已断开,可以主动关闭。

    FastAPI 的 WebSocket 对象底层基于 Starlette,而 Starlette 又使用了 websockets 库,这个库本身有内置的 ping/pong 机制,通常不需要手动实现。但如果你遇到意外断开,可以考虑在应用层手动发送:

    “`python

    示例:服务器端定时发送 ping

    async def send_heartbeat(websocket: WebSocket, interval: int = 30):
    try:
    while True:
    await asyncio.sleep(interval)
    await websocket.send_text(“PING”) # 或者使用 websocket.send_ping()
    print(“Sent PING”)
    except WebSocketDisconnect:
    print(“Heartbeat task: Client disconnected.”)
    except asyncio.CancelledError:
    print(“Heartbeat task cancelled.”)
    except Exception as e:
    print(f”Heartbeat task error: {e}”)

    @app.websocket(“/ws/{client_id}”)
    async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket)
    heartbeat_task = asyncio.create_task(send_heartbeat(websocket)) # 启动心跳任务
    try:
    # … 正常消息处理逻辑 …
    while True:
    data = await websocket.receive_text()
    if data == “PONG”: # 客户端回应 PONG
    print(“Received PONG”)
    continue # 不广播 PONG 消息
    # … 处理其他消息 …
    except WebSocketDisconnect:
    heartbeat_task.cancel() # 客户端断开,取消心跳任务
    # …
    finally:
    heartbeat_task.cancel() # 确保任务被取消
    “`

    第五章:部署考虑事项

    将 FastAPI WebSocket 应用投入生产环境需要考虑一些关键因素:

    1. ASGI 服务器:

      • Uvicorn: 最常用的 ASGI 服务器,高性能且易于使用。
      • Gunicorn + Uvicorn Worker: 在生产环境中,通常会使用 Gunicorn 作为进程管理器,再让 Gunicorn 运行多个 Uvicorn Worker。这样可以更好地利用多核 CPU,提高吞吐量和稳定性。
        bash
        gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000

        -w 4 表示启动 4 个 worker。
    2. 反向代理 (Nginx/Caddy):

      • SSL/TLS (WSS): 生产环境必须使用 wss:// (WebSocket Secure) 连接,这意味着你需要配置 SSL 证书。Nginx 或 Caddy 可以帮你处理 SSL 终止。
      • 负载均衡: 如果有多个 FastAPI 实例,反向代理可以分发请求。
      • WebSocket 代理配置: Nginx 需要特殊的配置来正确代理 WebSocket 连接,需要设置 UpgradeConnection 头。

      Nginx 示例配置片段:
      “`nginx
      http {
      map $http_upgrade $connection_upgrade {
      default upgrade;
      ” close;
      }

      server {
          listen 80;
          listen 443 ssl;
          server_name your_domain.com;
      
          # SSL 配置 ...
      
          location / {
              proxy_pass http://localhost:8000; # 你的 FastAPI 应用地址
              proxy_http_version 1.1;
              proxy_set_header Upgrade $http_upgrade;
              proxy_set_header Connection $connection_upgrade;
              proxy_set_header Host $host;
              proxy_read_timeout 86400s; # 可以设置长一点,保持WS连接
          }
          # ... 其他配置 ...
      }
      

      }
      “`

    3. 横向扩展与 Redis Pub/Sub:

      • 如前所述,如果你运行了多个 FastAPI 实例(无论是在同一台机器上使用多个 worker 还是部署在多台机器上),必须 使用 Redis Pub/Sub 或其他消息队列(如 Kafka、RabbitMQ)来同步实时消息。否则,不同实例上的客户端将无法互相通信。
    4. Sticky Sessions (粘性会话):

      • 如果你没有使用 Redis Pub/Sub 这种机制,而是依赖于单个 FastAPI 实例管理其所有 WebSocket 连接(例如,在 ConnectionManager 中只保存本地连接),那么在负载均衡器后面,你需要配置“粘性会话”。
      • 目的: 确保同一个客户端的 WebSocket 连接始终路由到同一个 FastAPI 实例。
      • 缺点: 增加了负载均衡器的复杂性,且如果某个实例宕机,其所有连接的客户端都需要重新连接。
      • 最佳实践: 尽量使用 Redis Pub/Sub,这样每个客户端可以连接到任何实例,实现更好的弹性。
    5. 资源限制与监控:

      • WebSocket 连接会消耗内存和 CPU。监控服务器资源使用情况,根据负载调整 worker 数量。
      • 操作系统文件描述符限制:每个 WebSocket 连接都会占用一个文件描述符。在高并发场景下,可能需要增加操作系统的文件描述符限制 (ulimit -n)。

    第六章:总结与展望

    FastAPI 结合 WebSockets 提供了一个强大且高效的框架,用于构建各种实时应用程序。通过本教程,你已经掌握了:

    • WebSocket 的基本概念及其相对于 HTTP 的优势。
    • 如何在 FastAPI 中定义和处理 WebSocket 端点。
    • 构建一个基础的实时聊天室,并学会了连接管理。
    • 利用 Pydantic 结构化 WebSocket 消息。
    • 通过查询参数、Cookie 和 Header 实现 WebSocket 身份验证。
    • 使用 Redis Pub/Sub 解决多实例部署下的状态同步和横向扩展问题。
    • 理解部署 FastAPI WebSocket 应用的关键考虑事项。

    实时应用的开发是一个充满挑战但也极具回报的领域。从这里开始,你可以进一步探索:

    • 更复杂的聊天功能: 私聊、群组聊天、消息历史记录。
    • 实时数据推送: 结合数据库变更监听或外部数据源,实现数据的实时更新。
    • WebRTC: 用于点对点音视频通信。
    • 离线处理: 用户断开连接后,如何处理未发送的消息。
    • 更精细的错误处理和日志记录: 确保应用在生产环境中的稳定性。

    FastAPI 的现代化设计和出色的性能使得它成为构建下一代实时应用的首选工具。希望这篇教程能为你深入探索 FastAPI WebSocket 的世界提供坚实的基础!

    发表评论

    您的邮箱地址不会被公开。 必填项已用 * 标注

    滚动至顶部