FastAPI Middleware 原理与应用 – wiki基地


深入理解 FastAPI Middleware:原理与应用

在构建现代 Web API 时,除了核心业务逻辑处理外,我们经常需要处理一些横切关注点(Cross-cutting Concerns),例如请求日志记录、身份认证、访问控制、请求头处理、CORS 策略、性能监控等。如果将这些逻辑分散到每一个路由处理函数中,代码会变得重复、臃肿且难以维护。这就是中间件(Middleware)发挥作用的地方。

FastAPI 作为一款高性能的现代 Python Web 框架,基于 Starlette 和 Pydantic 构建,它提供了强大且灵活的中间件机制,允许我们在请求到达路由处理函数之前或响应返回客户端之后插入自定义逻辑。本文将深入探讨 FastAPI 中间件的工作原理、类型以及如何在实际应用中有效地使用它。

一、什么是中间件?为什么需要它?

想象一下 Web 请求处理过程是一个流水线。客户端发送请求,服务器接收请求,然后根据请求路径匹配到相应的处理函数,处理函数执行业务逻辑,最后生成响应并返回给客户端。

中间件就像流水线上的一个或多个工人,它们位于请求进入和响应离开的关键节点上。每个中间件都有机会在请求到达下一个环节(可能是另一个中间件或最终的路由处理函数)之前进行预处理,并在响应从下一个环节返回后进行后处理。

Web Request Middleware Flow (Conceptual)

通过使用中间件,我们可以:

  1. 集中处理横切关注点: 将分散在各处的逻辑提取到一个独立模块中,提高代码的内聚性。
  2. 提高代码复用性: 一次编写,应用于所有(或部分)请求。
  3. 降低耦合度: 中间件与具体的业务逻辑解耦,业务代码更专注于自身职责。
  4. 构建可插拔的架构: 可以轻松地添加、移除或修改中间件,而无需改动核心业务代码。

在 FastAPI/Starlette 中,中间件被设计为 ASGI (Asynchronous Server Gateway Interface) 应用的一部分。ASGI 是一种异步 Python Web 标准,中间件本质上也是一个 ASGI 应用,它可以包装另一个 ASGI 应用(可以是下一个中间件或最终的 FastAPI/Starlette 应用本身)。当请求到达时,它会依次通过中间件链,每个中间件在调用链中的下一个应用之前执行逻辑,然后在下一个应用返回响应后执行逻辑。

二、FastAPI 中间件的工作原理

FastAPI(底层是 Starlette)的中间件机制是基于 ASGI 的异步调用链实现的。一个中间件本质上是一个 ASGI 应用,它接收三个参数:scope, receive, send。然而,FastAPI/Starlette 提供了一个更高级、更易用的接口来定义中间件,通常是以函数或类的形式。

核心原理概括如下:

  1. 添加中间件: 当你通过 app.add_middleware() 方法添加中间件时,FastAPI 会将这些中间件按照添加的顺序组成一个链条,并最终将你的主应用(包含路由处理函数)包裹在最内层。
  2. 请求流经链条: 当一个 HTTP 请求到来时,它首先进入最外层的中间件。
  3. 中间件执行逻辑: 每个中间件在接收到请求后,会执行一部分预处理逻辑(例如读取请求头、校验身份等)。
  4. 调用下一个: 预处理完成后,中间件会调用链条中的下一个应用(通过一个特定的异步函数实现,通常命名为 call_next)。这个调用会一直向下传递,直到到达最内层的 FastAPI 应用和最终的路由处理函数。
  5. 路由处理: 路由处理函数执行核心业务逻辑,生成一个响应对象。
  6. 响应回流: 响应对象从最内层开始,沿着中间件链条原路返回。
  7. 中间件执行后处理: 在响应经过每个中间件返回时,该中间件有机会执行后处理逻辑(例如修改响应头、记录响应状态、计算处理时间等)。
  8. 响应返回客户端: 经过所有中间件后处理的最终响应被发送回客户端。

这个过程可以用一个洋葱模型来类比:请求像穿透洋葱一样一层层进入,最终到达中心(路由处理函数),然后响应像从中心向外扩散一样一层层穿过洋葱皮返回。

1. 函数式中间件 (推荐)

FastAPI/Starlette 推荐使用基于函数的异步中间件。这种中间件是一个 async def 函数,它接收两个参数:

  • request: Request: 表示当前的 HTTP 请求对象。
  • call_next: 这是一个可调用的异步函数。调用 await call_next(request) 会将请求传递给下一个中间件或最终的路由处理函数,并等待获取响应。

函数签名的基本结构如下:

“`python
async def my_middleware(request: Request, call_next):
# 请求预处理逻辑 (在 await call_next 之前)
print(f”Request received: {request.method} {request.url}”)

# 调用下一个中间件或路由处理函数,并获取响应
response = await call_next(request)

# 响应后处理逻辑 (在 await call_next 之后)
print(f"Response status: {response.status_code}")
# 例如,可以修改响应头
response.headers["X-Processed-By-Middleware"] = "MyMiddleware"

# 返回响应
return response

“`

这种方式非常简洁直观,并且天然支持异步操作,与 FastAPI 的异步特性完美契合。

2. 类式中间件 (BaseHTTPMiddleware)

Starlette 也提供了一个 BaseHTTPMiddleware 类,你可以继承这个类来定义同步或异步的中间件。你需要实现一个 dispatch 方法:

“`python
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import Response

class MyClassMiddleware(BaseHTTPMiddleware):
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
# 请求预处理逻辑 (在 await call_next 之前)
print(f”Class Middleware Request received: {request.method} {request.url}”)

    # 调用下一个中间件或路由处理函数,并获取响应
    response = await call_next(request)

    # 响应后处理逻辑 (在 await call_next 之后)
    print(f"Class Middleware Response status: {response.status_code}")

    # 返回响应
    return response

“`

BaseHTTPMiddlewaredispatch 方法签名与函数式中间件类似,也接收 requestcall_next,并需要返回一个 Response 对象。

函数式 vs 类式:

  • 函数式: 更推荐,简洁,原生异步。适用于大多数需要异步操作的场景(例如调用数据库、外部服务)。
  • 类式 (BaseHTTPMiddleware): 适用于需要在中间件中存储状态(尽管不太常见)、或者你有一些同步逻辑想在中间件中执行(BaseHTTPMiddleware 会在一个线程池中运行其 dispatch 方法,以避免阻塞主事件循环,但这增加了开销)。对于纯异步逻辑,函数式更优。

3. 添加中间件到应用

使用 app.add_middleware() 方法将中间件添加到你的 FastAPI 应用实例:

“`python
from fastapi import FastAPI, Request, Response
import time

app = FastAPI()

添加函数式中间件

async def timing_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() – start_time
response.headers[“X-Process-Time”] = str(process_time)
print(f”Processed request in {process_time:.4f} seconds”)
return response

app.add_middleware(timing_middleware) # 直接传递函数

添加类式中间件

app.add_middleware(MyClassMiddleware) # 传递类名

@app.get(“/”)
async def read_root():
return {“Hello”: “World”}

@app.get(“/items/{item_id}”)
async def read_item(item_id: int):
time.sleep(0.1) # 模拟耗时操作
return {“item_id”: item_id}
“`

重要:中间件的添加顺序决定了其执行顺序。

  • 请求预处理阶段: 中间件按照添加的顺序从外到内依次执行(先添加的先执行预处理)。
  • 响应后处理阶段: 中间件按照添加顺序的逆序从内到外依次执行(后添加的先执行后处理)。

如果你添加了 MiddlewareA 然后是 MiddlewareB,请求流将是:
请求 -> MiddlewareA (预处理) -> MiddlewareB (预处理) -> 路由处理函数 -> MiddlewareB (后处理) -> MiddlewareA (后处理) -> 响应

理解这一点对于设计和调试中间件链至关重要。例如,一个认证中间件通常需要放在 CORS 中间件的 后面,因为 CORS 处理的是浏览器预检请求(OPTIONS),而认证通常只对实际请求(GET/POST等)有效。

三、FastAPI 中间件的常见应用场景

中间件在构建强大的 API 时扮演着核心角色。以下是一些常见的应用场景:

1. 请求/响应日志记录

中间件是记录所有请求和响应的理想位置。你可以在 await call_next(request) 之前记录请求信息,之后记录响应信息。

“`python
import logging
import time
from fastapi import FastAPI, Request, Response

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)

app = FastAPI()

@app.middleware(“http”) # 也可以使用装饰器语法
async def log_requests(request: Request, call_next):
start_time = time.time()

# 请求前日志
logger.info(f"Incoming Request: {request.method} {request.url} from {request.client.host}")

response = await call_next(request)

# 请求后日志
process_time = time.time() - start_time
logger.info(f"Outgoing Response: {request.method} {request.url} Status: {response.status_code} ProcessTime: {process_time:.4f}s")

return response

app.add_middleware(log_requests) # 使用装饰器或add_middleware均可

@app.get(“/”)
async def read_root():
return {“Hello”: “World”}
“`
这种方式提供了一个集中的请求监控点。

2. 添加或修改请求头/响应头

可以在请求预处理阶段添加或修改请求头(尽管修改入站请求头不常见且通常不推荐),在响应后处理阶段添加或修改响应头。

“`python
from fastapi import FastAPI, Request, Response

app = FastAPI()

async def add_custom_header_middleware(request: Request, call_next):
response = await call_next(request)
# 在响应中添加自定义头
response.headers[“X-App-Version”] = “1.0.0”
return response

app.add_middleware(add_custom_header_middleware)

@app.get(“/”)
async def read_root():
return {“message”: “This response has a custom header.”}
“`

3. 跨域资源共享 (CORS)

CORS 是 Web 开发中的常见问题。FastAPI 提供了内置的 CORSMiddleware,这是通过中间件实现的典型例子。它处理浏览器发送的预检请求(OPTIONS)以及在实际响应中添加必要的 CORS 头部。

“`python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

origins = [
“http://localhost”,
“http://localhost:8000”,
“https://my-production-app.com”,
]

app.add_middleware(
CORSMiddleware,
allow_origins=origins, # 允许的源列表
allow_credentials=True, # 允许发送 cookies/认证信息
allow_methods=[““], # 允许所有 HTTP 方法
allow_headers=[“
“], # 允许所有请求头
)

@app.get(“/”)
async def read_root():
return {“message”: “CORS is enabled for allowed origins.”}
“`
如果你需要非常定制化的 CORS 逻辑,也可以自己实现一个中间件,但这通常没有必要。

4. 身份认证与授权

在请求到达业务处理函数之前,中间件可以检查请求中是否包含有效的认证凭证(如 token、session cookie 等)。如果认证失败,中间件可以直接返回一个错误响应(例如 401 Unauthorized),从而阻止请求继续向下传递到业务逻辑。

“`python
from fastapi import FastAPI, Request, HTTPException, status

app = FastAPI()

async def auth_middleware(request: Request, call_next):
# 这是一个简化的示例,实际应用中需要更复杂的逻辑
auth_header = request.headers.get(“Authorization”)

# 检查是否需要跳过认证 (例如 /login 路径)
if request.url.path == "/login":
    response = await call_next(request)
    return response

if not auth_header:
    # 没有 Authorization 头,返回 401
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authorization header missing")

# 假设 token 格式是 "Bearer <token>"
parts = auth_header.split()
if parts[0].lower() != "bearer" or len(parts) != 2:
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Authorization header format")

token = parts[1]

# 在这里进行 token 验证 (例如查询数据库或验证 JWT)
# 假设 token == "valid_token" 表示验证成功
if token != "valid_token":
     raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")

# 如果认证成功,可以将用户信息添加到请求 state 或其他地方,供后续使用 (可选)
# request.state.user_id = 123

# 认证成功,继续处理请求
response = await call_next(request)
return response

app.add_middleware(auth_middleware)

@app.post(“/login”)
async def login():
# 模拟登录,返回一个 token
return {“token”: “valid_token”}

@app.get(“/protected”)
async def protected_route():
# 只有通过 auth_middleware 认证的请求才能到达这里
return {“message”: “You accessed a protected resource!”}

运行此应用,访问 /protected 需要在请求头中带上 “Authorization: Bearer valid_token”

“`
这个例子展示了中间件如何用于守卫路由。请注意,更复杂的授权逻辑(例如基于角色的访问控制)可能需要更高级的实现,可能结合依赖注入或路由器级别的处理。

5. 异常处理

虽然 FastAPI 提供了 @app.exception_handler() 来处理特定类型的异常,但中间件也可以用于捕获未被路由处理函数内部或更深层逻辑捕获的异常,并统一格式化响应。

“`python
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.responses import JSONResponse
import traceback # 导入 traceback 模块

app = FastAPI()

@app.middleware(“http”)
async def exception_handling_middleware(request: Request, call_next):
try:
response = await call_next(request)
return response
except Exception as exc:
# 捕获任何未处理的异常
status_code = 500
detail = “Internal Server Error”
if isinstance(exc, HTTPException):
status_code = exc.status_code
detail = exc.detail

    # 记录异常信息 (生产环境不应该返回详细traceback给客户端)
    print(f"Caught exception: {type(exc).__name__} - {exc}")
    traceback.print_exc() # 打印详细 traceback 到服务器日志

    # 返回统一格式的错误响应
    return JSONResponse(
        status_code=status_code,
        content={"detail": detail, "error_type": type(exc).__name__} # 可以在非生产环境加入更多信息
    )

@app.get(“/normal”)
async def normal_route():
return {“message”: “This is a normal response.”}

@app.get(“/error”)
async def error_route():
# 模拟一个运行时错误
result = 1 / 0
return {“result”: result}

@app.get(“/http-error”)
async def http_error_route():
# 模拟一个 HTTPException
raise HTTPException(status_code=400, detail=”Bad Request from route”)
``
中间件中的异常处理通常作为最后的保障,捕获那些未被特定异常处理器或路由内部
try…except` 块处理的错误。

6. 请求 ID 追踪

为每个进入的请求生成一个唯一的 ID,并在整个请求生命周期中(包括日志、传递给下游服务等)使用它,有助于追踪和调试。

“`python
from fastapi import FastAPI, Request, Response
import uuid

app = FastAPI()

@app.middleware(“http”)
async def request_id_middleware(request: Request, call_next):
request_id = request.headers.get(“X-Request-ID”)
if not request_id:
request_id = str(uuid.uuid4())

# 将 request_id 存储在 request.state 中,以便在路由处理函数中访问
request.state.request_id = request_id

# 在响应头中也包含 request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id

print(f"Request ID: {request_id}") # 可以结合日志记录

return response

@app.get(“/”)
async def read_root(request: Request):
# 在路由处理函数中访问 request_id
req_id = request.state.request_id
return {“message”: “Hello World”, “request_id”: req_id}
“`

7. 限流 (Rate Limiting)

尽管有专门的库(如 fastapi-limiter)来处理限流,但其核心逻辑也可以在中间件中实现:在 await call_next(request) 之前检查请求频率,如果超过限制则直接返回 429 Too Many Requests 响应。

“`python
import time
from fastapi import FastAPI, Request, Response, HTTPException, status
from collections import defaultdict
from threading import Lock # 简单的线程锁,对于异步应用需要更高级的锁或分布式缓存

app = FastAPI()

极简的内存限流示例 (不适合生产环境,因为不支持分布式和持久化)

RATE_LIMIT = 5 # 每分钟最多5次
RATE_LIMIT_INTERVAL = 60 # 秒

request_counts = defaultdict(list)
lock = Lock() # 用于保护 request_counts

async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host # 使用客户端IP作为标识

with lock:
    # 清理过期记录
    current_time = time.time()
    request_counts[client_ip] = [
        t for t in request_counts[client_ip] if t > current_time - RATE_LIMIT_INTERVAL
    ]

    # 检查是否超过限流
    if len(request_counts[client_ip]) >= RATE_LIMIT:
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail=f"Rate limit exceeded. Try again in {RATE_LIMIT_INTERVAL} seconds."
        )

    # 记录本次请求时间
    request_counts[client_ip].append(current_time)

response = await call_next(request)
return response

app.add_middleware(rate_limit_middleware) # 注意:这个简陋的限流在真实的异步环境中可能会有问题

@app.get(“/”)
async def read_root():
return {“message”: “You can access this route.”}

生产环境请使用 fastapi-limiter 等成熟库

“`
这是一个非常基础的示例,实际生产中需要使用 Redis 等外部存储来实现分布式、持久化且线程安全的限流。

8. 内置中间件

FastAPI(或其底层 Starlette)提供了一些常用的内置中间件,可以直接使用:

  • CORSMiddleware: 处理跨域请求。
  • GZipMiddleware: 对响应进行 GZip 压缩。
  • HTTPSRedirectMiddleware: 将所有 HTTP 请求重定向到 HTTPS。
  • TrustedHostMiddleware: 防止 HTTP Host 头攻击。
  • StaticFiles: 虽然更多被视为一个特殊的 endpoint 或 mount,但它也类似中间件,用于提供静态文件服务。

使用这些内置中间件非常简单,只需导入并添加到应用即可:

“`python
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware

app = FastAPI()

app.add_middleware(GZipMiddleware, minimum_size=1000) # 只对大于1000字节的响应进行压缩

@app.get(“/”)
async def read_root():
# 这个响应很小,不会被压缩(如果小于 minimum_size)
return {“Hello”: “World”}

@app.get(“/large_data”)
async def get_large_data():
# 这个响应可能较大,会被压缩
return {“data”: “A” * 2000}
“`

四、编写自定义中间件的最佳实践与注意事项

  1. 顺序至关重要: 仔细考虑中间件的添加顺序。例如,需要在认证 之后 进行授权,日志记录通常放在最外层以记录整个请求生命周期。
  2. 保持简洁和专注: 每个中间件应该只负责一项特定的功能。不要将多种不相关的逻辑揉合在一个中间件中。
  3. 避免阻塞: 函数式中间件是异步的 (async def),这意味着在 await call_next(request) 之前或之后执行的任何 I/O 操作(如数据库查询、外部 API 调用)都应该是异步的,以避免阻塞事件循环。如果必须执行同步阻塞操作,可以考虑使用 BaseHTTPMiddleware(它内部会使用线程池),或者在异步中间件中显式地使用 run_in_threadpool
  4. 优雅处理异常: 如果中间件可能抛出特定异常,考虑在中间件内部捕获并处理,或者确保有全局异常处理器能够捕获它并返回合适的错误响应。如果中间件内部抛出未捕获的异常,请求链会被中断,并可能导致服务器返回默认的 500 错误。
  5. 性能考虑: 每个中间件都会增加请求处理的开销。只添加你真正需要的中间件,并确保它们的执行效率尽可能高。
  6. 调试: 在中间件中添加日志输出是调试执行流程和排查问题的好方法。
  7. 访问请求/响应体: 在中间件中访问请求体(如 JSON 或表单数据)需要注意。请求体是流式的,一旦被读取就不能再次读取。如果你在中间件中读取了请求体,那么后续的中间件或路由处理函数可能无法再次读取它。解决方案是读取请求体后将其存储在 request.state 中,或者使用 Starlette 提供的工具(如 request.json(), request.form(), request.body()),它们会在内部处理好缓存,允许被多次调用(但要注意内存消耗)。类似地,读取响应体(response.body())通常需要你替换掉原始响应对象,因为响应体也是流式的。
  8. 测试: 编写测试用例来验证中间件的行为,确保它们按照预期工作,特别是当中间件之间存在依赖关系时。

五、总结

FastAPI 的中间件机制是其强大和灵活性的重要体现。通过将横切关注点从核心业务逻辑中分离出来,并以可插拔的方式组织,我们可以构建出更加模块化、可维护和可扩展的 API 应用。

理解中间件的工作原理——请求流经一个异步调用链,在 call_next 之前进行预处理,之后进行后处理——是正确使用它的关键。无论是使用简洁的函数式中间件还是基于类的 BaseHTTPMiddleware,都可以有效地处理日志、认证、CORS、异常等常见任务。结合 FastAPI 提供的内置中间件和自定义实现,开发者能够轻松地为 API 添加各种功能层,而无需污染核心业务代码,从而专注于构建高质量的应用程序。

在实践中,合理规划中间件链的顺序,保持每个中间件的单一职责,并注意性能和异常处理,将帮助你充分发挥 FastAPI 中间件的威力。


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部