如何在FastAPI中使用WebSocket构建实时应用 – wiki基地


使用FastAPI与WebSocket构建高性能实时应用

在现代Web应用的开发中,实时性变得越来越重要。聊天应用、在线游戏、股票行情显示、协作工具、实时通知系统等等,都依赖于服务器能够主动向客户端推送数据,而不需要客户端不断地发起请求(轮询)。传统的HTTP协议是无状态的请求-响应模式,虽然可以通过长轮询等技术模拟实时性,但这通常效率低下且资源消耗较大。WebSocket协议的出现,彻底改变了这一局面。

WebSocket协议提供了一个在单个TCP连接上进行全双工、双向通信的通道。一旦WebSocket连接建立,服务器和客户端都可以随时向对方发送数据,无需额外的HTTP开销。这使得构建实时应用变得高效且直观。

FastAPI是一个现代、快速(高性能)的Web框架,用于构建API。它基于标准的Python类型提示,并使用Starlette作为其底层异步框架。Starlette原生支持WebSocket,这意味着FastAPI也天然地支持WebSocket,并且能够充分利用Python的异步能力(async/await)来高效地处理大量的并发WebSocket连接。

本文将深入探讨如何在FastAPI中使用WebSocket构建实时应用,从基础概念到更高级的主题,并提供详细的代码示例。

文章目录

  1. 理解实时应用与WebSocket
    • 为什么需要实时性?
    • 传统HTTP的局限性
    • WebSocket协议简介
    • WebSocket与HTTP的对比
  2. FastAPI对WebSocket的支持
    • FastAPI基于Starlette
    • WebSocket
    • @app.websocket()装饰器
  3. 构建一个基础WebSocket Echo服务器
    • 项目设置与依赖
    • 编写FastAPI应用
    • 解释代码:连接的建立、数据的接收与发送
    • 运行应用
    • 客户端测试方法(浏览器、Python)
  4. 处理多个WebSocket客户端:构建一个简单的聊天室
    • 管理多个连接的需求
    • 引入ConnectionManager
    • 实现连接管理器:连接、断开、发送个人消息、广播消息
    • 修改WebSocket端点使用连接管理器
    • 编写一个简单的HTML/JavaScript客户端
    • 运行和测试聊天室
  5. WebSocket的数据类型:文本与字节
    • 发送和接收文本消息 (send_text, receive_text)
    • 发送和接收字节消息 (send_bytes, receive_bytes)
    • 发送和接收JSON消息 (send_json, receive_json)
  6. WebSocket的连接状态与错误处理
    • WebSocketDisconnect异常
    • 优雅地处理客户端断开连接
    • 其他潜在的异常处理
  7. WebSocket的认证与授权
    • 如何在WebSocket连接中识别用户
    • 通过查询参数或Header进行认证(握手阶段)
    • 在连接管理器中关联用户ID与WebSocket连接
  8. WebSocket的高级主题

    • 状态管理: 共享数据(例如:在线用户列表、聊天记录)
    • Subprotocols: 使用子协议
    • 性能与扩展:
      • FastAPI的异步能力如何处理并发
      • 单机扩展:增加Worker数量
      • 分布式扩展:使用消息队列/发布订阅系统(如Redis Pub/Sub)处理跨进程/服务器广播
    • 部署考虑: 代理服务器(Nginx/Traefik)、SSL/TLS
  9. 总结

  10. 进一步学习资源

1. 理解实时应用与WebSocket

为什么需要实时性?

在许多现代应用场景中,用户期望获得即时反馈和最新信息。例如:

  • 在线聊天/协作: 消息需要立即送达所有参与者。
  • 实时仪表盘: 股票价格、系统指标、传感器数据需要持续更新。
  • 在线游戏: 玩家的动作需要实时同步给其他玩家。
  • 实时通知: 新邮件、点赞、评论等需要立即通知用户。

传统HTTP的局限性

HTTP协议是客户端发起请求、服务器响应的模式。服务器无法主动向客户端发送数据。为了模拟实时性,开发者通常使用以下技术:

  • 短轮询 (Short Polling): 客户端每隔一小段时间就向服务器发送一个请求,询问是否有新数据。这会导致大量的HTTP请求,浪费带宽和服务器资源,而且延迟较高。
  • 长轮询 (Long Polling): 客户端发送请求后,服务器如果没有新数据会 hold 住连接,直到有新数据或者超时才响应。客户端收到响应后立即发起新的请求。这减少了请求数量,但仍然是请求-响应模式,且对服务器资源(保持连接)有一定压力。
  • HTTP Streaming: 服务器发送响应后不关闭连接,持续向客户端推送数据。这是单向的(服务器到客户端),且实现和解析相对复杂。

这些技术都是在无状态的HTTP协议之上进行的妥协。

WebSocket协议简介

WebSocket(WS)协议在OSI模型的应用层运行,通过HTTP协议的101 Switching Protocols状态码进行握手升级,从而从HTTP连接转变为WebSocket连接。一旦握手成功,客户端和服务器之间就会建立一个持久的、双向的、全双工的通信通道。

全双工: 客户端和服务器可以在同一时间相互发送数据。
双向: 任意一方都可以主动发送数据给对方。
持久连接: 连接一旦建立,就可以保持开启,用于后续的多次数据传输,无需重复建立连接的开销。

这使得WebSocket非常适合需要低延迟、高频率双向通信的应用场景。

WebSocket与HTTP的对比

特性 HTTP WebSocket
通信模式 请求-响应(无状态) 全双工双向(有状态,连接保持)
连接开销 每个请求都需要建立/关闭连接(或复用) 只需要一次握手建立连接,后续数据传输开销小
协议头 每次请求都包含完整的HTTP头 握手后数据帧开销很小
服务器推送 不支持(需依赖轮询或流) 原生支持
适用场景 常规网页浏览、API调用 实时聊天、游戏、通知、实时数据推送

2. FastAPI对WebSocket的支持

FastAPI基于Starlette构建,而Starlette从设计之初就考虑了异步和实时应用的需求,提供了原生的WebSocket支持。

FastAPI基于Starlette

理解FastAPI的WebSocket支持,首先要知道它是Starlette之上的一层封装。Starlette提供了底层的ASGI接口实现,包括对HTTP和WebSocket连接的处理。FastAPI则在其上提供了路由、依赖注入、数据验证/序列化等高级功能。

WebSocket

在FastAPI的WebSocket端点中,你将主要与 starlette.websockets.WebSocket 类打交道。这个类的实例代表了一个活动的WebSocket连接,它提供了一系列异步方法用于发送和接收数据:

  • accept(subprotocol: Optional[str] = None, headers: Optional[Sequence[Tuple[bytes, bytes]]] = None) -> None: 接受来自客户端的WebSocket连接请求。在处理任何消息之前必须调用此方法。
  • receive_text() -> str: 异步接收客户端发送的文本消息。
  • receive_bytes() -> bytes: 异步接收客户端发送的字节消息。
  • receive_json(mode: str = 'text') -> Any: 异步接收客户端发送的JSON消息(可以是文本或字节格式)。
  • send_text(data: str) -> None: 异步向客户端发送文本消息。
  • send_bytes(data: bytes) -> None: 异步向客户端发送字节消息。
  • send_json(data: Any, mode: str = 'text') -> None: 异步向客户端发送JSON消息。
  • close(code: int = 1000) -> None: 异步关闭WebSocket连接。code 是关闭状态码,1000表示正常关闭。

@app.websocket()装饰器

FastAPI提供了一个便捷的装饰器 @app.websocket("/ws/{some_parameter}") 来定义WebSocket端点。这个装饰器下的异步函数将处理来自指定路径的WebSocket连接请求。函数签名通常包含一个 websocket: WebSocket 参数,你可以通过它与客户端进行交互。路径参数 ({some_parameter}) 的使用方式与HTTP路径参数类似。

“`python
from fastapi import FastAPI, WebSocket

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 在这里处理WebSocket连接
pass
“`

3. 构建一个基础WebSocket Echo服务器

我们先创建一个最简单的例子:一个Echo服务器。客户端发送什么消息,服务器就原样返回什么消息。

项目设置与依赖

确保你已经安装了FastAPI和Uvicorn(ASGI服务器):

bash
pip install fastapi uvicorn websockets

websockets 库是 Starlette/FastAPI 内部使用的底层 WebSocket 实现之一。虽然你直接使用的是 Starlette/FastAPI 的 WebSocket 类,但安装 websockets 是必要的。

编写FastAPI应用

创建一个Python文件(例如 main.py):

“`python

main.py

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() # 1. 接受连接
try:
while True:
data = await websocket.receive_text() # 2. 接收文本消息
print(f”Received message: {data}”)
await websocket.send_text(f”Message text was: {data}”) # 3. 发送文本消息
except WebSocketDisconnect: # 4. 处理断开连接
print(“Client disconnected”)
await websocket.close() # 可选,但通常在异常时会自动关闭
except Exception as e: # 处理其他可能的错误
print(f”An error occurred: {e}”)
# 可以在这里发送错误信息给客户端,然后关闭连接
# await websocket.send_text(f”Error: {e}”)
await websocket.close(code=1011) # 1011表示内部错误

“`

解释代码:连接的建立、数据的接收与发送

  1. await websocket.accept(): 当一个客户端尝试连接到 /ws 路径时,FastAPI/Starlette 会调用 websocket_endpoint 函数。在处理任何接收或发送操作之前,你需要调用 accept() 来完成WebSocket握手并正式建立连接。如果不调用 accept(),连接最终会超时关闭。
  2. while True:: 建立连接后,我们进入一个无限循环,持续监听客户端发送的消息。
  3. data = await websocket.receive_text(): 这行代码异步地等待从客户端接收到的下一条文本消息。当消息到达时,它会被赋值给 data 变量。如果客户端发送的是其他类型(如字节)或连接断开,这里可能会抛出异常。
  4. await websocket.send_text(f"Message text was: {data}"): 收到消息后,我们使用 send_text() 方法将一个文本消息发送回客户端。
  5. except WebSocketDisconnect:: 这是处理客户端正常断开连接的关键。当客户端关闭WebSocket连接时(例如,关闭浏览器标签页,或者在JavaScript中调用 websocket.close()),receive_text()receive_bytes() 等方法会抛出 WebSocketDisconnect 异常。我们捕获这个异常,打印一条消息,并可以执行清理工作。FastAPI/Starlette 在这个异常发生时通常会自动尝试发送一个关闭帧并关闭底层TCP连接,所以显式调用 websocket.close() 在这里可能是冗余的,但捕获异常本身是重要的。
  6. except Exception as e:: 捕获其他未预期的异常,以防止服务器崩溃。在实际应用中,你可能需要更精细的错误处理。

运行应用

保存文件后,在终端中运行Uvicorn服务器:

bash
uvicorn main:app --reload

你应该会看到Uvicorn启动的输出,表示服务器正在监听。

客户端测试方法

方法1:使用浏览器开发者工具

这是最便捷的测试方法之一。
1. 打开任何一个现代浏览器(如Chrome, Firefox)。
2. 打开开发者工具 (通常按 F12)。
3. 切换到 “Console” 标签页。
4. 输入以下JavaScript代码并执行:

“`javascript
// 创建一个WebSocket连接
var ws = new WebSocket(“ws://localhost:8000/ws”);

// 监听连接建立事件
ws.onopen = function(event) {
console.log(“WebSocket connection opened:”, event);
// 连接建立后,发送一条消息
ws.send(“Hello FastAPI WebSocket!”);
};

// 监听收到消息事件
ws.onmessage = function(event) {
console.log(“Received message:”, event.data);
};

// 监听连接关闭事件
ws.onclose = function(event) {
if (event.wasClean) {
console.log(Connection closed cleanly, code=${event.code} reason=${event.reason});
} else {
console.error(“Connection died”);
}
};

// 监听错误事件
ws.onerror = function(error) {
console.error(“WebSocket error:”, error);
};

// 发送更多消息 (在连接打开后)
// ws.send(“Another message”);

// 关闭连接 (如果需要)
// ws.close();

``
执行
new WebSocket(…)后,如果服务器正常运行,你会看到WebSocket connection opened的日志,然后是服务器返回的Received message: Message text was: Hello FastAPI WebSocket!。你可以在控制台中输入ws.send(“Your message here”)发送更多消息。关闭浏览器标签页会触发onclose` 事件,服务器端会打印 “Client disconnected”。

方法2:使用Python客户端

你也可以用Python编写一个客户端来测试。需要安装 websockets 库。

bash
pip install websockets

创建一个新的Python文件(例如 client.py):

“`python

client.py

import asyncio
import websockets

async def send_message():
uri = “ws://localhost:8000/ws”
async with websockets.connect(uri) as websocket:
await websocket.send(“Hello from Python client!”)
print(f”> Hello from Python client!”)

    response = await websocket.recv()
    print(f"< {response}")

    await websocket.send("Another message from Python!")
    print(f"> Another message from Python!")

    response2 = await websocket.recv()
    print(f"< {response2}")

asyncio.get_event_loop().run_until_complete(send_message())

``
运行
python client.py`,你会在客户端和服务端控制台看到消息的发送和接收记录。

这个基础示例展示了FastAPI处理单个WebSocket连接的基本流程:接受连接、进入循环监听消息、处理消息、处理断开连接。

4. 处理多个WebSocket客户端:构建一个简单的聊天室

Echo服务器只处理单个连接,而实际应用通常需要管理多个并发连接,并在它们之间协调数据(例如,广播消息给所有用户)。我们将构建一个简单的聊天室,演示如何管理多个WebSocket连接。

管理多个连接的需求

在一个聊天室中,当一个用户发送消息时,服务器需要将这条消息发送给所有其他在线用户。这意味着服务器需要知道当前所有活动的WebSocket连接,并能够遍历它们进行广播。

引入ConnectionManager

一个常见的模式是创建一个 ConnectionManager 类来封装连接的管理逻辑。它可以维护一个活动连接的列表,并提供添加连接、移除连接、发送消息给特定连接以及广播消息给所有连接的方法。

实现连接管理器

“`python

main.py (在之前的代码基础上添加)

… (前面的import和app = FastAPI()保留)

from typing import List

class ConnectionManager:
def init(self):
# 使用列表存储所有活动的WebSocket连接
self.active_connections: List[WebSocket] = []

async def connect(self, websocket: WebSocket):
    """处理新连接"""
    await websocket.accept()
    self.active_connections.append(websocket)
    print(f"Client connected: {websocket.client.host}:{websocket.client.port}. Total connections: {len(self.active_connections)}")

def disconnect(self, websocket: WebSocket):
    """处理连接断开"""
    try:
        self.active_connections.remove(websocket)
        print(f"Client disconnected: {websocket.client.host}:{websocket.client.port}. Total connections: {len(self.active_connections)}")
    except ValueError:
        # 如果连接不在列表中(可能因为异常或其他原因没有成功添加到列表)
        print(f"Disconnected client {websocket.client.host}:{websocket.client.port} was not in the active connections list.")


async def send_personal_message(self, message: str, websocket: WebSocket):
    """向单个客户端发送消息"""
    try:
        await websocket.send_text(message)
    except WebSocketDisconnect:
         print(f"Attempted to send message to disconnected client: {websocket.client.host}:{websocket.client.port}")
         # 可选:如果发送失败是因为断开连接,可以在这里调用 disconnect(websocket)
         # 但是通常 receive_text/bytes 失败会先抛出 WebSocketDisconnect 并由端点函数处理
         # self.disconnect(websocket)
    except Exception as e:
         print(f"Error sending message to client {websocket.client.host}:{websocket.client.port}: {e}")
         # 可以在这里决定是否断开连接
         # await websocket.close(code=1011)


async def broadcast(self, message: str):
    """向所有连接的客户端广播消息"""
    # 创建一个列表来存储无效的连接,以便在遍历后移除
    invalid_connections = []
    for connection in self.active_connections:
        try:
            await connection.send_text(message)
        except WebSocketDisconnect:
             # 如果在广播时发现连接已断开,标记为无效
             print(f"Client disconnected during broadcast: {connection.client.host}:{connection.client.port}")
             invalid_connections.append(connection)
        except Exception as e:
             # 处理其他发送错误
             print(f"Error broadcasting to client {connection.client.host}:{connection.client.port}: {e}")
             # 可以在这里决定是否将这个连接标记为无效并移除
             invalid_connections.append(connection) # 将出错的连接也标记为无效

    # 移除所有无效的连接
    for invalid_conn in invalid_connections:
        # 使用 try-except block, 因为在多线程/多进程环境中,
        # active_connections 列表可能在迭代和修改之间发生变化
        try:
             self.active_connections.remove(invalid_conn)
        except ValueError:
             # 如果连接已经被移除了,忽略
             pass

    if invalid_connections:
         print(f"Removed {len(invalid_connections)} invalid connections. Total connections: {len(self.active_connections)}")

创建一个ConnectionManager实例

manager = ConnectionManager()

… (基础Echo服务器的 @app.websocket(“/ws”) 可以注释掉或删除)

“`

修改WebSocket端点使用连接管理器

现在我们将 /ws 端点修改为聊天室逻辑,使用 ConnectionManager

“`python

main.py (继续在上面代码的基础上添加)

@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
“””
聊天室WebSocket端点,每个连接通过client_id区分
“””
await manager.connect(websocket) # 1. 使用管理器处理连接建立
try:
# 2. 收到消息后,向所有客户端广播,包括发送者自己
while True:
data = await websocket.receive_text()
message = f”Client #{client_id} says: {data}”
print(f”Received message from client #{client_id}: {data}”)
await manager.broadcast(message) # 3. 使用管理器广播消息

except WebSocketDisconnect: # 4. 处理断开连接
    manager.disconnect(websocket) # 使用管理器处理断开

except Exception as e: # 处理其他错误
    print(f"An error occurred with client #{client_id}: {e}")
    manager.disconnect(websocket) # 发生错误时也断开连接
    # 可以在这里考虑通知其他客户端某个用户由于错误断开

``
注意:我们将路径改为了
/ws/{client_id}。这意味着每个连接都需要在URL中提供一个唯一的client_id`。在实际应用中,这个ID可能来自用户认证,而不是简单地从URL获取。

编写一个简单的HTML/JavaScript客户端

为了测试聊天室,我们需要一个客户端界面。创建一个 index.html 文件:

“`html




FastAPI WebSocket Chat


FastAPI WebSocket Chat







```

运行和测试聊天室

  1. 确保 main.py 包含上面完整的代码(包括 ConnectionManager/ws/{client_id} 端点)。
  2. 运行Uvicorn服务器:uvicorn main:app --reload
  3. index.html 文件保存在项目的同一目录下。
  4. 在浏览器中打开 index.html 文件(可以直接双击打开)。
  5. 在第一个浏览器窗口中,输入一个客户端ID(例如 1),点击 "Connect"。你应该会看到控制台和页面上的连接成功消息,并且服务器控制台会打印连接信息。
  6. 打开第二个浏览器窗口,同样打开 index.html。输入另一个客户端ID(例如 2),点击 "Connect"。服务器控制台会打印第二个连接信息。
  7. 在任一浏览器窗口的输入框中输入消息,点击 "Send"。消息应该会出现在所有已连接的浏览器窗口的聊天框中,并且服务器控制台会打印收到的消息和广播信息。
  8. 尝试关闭一个浏览器窗口,服务器控制台会打印断开连接信息。在其他窗口发送消息,断开的客户端将不再收到。

这个聊天室示例很好地演示了如何使用 ConnectionManager 来管理多个并发的WebSocket连接,并实现服务器向多个客户端广播消息的功能。

5. WebSocket的数据类型:文本与字节

WebSocket协议本身支持发送文本数据(UTF-8编码)和二进制数据。FastAPI/Starlette的 WebSocket 类提供了相应的方法:

  • send_text(data: str)receive_text() -> str 用于处理文本。
  • send_bytes(data: bytes)receive_bytes() -> bytes 用于处理字节。

此外,FastAPI/Starlette还提供了方便的方法来发送和接收JSON数据:

  • send_json(data: Any, mode: str = 'text'): 发送Python对象,会自动序列化为JSON字符串(默认为文本模式,也可以指定 mode='bytes' 发送二进制JSON)。
  • receive_json(mode: str = 'text') -> Any: 接收JSON数据(可以是文本或字节),会自动反序列化为Python对象。

这在发送结构化数据时非常有用,例如,在聊天应用中发送包含发送者、消息内容、时间戳的对象,而不是简单的纯文本。

示例:发送和接收JSON消息

```python

在上面的ConnectionManager中添加发送/接收JSON的方法

... (ConnectionManager类的其他方法)

async def send_personal_json(self, data: Any, websocket: WebSocket):
    """向单个客户端发送JSON消息"""
    try:
        await websocket.send_json(data)
    except WebSocketDisconnect:
        print(f"Attempted to send JSON to disconnected client: {websocket.client.host}:{websocket.client.port}")
    except Exception as e:
         print(f"Error sending JSON to client {websocket.client.host}:{websocket.client.port}: {e}")

async def broadcast_json(self, data: Any):
    """向所有连接的客户端广播JSON消息"""
    invalid_connections = []
    for connection in self.active_connections:
        try:
            await connection.send_json(data)
        except WebSocketDisconnect:
             invalid_connections.append(connection)
        except Exception as e:
             print(f"Error broadcasting JSON to client: {e}")
             invalid_connections.append(connection)

    for invalid_conn in invalid_connections:
        try:
             self.active_connections.remove(invalid_conn)
        except ValueError:
             pass
    if invalid_connections:
         print(f"Removed {len(invalid_connections)} invalid connections after JSON broadcast.")

修改WebSocket端点以处理JSON

@app.websocket("/ws/json/{client_id}") # 新增一个JSON专用的端点
async def websocket_json_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket) # manager的connect方法可以复用
try:
while True:
# 接收JSON数据
data = await websocket.receive_json()
print(f"Received JSON from client #{client_id}: {data}")

         # 假设客户端发送 { "message": "hello" }
         message_content = data.get("message", "Empty message")

         # 构建要广播的JSON数据
         response_data = {
             "sender": client_id,
             "content": message_content,
             "timestamp": datetime.datetime.now().isoformat()
         }

         # 广播JSON数据
         await manager.broadcast_json(response_data)

 except WebSocketDisconnect:
     manager.disconnect(websocket)
     print(f"Client #{client_id} disconnected from JSON endpoint")
     # 可选:通知其他用户某个用户断开
     await manager.broadcast_json({"sender": "system", "content": f"Client #{client_id} has left the chat.", "timestamp": datetime.datetime.now().isoformat()})
 except Exception as e:
     print(f"An error occurred with JSON client #{client_id}: {e}")
     manager.disconnect(websocket)
     # 可选:通知其他用户某个用户因错误断开
     await manager.broadcast_json({"sender": "system", "content": f"Client #{client_id} disconnected due to an error.", "timestamp": datetime.datetime.now().isoformat()})

``
在客户端也需要修改JavaScript代码来发送和接收JSON(使用
websocket.send(JSON.stringify(yourObject))JSON.parse(event.data)onmessage` 中)。

6. WebSocket的连接状态与错误处理

WebSocket连接是有状态的,这意味着连接可能在任何时候因为各种原因断开。优雅地处理这些情况对于构建健壮的实时应用至关重要。

WebSocketDisconnect 异常

如前所述,当客户端关闭连接时(无论是正常关闭还是异常中断如网络断开、浏览器崩溃),服务器尝试 receive_text()receive_bytes()receive_json() 时会抛出 WebSocketDisconnect 异常。这是处理客户端断开连接的主要机制。

在WebSocket端点函数中使用 try...except WebSocketDisconnect: 块是标准做法,以便在连接断开时执行清理任务(例如,从 ConnectionManager 中移除连接)。

优雅地处理客户端断开连接

ConnectionManagerbroadcastsend_personal_message 方法中,如果尝试向一个已经断开的连接发送数据,Starlette/FastAPI 底层也会抛出 WebSocketDisconnect。在 broadcast 方法中捕获这个异常并清理无效连接是最佳实践,可以防止向死连接不断发送数据。

```python

ConnectionManager 类的 broadcast 方法中已经包含了对 WebSocketDisconnect 的处理和无效连接的移除。

这是确保 active_connections 列表保持“干净”的重要部分。

```

其他潜在的异常处理

除了 WebSocketDisconnect,在处理 WebSocket 消息时还可能遇到其他异常,例如:

  • 解码错误: 如果使用 receive_json 接收到了非法的JSON字符串。
  • 业务逻辑错误: 在处理收到的消息时,你的应用代码可能因为各种原因抛出异常(例如,数据库错误)。

为了防止这些异常导致整个WebSocket端点函数崩溃并意外断开连接,应该使用更通用的 except Exception as e: 块来捕获它们。在捕获到其他异常时,通常的处理方式是:
1. 记录错误。
2. 可以选择性地向客户端发送一个错误消息(如果连接还可用)。
3. 调用 websocket.close() 带有合适的错误码(例如1011表示服务器错误)。
4. 从 ConnectionManager 中移除连接。

7. WebSocket的认证与授权

与HTTP端点类似,WebSocket连接也常常需要知道连接的用户身份,并根据用户身份进行授权。由于WebSocket连接是通过HTTP握手建立的,你可以在握手阶段进行认证。

如何在WebSocket连接中识别用户

几种常见的方法:

  1. 通过URL查询参数传递Token或Session ID: 这是最简单的方法,客户端在连接URL中包含认证信息,如 ws://localhost:8000/ws/{client_id}?token=your_token。服务器端可以在WebSocket端点函数中通过依赖注入获取查询参数。
  2. 通过HTTP Headers传递Token: 客户端可以在WebSocket握手请求的Header中包含认证信息(例如 Authorization: Bearer your_token)。然而,标准的浏览器 WebSocket API 不允许自定义Header,所以这种方法通常只适用于非浏览器客户端(如移动应用、桌面应用或Python客户端)。
  3. 先建立HTTP连接,获取认证信息,再建立WebSocket连接: 客户端先通过标准的HTTP API使用用户名/密码登录,获取一个Token或Session ID。然后使用这个Token/ID通过上述方法1或2建立WebSocket连接。这是最常见的模式。
  4. 在WebSocket连接建立后发送认证消息: 连接建立后,客户端立即发送一条特殊的认证消息包含凭据。服务器验证后才允许后续操作。这种方法稍微复杂,但允许使用WebSocket协议本身进行认证。

通过查询参数进行认证示例

假设你有一个 /login HTTP端点,成功登录后返回一个简单的 token。然后客户端使用这个 token 连接到 WebSocket。

```python

main.py (继续添加)

from fastapi import Query, Depends, HTTPException, status
from typing import Optional
import uuid # 用于生成简单token

模拟用户数据库和token存储

fake_users_db = {
"john_doe": {"username": "john_doe", "password": "secretpassword"},
"jane_smith": {"username": "jane_smith", "password": "anothersecret"}
}

fake_tokens_db = {} # token: username 映射

HTTP 登录端点 (简化版)

@app.post("/login")
async def login(username: str, password: str):
user = fake_users_db.get(username)
if not user or user["password"] != password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
token = str(uuid.uuid4()) # 生成一个简单的token
fake_tokens_db[token] = username
return {"token": token}

依赖注入函数,用于验证WebSocket连接的token

async def get_user_from_token(token: str = Query(..., description="Authentication token")):
username = fake_tokens_db.get(token)
if not username:
# 可以在这里抛出 HTTPException,虽然 WebSocket 握手时
# HTTPException 不会像 HTTP 请求那样返回标准错误响应,
# 但会阻止连接建立。客户端会收到连接失败(状态码通常是 1011)。
# 或者,你可以让依赖返回 None,然后在 WebSocket 端点内部检查。
print(f"Invalid token received: {token}")
# 直接返回 None,然后在端点中处理
return None
# 或者更严格地阻止连接: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
print(f"Token {token} validated for user: {username}")
return username

修改 WebSocket 端点使用依赖注入进行认证

@app.websocket("/ws/auth")
async def websocket_auth_endpoint(
websocket: WebSocket,
username: str = Depends(get_user_from_token) # 使用依赖注入获取并验证用户
):
# 如果依赖注入返回 None (即token无效),我们不接受连接
if username is None:
# FastAPI/Starlette 会在依赖失败时阻止 accept() 调用,
# 但为了明确和日志记录,我们可以在这里加个检查
print("Connection attempt with invalid token rejected.")
# await websocket.close(code=1008) # 1008表示策略违规,或1011表示服务器错误
# 注意:在依赖注入失败时,accept() 未被调用,直接调用 close() 可能会出错
# 依赖失败通常会导致 Starlette 自动拒绝连接,客户端会看到握手失败。
return # 直接返回,不接受连接

# 如果token有效,接受连接并继续
print(f"Client '{username}' connected via authenticated WebSocket.")
await manager.connect(websocket) # 使用管理器处理连接建立

try:
    # 现在你可以使用 username 变量来识别用户
    await manager.send_personal_message(f"Welcome, {username}!", websocket)
    await manager.broadcast(f"User '{username}' has joined the chat.")

    while True:
        data = await websocket.receive_text()
        message = f"'{username}' says: {data}"
        print(f"Received message from '{username}': {data}")
        await manager.broadcast(message)

except WebSocketDisconnect:
    manager.disconnect(websocket)
    print(f"User '{username}' disconnected.")
    await manager.broadcast(f"User '{username}' has left the chat.")
except Exception as e:
    print(f"An error occurred with user '{username}': {e}")
    manager.disconnect(websocket)
    await manager.broadcast(f"User '{username}' disconnected due to an error.")

```

在这个例子中,用户需要先通过 /login 获取 token,然后通过 /ws/auth?token=YOUR_TOKEN 连接WebSocket。get_user_from_token 依赖函数会在WebSocket握手阶段执行,验证 token。如果 token 无效,依赖会失败,FastAPI 会拒绝接受 WebSocket 连接。如果 token 有效,端点函数会接收到用户名,并可以使用这个用户名来标识连接。

在连接管理器中关联用户ID与WebSocket连接

为了发送消息给特定用户(不是通过 client_id,而是通过他们的用户名或用户ID),ConnectionManager 可以使用一个字典来存储映射关系,例如 username: WebSocket

```python

main.py (修改 ConnectionManager)

from typing import List, Dict

class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
# 添加一个字典来按用户名查找连接
self.user_connections: Dict[str, WebSocket] = {}

async def connect(self, websocket: WebSocket, username: Optional[str] = None):
    """处理新连接,可选地关联用户名"""
    await websocket.accept()
    self.active_connections.append(websocket)
    if username:
         self.user_connections[username] = websocket # 关联用户名
         print(f"Client '{username}' connected. Total connections: {len(self.active_connections)}")
    else:
         print(f"Client connected (anonymous). Total connections: {len(self.active_connections)}")


def disconnect(self, websocket: WebSocket):
    """处理连接断开,同时移除用户关联"""
    try:
        self.active_connections.remove(websocket)
        # 查找并移除用户关联
        username_to_remove = None
        for username, conn in self.user_connections.items():
            if conn == websocket:
                username_to_remove = username
                break
        if username_to_remove:
            del self.user_connections[username_to_remove]
            print(f"Client '{username_to_remove}' disconnected. Total connections: {len(self.active_connections)}")
        else:
             print(f"Client disconnected (anonymous or not found in user map). Total connections: {len(self.active_connections)}")
    except ValueError:
        print("Disconnected client not found in active connections list.")

async def send_personal_message(self, message: str, websocket: WebSocket):
    # ... (方法体与之前类似,已处理异常)
    pass

async def send_message_to_user(self, message: str, username: str):
     """向特定用户发送消息"""
     connection = self.user_connections.get(username)
     if connection:
          await self.send_personal_message(message, connection)
     else:
          print(f"User '{username}' not found among active connections.")


async def broadcast(self, message: str):
   # ... (方法体与之前类似,已处理异常和清理无效连接)
   pass

async def broadcast_json(self, data: Any):
   # ... (方法体与之前类似,已处理异常和清理无效连接)
   pass

... (继续使用修改后的 manager 实例)

修改 websocket_auth_endpoint 的 connect/disconnect 调用

@app.websocket("/ws/auth")
async def websocket_auth_endpoint(
websocket: WebSocket,
username: str = Depends(get_user_from_token) # 确保 get_user_from_token 返回 username 或 None
):
if username is None:
print("Connection attempt with invalid token rejected.")
return

# 使用带有用户名的 connect 方法
await manager.connect(websocket, username)

try:
   # ... (其余逻辑与之前类似,使用 username 变量)
   await manager.send_personal_message(f"Welcome, {username}!", websocket)
   await manager.broadcast(f"User '{username}' has joined the chat.")

   while True:
       data = await websocket.receive_text()
       # 可以发送消息给特定用户,例如私聊命令 /pm user message
       if data.startswith("/pm "):
           parts = data.split(" ", 2)
           if len(parts) == 3:
               target_user = parts[1]
               private_message_content = parts[2]
               await manager.send_message_to_user(f"(Private from {username}): {private_message_content}", target_user)
               await manager.send_personal_message(f"(Private to {target_user}): {private_message_content}", websocket) # 给发送者自己一个确认
           else:
                await manager.send_personal_message("Invalid /pm command format. Use '/pm username message'", websocket)
       else:
           # 广播普通消息
           message = f"'{username}' says: {data}"
           print(f"Received message from '{username}': {data}")
           await manager.broadcast(message)


except WebSocketDisconnect:
    manager.disconnect(websocket)
    print(f"User '{username}' disconnected.")
    await manager.broadcast(f"User '{username}' has left the chat.")
except Exception as e:
    print(f"An error occurred with user '{username}': {e}")
    # 发生错误时,在断开前尝试向该用户发送错误信息
    try:
        await manager.send_personal_message(f"An internal server error occurred: {e}", websocket)
    except:
        pass # 尝试发送失败也没关系
    manager.disconnect(websocket)
    await manager.broadcast(f"User '{username}' disconnected due to an error.")

``
现在
ConnectionManager` 可以通过用户名查找特定的WebSocket连接,从而实现私聊等功能。

8. WebSocket的高级主题

状态管理

在实时应用中,服务器经常需要维护一些共享状态,例如所有在线用户列表、最近的聊天消息历史、游戏房间状态等。在单进程的FastAPI应用中,你可以将这些状态存储在全局变量或 ConnectionManager 实例中。

  • 在线用户列表: ConnectionManager 中的 user_connections 字典就是一个简单的在线用户列表。
  • 聊天记录: 可以在 ConnectionManager 中维护一个列表来存储最近 N 条消息,新连接的用户可以先获取这些历史消息。

```python

main.py (在 ConnectionManager 中添加消息历史)

class ConnectionManager:
# ... (其他方法)

def __init__(self):
    self.active_connections: List[WebSocket] = []
    self.user_connections: Dict[str, WebSocket] = {}
    self.message_history: List[Dict[str, Any]] = [] # 存储消息历史,例如 JSON 格式
    self.max_history_size = 100 # 最大存储消息数量

async def connect(self, websocket: WebSocket, username: Optional[str] = None):
     await websocket.accept()
     self.active_connections.append(websocket)
     if username:
          self.user_connections[username] = websocket
          print(f"Client '{username}' connected. Total connections: {len(self.active_connections)}")
     else:
          print(f"Client connected (anonymous). Total connections: {len(self.active_connections)}")

     # 向新连接的用户发送历史消息
     if self.message_history:
         # 注意:直接发送 JSON 列表,或者逐条发送
         # 逐条发送更通用,即使客户端不支持接收 JSON 数组
         await self.send_personal_json({"type": "history", "messages": self.message_history}, websocket)
         # 或者
         # for msg in self.message_history:
         #     await self.send_personal_json({"type": "history_item", "message": msg}, websocket)


async def add_message_to_history(self, message_data: Dict[str, Any]):
    """添加消息到历史记录并裁剪"""
    self.message_history.append(message_data)
    # 保持历史记录在指定大小内
    if len(self.message_history) > self.max_history_size:
        self.message_history.pop(0) # 移除最旧的消息

async def broadcast_json(self, data: Any):
    """广播 JSON 消息,并添加到历史记录(如果需要)"""
    # 可以在这里判断消息类型,只将聊天消息添加到历史记录
    if data.get("type") == "chat_message":
         await self.add_message_to_history(data)

    invalid_connections = []
    for connection in self.active_connections:
       # ... (发送逻辑,与之前类似)
       try:
           await connection.send_json(data)
       except (WebSocketDisconnect, Exception):
            invalid_connections.append(connection)

    # ... (移除无效连接逻辑)

修改 websocket_auth_endpoint 中的广播调用,发送包含 type 的 JSON 消息

@app.websocket("/ws/auth")
async def websocket_auth_endpoint(
websocket: WebSocket,
username: str = Depends(get_user_from_token)
):
# ... (连接和错误处理逻辑)

try:
   await manager.connect(websocket, username)
   await manager.broadcast_json({"type": "system", "content": f"User '{username}' has joined the chat.", "timestamp": datetime.datetime.now().isoformat()})

   while True:
       data = await websocket.receive_json() # 假设客户端发送 JSON
       # 假设客户端发送 { "type": "chat", "message": "..." }
       message_type = data.get("type")

       if message_type == "chat":
           message_content = data.get("message", "")
           if message_content.strip(): # 忽略空消息
               chat_message_data = {
                   "type": "chat_message", # 区分消息类型
                   "sender": username,
                   "content": message_content,
                   "timestamp": datetime.datetime.now().isoformat()
               }
               print(f"Received chat message from '{username}': {message_content}")
               await manager.broadcast_json(chat_message_data) # 广播聊天消息 (会添加到历史)

       elif message_type == "private_chat":
           target_user = data.get("target_user")
           private_message_content = data.get("message", "")
           if target_user and private_message_content.strip():
               private_message_data = {
                   "type": "private_chat_message",
                   "sender": username,
                   "target": target_user,
                   "content": private_message_content,
                   "timestamp": datetime.datetime.now().isoformat()
               }
               print(f"Received private message from '{username}' to '{target_user}': {private_message_content}")
               await manager.send_message_to_user(private_message_data, target_user)
               # 可选:给发送者自己一个确认
               await manager.send_personal_json({"type": "system", "content": f"Private message sent to {target_user}.", "timestamp": datetime.datetime.now().isoformat()}, websocket)

       # 可以处理其他消息类型,例如加入房间、离开房间等

except WebSocketDisconnect:
    manager.disconnect(websocket)
    print(f"User '{username}' disconnected.")
    await manager.broadcast_json({"type": "system", "content": f"User '{username}' has left the chat.", "timestamp": datetime.datetime.now().isoformat()})
except Exception as e:
    print(f"An error occurred with user '{username}': {e}")
    manager.disconnect(websocket)
    await manager.broadcast_json({"type": "system", "content": f"User '{username}' disconnected due to an error.", "timestamp": datetime.datetime.now().isoformat()})

```

Subprotocols

WebSocket协议允许指定和使用"子协议" (subprotocols)。子协议定义了在WebSocket连接上交换消息的格式和语义。例如,你可以定义一个用于聊天应用的子协议,规定消息必须是JSON格式,并且包含 type, sender, content 等字段。客户端和服务器在握手阶段协商使用的子协议。

在FastAPI中,可以在 app.websocket() 装饰器中指定支持的子协议列表:

```python
@app.websocket("/ws/subprotocol", subprotocols=["chat", "notifications"])
async def websocket_subprotocol_endpoint(websocket: WebSocket):
# 在 accept() 中可以指定实际使用的子协议
# chosen_subprotocol = ... 客户端会在握手请求头 Sec-WebSocket-Protocol 中发送其支持的子协议
# Starlette/FastAPI 会自动处理协商,如果客户端支持其中一个,会在 websocket.scope['subprotocol'] 中可用
await websocket.accept() # 可以可选地传入 chosen_subprotocol 参数

# 检查协商后的子协议 (如果需要)
# negotiated_protocol = websocket.scope.get('subprotocol')
# print(f"Negotiated subprotocol: {negotiated_protocol}")

# ... 处理消息 ...

``
客户端在创建
WebSocket对象时指定子协议列表:new WebSocket("ws://...", ["chat", "notifications"])`。

性能与扩展

FastAPI基于Starlette和Uvicorn(一个高性能的ASGI服务器),本身具有很好的并发处理能力,尤其是在使用异步代码时。单个FastAPI进程可以同时处理大量的并发连接(包括HTTP和WebSocket),因为当一个连接在等待I/O(如接收消息、发送消息)时,事件循环可以切换去处理其他连接。

  • 单机扩展: Uvicorn可以通过 --workers 参数启动多个worker进程。每个worker都是独立的Python进程,有自己的内存空间。这有助于利用多核CPU,并且可以在一个worker崩溃时不影响其他worker。
    • 问题: 当使用多个worker时,前面实现的 ConnectionManager 存在于每个worker进程的内存中,它们之间不共享状态。这意味着一个worker收到的消息无法通过其本地的 ConnectionManager 广播给连接到其他worker的客户端。
  • 分布式扩展: 为了解决多进程/多服务器部署时的状态共享问题,需要使用外部服务来协调消息分发。最常见的模式是使用发布/订阅系统 (Pub/Sub)
    • Redis Pub/Sub: Redis是一个流行的内存数据结构存储,它也提供Pub/Sub功能。
      • 工作原理: 当一个worker收到一条消息需要广播时,它不是直接遍历本地连接,而是将消息发布 (Publish) 到一个特定的Redis频道。其他所有worker都在订阅 (Subscribe) 这个频道。当消息发布到频道时,所有订阅了这个频道的worker都会收到消息。每个worker收到消息后,再通过其本地的 ConnectionManager 将消息发送给连接到自己的客户端。
    • 实现: 这需要额外的代码来集成Redis客户端(如 redis-py),在 ConnectionManager 中添加订阅和发布逻辑,并在单独的协程中运行订阅循环。

```python

概念示例 (不完整,仅展示 Redis Pub/Sub 思路)

import asyncio
import aioredis # 需要安装 pip install redis hiredis

假设 ConnectionManager 有一个 Redis 客户端连接池

class ConnectionManager:

# ... init, connect, disconnect, send_personal_message, send_message_to_user ...

async def setup_redis(self, redis_url="redis://localhost"):

self._redis = await aioredis.from_url(redis_url, decode_responses=True)

self._pubsub = self._redis.pubsub()

await self._pubsub.subscribe("chat_channel") # 订阅聊天频道

asyncio.create_task(self.listen_redis_pubsub()) # 启动一个协程监听消息

async def listen_redis_pubsub(self):

"""在协程中循环监听 Redis Pub/Sub 频道"""

print("Started listening to Redis Pub/Sub...")

while True:

try:

message = await self._pubsub.get_message(ignore_subscribe_messages=True)

if message and message['type'] == 'message':

data = message['data'] # 这是收到的广播消息 (字符串或JSON)

print(f"Received message from Redis: {data}")

# 将收到的消息广播给本 worker 的所有客户端 (不包括发送者,如果发送者是本worker的客户端)

# 可以在消息数据中包含发送者的 worker ID 或 connection ID 来避免回发

await self.broadcast(data) # 或者 broadcast_json(json.loads(data))

await asyncio.sleep(0.01) # 防止CPU空转

except Exception as e:

print(f"Error in Redis Pub/Sub listener: {e}")

await asyncio.sleep(1) # 出错时稍等重试

async def publish_message(self, message: str):

"""将消息发布到 Redis 频道"""

if self._redis:

await self._redis.publish("chat_channel", message)

# 在 WebSocket 端点收到消息时,不再直接 broadcast(),而是 publish_message()

@app.websocket("/ws/auth")

async def websocket_auth_endpoint(...):

# ... connect 逻辑 ...

await manager.setup_redis() # 在连接建立后设置Redis (可能需要更早设置,例如在应用启动时)

try:

while True:

data = await websocket.receive_text()

message_to_publish = f"'{username}' says: {data}"

await manager.publish_message(message_to_publish) # 发布到Redis

# ... disconnect 和 异常处理 ...

# 应用启动时初始化 Redis (或者在 ConnectionManager 创建时)

@app.on_event("startup")

async def startup_event():

await manager.setup_redis() # 或者其他初始化方式

@app.on_event("shutdown")

async def shutdown_event():

if manager._redis:

await manager._redis.close()

```
使用Redis Pub/Sub增加了复杂度,但对于需要水平扩展的实时应用来说是必要的。

部署考虑

  • 代理服务器 (Nginx/Traefik): 在生产环境中,通常会在FastAPI应用前面部署一个反向代理服务器。这个代理服务器需要配置来正确处理WebSocket连接。Nginx和Traefik都支持WebSocket代理,核心配置是升级连接头 (UpgradeConnection 头)。
  • SSL/TLS (WSS): 为了安全,生产环境中的WebSocket连接应该使用加密的WebSocket Secure (wss://) 协议,它运行在TLS/SSL之上。代理服务器负责处理SSL证书和加密,并将流量转发到后端的FastAPI应用(通常是未加密的 ws://)。
  • Uvicorn Worker数量: 根据服务器的CPU核心数量和应用特性,配置Uvicorn的worker数量。

9. 总结

FastAPI凭借其高性能、易用性和对异步编程的良好支持,成为了构建实时应用的优秀选择。通过结合Starlette强大的WebSocket实现,我们可以轻松地创建双向、实时的通信功能。

本文从理解WebSocket协议的优势开始,逐步深入,展示了如何在FastAPI中:
* 定义基础WebSocket端点。
* 处理连接的建立、消息的收发和断开。
* 构建 ConnectionManager 来有效地管理多个并发连接并实现广播功能。
* 处理不同类型的数据(文本、字节、JSON)。
* 实现WebSocket连接的认证。
* 探讨了状态管理和水平扩展等高级主题。

通过本文提供的代码示例和解释,你应该已经掌握了使用FastAPI和WebSocket构建功能丰富实时应用的基础知识和核心技巧。

10. 进一步学习资源

希望这篇文章对你有所帮助!祝你在使用FastAPI构建实时应用的旅程中一切顺利!

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部