FastAPI WebSocket 教学:从零开始实现实时功能
前言:步入实时通信的世界
在当今高度互联的数字时代,用户对于应用体验的要求越来越高。传统的请求-响应(Request-Response)模式,如HTTP,虽然强大,但在需要即时更新、双向通信的场景下显得力不从心。想象一下在线聊天、多人协作文档、实时股票报价、游戏排行榜或物联网数据流,这些功能的核心都离不开“实时”二字。
这时,WebSocket 技术应运而生。它突破了HTTP的单向通信限制,允许客户端和服务器之间建立一个持久的、全双工的通信通道。一旦握手成功,双方可以随时发送和接收数据,从而实现真正意义上的实时交互。
而Python作为一门在Web开发领域日益重要的语言,其异步框架FastAPI更是为构建高性能、高并发的API服务提供了得天独厚的优势。FastAPI基于Starlette和Pydantic,充分利用了Python的async/await语法,使其在处理异步任务(包括WebSocket)时表现卓越。
本篇文章将带你从零开始,深入学习如何使用FastAPI构建WebSocket应用。我们将从基础概念入手,逐步实现一个功能完善的多用户实时聊天室,并探讨一些高级主题和最佳实践。
1. 理解 WebSocket:与 HTTP 的异同
在深入代码之前,我们有必要理解WebSocket与我们日常接触的HTTP协议有哪些本质区别:
1.1 HTTP:单向、短连接
- 工作模式:客户端发起请求(Request),服务器返回响应(Response)。
- 连接状态:无状态(Stateless)。每次请求都是独立的,服务器不保留客户端的连接状态(尽管可以通过Cookie/Session模拟状态)。
- 连接生命周期:通常是短连接。完成一次请求-响应后,连接即可关闭(尽管HTTP/1.1引入了Keep-Alive,但本质仍是请求-响应)。
- 头部开销:每次请求都包含完整的HTTP头部,开销较大。
- 应用场景:网页浏览、API调用等。
1.2 WebSocket:双向、长连接
- 工作模式:客户端和服务器建立连接后,可以在任何时候互相发送数据,是真正的双向通信。
- 连接状态:有状态(Stateful)。一旦建立连接,服务器会保持这个连接,并知道这个客户端的存在。
- 连接生命周期:长连接。连接一旦建立,除非一方主动断开或出现异常,否则会一直保持开启状态。
- 头部开销:连接建立后,后续数据帧的头部开销极小,大大减少了网络传输的冗余。
- 握手过程:WebSocket连接的建立,实际上是基于HTTP协议的一次特殊握手(Upgrade请求)。客户端发送一个带有
Upgrade: websocket和Connection: Upgrade等特定头部的HTTP请求,服务器如果支持WebSocket,则会返回一个特殊的HTTP响应,表示协议升级成功,从此连接就从HTTP升级为WebSocket。
1.3 为什么选择 WebSocket?
- 实时性:无需频繁轮询,数据可以即时推送。
- 效率高:连接建立后开销小,减少了重复的TCP握手和HTTP头部传输。
- 资源消耗低:对于服务器和客户端来说,长连接的维护成本低于大量短连接的频繁建立和关闭。
2. 准备工作:环境搭建
在开始编码之前,我们需要搭建必要的开发环境。
2.1 创建虚拟环境(推荐)
为了避免包冲突,我们总是建议为每个项目创建一个独立的虚拟环境。
“`bash
创建虚拟环境
python -m venv venv
激活虚拟环境
在 macOS/Linux 上
source venv/bin/activate
在 Windows 上
venv\Scripts\activate
“`
2.2 安装 FastAPI 及 Uvicorn
FastAPI 是我们的Web框架,而Uvicorn是一个高性能的ASGI服务器,用于运行FastAPI应用。
bash
pip install fastapi uvicorn websockets # websockets库是FastAPI处理WebSocket的底层依赖
3. 初探 FastAPI WebSocket:构建一个 Echo 服务器
我们从最简单的Echo服务器开始。Echo服务器的功能是:客户端发送什么消息,服务器就原封不动地返回什么消息。
3.1 FastAPI 服务器端代码 (main.py)
“`python
main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse # 用于返回一个简单的HTML页面作为客户端
import uvicorn
app = FastAPI()
简单的HTML页面,用于提供WebSocket客户端界面
html = “””
FastAPI WebSocket Echo Server
“””
HTTP路由:提供上面定义的HTML页面
@app.get(“/”)
async def get():
return HTMLResponse(html)
WebSocket路由:处理WebSocket连接
@app.websocket(“/ws”)
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() # 接受WebSocket连接
try:
while True:
# 持续监听客户端发送过来的文本消息
data = await websocket.receive_text()
# 将收到的消息原封不动地发回给客户端
await websocket.send_text(f”Message text was: {data}”)
except WebSocketDisconnect:
# 当客户端断开连接时,会抛出WebSocketDisconnect异常
print(“Client disconnected”)
# 可以在这里进行一些清理工作,例如从在线用户列表中移除该用户
if name == “main“:
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`
3.2 代码解析
from fastapi import FastAPI, WebSocket, WebSocketDisconnect: 导入必要的模块。WebSocket是FastAPI提供的用于处理WebSocket连接的类型,WebSocketDisconnect是当客户端断开连接时FastAPI会抛出的异常。@app.get("/"): 这是一个标准的FastAPI HTTP GET路由,用于在浏览器访问根路径时返回一个包含JavaScript客户端代码的HTML页面。@app.websocket("/ws"): 这是FastAPI定义WebSocket路由的方式。当客户端尝试通过WebSocket协议连接/ws路径时,这个异步函数就会被调用。async def websocket_endpoint(websocket: WebSocket)::websocket: WebSocket:FastAPI会自动将一个表示当前WebSocket连接的对象注入到这个参数中。await websocket.accept():这是WebSocket连接建立的关键一步。服务器必须显式地accept()客户端的连接请求,连接才能正式建立。在此之前,任何send()或receive()操作都会失败。while True::一旦连接建立,我们进入一个无限循环,持续监听和处理来自客户端的消息。data = await websocket.receive_text():这是一个异步操作,它会阻塞当前协程,直到从客户端接收到一条文本消息。FastAPI也支持receive_bytes()用于二进制数据,以及receive_json()用于JSON数据。await websocket.send_text(f"Message text was: {data}"):将收到的消息通过WebSocket连接发送回客户端。同样,也有send_bytes()和send_json()。except WebSocketDisconnect::这是一个非常重要的错误处理。当客户端主动关闭连接(例如关闭浏览器标签页)或者网络异常导致连接断开时,receive_text()或send_text()操作会抛出WebSocketDisconnect异常。捕获这个异常可以让我们优雅地处理客户端断开连接的情况,例如清理其会话数据。
3.3 运行与测试
- 保存上述代码为
main.py。 - 在命令行中运行:
uvicorn main:app --reload(--reload选项让Uvicorn在文件修改时自动重启服务器,方便开发)。 - 打开浏览器,访问
http://localhost:8000。 - 在页面的输入框中输入消息并点击“Send”,你会看到消息被发送到服务器,服务器再将其原封不动地发回。检查浏览器的开发者工具(Console),可以看到WebSocket连接的状态信息。
恭喜你!你已经成功创建并运行了第一个FastAPI WebSocket Echo服务器。
4. 进阶:实现多用户实时聊天室
Echo服务器虽然简单,但对于多用户通信场景还不够。真正的聊天室需要服务器能够将一个客户端发来的消息广播给所有连接着的其他客户端。
这涉及到如何管理多个WebSocket连接,以及如何安全地进行广播。我们将引入一个ConnectionManager类来帮助我们管理连接。
4.1 服务器端代码 (main.py – 更新)
我们将修改 main.py 来实现聊天室功能。
“`python
main.py (更新版本)
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from typing import List
import uvicorn
import json # 用于处理JSON格式的消息
app = FastAPI()
聊天室的HTML客户端页面
html_chat = “””
FastAPI WebSocket 聊天室
“””
连接管理器类
class ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
print(f"新连接: {websocket.client.host}:{websocket.client.port}. 当前连接数: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
print(f"连接断开: {websocket.client.host}:{websocket.client.port}. 当前连接数: {len(self.active_connections)}")
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
# 遍历所有活跃连接,向每个连接发送消息
for connection in self.active_connections:
try:
await connection.send_text(message)
except RuntimeError as e:
# 某些情况下,连接可能在遍历过程中断开,导致发送失败
# 这种错误可以通过WebsocketDisconnect处理,但为了健壮性,这里也捕获
print(f"发送消息到断开连接时发生错误: {e}")
# 考虑在这里将问题连接从列表中移除,但通常WebSocketDisconnect会处理
实例化连接管理器,全局共享
manager = ConnectionManager()
HTTP路由:提供聊天室HTML页面
@app.get(“/”)
async def get():
return HTMLResponse(html_chat)
WebSocket路由:处理聊天室连接
@app.websocket(“/ws”)
async def websocket_chat_endpoint(websocket: WebSocket):
# 将新连接添加到管理器
await manager.connect(websocket)
try:
# 通知所有客户端有新用户加入
await manager.broadcast(json.dumps({“sender”: “系统”, “message”: “新用户加入聊天室!”}))
while True:
# 持续监听客户端发送过来的JSON消息
data = await websocket.receive_text()
message_data = json.loads(data) # 解析JSON字符串
sender = message_data.get("sender", "匿名用户")
message_content = message_data.get("message", "空消息")
# 构造要广播的JSON消息
broadcast_message = json.dumps({"sender": sender, "message": message_content})
# 将收到的消息广播给所有连接的客户端
await manager.broadcast(broadcast_message)
except WebSocketDisconnect:
# 客户端断开连接时,从管理器中移除
manager.disconnect(websocket)
# 通知所有客户端有用户离开
await manager.broadcast(json.dumps({"sender": "系统", "message": "用户离开了聊天室!"}))
except Exception as e:
print(f"WebSocket 错误: {e}")
manager.disconnect(websocket)
await manager.broadcast(json.dumps({"sender": "系统", "message": "有用户因错误离开了聊天室!"}))
if name == “main“:
uvicorn.run(app, host=”0.0.0.0”, port=8000)
“`
4.2 代码解析与增强
-
ConnectionManager类:active_connections: List[WebSocket]:一个列表,用于存储所有当前活跃的WebSocket连接对象。connect(websocket: WebSocket):当有新连接建立时调用。它会先accept()连接,然后将websocket对象添加到active_connections列表中。disconnect(websocket: WebSocket):当客户端断开连接时调用。它会将对应的websocket对象从active_connections列表中移除。send_personal_message(message: str, websocket: WebSocket):发送消息给单个特定客户端(例如私聊功能)。broadcast(message: str):遍历active_connections列表中的所有websocket对象,并向每个对象发送相同的消息。这是实现群聊的关键。-
并发安全(Concurrency Safety):
在多线程或多协程环境下,对共享资源(如active_connections列表)进行修改(添加或删除)时,可能会出现竞态条件(Race Condition)。例如,一个协程正在遍历列表进行广播,而另一个协程同时在尝试从列表中移除一个断开的连接,这可能导致IndexError或其他不可预测的行为。
在Python的asyncio环境中,我们可以使用asyncio.Lock来保护共享资源。对于ConnectionManager,一个更健壮的实现会是:
“`python
import asyncioclass ConnectionManager:
def init(self):
self.active_connections: List[WebSocket] = []
self.lock = asyncio.Lock() # 添加一个异步锁async def connect(self, websocket: WebSocket): await websocket.accept() async with self.lock: # 使用锁保护列表操作 self.active_connections.append(websocket) print(f"新连接: {websocket.client.host}:{websocket.client.port}. 当前连接数: {len(self.active_connections)}") async def disconnect(self, websocket: WebSocket): # disconnect也应是异步的 async with self.lock: # 使用锁保护列表操作 if websocket in self.active_connections: # 确保连接存在才移除 self.active_connections.remove(websocket) print(f"连接断开: {websocket.client.host}:{websocket.client.port}. 当前连接数: {len(self.active_connections)}") async def broadcast(self, message: str): # 遍历时,可以先复制一份列表,或者在发送失败时移除 to_remove = [] async with self.lock: # 在遍历前获取锁,防止在遍历时列表被修改 connections_copy = list(self.active_connections) # 复制一份,防止在遍历时修改原列表 for connection in connections_copy: try: await connection.send_text(message) except RuntimeError as e: # 捕获发送失败,可能连接已无效 print(f"发送消息到断开连接时发生错误: {e}") # 延迟移除,或者在disconnect中统一移除 to_remove.append(connection) except WebSocketDisconnect: # 如果在send时才断开 to_remove.append(connection) if to_remove: async with self.lock: for connection in to_remove: if connection in self.active_connections: self.active_connections.remove(connection) print(f"清理已断开连接: {connection.client.host}:{connection.client.port}")``asyncio.Lock`,但在实际生产环境中,强烈建议在所有修改共享数据结构的操作中引入锁。
为了本教程的简洁性,在主代码中暂时省略了
-
json模块:- 在实际应用中,WebSocket传输的往往是结构化的数据,JSON是最佳选择。
- 客户端通过
JSON.stringify()将JavaScript对象转换为JSON字符串发送。 - 服务器端通过
json.loads()将收到的JSON字符串解析为Python字典。 - 服务器端通过
json.dumps()将Python字典转换为JSON字符串发送。
-
websocket_chat_endpoint函数:await manager.connect(websocket):新连接一进来,就通过管理器添加到活跃连接列表。await manager.broadcast(...):当有新用户加入或离开时,向所有客户端广播系统消息。data = await websocket.receive_text():接收客户端发送的JSON字符串消息。message_data = json.loads(data):解析消息体,获取发送者和消息内容。await manager.broadcast(broadcast_message):将格式化后的消息广播给所有客户端。except WebSocketDisconnect::捕获断开连接异常,通过管理器移除连接,并广播用户离开信息。except Exception as e::捕获其他潜在错误,例如JSON解析失败、网络异常等,进行日志记录并清理连接。
4.3 运行与测试
- 保存上述更新后的代码为
main.py。 - 在命令行中运行:
uvicorn main:app --reload。 - 打开多个浏览器标签页或窗口,访问
http://localhost:8000。 - 在每个窗口中输入不同的用户名,然后输入消息发送。你会发现所有连接的客户端都能实时收到消息,包括系统消息(用户加入/离开)。
- 尝试关闭某个浏览器标签页,观察其他标签页是否收到用户离开的系统消息。
5. 深入探讨与最佳实践
实现了基本的聊天室功能后,我们来探讨一些在实际开发中会遇到的更高级的场景和最佳实践。
5.1 数据序列化与反序列化
- FastAPI 内置 JSON 支持:除了手动使用
json.dumps()和json.loads(),FastAPI 的WebSocket对象也提供了方便的send_json()和receive_json()方法。await websocket.send_json({"sender": "系统", "message": "欢迎!"})received_data = await websocket.receive_json()
这简化了代码,并且 FastAPI 内部会处理好JSON的编解码。
5.2 认证与授权 (Authentication & Authorization)
WebSocket 连接也需要安全保障。通常做法有:
- 基于Token的HTTP握手:在WebSocket握手请求的HTTP头部中包含认证Token(例如
Authorization: Bearer <token>)。FastAPI的WebSocket路由函数可以访问websocket.headers来获取这些信息,并在await websocket.accept()之前进行验证。
“`python
@app.websocket(“/ws”)
async def websocket_chat_endpoint(websocket: WebSocket, token: str = Query(…)): # 可以从URL查询参数获取
# 也可以从 headers 获取:
# auth_header = websocket.headers.get(“Authorization”)
# if not auth_header or not auth_header.startswith(“Bearer “):
# await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason=”Unauthorized”)
# return
# token = auth_header.split(” “)[1]# 验证 token... if not is_valid_token(token): # 假设有一个验证函数 await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Unauthorized") return await manager.connect(websocket) # ... 后续逻辑“`
2. 首次消息验证:连接建立后,客户端发送的第一个消息包含身份凭证。服务器验证后才允许后续通信。
5.3 错误处理与心跳机制
WebSocketDisconnect:我们已经看到它用于处理客户端断开连接。- 客户端自动重连:在客户端JavaScript中实现
onclose事件的重连逻辑,可以提高用户体验。例如,setTimeout(connectWebSocket, 3000)。 - 心跳机制 (Ping/Pong):长时间没有数据传输时,防火墙或代理可能会切断连接。WebSocket协议自带Ping/Pong帧,可以由客户端或服务器发送,用来检查连接是否仍然活跃。
- FastAPI/Starlette 底层通常会处理自动心跳,但在某些特定场景下,你可能需要手动实现应用层的心跳机制。例如,每隔一段时间向客户端发送一个特定类型的“ping”消息,并期待收到“pong”消息。
5.4 后台任务与非阻塞操作
在WebSocket处理函数内部,任何耗时的同步操作都会阻塞整个协程,甚至可能影响其他连接。对于需要执行长时间任务的情况,可以使用FastAPI的 BackgroundTasks 或 asyncio.create_task() 来将其放到后台执行,避免阻塞主循环。
“`python
from fastapi import BackgroundTasks
async def heavy_computation_task(data):
# 模拟耗时操作
await asyncio.sleep(5)
print(f”后台任务完成,处理了: {data}”)
@app.websocket(“/ws_background”)
async def websocket_background_endpoint(websocket: WebSocket, background_tasks: BackgroundTasks):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
# 将耗时任务添加到后台
background_tasks.add_task(heavy_computation_task, data)
await websocket.send_text(f”已收到消息并启动后台任务处理: {data}”)
except WebSocketDisconnect:
print(“Client disconnected from background task endpoint”)
“`
5.5 状态管理与持久化
对于更复杂的应用,聊天记录、用户在线状态等数据不可能只存在于内存中。
- 数据库:将聊天消息、用户会话等持久化到数据库(如PostgreSQL、MongoDB)。
- 缓存/消息队列 (Redis Pub/Sub):
- 对于分布式部署的FastAPI应用(即有多个FastAPI实例),每个实例都有自己的
ConnectionManager。这时,一个实例收到的消息无法直接广播给连接到另一个实例的客户端。 - 解决方案是使用一个中央消息队列系统,如Redis的Pub/Sub(发布/订阅)模式。当一个FastAPI实例收到消息时,它不是直接广播,而是将消息发布到一个Redis频道。所有FastAPI实例都订阅这个频道,收到消息后,再各自向其管理的客户端广播。
- 这是一个非常重要的概念,用于扩展WebSocket应用。
- 对于分布式部署的FastAPI应用(即有多个FastAPI实例),每个实例都有自己的
5.6 客户端框架集成
本教程使用了原生的JavaScript WebSocket API。在实际项目中,你可能会使用更高级的JavaScript框架(如Vue.js, React, Angular)或专门的WebSocket客户端库(如Socket.IO Client,但请注意Socket.IO协议与标准WebSocket协议不同),它们提供了更强大的抽象和状态管理能力。
6. 总结与展望
通过本篇文章的学习,我们:
- 理解了WebSocket协议的核心概念及其与HTTP的区别。
- 掌握了FastAPI中WebSocket路由的定义和基本用法。
- 从零开始构建了一个FastAPI WebSocket Echo服务器。
- 进一步实现了一个多用户实时聊天室,学习了连接管理和消息广播机制。
- 探讨了认证、错误处理、数据序列化、后台任务、状态管理及分布式扩展等高级主题。
WebSocket技术结合FastAPI的异步优势,为构建高性能、实时的Web应用提供了坚实的基础。无论是即时通讯、协同编辑、实时数据可视化还是物联网控制,WebSocket都能发挥其独特的作用。
实时通信的世界充满无限可能,希望这篇详细的教程能为你打开一扇新的大门,激发你创造更多激动人心的实时应用!现在,是时候将这些知识付诸实践,去构建你自己的实时应用了!