使用FastAPI与WebSocket构建高性能实时应用
在现代Web应用的开发中,实时性变得越来越重要。聊天应用、在线游戏、股票行情显示、协作工具、实时通知系统等等,都依赖于服务器能够主动向客户端推送数据,而不需要客户端不断地发起请求(轮询)。传统的HTTP协议是无状态的请求-响应模式,虽然可以通过长轮询等技术模拟实时性,但这通常效率低下且资源消耗较大。WebSocket协议的出现,彻底改变了这一局面。
WebSocket协议提供了一个在单个TCP连接上进行全双工、双向通信的通道。一旦WebSocket连接建立,服务器和客户端都可以随时向对方发送数据,无需额外的HTTP开销。这使得构建实时应用变得高效且直观。
FastAPI是一个现代、快速(高性能)的Web框架,用于构建API。它基于标准的Python类型提示,并使用Starlette作为其底层异步框架。Starlette原生支持WebSocket,这意味着FastAPI也天然地支持WebSocket,并且能够充分利用Python的异步能力(async
/await
)来高效地处理大量的并发WebSocket连接。
本文将深入探讨如何在FastAPI中使用WebSocket构建实时应用,从基础概念到更高级的主题,并提供详细的代码示例。
文章目录
- 理解实时应用与WebSocket
- 为什么需要实时性?
- 传统HTTP的局限性
- WebSocket协议简介
- WebSocket与HTTP的对比
- FastAPI对WebSocket的支持
- FastAPI基于Starlette
WebSocket
类@app.websocket()
装饰器
- 构建一个基础WebSocket Echo服务器
- 项目设置与依赖
- 编写FastAPI应用
- 解释代码:连接的建立、数据的接收与发送
- 运行应用
- 客户端测试方法(浏览器、Python)
- 处理多个WebSocket客户端:构建一个简单的聊天室
- 管理多个连接的需求
- 引入
ConnectionManager
- 实现连接管理器:连接、断开、发送个人消息、广播消息
- 修改WebSocket端点使用连接管理器
- 编写一个简单的HTML/JavaScript客户端
- 运行和测试聊天室
- WebSocket的数据类型:文本与字节
- 发送和接收文本消息 (
send_text
,receive_text
) - 发送和接收字节消息 (
send_bytes
,receive_bytes
) - 发送和接收JSON消息 (
send_json
,receive_json
)
- 发送和接收文本消息 (
- WebSocket的连接状态与错误处理
WebSocketDisconnect
异常- 优雅地处理客户端断开连接
- 其他潜在的异常处理
- WebSocket的认证与授权
- 如何在WebSocket连接中识别用户
- 通过查询参数或Header进行认证(握手阶段)
- 在连接管理器中关联用户ID与WebSocket连接
-
WebSocket的高级主题
- 状态管理: 共享数据(例如:在线用户列表、聊天记录)
- Subprotocols: 使用子协议
- 性能与扩展:
- FastAPI的异步能力如何处理并发
- 单机扩展:增加Worker数量
- 分布式扩展:使用消息队列/发布订阅系统(如Redis Pub/Sub)处理跨进程/服务器广播
- 部署考虑: 代理服务器(Nginx/Traefik)、SSL/TLS
-
总结
- 进一步学习资源
1. 理解实时应用与WebSocket
为什么需要实时性?
在许多现代应用场景中,用户期望获得即时反馈和最新信息。例如:
- 在线聊天/协作: 消息需要立即送达所有参与者。
- 实时仪表盘: 股票价格、系统指标、传感器数据需要持续更新。
- 在线游戏: 玩家的动作需要实时同步给其他玩家。
- 实时通知: 新邮件、点赞、评论等需要立即通知用户。
传统HTTP的局限性
HTTP协议是客户端发起请求、服务器响应的模式。服务器无法主动向客户端发送数据。为了模拟实时性,开发者通常使用以下技术:
- 短轮询 (Short Polling): 客户端每隔一小段时间就向服务器发送一个请求,询问是否有新数据。这会导致大量的HTTP请求,浪费带宽和服务器资源,而且延迟较高。
- 长轮询 (Long Polling): 客户端发送请求后,服务器如果没有新数据会 hold 住连接,直到有新数据或者超时才响应。客户端收到响应后立即发起新的请求。这减少了请求数量,但仍然是请求-响应模式,且对服务器资源(保持连接)有一定压力。
- HTTP Streaming: 服务器发送响应后不关闭连接,持续向客户端推送数据。这是单向的(服务器到客户端),且实现和解析相对复杂。
这些技术都是在无状态的HTTP协议之上进行的妥协。
WebSocket协议简介
WebSocket(WS)协议在OSI模型的应用层运行,通过HTTP协议的101 Switching Protocols状态码进行握手升级,从而从HTTP连接转变为WebSocket连接。一旦握手成功,客户端和服务器之间就会建立一个持久的、双向的、全双工的通信通道。
全双工: 客户端和服务器可以在同一时间相互发送数据。
双向: 任意一方都可以主动发送数据给对方。
持久连接: 连接一旦建立,就可以保持开启,用于后续的多次数据传输,无需重复建立连接的开销。
这使得WebSocket非常适合需要低延迟、高频率双向通信的应用场景。
WebSocket与HTTP的对比
特性 | HTTP | WebSocket |
---|---|---|
通信模式 | 请求-响应(无状态) | 全双工双向(有状态,连接保持) |
连接开销 | 每个请求都需要建立/关闭连接(或复用) | 只需要一次握手建立连接,后续数据传输开销小 |
协议头 | 每次请求都包含完整的HTTP头 | 握手后数据帧开销很小 |
服务器推送 | 不支持(需依赖轮询或流) | 原生支持 |
适用场景 | 常规网页浏览、API调用 | 实时聊天、游戏、通知、实时数据推送 |
2. FastAPI对WebSocket的支持
FastAPI基于Starlette构建,而Starlette从设计之初就考虑了异步和实时应用的需求,提供了原生的WebSocket支持。
FastAPI基于Starlette
理解FastAPI的WebSocket支持,首先要知道它是Starlette之上的一层封装。Starlette提供了底层的ASGI接口实现,包括对HTTP和WebSocket连接的处理。FastAPI则在其上提供了路由、依赖注入、数据验证/序列化等高级功能。
WebSocket
类
在FastAPI的WebSocket端点中,你将主要与 starlette.websockets.WebSocket
类打交道。这个类的实例代表了一个活动的WebSocket连接,它提供了一系列异步方法用于发送和接收数据:
accept(subprotocol: Optional[str] = None, headers: Optional[Sequence[Tuple[bytes, bytes]]] = None) -> None
: 接受来自客户端的WebSocket连接请求。在处理任何消息之前必须调用此方法。receive_text() -> str
: 异步接收客户端发送的文本消息。receive_bytes() -> bytes
: 异步接收客户端发送的字节消息。receive_json(mode: str = 'text') -> Any
: 异步接收客户端发送的JSON消息(可以是文本或字节格式)。send_text(data: str) -> None
: 异步向客户端发送文本消息。send_bytes(data: bytes) -> None
: 异步向客户端发送字节消息。send_json(data: Any, mode: str = 'text') -> None
: 异步向客户端发送JSON消息。close(code: int = 1000) -> None
: 异步关闭WebSocket连接。code
是关闭状态码,1000表示正常关闭。
@app.websocket()
装饰器
FastAPI提供了一个便捷的装饰器 @app.websocket("/ws/{some_parameter}")
来定义WebSocket端点。这个装饰器下的异步函数将处理来自指定路径的WebSocket连接请求。函数签名通常包含一个 websocket: WebSocket
参数,你可以通过它与客户端进行交互。路径参数 ({some_parameter}
) 的使用方式与HTTP路径参数类似。
“`python
from fastapi import FastAPI, WebSocket
app = FastAPI()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
# 在这里处理WebSocket连接
pass
“`
3. 构建一个基础WebSocket Echo服务器
我们先创建一个最简单的例子:一个Echo服务器。客户端发送什么消息,服务器就原样返回什么消息。
项目设置与依赖
确保你已经安装了FastAPI和Uvicorn(ASGI服务器):
bash
pip install fastapi uvicorn websockets
websockets
库是 Starlette/FastAPI 内部使用的底层 WebSocket 实现之一。虽然你直接使用的是 Starlette/FastAPI 的 WebSocket
类,但安装 websockets
是必要的。
编写FastAPI应用
创建一个Python文件(例如 main.py
):
“`python
main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() # 1. 接受连接
try:
while True:
data = await websocket.receive_text() # 2. 接收文本消息
print(f”Received message: {data}”)
await websocket.send_text(f”Message text was: {data}”) # 3. 发送文本消息
except WebSocketDisconnect: # 4. 处理断开连接
print(“Client disconnected”)
await websocket.close() # 可选,但通常在异常时会自动关闭
except Exception as e: # 处理其他可能的错误
print(f”An error occurred: {e}”)
# 可以在这里发送错误信息给客户端,然后关闭连接
# await websocket.send_text(f”Error: {e}”)
await websocket.close(code=1011) # 1011表示内部错误
“`
解释代码:连接的建立、数据的接收与发送
await websocket.accept()
: 当一个客户端尝试连接到/ws
路径时,FastAPI/Starlette 会调用websocket_endpoint
函数。在处理任何接收或发送操作之前,你需要调用accept()
来完成WebSocket握手并正式建立连接。如果不调用accept()
,连接最终会超时关闭。while True:
: 建立连接后,我们进入一个无限循环,持续监听客户端发送的消息。data = await websocket.receive_text()
: 这行代码异步地等待从客户端接收到的下一条文本消息。当消息到达时,它会被赋值给data
变量。如果客户端发送的是其他类型(如字节)或连接断开,这里可能会抛出异常。await websocket.send_text(f"Message text was: {data}")
: 收到消息后,我们使用send_text()
方法将一个文本消息发送回客户端。except WebSocketDisconnect:
: 这是处理客户端正常断开连接的关键。当客户端关闭WebSocket连接时(例如,关闭浏览器标签页,或者在JavaScript中调用websocket.close()
),receive_text()
或receive_bytes()
等方法会抛出WebSocketDisconnect
异常。我们捕获这个异常,打印一条消息,并可以执行清理工作。FastAPI/Starlette 在这个异常发生时通常会自动尝试发送一个关闭帧并关闭底层TCP连接,所以显式调用websocket.close()
在这里可能是冗余的,但捕获异常本身是重要的。except Exception as e:
: 捕获其他未预期的异常,以防止服务器崩溃。在实际应用中,你可能需要更精细的错误处理。
运行应用
保存文件后,在终端中运行Uvicorn服务器:
bash
uvicorn main:app --reload
你应该会看到Uvicorn启动的输出,表示服务器正在监听。
客户端测试方法
方法1:使用浏览器开发者工具
这是最便捷的测试方法之一。
1. 打开任何一个现代浏览器(如Chrome, Firefox)。
2. 打开开发者工具 (通常按 F12)。
3. 切换到 “Console” 标签页。
4. 输入以下JavaScript代码并执行:
“`javascript
// 创建一个WebSocket连接
var ws = new WebSocket(“ws://localhost:8000/ws”);
// 监听连接建立事件
ws.onopen = function(event) {
console.log(“WebSocket connection opened:”, event);
// 连接建立后,发送一条消息
ws.send(“Hello FastAPI WebSocket!”);
};
// 监听收到消息事件
ws.onmessage = function(event) {
console.log(“Received message:”, event.data);
};
// 监听连接关闭事件
ws.onclose = function(event) {
if (event.wasClean) {
console.log(Connection closed cleanly, code=${event.code} reason=${event.reason}
);
} else {
console.error(“Connection died”);
}
};
// 监听错误事件
ws.onerror = function(error) {
console.error(“WebSocket error:”, error);
};
// 发送更多消息 (在连接打开后)
// ws.send(“Another message”);
// 关闭连接 (如果需要)
// ws.close();
``
new WebSocket(…)
执行后,如果服务器正常运行,你会看到
WebSocket connection opened的日志,然后是服务器返回的
Received message: Message text was: Hello FastAPI WebSocket!。你可以在控制台中输入
ws.send(“Your message here”)发送更多消息。关闭浏览器标签页会触发
onclose` 事件,服务器端会打印 “Client disconnected”。
方法2:使用Python客户端
你也可以用Python编写一个客户端来测试。需要安装 websockets
库。
bash
pip install websockets
创建一个新的Python文件(例如 client.py
):
“`python
client.py
import asyncio
import websockets
async def send_message():
uri = “ws://localhost:8000/ws”
async with websockets.connect(uri) as websocket:
await websocket.send(“Hello from Python client!”)
print(f”> Hello from Python client!”)
response = await websocket.recv()
print(f"< {response}")
await websocket.send("Another message from Python!")
print(f"> Another message from Python!")
response2 = await websocket.recv()
print(f"< {response2}")
asyncio.get_event_loop().run_until_complete(send_message())
``
python client.py`,你会在客户端和服务端控制台看到消息的发送和接收记录。
运行
这个基础示例展示了FastAPI处理单个WebSocket连接的基本流程:接受连接、进入循环监听消息、处理消息、处理断开连接。
4. 处理多个WebSocket客户端:构建一个简单的聊天室
Echo服务器只处理单个连接,而实际应用通常需要管理多个并发连接,并在它们之间协调数据(例如,广播消息给所有用户)。我们将构建一个简单的聊天室,演示如何管理多个WebSocket连接。
管理多个连接的需求
在一个聊天室中,当一个用户发送消息时,服务器需要将这条消息发送给所有其他在线用户。这意味着服务器需要知道当前所有活动的WebSocket连接,并能够遍历它们进行广播。
引入ConnectionManager
一个常见的模式是创建一个 ConnectionManager
类来封装连接的管理逻辑。它可以维护一个活动连接的列表,并提供添加连接、移除连接、发送消息给特定连接以及广播消息给所有连接的方法。
实现连接管理器
“`python
main.py (在之前的代码基础上添加)
… (前面的import和app = FastAPI()保留)
from typing import List
class ConnectionManager:
def init(self):
# 使用列表存储所有活动的WebSocket连接
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
"""处理新连接"""
await websocket.accept()
self.active_connections.append(websocket)
print(f"Client connected: {websocket.client.host}:{websocket.client.port}. Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
"""处理连接断开"""
try:
self.active_connections.remove(websocket)
print(f"Client disconnected: {websocket.client.host}:{websocket.client.port}. Total connections: {len(self.active_connections)}")
except ValueError:
# 如果连接不在列表中(可能因为异常或其他原因没有成功添加到列表)
print(f"Disconnected client {websocket.client.host}:{websocket.client.port} was not in the active connections list.")
async def send_personal_message(self, message: str, websocket: WebSocket):
"""向单个客户端发送消息"""
try:
await websocket.send_text(message)
except WebSocketDisconnect:
print(f"Attempted to send message to disconnected client: {websocket.client.host}:{websocket.client.port}")
# 可选:如果发送失败是因为断开连接,可以在这里调用 disconnect(websocket)
# 但是通常 receive_text/bytes 失败会先抛出 WebSocketDisconnect 并由端点函数处理
# self.disconnect(websocket)
except Exception as e:
print(f"Error sending message to client {websocket.client.host}:{websocket.client.port}: {e}")
# 可以在这里决定是否断开连接
# await websocket.close(code=1011)
async def broadcast(self, message: str):
"""向所有连接的客户端广播消息"""
# 创建一个列表来存储无效的连接,以便在遍历后移除
invalid_connections = []
for connection in self.active_connections:
try:
await connection.send_text(message)
except WebSocketDisconnect:
# 如果在广播时发现连接已断开,标记为无效
print(f"Client disconnected during broadcast: {connection.client.host}:{connection.client.port}")
invalid_connections.append(connection)
except Exception as e:
# 处理其他发送错误
print(f"Error broadcasting to client {connection.client.host}:{connection.client.port}: {e}")
# 可以在这里决定是否将这个连接标记为无效并移除
invalid_connections.append(connection) # 将出错的连接也标记为无效
# 移除所有无效的连接
for invalid_conn in invalid_connections:
# 使用 try-except block, 因为在多线程/多进程环境中,
# active_connections 列表可能在迭代和修改之间发生变化
try:
self.active_connections.remove(invalid_conn)
except ValueError:
# 如果连接已经被移除了,忽略
pass
if invalid_connections:
print(f"Removed {len(invalid_connections)} invalid connections. Total connections: {len(self.active_connections)}")
创建一个ConnectionManager实例
manager = ConnectionManager()
… (基础Echo服务器的 @app.websocket(“/ws”) 可以注释掉或删除)
“`
修改WebSocket端点使用连接管理器
现在我们将 /ws
端点修改为聊天室逻辑,使用 ConnectionManager
。
“`python
main.py (继续在上面代码的基础上添加)
@app.websocket(“/ws/{client_id}”)
async def websocket_endpoint(websocket: WebSocket, client_id: int):
“””
聊天室WebSocket端点,每个连接通过client_id区分
“””
await manager.connect(websocket) # 1. 使用管理器处理连接建立
try:
# 2. 收到消息后,向所有客户端广播,包括发送者自己
while True:
data = await websocket.receive_text()
message = f”Client #{client_id} says: {data}”
print(f”Received message from client #{client_id}: {data}”)
await manager.broadcast(message) # 3. 使用管理器广播消息
except WebSocketDisconnect: # 4. 处理断开连接
manager.disconnect(websocket) # 使用管理器处理断开
except Exception as e: # 处理其他错误
print(f"An error occurred with client #{client_id}: {e}")
manager.disconnect(websocket) # 发生错误时也断开连接
# 可以在这里考虑通知其他客户端某个用户由于错误断开
``
/ws/{client_id}
注意:我们将路径改为了。这意味着每个连接都需要在URL中提供一个唯一的
client_id`。在实际应用中,这个ID可能来自用户认证,而不是简单地从URL获取。
编写一个简单的HTML/JavaScript客户端
为了测试聊天室,我们需要一个客户端界面。创建一个 index.html
文件:
“`html
FastAPI WebSocket Chat
```
运行和测试聊天室
- 确保
main.py
包含上面完整的代码(包括ConnectionManager
和/ws/{client_id}
端点)。 - 运行Uvicorn服务器:
uvicorn main:app --reload
- 将
index.html
文件保存在项目的同一目录下。 - 在浏览器中打开
index.html
文件(可以直接双击打开)。 - 在第一个浏览器窗口中,输入一个客户端ID(例如
1
),点击 "Connect"。你应该会看到控制台和页面上的连接成功消息,并且服务器控制台会打印连接信息。 - 打开第二个浏览器窗口,同样打开
index.html
。输入另一个客户端ID(例如2
),点击 "Connect"。服务器控制台会打印第二个连接信息。 - 在任一浏览器窗口的输入框中输入消息,点击 "Send"。消息应该会出现在所有已连接的浏览器窗口的聊天框中,并且服务器控制台会打印收到的消息和广播信息。
- 尝试关闭一个浏览器窗口,服务器控制台会打印断开连接信息。在其他窗口发送消息,断开的客户端将不再收到。
这个聊天室示例很好地演示了如何使用 ConnectionManager
来管理多个并发的WebSocket连接,并实现服务器向多个客户端广播消息的功能。
5. WebSocket的数据类型:文本与字节
WebSocket协议本身支持发送文本数据(UTF-8编码)和二进制数据。FastAPI/Starlette的 WebSocket
类提供了相应的方法:
send_text(data: str)
和receive_text() -> str
用于处理文本。send_bytes(data: bytes)
和receive_bytes() -> bytes
用于处理字节。
此外,FastAPI/Starlette还提供了方便的方法来发送和接收JSON数据:
send_json(data: Any, mode: str = 'text')
: 发送Python对象,会自动序列化为JSON字符串(默认为文本模式,也可以指定mode='bytes'
发送二进制JSON)。receive_json(mode: str = 'text') -> Any
: 接收JSON数据(可以是文本或字节),会自动反序列化为Python对象。
这在发送结构化数据时非常有用,例如,在聊天应用中发送包含发送者、消息内容、时间戳的对象,而不是简单的纯文本。
示例:发送和接收JSON消息
```python
在上面的ConnectionManager中添加发送/接收JSON的方法
... (ConnectionManager类的其他方法)
async def send_personal_json(self, data: Any, websocket: WebSocket):
"""向单个客户端发送JSON消息"""
try:
await websocket.send_json(data)
except WebSocketDisconnect:
print(f"Attempted to send JSON to disconnected client: {websocket.client.host}:{websocket.client.port}")
except Exception as e:
print(f"Error sending JSON to client {websocket.client.host}:{websocket.client.port}: {e}")
async def broadcast_json(self, data: Any):
"""向所有连接的客户端广播JSON消息"""
invalid_connections = []
for connection in self.active_connections:
try:
await connection.send_json(data)
except WebSocketDisconnect:
invalid_connections.append(connection)
except Exception as e:
print(f"Error broadcasting JSON to client: {e}")
invalid_connections.append(connection)
for invalid_conn in invalid_connections:
try:
self.active_connections.remove(invalid_conn)
except ValueError:
pass
if invalid_connections:
print(f"Removed {len(invalid_connections)} invalid connections after JSON broadcast.")
修改WebSocket端点以处理JSON
@app.websocket("/ws/json/{client_id}") # 新增一个JSON专用的端点
async def websocket_json_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket) # manager的connect方法可以复用
try:
while True:
# 接收JSON数据
data = await websocket.receive_json()
print(f"Received JSON from client #{client_id}: {data}")
# 假设客户端发送 { "message": "hello" }
message_content = data.get("message", "Empty message")
# 构建要广播的JSON数据
response_data = {
"sender": client_id,
"content": message_content,
"timestamp": datetime.datetime.now().isoformat()
}
# 广播JSON数据
await manager.broadcast_json(response_data)
except WebSocketDisconnect:
manager.disconnect(websocket)
print(f"Client #{client_id} disconnected from JSON endpoint")
# 可选:通知其他用户某个用户断开
await manager.broadcast_json({"sender": "system", "content": f"Client #{client_id} has left the chat.", "timestamp": datetime.datetime.now().isoformat()})
except Exception as e:
print(f"An error occurred with JSON client #{client_id}: {e}")
manager.disconnect(websocket)
# 可选:通知其他用户某个用户因错误断开
await manager.broadcast_json({"sender": "system", "content": f"Client #{client_id} disconnected due to an error.", "timestamp": datetime.datetime.now().isoformat()})
``
websocket.send(JSON.stringify(yourObject))
在客户端也需要修改JavaScript代码来发送和接收JSON(使用和
JSON.parse(event.data)在
onmessage` 中)。
6. WebSocket的连接状态与错误处理
WebSocket连接是有状态的,这意味着连接可能在任何时候因为各种原因断开。优雅地处理这些情况对于构建健壮的实时应用至关重要。
WebSocketDisconnect
异常
如前所述,当客户端关闭连接时(无论是正常关闭还是异常中断如网络断开、浏览器崩溃),服务器尝试 receive_text()
、receive_bytes()
、receive_json()
时会抛出 WebSocketDisconnect
异常。这是处理客户端断开连接的主要机制。
在WebSocket端点函数中使用 try...except WebSocketDisconnect:
块是标准做法,以便在连接断开时执行清理任务(例如,从 ConnectionManager
中移除连接)。
优雅地处理客户端断开连接
在 ConnectionManager
的 broadcast
或 send_personal_message
方法中,如果尝试向一个已经断开的连接发送数据,Starlette/FastAPI 底层也会抛出 WebSocketDisconnect
。在 broadcast
方法中捕获这个异常并清理无效连接是最佳实践,可以防止向死连接不断发送数据。
```python
ConnectionManager 类的 broadcast 方法中已经包含了对 WebSocketDisconnect 的处理和无效连接的移除。
这是确保 active_connections 列表保持“干净”的重要部分。
```
其他潜在的异常处理
除了 WebSocketDisconnect
,在处理 WebSocket 消息时还可能遇到其他异常,例如:
- 解码错误: 如果使用
receive_json
接收到了非法的JSON字符串。 - 业务逻辑错误: 在处理收到的消息时,你的应用代码可能因为各种原因抛出异常(例如,数据库错误)。
为了防止这些异常导致整个WebSocket端点函数崩溃并意外断开连接,应该使用更通用的 except Exception as e:
块来捕获它们。在捕获到其他异常时,通常的处理方式是:
1. 记录错误。
2. 可以选择性地向客户端发送一个错误消息(如果连接还可用)。
3. 调用 websocket.close()
带有合适的错误码(例如1011表示服务器错误)。
4. 从 ConnectionManager
中移除连接。
7. WebSocket的认证与授权
与HTTP端点类似,WebSocket连接也常常需要知道连接的用户身份,并根据用户身份进行授权。由于WebSocket连接是通过HTTP握手建立的,你可以在握手阶段进行认证。
如何在WebSocket连接中识别用户
几种常见的方法:
- 通过URL查询参数传递Token或Session ID: 这是最简单的方法,客户端在连接URL中包含认证信息,如
ws://localhost:8000/ws/{client_id}?token=your_token
。服务器端可以在WebSocket端点函数中通过依赖注入获取查询参数。 - 通过HTTP Headers传递Token: 客户端可以在WebSocket握手请求的Header中包含认证信息(例如
Authorization: Bearer your_token
)。然而,标准的浏览器WebSocket
API 不允许自定义Header,所以这种方法通常只适用于非浏览器客户端(如移动应用、桌面应用或Python客户端)。 - 先建立HTTP连接,获取认证信息,再建立WebSocket连接: 客户端先通过标准的HTTP API使用用户名/密码登录,获取一个Token或Session ID。然后使用这个Token/ID通过上述方法1或2建立WebSocket连接。这是最常见的模式。
- 在WebSocket连接建立后发送认证消息: 连接建立后,客户端立即发送一条特殊的认证消息包含凭据。服务器验证后才允许后续操作。这种方法稍微复杂,但允许使用WebSocket协议本身进行认证。
通过查询参数进行认证示例
假设你有一个 /login
HTTP端点,成功登录后返回一个简单的 token。然后客户端使用这个 token 连接到 WebSocket。
```python
main.py (继续添加)
from fastapi import Query, Depends, HTTPException, status
from typing import Optional
import uuid # 用于生成简单token
模拟用户数据库和token存储
fake_users_db = {
"john_doe": {"username": "john_doe", "password": "secretpassword"},
"jane_smith": {"username": "jane_smith", "password": "anothersecret"}
}
fake_tokens_db = {} # token: username 映射
HTTP 登录端点 (简化版)
@app.post("/login")
async def login(username: str, password: str):
user = fake_users_db.get(username)
if not user or user["password"] != password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials"
)
token = str(uuid.uuid4()) # 生成一个简单的token
fake_tokens_db[token] = username
return {"token": token}
依赖注入函数,用于验证WebSocket连接的token
async def get_user_from_token(token: str = Query(..., description="Authentication token")):
username = fake_tokens_db.get(token)
if not username:
# 可以在这里抛出 HTTPException,虽然 WebSocket 握手时
# HTTPException 不会像 HTTP 请求那样返回标准错误响应,
# 但会阻止连接建立。客户端会收到连接失败(状态码通常是 1011)。
# 或者,你可以让依赖返回 None,然后在 WebSocket 端点内部检查。
print(f"Invalid token received: {token}")
# 直接返回 None,然后在端点中处理
return None
# 或者更严格地阻止连接: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
print(f"Token {token} validated for user: {username}")
return username
修改 WebSocket 端点使用依赖注入进行认证
@app.websocket("/ws/auth")
async def websocket_auth_endpoint(
websocket: WebSocket,
username: str = Depends(get_user_from_token) # 使用依赖注入获取并验证用户
):
# 如果依赖注入返回 None (即token无效),我们不接受连接
if username is None:
# FastAPI/Starlette 会在依赖失败时阻止 accept() 调用,
# 但为了明确和日志记录,我们可以在这里加个检查
print("Connection attempt with invalid token rejected.")
# await websocket.close(code=1008) # 1008表示策略违规,或1011表示服务器错误
# 注意:在依赖注入失败时,accept() 未被调用,直接调用 close() 可能会出错
# 依赖失败通常会导致 Starlette 自动拒绝连接,客户端会看到握手失败。
return # 直接返回,不接受连接
# 如果token有效,接受连接并继续
print(f"Client '{username}' connected via authenticated WebSocket.")
await manager.connect(websocket) # 使用管理器处理连接建立
try:
# 现在你可以使用 username 变量来识别用户
await manager.send_personal_message(f"Welcome, {username}!", websocket)
await manager.broadcast(f"User '{username}' has joined the chat.")
while True:
data = await websocket.receive_text()
message = f"'{username}' says: {data}"
print(f"Received message from '{username}': {data}")
await manager.broadcast(message)
except WebSocketDisconnect:
manager.disconnect(websocket)
print(f"User '{username}' disconnected.")
await manager.broadcast(f"User '{username}' has left the chat.")
except Exception as e:
print(f"An error occurred with user '{username}': {e}")
manager.disconnect(websocket)
await manager.broadcast(f"User '{username}' disconnected due to an error.")
```
在这个例子中,用户需要先通过 /login
获取 token,然后通过 /ws/auth?token=YOUR_TOKEN
连接WebSocket。get_user_from_token
依赖函数会在WebSocket握手阶段执行,验证 token。如果 token 无效,依赖会失败,FastAPI 会拒绝接受 WebSocket 连接。如果 token 有效,端点函数会接收到用户名,并可以使用这个用户名来标识连接。
在连接管理器中关联用户ID与WebSocket连接
为了发送消息给特定用户(不是通过 client_id
,而是通过他们的用户名或用户ID),ConnectionManager
可以使用一个字典来存储映射关系,例如 username: WebSocket
。
```python
main.py (修改 ConnectionManager)
from typing import List, Dict
class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
# 添加一个字典来按用户名查找连接
self.user_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, username: Optional[str] = None):
"""处理新连接,可选地关联用户名"""
await websocket.accept()
self.active_connections.append(websocket)
if username:
self.user_connections[username] = websocket # 关联用户名
print(f"Client '{username}' connected. Total connections: {len(self.active_connections)}")
else:
print(f"Client connected (anonymous). Total connections: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
"""处理连接断开,同时移除用户关联"""
try:
self.active_connections.remove(websocket)
# 查找并移除用户关联
username_to_remove = None
for username, conn in self.user_connections.items():
if conn == websocket:
username_to_remove = username
break
if username_to_remove:
del self.user_connections[username_to_remove]
print(f"Client '{username_to_remove}' disconnected. Total connections: {len(self.active_connections)}")
else:
print(f"Client disconnected (anonymous or not found in user map). Total connections: {len(self.active_connections)}")
except ValueError:
print("Disconnected client not found in active connections list.")
async def send_personal_message(self, message: str, websocket: WebSocket):
# ... (方法体与之前类似,已处理异常)
pass
async def send_message_to_user(self, message: str, username: str):
"""向特定用户发送消息"""
connection = self.user_connections.get(username)
if connection:
await self.send_personal_message(message, connection)
else:
print(f"User '{username}' not found among active connections.")
async def broadcast(self, message: str):
# ... (方法体与之前类似,已处理异常和清理无效连接)
pass
async def broadcast_json(self, data: Any):
# ... (方法体与之前类似,已处理异常和清理无效连接)
pass
... (继续使用修改后的 manager 实例)
修改 websocket_auth_endpoint 的 connect/disconnect 调用
@app.websocket("/ws/auth")
async def websocket_auth_endpoint(
websocket: WebSocket,
username: str = Depends(get_user_from_token) # 确保 get_user_from_token 返回 username 或 None
):
if username is None:
print("Connection attempt with invalid token rejected.")
return
# 使用带有用户名的 connect 方法
await manager.connect(websocket, username)
try:
# ... (其余逻辑与之前类似,使用 username 变量)
await manager.send_personal_message(f"Welcome, {username}!", websocket)
await manager.broadcast(f"User '{username}' has joined the chat.")
while True:
data = await websocket.receive_text()
# 可以发送消息给特定用户,例如私聊命令 /pm user message
if data.startswith("/pm "):
parts = data.split(" ", 2)
if len(parts) == 3:
target_user = parts[1]
private_message_content = parts[2]
await manager.send_message_to_user(f"(Private from {username}): {private_message_content}", target_user)
await manager.send_personal_message(f"(Private to {target_user}): {private_message_content}", websocket) # 给发送者自己一个确认
else:
await manager.send_personal_message("Invalid /pm command format. Use '/pm username message'", websocket)
else:
# 广播普通消息
message = f"'{username}' says: {data}"
print(f"Received message from '{username}': {data}")
await manager.broadcast(message)
except WebSocketDisconnect:
manager.disconnect(websocket)
print(f"User '{username}' disconnected.")
await manager.broadcast(f"User '{username}' has left the chat.")
except Exception as e:
print(f"An error occurred with user '{username}': {e}")
# 发生错误时,在断开前尝试向该用户发送错误信息
try:
await manager.send_personal_message(f"An internal server error occurred: {e}", websocket)
except:
pass # 尝试发送失败也没关系
manager.disconnect(websocket)
await manager.broadcast(f"User '{username}' disconnected due to an error.")
``
ConnectionManager` 可以通过用户名查找特定的WebSocket连接,从而实现私聊等功能。
现在
8. WebSocket的高级主题
状态管理
在实时应用中,服务器经常需要维护一些共享状态,例如所有在线用户列表、最近的聊天消息历史、游戏房间状态等。在单进程的FastAPI应用中,你可以将这些状态存储在全局变量或 ConnectionManager
实例中。
- 在线用户列表:
ConnectionManager
中的user_connections
字典就是一个简单的在线用户列表。 - 聊天记录: 可以在
ConnectionManager
中维护一个列表来存储最近 N 条消息,新连接的用户可以先获取这些历史消息。
```python
main.py (在 ConnectionManager 中添加消息历史)
class ConnectionManager:
# ... (其他方法)
def __init__(self):
self.active_connections: List[WebSocket] = []
self.user_connections: Dict[str, WebSocket] = {}
self.message_history: List[Dict[str, Any]] = [] # 存储消息历史,例如 JSON 格式
self.max_history_size = 100 # 最大存储消息数量
async def connect(self, websocket: WebSocket, username: Optional[str] = None):
await websocket.accept()
self.active_connections.append(websocket)
if username:
self.user_connections[username] = websocket
print(f"Client '{username}' connected. Total connections: {len(self.active_connections)}")
else:
print(f"Client connected (anonymous). Total connections: {len(self.active_connections)}")
# 向新连接的用户发送历史消息
if self.message_history:
# 注意:直接发送 JSON 列表,或者逐条发送
# 逐条发送更通用,即使客户端不支持接收 JSON 数组
await self.send_personal_json({"type": "history", "messages": self.message_history}, websocket)
# 或者
# for msg in self.message_history:
# await self.send_personal_json({"type": "history_item", "message": msg}, websocket)
async def add_message_to_history(self, message_data: Dict[str, Any]):
"""添加消息到历史记录并裁剪"""
self.message_history.append(message_data)
# 保持历史记录在指定大小内
if len(self.message_history) > self.max_history_size:
self.message_history.pop(0) # 移除最旧的消息
async def broadcast_json(self, data: Any):
"""广播 JSON 消息,并添加到历史记录(如果需要)"""
# 可以在这里判断消息类型,只将聊天消息添加到历史记录
if data.get("type") == "chat_message":
await self.add_message_to_history(data)
invalid_connections = []
for connection in self.active_connections:
# ... (发送逻辑,与之前类似)
try:
await connection.send_json(data)
except (WebSocketDisconnect, Exception):
invalid_connections.append(connection)
# ... (移除无效连接逻辑)
修改 websocket_auth_endpoint 中的广播调用,发送包含 type 的 JSON 消息
@app.websocket("/ws/auth")
async def websocket_auth_endpoint(
websocket: WebSocket,
username: str = Depends(get_user_from_token)
):
# ... (连接和错误处理逻辑)
try:
await manager.connect(websocket, username)
await manager.broadcast_json({"type": "system", "content": f"User '{username}' has joined the chat.", "timestamp": datetime.datetime.now().isoformat()})
while True:
data = await websocket.receive_json() # 假设客户端发送 JSON
# 假设客户端发送 { "type": "chat", "message": "..." }
message_type = data.get("type")
if message_type == "chat":
message_content = data.get("message", "")
if message_content.strip(): # 忽略空消息
chat_message_data = {
"type": "chat_message", # 区分消息类型
"sender": username,
"content": message_content,
"timestamp": datetime.datetime.now().isoformat()
}
print(f"Received chat message from '{username}': {message_content}")
await manager.broadcast_json(chat_message_data) # 广播聊天消息 (会添加到历史)
elif message_type == "private_chat":
target_user = data.get("target_user")
private_message_content = data.get("message", "")
if target_user and private_message_content.strip():
private_message_data = {
"type": "private_chat_message",
"sender": username,
"target": target_user,
"content": private_message_content,
"timestamp": datetime.datetime.now().isoformat()
}
print(f"Received private message from '{username}' to '{target_user}': {private_message_content}")
await manager.send_message_to_user(private_message_data, target_user)
# 可选:给发送者自己一个确认
await manager.send_personal_json({"type": "system", "content": f"Private message sent to {target_user}.", "timestamp": datetime.datetime.now().isoformat()}, websocket)
# 可以处理其他消息类型,例如加入房间、离开房间等
except WebSocketDisconnect:
manager.disconnect(websocket)
print(f"User '{username}' disconnected.")
await manager.broadcast_json({"type": "system", "content": f"User '{username}' has left the chat.", "timestamp": datetime.datetime.now().isoformat()})
except Exception as e:
print(f"An error occurred with user '{username}': {e}")
manager.disconnect(websocket)
await manager.broadcast_json({"type": "system", "content": f"User '{username}' disconnected due to an error.", "timestamp": datetime.datetime.now().isoformat()})
```
Subprotocols
WebSocket协议允许指定和使用"子协议" (subprotocols)。子协议定义了在WebSocket连接上交换消息的格式和语义。例如,你可以定义一个用于聊天应用的子协议,规定消息必须是JSON格式,并且包含 type
, sender
, content
等字段。客户端和服务器在握手阶段协商使用的子协议。
在FastAPI中,可以在 app.websocket()
装饰器中指定支持的子协议列表:
```python
@app.websocket("/ws/subprotocol", subprotocols=["chat", "notifications"])
async def websocket_subprotocol_endpoint(websocket: WebSocket):
# 在 accept() 中可以指定实际使用的子协议
# chosen_subprotocol = ... 客户端会在握手请求头 Sec-WebSocket-Protocol 中发送其支持的子协议
# Starlette/FastAPI 会自动处理协商,如果客户端支持其中一个,会在 websocket.scope['subprotocol'] 中可用
await websocket.accept() # 可以可选地传入 chosen_subprotocol 参数
# 检查协商后的子协议 (如果需要)
# negotiated_protocol = websocket.scope.get('subprotocol')
# print(f"Negotiated subprotocol: {negotiated_protocol}")
# ... 处理消息 ...
``
WebSocket
客户端在创建对象时指定子协议列表:
new WebSocket("ws://...", ["chat", "notifications"])`。
性能与扩展
FastAPI基于Starlette和Uvicorn(一个高性能的ASGI服务器),本身具有很好的并发处理能力,尤其是在使用异步代码时。单个FastAPI进程可以同时处理大量的并发连接(包括HTTP和WebSocket),因为当一个连接在等待I/O(如接收消息、发送消息)时,事件循环可以切换去处理其他连接。
- 单机扩展: Uvicorn可以通过
--workers
参数启动多个worker进程。每个worker都是独立的Python进程,有自己的内存空间。这有助于利用多核CPU,并且可以在一个worker崩溃时不影响其他worker。- 问题: 当使用多个worker时,前面实现的
ConnectionManager
存在于每个worker进程的内存中,它们之间不共享状态。这意味着一个worker收到的消息无法通过其本地的ConnectionManager
广播给连接到其他worker的客户端。
- 问题: 当使用多个worker时,前面实现的
- 分布式扩展: 为了解决多进程/多服务器部署时的状态共享问题,需要使用外部服务来协调消息分发。最常见的模式是使用发布/订阅系统 (Pub/Sub)。
- Redis Pub/Sub: Redis是一个流行的内存数据结构存储,它也提供Pub/Sub功能。
- 工作原理: 当一个worker收到一条消息需要广播时,它不是直接遍历本地连接,而是将消息发布 (Publish) 到一个特定的Redis频道。其他所有worker都在订阅 (Subscribe) 这个频道。当消息发布到频道时,所有订阅了这个频道的worker都会收到消息。每个worker收到消息后,再通过其本地的
ConnectionManager
将消息发送给连接到自己的客户端。
- 工作原理: 当一个worker收到一条消息需要广播时,它不是直接遍历本地连接,而是将消息发布 (Publish) 到一个特定的Redis频道。其他所有worker都在订阅 (Subscribe) 这个频道。当消息发布到频道时,所有订阅了这个频道的worker都会收到消息。每个worker收到消息后,再通过其本地的
- 实现: 这需要额外的代码来集成Redis客户端(如
redis-py
),在ConnectionManager
中添加订阅和发布逻辑,并在单独的协程中运行订阅循环。
- Redis Pub/Sub: Redis是一个流行的内存数据结构存储,它也提供Pub/Sub功能。
```python
概念示例 (不完整,仅展示 Redis Pub/Sub 思路)
import asyncio
import aioredis # 需要安装 pip install redis hiredis
假设 ConnectionManager 有一个 Redis 客户端连接池
class ConnectionManager:
# ... init, connect, disconnect, send_personal_message, send_message_to_user ...
async def setup_redis(self, redis_url="redis://localhost"):
self._redis = await aioredis.from_url(redis_url, decode_responses=True)
self._pubsub = self._redis.pubsub()
await self._pubsub.subscribe("chat_channel") # 订阅聊天频道
asyncio.create_task(self.listen_redis_pubsub()) # 启动一个协程监听消息
async def listen_redis_pubsub(self):
"""在协程中循环监听 Redis Pub/Sub 频道"""
print("Started listening to Redis Pub/Sub...")
while True:
try:
message = await self._pubsub.get_message(ignore_subscribe_messages=True)
if message and message['type'] == 'message':
data = message['data'] # 这是收到的广播消息 (字符串或JSON)
print(f"Received message from Redis: {data}")
# 将收到的消息广播给本 worker 的所有客户端 (不包括发送者,如果发送者是本worker的客户端)
# 可以在消息数据中包含发送者的 worker ID 或 connection ID 来避免回发
await self.broadcast(data) # 或者 broadcast_json(json.loads(data))
await asyncio.sleep(0.01) # 防止CPU空转
except Exception as e:
print(f"Error in Redis Pub/Sub listener: {e}")
await asyncio.sleep(1) # 出错时稍等重试
async def publish_message(self, message: str):
"""将消息发布到 Redis 频道"""
if self._redis:
await self._redis.publish("chat_channel", message)
# 在 WebSocket 端点收到消息时,不再直接 broadcast(),而是 publish_message()
@app.websocket("/ws/auth")
async def websocket_auth_endpoint(...):
# ... connect 逻辑 ...
await manager.setup_redis() # 在连接建立后设置Redis (可能需要更早设置,例如在应用启动时)
try:
while True:
data = await websocket.receive_text()
message_to_publish = f"'{username}' says: {data}"
await manager.publish_message(message_to_publish) # 发布到Redis
# ... disconnect 和 异常处理 ...
# 应用启动时初始化 Redis (或者在 ConnectionManager 创建时)
@app.on_event("startup")
async def startup_event():
await manager.setup_redis() # 或者其他初始化方式
@app.on_event("shutdown")
async def shutdown_event():
if manager._redis:
await manager._redis.close()
```
使用Redis Pub/Sub增加了复杂度,但对于需要水平扩展的实时应用来说是必要的。
部署考虑
- 代理服务器 (Nginx/Traefik): 在生产环境中,通常会在FastAPI应用前面部署一个反向代理服务器。这个代理服务器需要配置来正确处理WebSocket连接。Nginx和Traefik都支持WebSocket代理,核心配置是升级连接头 (
Upgrade
和Connection
头)。 - SSL/TLS (WSS): 为了安全,生产环境中的WebSocket连接应该使用加密的WebSocket Secure (
wss://
) 协议,它运行在TLS/SSL之上。代理服务器负责处理SSL证书和加密,并将流量转发到后端的FastAPI应用(通常是未加密的ws://
)。 - Uvicorn Worker数量: 根据服务器的CPU核心数量和应用特性,配置Uvicorn的worker数量。
9. 总结
FastAPI凭借其高性能、易用性和对异步编程的良好支持,成为了构建实时应用的优秀选择。通过结合Starlette强大的WebSocket实现,我们可以轻松地创建双向、实时的通信功能。
本文从理解WebSocket协议的优势开始,逐步深入,展示了如何在FastAPI中:
* 定义基础WebSocket端点。
* 处理连接的建立、消息的收发和断开。
* 构建 ConnectionManager
来有效地管理多个并发连接并实现广播功能。
* 处理不同类型的数据(文本、字节、JSON)。
* 实现WebSocket连接的认证。
* 探讨了状态管理和水平扩展等高级主题。
通过本文提供的代码示例和解释,你应该已经掌握了使用FastAPI和WebSocket构建功能丰富实时应用的基础知识和核心技巧。
10. 进一步学习资源
- FastAPI官方文档 - WebSockets: https://fastapi.tiangolo.com/advanced/websockets/
- Starlette官方文档 - WebSockets: https://www.starlette.io/websockets/
- MDN Web Docs - WebSocket API (Client-side JavaScript): https://developer.mozilla.org/en-US/docs/Web/API/WebSocket
- Redis Pub/Sub 文档: https://redis.io/topics/pubsub
websockets
Python 库文档: https://websockets.readthedocs.io/en/stable/
希望这篇文章对你有所帮助!祝你在使用FastAPI构建实时应用的旅程中一切顺利!