FastAPI WebSocket 教程 – wiki基地


深入理解 FastAPI WebSocket:构建实时应用的基石

在现代 Web 应用中,实时性变得越来越重要。传统的 HTTP 请求/响应模式适用于一次性的数据交换,但对于聊天应用、实时数据仪表盘、游戏或协作工具等需要服务器主动推送数据给客户端的场景,效率较低且实现复杂(通常需要轮询)。这时,WebSocket 技术应运而生,它提供了一种在客户端和服务器之间建立持久性、双向通信通道的能力。

FastAPI,作为一款现代、快速(高性能)的 Web 框架,天然支持异步编程,这使得它与 WebSockets 的异步特性完美契合。本篇文章将带你深入了解如何在 FastAPI 中构建 WebSocket 应用程序。

一、什么是 WebSocket?为什么在 FastAPI 中使用它?

WebSocket 是一种网络传输协议,它在客户端和服务器之间提供全双工(即双向)通信通道。一旦连接建立,服务器和客户端都可以随时向对方发送数据,而无需像 HTTP 那样每次通信都建立新的连接。这大大减少了延迟和开销,特别适合需要频繁、低延迟数据交换的场景。

为什么在 FastAPI 中使用 WebSocket?

  1. 异步原生支持: FastAPI 基于 Starlette 构建,而 Starlette 是为异步框架设计的。WebSocket 操作(如接受连接、接收消息、发送消息)本质上是 I/O 密集型的,异步编程可以让你在等待一个连接的 I/O 操作时,同时处理其他连接或任务,从而提高服务器的并发能力和整体性能。
  2. 简洁的API: FastAPI 提供了非常直观和易用的 API 来处理 WebSocket 连接和消息。
  3. 高性能: 结合 Uvicorn、Hypercorn 等 ASGI 服务器,FastAPI 的异步特性使得 WebSocket 应用能够处理大量并发连接。
  4. 集成性: FastAPI 强大的依赖注入系统、数据验证(Pydantic)、安全性特性等都可以与 WebSocket endpoint 结合使用。

二、准备工作

在开始之前,请确保你已经安装了 Python 3.7+ 和 FastAPI 以及一个 ASGI 服务器(如 Uvicorn)。

bash
pip install fastapi uvicorn websockets

注意:websockets 库是 FastAPI 处理 WebSocket 连接的底层依赖,尽管你直接使用 fastapiuvicorn 即可启动应用,但显式安装 websockets 是一个好习惯。

三、构建一个基本的 WebSocket Endpoint

我们将从一个最简单的例子开始:一个 WebSocket endpoint,它接受连接,然后循环接收来自客户端的消息,并将收到的消息原样发回给客户端(一个简单的 echo 服务器)。

创建一个 Python 文件,例如 main.py

“`python

main.py

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
“””
处理 WebSocket 连接的 endpoint
“””
await websocket.accept() # 接受客户端的连接
print(“WebSocket 连接已建立”)
try:
while True:
# 循环接收客户端发送的文本消息
data = await websocket.receive_text()
print(f”收到消息: {data}”)
# 将收到的消息发送回客户端
await websocket.send_text(f”消息内容是: {data}”)
except WebSocketDisconnect:
# 捕获连接断开异常
print(“WebSocket 连接已断开”)
# 可以在这里执行一些清理工作,例如从连接列表中移除该连接
except Exception as e:
# 捕获其他可能的异常
print(f”WebSocket 发生错误: {e}”)
# 可以选择关闭连接
# await websocket.close() # 根据需要是否显式关闭
“`

代码解释:

  1. from fastapi import FastAPI, WebSocket, WebSocketDisconnect: 导入必要的类和异常。
  2. app = FastAPI(): 创建 FastAPI 应用实例。
  3. @app.websocket("/ws"): 这是一个新的 FastAPI 装饰器,用于注册一个 WebSocket endpoint。它指定了客户端连接时使用的路径。
  4. async def websocket_endpoint(websocket: WebSocket):: 定义一个异步函数来处理 WebSocket 连接。FastAPI 会自动将代表当前连接的 WebSocket 对象作为参数传递给这个函数。使用类型提示 websocket: WebSocket 是推荐的做法。
  5. await websocket.accept(): 这是建立 WebSocket 连接的关键步骤。服务器必须调用 accept() 方法来完成握手,客户端才会认为连接成功建立。
  6. while True:: 进入一个无限循环,持续监听来自客户端的消息。
  7. data = await websocket.receive_text(): 异步地等待并接收客户端发送的下一条文本消息。这个方法会阻塞,直到收到消息或连接断开。FastAPI 的 WebSocket 对象提供了多种接收方法,如 receive_text() (接收文本)、receive_bytes() (接收二进制数据)、receive_json() (接收 JSON 数据并自动解析)。
  8. await websocket.send_text(f"消息内容是: {data}"): 异步地向客户端发送一条文本消息。同样,也有 send_bytes()send_json() 方法。
  9. except WebSocketDisconnect:: 当客户端正常或非正常断开连接时,receive_text()(或其他接收方法)会抛出 WebSocketDisconnect 异常。我们捕获这个异常,可以在这里记录日志或执行清理操作。
  10. except Exception as e:: 捕获处理过程中可能发生的其他异常,例如消息解析错误等。

四、运行应用程序

使用 Uvicorn 运行这个应用:

bash
uvicorn main:app --reload

这会启动一个服务器,监听默认端口 8000。

五、测试 WebSocket 连接

你不能直接在浏览器地址栏访问 http://127.0.0.1:8000/ws 来测试 WebSocket,因为这是不同的协议。你需要一个 WebSocket 客户端。

以下是几种测试方法:

1. 使用浏览器开发者工具 (Console)

打开一个空白页面,按 F12 打开开发者工具,切换到 Console 标签页。输入以下 JavaScript 代码:

“`javascript
// 创建 WebSocket 连接
var ws = new WebSocket(“ws://127.0.0.1:8000/ws”);

// 连接成功时触发
ws.onopen = function(event) {
console.log(“WebSocket 连接成功”);
// 连接成功后发送消息
ws.send(“Hello FastAPI!”);
};

// 收到消息时触发
ws.onmessage = function(event) {
console.log(“收到服务器消息: “, event.data);
};

// 连接错误时触发
ws.onerror = function(event) {
console.error(“WebSocket 错误: “, event);
};

// 连接关闭时触发
ws.onclose = function(event) {
console.log(“WebSocket 连接已关闭: “, event.code, event.reason);
};

// 手动关闭连接 (可选)
// ws.close();
“`

执行这些代码,你会在控制台看到连接建立的日志,发送的消息,以及收到服务器回显的消息。在运行 Uvicorn 的终端也会看到相应的日志输出。

2. 使用 Python 客户端库

你可以编写一个简单的 Python 脚本作为客户端:

“`python

client.py

import asyncio
import websockets

async def connect_and_send():
uri = “ws://127.0.0.1:8000/ws”
try:
async with websockets.connect(uri) as websocket:
print(“WebSocket 连接成功”)

        # 发送消息
        message = "你好,服务器!"
        await websocket.send(message)
        print(f"已发送: {message}")

        # 接收回显消息
        response = await websocket.recv()
        print(f"收到回显: {response}")

        # 可以继续发送和接收
        message2 = "第二条消息"
        await websocket.send(message2)
        print(f"已发送: {message2}")
        response2 = await websocket.recv()
        print(f"收到回显: {response2}")


except websockets.exceptions.ConnectionClosedOK:
    print("WebSocket 连接正常关闭")
except websockets.exceptions.ConnectionClosedError as e:
     print(f"WebSocket 连接异常关闭: {e}")
except Exception as e:
    print(f"发生其他错误: {e}")

if name == “main“:
asyncio.run(connect_and_send())
“`

运行 python client.py,同样可以看到与服务器的交互过程。

3. 使用命令行工具

例如,可以使用 wscat (需要 Node.js 环境安装 npm install -g wscat):

bash
wscat -c ws://127.0.0.1:8000/ws

连接成功后,你可以在终端输入文本,按回车发送,服务器的回显消息也会显示在终端。

六、处理多个客户端连接

上面的例子只能处理一个连接。在实际应用中,服务器需要同时处理多个 WebSocket 连接,并且可能需要实现广播消息(发给所有连接的客户端)或向特定客户端发送消息的功能(私聊)。

为了管理这些连接,我们可以创建一个 ConnectionManager 类来跟踪所有活跃的连接。

修改 main.py

“`python

main.py (处理多个客户端)

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

class ConnectionManager:
“””
用于管理 WebSocket 连接的类
“””
def init(self):
# 使用列表存储所有活跃的 WebSocket 连接
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
    """
    处理新连接的建立
    """
    await websocket.accept() # 接受连接
    self.active_connections.append(websocket) # 将新连接添加到列表

def disconnect(self, websocket: WebSocket):
    """
    处理连接断开
    """
    try:
        self.active_connections.remove(websocket) # 从列表中移除断开的连接
    except ValueError:
        # 如果连接不在列表中,说明已经被移除了(可能在广播时因断开移除)
        pass

async def send_personal_message(self, message: str, websocket: WebSocket):
    """
    向单个客户端发送消息
    """
    await websocket.send_text(message)

async def broadcast(self, message: str):
    """
    向所有活跃客户端广播消息
    """
    # 遍历所有活跃连接并发送消息
    # 注意:在遍历时如果连接断开,send_text() 会抛出异常
    # 为了安全,我们可以在发送前检查,或捕获异常后移除
    disconnected_connections = []
    for connection in self.active_connections:
        try:
            await connection.send_text(message)
        except RuntimeError as e:
             # 如果在异步操作中遇到连接已关闭的错误
             # print(f"发送消息失败,连接可能已关闭: {e}")
             disconnected_connections.append(connection)
        except Exception as e:
            print(f"向连接 {connection} 发送消息时发生错误: {e}")
            disconnected_connections.append(connection)

    # 移除发送失败的连接
    for connection in disconnected_connections:
         self.disconnect(connection)

创建一个全局或应用级别的连接管理器实例

manager = ConnectionManager()

@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
“””
处理带有客户端ID的 WebSocket 连接
“””
await manager.connect(websocket) # 使用管理器建立连接
print(f”客户端 {client_id} 连接已建立”)
try:
# 通知所有客户端有新用户加入 (广播)
await manager.broadcast(f”客户端 {client_id} 加入了聊天”)

    while True:
        # 接收客户端消息
        data = await websocket.receive_text()
        print(f"收到来自客户端 {client_id} 的消息: {data}")

        # 将消息广播给所有其他客户端 (模拟聊天室)
        await manager.broadcast(f"客户端 {client_id} 说: {data}")

        # 也可以向特定客户端发送消息 (如果知道其连接对象或ID映射)
        # await manager.send_personal_message(f"你发送了: {data}", websocket)

except WebSocketDisconnect:
    # 处理连接断开
    manager.disconnect(websocket) # 使用管理器移除连接
    print(f"客户端 {client_id} 连接已断开")
    # 通知所有客户端有用户离开 (广播)
    await manager.broadcast(f"客户端 {client_id} 离开了聊天")
except Exception as e:
    print(f"客户端 {client_id} WebSocket 发生错误: {e}")
    # 发生其他错误时也断开连接
    manager.disconnect(websocket)
    await manager.broadcast(f"客户端 {client_id} 因错误退出")

除了 WebSocket endpoint,你还可以有标准的 HTTP endpoint

@app.get(“/”)
async def get():
return {“message”: “Hello HTTP”}

或者一个简单的 HTML 页面,包含 JavaScript WebSocket 客户端代码,方便测试

from fastapi.responses import HTMLResponse

html = “””

<!DOCTYPE html>

Chat

WebSocket Chat

Client ID:

Message:

    “””

    @app.get(“/chat”, response_class=HTMLResponse)

    async def get_chat_page():

    return html

    “`

    代码解释:

    1. ConnectionManager 类:
      • active_connections: List[WebSocket]: 存储所有当前活跃的 WebSocket 对象。
      • connect(websocket): 将新的连接添加到列表。
      • disconnect(websocket): 从列表中移除断开的连接。
      • send_personal_message(message, websocket): 向指定的单个连接发送消息。
      • broadcast(message): 遍历列表,向所有连接发送消息。这里增加了简单的错误处理,如果在发送过程中发现连接已失效,会将其从列表中移除。
    2. manager = ConnectionManager(): 创建一个 ConnectionManager 实例。在实际应用中,如果有多 worker 或分布式部署,需要使用 Redis Pub/Sub 或其他消息队列来实现跨进程/机器的广播。这里简化为单进程内存管理。
    3. @app.websocket("/ws/{client_id}"): WebSocket endpoint 现在可以接收路径参数,例如用于标识不同的客户端或房间。这里我们接收一个 client_id
    4. websocket_endpoint 函数内部:
      • await manager.connect(websocket): 使用管理器接受连接并将其添加到管理列表。
      • manager.disconnect(websocket): 在 WebSocketDisconnect 异常中,使用管理器移除连接。
      • await manager.broadcast(...): 使用管理器的广播方法发送消息给所有连接。

    现在你可以启动服务器,并使用多个客户端连接 ws://127.0.0.1:8000/ws/1ws://127.0.0.1:8000/ws/2 等。你会发现一个客户端发送的消息会被广播给所有连接的客户端。

    七、发送和接收不同类型的数据

    除了文本 (send_text, receive_text),FastAPI 的 WebSocket 对象还支持:

    • 二进制数据: 使用 send_bytes()receive_bytes()
    • JSON 数据: 使用 send_json()receive_json()。这非常方便,FastAPI 会自动处理 JSON 的序列化和反序列化。

    示例:发送和接收 JSON

    “`python
    from fastapi import FastAPI, WebSocket, WebSocketDisconnect

    app = FastAPI()

    @app.websocket(“/ws_json”)
    async def websocket_json_endpoint(websocket: WebSocket):
    await websocket.accept()
    print(“WebSocket JSON 连接已建立”)
    try:
    while True:
    # 接收 JSON 数据
    data = await websocket.receive_json()
    print(f”收到 JSON 数据: {data}”)

            # 构建 JSON 响应
            response = {"received": data, "status": "success"}
    
            # 发送 JSON 数据
            await websocket.send_json(response)
    
    except WebSocketDisconnect:
        print("WebSocket JSON 连接已断开")
    except Exception as e:
        print(f"WebSocket JSON 发生错误: {e}")
    

    “`

    客户端可以使用相应的 JSON 发送/接收方法或手动序列化/反序列化。例如,在 JavaScript 客户端中使用 JSON.stringify()JSON.parse()

    八、错误处理与连接状态

    • WebSocketDisconnect 是处理连接正常或异常关闭的主要异常。始终建议在 try...except WebSocketDisconnect 块中包含 WebSocket 的接收循环。
    • 在异步发送消息 (send_text, send_json, etc.) 时,如果连接在发送过程中断开,可能会抛出 RuntimeError 或其他与连接状态相关的异常。在处理多个连接并广播时,遍历列表发送消息时需要小心处理这些异常,以避免一个断开的连接导致整个广播失败,并在捕获异常后及时将该连接从管理列表中移除。
    • 可以使用 websocket.close() 方法从服务器端主动关闭连接。

    九、进阶话题(简述)

    • 身份验证与授权: 在 WebSocket 握手阶段或接收消息后,可以像处理 HTTP 请求一样进行用户身份验证。例如,可以在连接建立前检查请求头、Cookie 或 URL 参数中的 token。一旦连接建立,可以基于用户的身份控制其行为(例如,只允许发送给特定房间)。
    • 使用 Pydantic 进行数据验证: 结合 receive_json(),可以利用 Pydantic 模型自动验证收到的 JSON 数据格式。
    • 后台任务: 对于需要服务器定期向客户端推送数据的场景(例如实时股票报价),可以使用 FastAPI 的后台任务(@app.on_event("startup") 启动一个 loop)或 asyncio 任务来完成,这些任务可以访问 ConnectionManager 并向客户端发送数据。
    • 跨进程/机器扩展: 如前所述,简单的内存 ConnectionManager 不适用于多 worker 或分布式部署。需要使用外部消息代理(如 Redis Pub/Sub, RabbitMQ, Kafka)来实现跨多个服务器实例的广播和消息路由。Starlette (FastAPI 的底层) 有一些社区库(如 starlette-redis-pubsub)可以帮助实现这一点。

    十、总结

    FastAPI 为构建高性能、易于维护的 WebSocket 应用程序提供了强大的支持。通过其简洁的 API 和异步特性,你可以轻松地实现实时数据传输、构建聊天室等功能。理解 WebSocket 的全双工通信模式、如何在 FastAPI 中定义 endpoint、处理连接生命周期(接受、接收、发送、断开)以及如何管理多个连接是构建复杂实时应用的关键。

    希望本教程能帮助你迈出使用 FastAPI 构建 WebSocket 应用的第一步!


    发表评论

    您的邮箱地址不会被公开。 必填项已用 * 标注

    滚动至顶部