FastAPI ORM 教程:快速入门指南 – wiki基地


FastAPI ORM 教程:快速入门指南 (详细版)

引言

FastAPI 以其卓越的性能、易用性和强大的类型提示支持,迅速成为现代 Web API 开发的热门选择。它基于 Starlette 和 Pydantic 构建,提供了异步支持、自动数据验证和交互式 API 文档等开箱即用的功能。然而,任何复杂的 Web 应用程序通常都需要与数据库进行交互。直接编写 SQL 查询语句虽然灵活,但往往冗长、容易出错,且难以维护。

这就是对象关系映射器(Object-Relational Mapper, ORM)发挥作用的地方。ORM 允许开发者使用面向对象的方式来操作数据库,将数据库表映射为 Python 类(模型),将表记录映射为类的实例。这极大地提高了开发效率,减少了重复代码,并增强了代码的可读性和可维护性。

在 Python 生态系统中,SQLAlchemy 是最成熟、功能最强大、社区最活跃的 ORM 之一。它与 FastAPI 结合使用,能够构建出既高性能又易于维护的数据库驱动的 API。

本教程将详细指导你如何在 FastAPI 项目中集成 SQLAlchemy,实现基本的 CRUD(创建、读取、更新、删除)操作,并理解其核心概念。我们将从项目设置开始,逐步构建一个简单的待办事项(Todo)API 作为示例。虽然我们将重点关注 SQLAlchemy,但其核心概念也适用于其他与 FastAPI 兼容的 ORM(如 Tortoise ORM 或 SQLModel)。

为什么选择 FastAPI + ORM (SQLAlchemy)?

  1. 开发效率: ORM 让你用 Python 代码操作数据库,无需编写繁琐的 SQL 语句。模型定义清晰,业务逻辑更集中。
  2. 类型安全: FastAPI 强制使用 Pydantic 进行数据验证和序列化,结合 SQLAlchemy 的模型,可以在多个层面确保数据类型的一致性,减少运行时错误。
  3. 代码可维护性: ORM 抽象了数据库细节,使得更换数据库(如从 SQLite 切换到 PostgreSQL)变得相对容易。代码结构更清晰,更易于团队协作和长期维护。
  4. 性能: FastAPI 本身是高性能的异步框架。SQLAlchemy 也提供了对异步操作的支持(通过 asyncio 扩展),允许你在 I/O 密集型的数据库操作中充分利用异步优势,避免阻塞事件循环。
  5. 生态系统: FastAPI 和 SQLAlchemy 都拥有庞大而活跃的社区,提供了丰富的文档、教程和第三方扩展。

前提条件

  • 熟悉 Python 基础语法。
  • 了解基本的 Web API 概念 (HTTP 方法, 请求/响应)。
  • 安装了 Python 3.7+。
  • 熟悉命令行/终端操作。

一、项目设置与环境准备

首先,我们需要创建一个项目目录,并设置一个虚拟环境来隔离项目依赖。

  1. 创建项目目录:
    bash
    mkdir fastapi_orm_tutorial
    cd fastapi_orm_tutorial

  2. 创建并激活虚拟环境:

    • Linux/macOS:
      bash
      python3 -m venv venv
      source venv/bin/activate
    • Windows:
      bash
      python -m venv venv
      .\venv\Scripts\activate

      激活后,你的终端提示符前应该会显示 (venv)
  3. 安装必要的库:
    我们将安装 FastAPI、Uvicorn(ASGI 服务器)、SQLAlchemy 以及用于与数据库交互的驱动程序。为了简单起见,我们将首先使用 SQLite 数据库,它不需要单独的数据库服务器。对于异步操作,我们将需要 aiosqlite。对于生产环境或需要更强大功能的场景,通常会使用 PostgreSQL 或 MySQL,对应的异步驱动分别是 asyncpgaiomysql

    bash
    pip install fastapi "uvicorn[standard]" sqlalchemy "pydantic[email]" aiosqlite asyncpg psycopg2-binary

    * fastapi: 核心框架。
    * uvicorn[standard]: ASGI 服务器,用于运行 FastAPI 应用。[standard] 包含 httptoolsuvloop (在 Linux/macOS 上) 以获得更好的性能。
    * sqlalchemy: 核心 ORM 库。
    * pydantic[email]: FastAPI 用于数据验证的基础库。[email] 是可选的,用于 Pydantic 的 Email 类型验证。
    * aiosqlite: SQLite 的异步驱动。
    * asyncpg: PostgreSQL 的异步驱动(如果计划使用 PostgreSQL)。
    * psycopg2-binary: PostgreSQL 的同步驱动(某些 SQLAlchemy 功能可能仍需)。

二、数据库配置 (database.py)

我们需要配置 SQLAlchemy 与数据库的连接。创建一个名为 database.py 的文件,用于存放数据库相关的设置和会话管理。

我们将同时展示同步和异步的设置方式,但在初级示例中,先侧重异步方式,因为它与 FastAPI 的异步特性更契合。

“`python

database.py

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
import os
from dotenv import load_dotenv

优先从 .env 文件加载环境变量(如果存在)

load_dotenv()

— 异步配置 —

使用环境变量或默认值配置数据库 URL

对于 SQLite (异步): “sqlite+aiosqlite:///./test.db”

对于 PostgreSQL (异步): “postgresql+asyncpg://user:password@host:port/dbname”

ASYNC_DATABASE_URL = os.getenv(“ASYNC_DATABASE_URL”, “sqlite+aiosqlite:///./test_async.db”)

创建异步引擎

echo=True 会打印执行的 SQL 语句,便于调试

async_engine = create_async_engine(ASYNC_DATABASE_URL, echo=True)

创建异步会话工厂

expire_on_commit=False 防止在提交后访问已提交的对象时需要重新查询

AsyncSessionLocal = sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False
)

— 同步配置 (可选,用于对比或特定场景) —

SYNC_DATABASE_URL = os.getenv(“SYNC_DATABASE_URL”, “sqlite:///./test_sync.db”)

sync_engine = create_engine(SYNC_DATABASE_URL, echo=True)

SyncSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=sync_engine)

— 数据库模型基类 —

所有 ORM 模型都需要继承这个 Base 类

Base = declarative_base()

— 异步数据库会话依赖 —

async def get_async_db():
“””
FastAPI 依赖项,用于获取异步数据库会话。
使用 async with 语句确保会话在使用后正确关闭。
“””
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit() # 尝试提交事务
except Exception as e:
await session.rollback() # 发生错误时回滚
raise e
finally:
await session.close() # 确保会话关闭

— 同步数据库会话依赖 (可选) —

def get_sync_db():

db = SyncSessionLocal()

try:

yield db

finally:

db.close()

print(f”使用异步数据库: {ASYNC_DATABASE_URL}”)

print(f”使用同步数据库: {SYNC_DATABASE_URL}”) # 如果使用同步

“`

说明:

  • ASYNC_DATABASE_URL: 定义了数据库连接字符串。格式为 driver+dialect://user:password@host:port/database。我们默认使用 sqlite+aiosqlite 指向当前目录下的 test_async.db 文件。
  • create_async_engine: 创建 SQLAlchemy 的异步引擎实例。echo=True 用于调试,它会打印所有执行的 SQL 语句。
  • AsyncSessionLocal: 创建一个异步会话工厂 (sessionmaker)。AsyncSession 是 SQLAlchemy 提供的异步会话类。expire_on_commit=False 是一个常用设置,防止在事务提交后 ORM 对象过期。
  • Base: declarative_base() 返回一个类,我们的所有数据库模型都需要继承它。SQLAlchemy 会通过这个基类来发现和管理所有的表模型。
  • get_async_db: 这是一个异步生成器函数,设计为 FastAPI 的依赖项 (Dependency)。它创建一个新的 AsyncSession,通过 yield 提供给路径操作函数,并在请求处理完成后(无论成功或失败)确保会话被正确关闭(通过 async withfinally)。添加了 try...except...finally 块来处理事务:成功时提交,出错时回滚。

三、定义 ORM 模型 (models.py)

模型是数据库表的 Python 表示。创建一个名为 models.py 的文件,定义我们的数据模型。我们将创建一个简单的 Todo 模型。

“`python

models.py

from sqlalchemy import Column, Integer, String, Boolean, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func # 用于数据库级别的默认值,如时间戳
from .database import Base # 从 database.py 导入 Base

class User(Base):
tablename = “users”

id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())

# 定义关系:一个用户可以有多个 Todo 项
todos = relationship("Todo", back_populates="owner")

class Todo(Base):
tablename = “todos” # 表名

# 定义列
id = Column(Integer, primary_key=True, index=True) # 主键,自动增长,加索引
title = Column(String, index=True, nullable=False) # 标题,加索引,不允许为空
description = Column(String, index=True, nullable=True) # 描述,加索引,允许为空
completed = Column(Boolean, default=False) # 完成状态,默认为 False
created_at = Column(DateTime(timezone=True), server_default=func.now()) # 创建时间,数据库自动设置
updated_at = Column(DateTime(timezone=True), onupdate=func.now()) # 更新时间,数据库自动更新

# 外键,关联到 users 表的 id 列
owner_id = Column(Integer, ForeignKey("users.id"))

# 定义关系:这个 Todo 项属于哪个 User
# back_populates 与 User 模型中的 'todos' 关系相对应,建立双向关系
owner = relationship("User", back_populates="todos")

“`

说明:

  • 每个类都继承自 database.Base
  • __tablename__: 指定了数据库中对应的表名。
  • Column: 定义表的列。参数包括:
    • 类型 (Integer, String, Boolean, DateTime 等)。
    • primary_key=True: 指定为主键。
    • index=True: 在该列上创建索引,加快查询速度。
    • unique=True: 该列的值必须唯一。
    • nullable=False: 该列不允许为空。
    • default: Python 级别的默认值。
    • server_default: 数据库级别的默认值(如 func.now() 获取当前时间戳)。
    • onupdate: 数据库级别的更新触发器(如 func.now() 在更新时设置时间戳)。
  • ForeignKey: 定义外键约束,链接到 users 表的 id 列。
  • relationship: 定义模型之间的关系。
    • Todo 中的 owner = relationship("User", back_populates="todos") 表示一个 Todo 属于一个 User,并且通过 User 模型中名为 todos 的关系属性进行反向关联。
    • User 中的 todos = relationship("Todo", back_populates="owner") 表示一个 User 可以拥有多个 Todo 项,通过 Todo 模型中名为 owner 的关系属性进行反向关联。back_populates 建立了这种双向链接。

四、定义 Pydantic Schemas (schemas.py)

虽然 SQLAlchemy 模型定义了数据库结构,但 API 的输入和输出通常需要不同的结构或字段子集。例如,创建用户时不应传入 id,响应中可能不包含密码。Pydantic 模型(在 FastAPI 中称为 Schemas)用于定义 API 的数据接口。

创建一个名为 schemas.py 的文件。

“`python

schemas.py

from pydantic import BaseModel, EmailStr, ConfigDict
from typing import Optional, List
from datetime import datetime

— Todo Schemas —

基础 Schema,包含共享字段

class TodoBase(BaseModel):
title: str
description: Optional[str] = None # description 是可选的

创建 Todo 时使用的 Schema (输入)

继承自 TodoBase,不需要 id, created_at, updated_at, owner_id, completed (有默认值)

class TodoCreate(TodoBase):
pass # 目前与 TodoBase 相同,但可以添加特定于创建的字段

更新 Todo 时使用的 Schema (输入)

所有字段都是可选的,因为是部分更新

class TodoUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
completed: Optional[bool] = None

从数据库读取 Todo 时使用的 Schema (输出)

继承自 TodoBase,并包含数据库生成的字段

class Todo(TodoBase):
id: int
completed: bool
created_at: datetime
updated_at: Optional[datetime] = None # 可能还没有更新过
owner_id: int

# Pydantic V2 配置: 允许从 ORM 对象属性读取数据
model_config = ConfigDict(from_attributes=True)
# Pydantic V1 写法:
# class Config:
#     orm_mode = True

— User Schemas —

class UserBase(BaseModel):
email: EmailStr # 使用 Pydantic 的 EmailStr 进行邮箱格式验证

class UserCreate(UserBase):
password: str # 创建用户时需要密码

class User(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: Optional[datetime] = None
todos: List[Todo] = [] # 包含关联的 Todo 列表

model_config = ConfigDict(from_attributes=True)
# Pydantic V1 写法:
# class Config:
#     orm_mode = True

“`

说明:

  • 我们为 TodoUser 定义了多个 Pydantic 模型:
    • Base: 包含所有模型共有的基础字段。
    • Create: 用于 API 创建操作的输入验证。通常不包含 id、时间戳等由数据库或服务器生成的字段。
    • Update: 用于 API 更新操作的输入验证。字段通常是可选的 (Optional)。
    • 无后缀 (如 Todo, User): 用于 API 读取操作的输出。包含数据库中的所有相关字段,包括 id 和时间戳。
  • Optional[T] 表示该字段是可选的。
  • EmailStr 是 Pydantic 提供的特殊类型,用于验证电子邮件格式。
  • model_config = ConfigDict(from_attributes=True) (Pydantic V2) 或 Config.orm_mode = True (Pydantic V1) 是关键!它告诉 Pydantic 模型可以直接从 ORM 对象(具有属性访问权限的对象)而不是仅仅从字典中读取数据。这使得我们可以直接将 SQLAlchemy 查询结果传递给 Pydantic 模型进行序列化。
  • User schema 中,todos: List[Todo] = [] 定义了嵌套的 Schema,用于在获取用户信息时一并返回其拥有的 Todo 列表。from_attributes=True 会自动处理这种关系加载(如果 SQLAlchemy 对象已经加载了关系数据)。

五、实现 CRUD 操作 (crud.py)

将数据库操作逻辑(创建、读取、更新、删除)封装在单独的函数中是一种良好的实践,这有助于保持 API 路由处理程序的简洁性。创建一个名为 crud.py 的文件。

“`python

crud.py

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select # SQLAlchemy 2.0+ 风格的 select
from sqlalchemy.orm import selectinload # 用于预加载关系数据
from . import models, schemas
from typing import List, Optional
import bcrypt # 用于密码哈希

— User CRUD —

async def get_user(db: AsyncSession, user_id: int) -> Optional[models.User]:
“””根据 ID 获取单个用户”””
result = await db.execute(select(models.User).filter(models.User.id == user_id))
return result.scalar_one_or_none()

async def get_user_by_email(db: AsyncSession, email: str) -> Optional[models.User]:
“””根据 Email 获取单个用户”””
result = await db.execute(select(models.User).filter(models.User.email == email))
return result.scalar_one_or_none()

async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100) -> List[models.User]:
“””获取用户列表,支持分页”””
result = await db.execute(
select(models.User).offset(skip).limit(limit)
)
return result.scalars().all()

def get_password_hash(password: str) -> str:
“””哈希密码”””
# 注意:实际应用中 salt 应妥善管理,或使用更高级的库如 passlib
pwd_bytes = password.encode(‘utf-8’)
salt = bcrypt.gensalt()
hashed_password = bcrypt.hashpw(pwd_bytes, salt)
return hashed_password.decode(‘utf-8’) # 存储为字符串

async def create_user(db: AsyncSession, user: schemas.UserCreate) -> models.User:
“””创建新用户”””
hashed_password = get_password_hash(user.password)
db_user = models.User(
email=user.email,
hashed_password=hashed_password
# is_active 默认为 True, created_at/updated_at 由数据库自动生成
)
db.add(db_user)
# 注意:这里不 commit,由 get_async_db 依赖项处理事务
await db.flush() # 将对象添加到会话,但不提交,以便获取 ID(如果需要)
await db.refresh(db_user) # 刷新对象状态,获取数据库生成的默认值
return db_user

— Todo CRUD —

async def get_todo(db: AsyncSession, todo_id: int) -> Optional[models.Todo]:
“””根据 ID 获取单个 Todo 项”””
result = await db.execute(
select(models.Todo).filter(models.Todo.id == todo_id)
)
return result.scalar_one_or_none()

async def get_todos(db: AsyncSession, skip: int = 0, limit: int = 100) -> List[models.Todo]:
“””获取 Todo 列表,支持分页”””
result = await db.execute(
select(models.Todo).offset(skip).limit(limit)
)
return result.scalars().all()

async def get_todos_by_owner(db: AsyncSession, owner_id: int, skip: int = 0, limit: int = 100) -> List[models.Todo]:
“””获取特定用户的 Todo 列表”””
result = await db.execute(
select(models.Todo)
.filter(models.Todo.owner_id == owner_id)
.offset(skip)
.limit(limit)
)
return result.scalars().all()

async def create_user_todo(db: AsyncSession, todo: schemas.TodoCreate, user_id: int) -> models.Todo:
“””为特定用户创建新的 Todo 项”””
db_todo = models.Todo(
**todo.model_dump(), # 从 Pydantic 模型解包字段
owner_id=user_id
# completed 默认为 False, created_at/updated_at 由数据库自动生成
)
db.add(db_todo)
await db.flush()
await db.refresh(db_todo)
return db_todo

async def update_todo(db: AsyncSession, todo_id: int, todo_update: schemas.TodoUpdate) -> Optional[models.Todo]:
“””更新 Todo 项”””
db_todo = await get_todo(db, todo_id)
if db_todo is None:
return None

# 获取 Pydantic 模型中已设置(非 None)的字段进行更新
update_data = todo_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
    setattr(db_todo, key, value)

db.add(db_todo) # 将更改添加到会话
await db.flush()
await db.refresh(db_todo)
return db_todo

async def delete_todo(db: AsyncSession, todo_id: int) -> Optional[models.Todo]:
“””删除 Todo 项”””
db_todo = await get_todo(db, todo_id)
if db_todo:
await db.delete(db_todo)
# 注意:删除操作也需要 flush 或 commit 才能生效
await db.flush() # 或者直接在 get_async_db 中 commit
return db_todo
return None

async def get_user_with_todos(db: AsyncSession, user_id: int) -> Optional[models.User]:
“””获取用户及其所有 Todo 项(使用关系预加载)”””
# selectinload 用于高效加载一对多或多对多关系
result = await db.execute(
select(models.User)
.options(selectinload(models.User.todos))
.filter(models.User.id == user_id)
)
return result.scalar_one_or_none()
“`

说明:

  • 所有数据库操作函数都接受 db: AsyncSession 作为第一个参数,这个会话将由 FastAPI 的依赖注入系统提供。
  • 使用了 async defawait,因为我们配置的是异步数据库连接和会话。
  • 使用了 SQLAlchemy 2.0+ 风格的 select() 语句,这是推荐的现代用法。
  • db.add(): 将新创建或已修改的 ORM 对象添加到会话中,准备持久化。
  • await db.flush(): 将会话中的更改发送到数据库,但不提交事务。这对于获取数据库生成的 ID 或检查约束很有用。
  • await db.refresh(obj): 在 flushcommit 后,用数据库中的最新数据更新 Python 对象的状态(例如获取自动生成的主键或默认值)。
  • await db.delete(obj): 标记一个对象以便在下次 flushcommit 时删除。
  • result.scalar_one_or_none(): 执行查询并返回单个结果,如果没有结果则返回 None
  • result.scalars().all(): 执行查询并返回所有结果的列表。.scalars() 用于获取每行的第一个元素(即我们的 ORM 对象)。
  • model_dump() (Pydantic V2) 或 dict() (Pydantic V1): 用于将 Pydantic 模型转换为字典,方便地传递给 SQLAlchemy 模型构造函数或更新操作。exclude_unset=True 在更新时很有用,它只包含显式设置了值的字段。
  • selectinload: 是一种关系加载策略,用于在查询主对象(如 User)时,通过一次额外的查询(或 JOIN,取决于配置)预先加载关联的对象(如 Todos)。这可以避免在后续访问关系属性时产生 N+1 查询问题,提高性能。
  • 密码哈希使用了 bcrypt 库。注意: 这是一个基础示例。生产环境中应使用更健壮的密码管理库(如 passlib)并妥善处理 salt。

六、创建 FastAPI 应用和 API 路由 (main.py)

现在我们将所有部分组合起来,创建 FastAPI 应用实例并定义 API 端点。创建一个名为 main.py 的文件。

“`python

main.py

from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List

from . import crud, models, schemas
from .database import async_engine, Base, get_async_db # 导入异步相关项

— 数据库初始化 —

注意:在生产环境中,通常使用数据库迁移工具(如 Alembic)来管理表结构

async def create_db_and_tables():
async with async_engine.begin() as conn:
# 如果表已存在,不会重复创建
# await conn.run_sync(Base.metadata.drop_all) # 可选:开发时清空数据库
await conn.run_sync(Base.metadata.create_all)
print(“数据库表已创建 (如果不存在)”)

— FastAPI 应用实例 —

app = FastAPI(title=”FastAPI ORM Tutorial”, description=”一个使用 FastAPI 和 SQLAlchemy (异步) 的示例 API”)

— 应用启动事件 —

在应用启动时尝试创建数据库表

@app.on_event(“startup”)
async def on_startup():
await create_db_and_tables()

— API 路由 —

用户相关的端点

@app.post(“/users/”, response_model=schemas.User, status_code=status.HTTP_201_CREATED, tags=[“Users”])
async def create_new_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_async_db)):
“””创建一个新用户”””
db_user = await crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=”邮箱已被注册”)
created_user = await crud.create_user(db=db, user=user)
return created_user

@app.get(“/users/”, response_model=List[schemas.User], tags=[“Users”])
async def read_all_users(skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db)):
“””获取用户列表(带分页)”””
users = await crud.get_users(db, skip=skip, limit=limit)
return users

@app.get(“/users/{user_id}”, response_model=schemas.User, tags=[“Users”])
async def read_single_user(user_id: int, db: AsyncSession = Depends(get_async_db)):
“””根据 ID 获取单个用户”””
db_user = await crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”用户未找到”)
return db_user

@app.get(“/users/{user_id}/details”, response_model=schemas.User, tags=[“Users”])
async def read_user_with_todos(user_id: int, db: AsyncSession = Depends(get_async_db)):
“””获取用户及其所有 Todo 项”””
db_user = await crud.get_user_with_todos(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”用户未找到”)
# Pydantic 的 from_attributes=True 会自动处理嵌套的 todos 列表
return db_user

Todo 相关的端点

@app.post(“/users/{user_id}/todos/”, response_model=schemas.Todo, status_code=status.HTTP_201_CREATED, tags=[“Todos”])
async def create_todo_for_user(
user_id: int, todo: schemas.TodoCreate, db: AsyncSession = Depends(get_async_db)
):
“””为指定用户创建 Todo 项”””
# 可以在这里加一步验证 user_id 是否存在,或者让数据库外键约束处理
db_user = await crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”用户未找到,无法创建 Todo”)
created_todo = await crud.create_user_todo(db=db, todo=todo, user_id=user_id)
return created_todo

@app.get(“/todos/”, response_model=List[schemas.Todo], tags=[“Todos”])
async def read_all_todos(skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db)):
“””获取所有 Todo 项(带分页)”””
todos = await crud.get_todos(db, skip=skip, limit=limit)
return todos

@app.get(“/users/{user_id}/todos/”, response_model=List[schemas.Todo], tags=[“Todos”])
async def read_user_todos(user_id: int, skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db)):
“””获取指定用户的所有 Todo 项”””
# 验证用户是否存在
db_user = await crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”用户未找到”)
todos = await crud.get_todos_by_owner(db=db, owner_id=user_id, skip=skip, limit=limit)
return todos

@app.get(“/todos/{todo_id}”, response_model=schemas.Todo, tags=[“Todos”])
async def read_single_todo(todo_id: int, db: AsyncSession = Depends(get_async_db)):
“””根据 ID 获取单个 Todo 项”””
db_todo = await crud.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”Todo 项未找到”)
return db_todo

@app.put(“/todos/{todo_id}”, response_model=schemas.Todo, tags=[“Todos”])
async def update_existing_todo(
todo_id: int, todo_update: schemas.TodoUpdate, db: AsyncSession = Depends(get_async_db)
):
“””更新指定的 Todo 项”””
updated_todo = await crud.update_todo(db=db, todo_id=todo_id, todo_update=todo_update)
if updated_todo is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”Todo 项未找到,无法更新”)
return updated_todo

@app.delete(“/todos/{todo_id}”, status_code=status.HTTP_204_NO_CONTENT, tags=[“Todos”])
async def delete_existing_todo(todo_id: int, db: AsyncSession = Depends(get_async_db)):
“””删除指定的 Todo 项”””
deleted_todo = await crud.delete_todo(db=db, todo_id=todo_id)
if deleted_todo is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”Todo 项未找到,无法删除”)
# 对于 204 状态码,FastAPI 默认不返回任何内容体
# 如果需要返回被删除的对象信息,可以修改 response_model 和 status_code
return None # 或者 return Response(status_code=status.HTTP_204_NO_CONTENT)

根路径

@app.get(“/”, tags=[“Root”])
async def root():
return {“message”: “欢迎来到 FastAPI ORM 教程 API!”}

“`

说明:

  • create_db_and_tables(): 这个异步函数使用 async_engine 来创建 models.py 中定义的所有表(如果它们尚不存在)。我们通过 @app.on_event("startup") 注册它,使其在 FastAPI 应用启动时自动执行。重要提示: 在生产环境中,应使用数据库迁移工具(如 Alembic)来管理数据库模式的变更,而不是依赖 create_allcreate_all 不会处理表的修改或删除。
  • app = FastAPI(...): 创建 FastAPI 应用实例。
  • @app.post(...), @app.get(...), @app.put(...), @app.delete(...): 定义 API 路由(路径操作)。
  • response_model: 指定了 API 端点成功响应时应该返回的数据结构(Pydantic schema)。FastAPI 会自动使用这个模型来序列化返回的数据,并过滤掉不必要的字段。
  • status_code: 定义了成功响应时的 HTTP 状态码。
  • tags: 用于在自动生成的 API 文档(Swagger UI / ReDoc)中对端点进行分组。
  • Depends(get_async_db): 这是 FastAPI 的依赖注入。对于每个需要数据库访问的请求,FastAPI 会调用 get_async_db 函数,并将返回的 AsyncSession 实例注入到路径操作函数的 db 参数中。get_async_db 中的 yield 确保了会话在使用后会被妥善处理(提交或回滚,然后关闭)。
  • 路径参数(如 {user_id})和查询参数(如 skip, limit)通过函数参数的类型提示自动解析和验证。
  • 请求体(如 POST /users/ 中的 user: schemas.UserCreate)通过 Pydantic 模型进行自动解析和验证。如果请求数据不符合 UserCreate 的结构,FastAPI 会自动返回 422 Unprocessable Entity 错误。
  • HTTPException: 用于在特定条件下(如资源未找到、请求无效)返回标准的 HTTP 错误响应。

七、运行应用程序

现在,一切准备就绪,可以运行我们的 FastAPI 应用了。

  1. 确保你在项目根目录 (fastapi_orm_tutorial) 并且虚拟环境已激活。

  2. 启动 Uvicorn 服务器:
    bash
    uvicorn main:app --reload --host 0.0.0.0 --port 8000

    • main:app: 指向 main.py 文件中的 app FastAPI 实例。
    • --reload: 开发时非常有用,当代码文件更改时,服务器会自动重启。
    • --host 0.0.0.0: 使服务器可以从网络中的其他机器访问(如果需要)。
    • --port 8000: 指定监听的端口。

    你应该会看到类似以下的输出,表明服务器正在运行,并且数据库表已创建(如果是第一次运行):

    INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
    INFO: Started reloader process [xxxxx] using statreload
    INFO: Started server process [xxxxx]
    INFO: Waiting for application startup.
    使用异步数据库: sqlite+aiosqlite:///./test_async.db
    INFO: Loading module main from file fastapi_orm_tutorial/main.py...
    INFO: Will watch for changes in directory: fastapi_orm_tutorial/
    INFO: Reloader statreload detected changes in 'main.py'. Reloading...
    INFO: Shutting down
    INFO: Waiting for application shutdown.
    INFO: Application shutdown complete.
    INFO: Finished server process [xxxxx]
    INFO: Started reloader process [xxxxx] using statreload
    INFO: Started server process [xxxxx]
    INFO: Waiting for application startup.
    使用异步数据库: sqlite+aiosqlite:///./test_async.db
    INFO: Application startup complete.
    INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
    数据库表已创建 (如果不存在) # 这句来自我们的启动事件

  3. 访问 API 文档:
    打开浏览器,访问 http://127.0.0.1:8000/docs (或者如果你使用了不同的 host/port,请相应修改)。你应该能看到 FastAPI 自动生成的 Swagger UI 交互式文档。这里列出了我们定义的所有 API 端点,你可以直接在浏览器中测试它们!

    • 尝试创建一个用户 (POST /users/)。
    • 尝试获取用户列表 (GET /users/)。
    • 尝试为用户创建 Todo 项 (POST /users/{user_id}/todos/)。
    • 尝试获取所有 Todo 或特定用户的 Todo。
    • 尝试更新和删除 Todo 项。

八、关于同步与异步的选择

本教程重点演示了异步方式 (asyncio + AsyncSession),因为它最能发挥 FastAPI 的性能优势,尤其是在处理大量并发请求和 I/O 密集型操作(如数据库查询)时。

然而,在某些情况下,同步方式可能仍然适用:

  • 项目初期或简单应用,对并发性能要求不高。
  • 团队对异步编程还不太熟悉。
  • 使用了不支持 asyncio 的旧库或驱动。

如果选择同步,你需要:

  1. 修改 database.py:

    • 使用同步数据库 URL(如 sqlite:///./test_sync.dbpostgresql://user:password@host/dbname)。
    • 使用 create_engine 代替 create_async_engine
    • 使用 sessionmaker 绑定到同步引擎,并不指定 class_=AsyncSession
    • 创建一个同步的 get_db 依赖项。
  2. 修改 crud.py:

    • 函数定义移除 async 关键字。
    • 移除所有 await 关键字。
    • 数据库操作(如 db.execute, db.commit, db.refresh, db.delete)使用同步版本。
    • result.scalar_one_or_none()result.scalars().all() 等用法保持不变。
  3. 修改 main.py:

    • 所有路径操作函数移除 async 关键字。
    • 依赖项改为 Depends(get_sync_db)

同步代码通常更直接,但可能会在数据库操作期间阻塞 FastAPI 的事件循环,影响并发处理能力。对于现代 FastAPI 应用,强烈推荐尽可能使用异步方式

九、进阶主题与最佳实践

这个快速入门指南涵盖了基础,但实际项目中还有更多需要考虑:

  1. 数据库迁移 (Alembic): 如前所述,Base.metadata.create_all 不适用于生产环境。Alembic 是 SQLAlchemy 官方推荐的数据库迁移工具,它可以跟踪模型变化并生成迁移脚本来更新数据库模式,而不会丢失数据。
  2. 更复杂的关系: SQLAlchemy 支持一对一、多对多等更复杂的关系,以及丰富的加载策略(joinedload, subqueryload 等)来优化查询性能。
  3. 错误处理: 更精细地处理数据库约束错误(如唯一性冲突)、连接错误等,并返回更具体的 HTTP 状态码和错误信息。
  4. 依赖注入的进阶使用: 可以创建更复杂的依赖项,例如获取当前登录用户,进行权限检查等。
  5. 测试: 编写单元测试和集成测试来确保 CRUD 操作和 API 端点的正确性。可以使用内存数据库(如 SQLite in-memory)或测试专用的数据库实例。FastAPI 提供了 TestClient 来方便地测试 API。
  6. 代码组织: 对于大型项目,可以将路由、模型、schemas 和 CRUD 操作按功能模块(如 users, items)组织到不同的子目录和文件中,使用 FastAPI 的 APIRouter 来管理。
  7. 配置管理: 使用环境变量、配置文件(如 .env 文件配合 python-dotenv 库,或 Pydantic 的 BaseSettings)来管理数据库连接字符串、密钥等敏感信息,而不是硬编码在代码中。
  8. 安全性:
    • 密码管理: 使用 passlib 等库进行安全的密码哈希和验证。
    • 认证与授权: 实现用户登录、令牌(如 JWT)生成与验证、基于角色的访问控制等。FastAPI 提供了 OAuth2PasswordBearer 等工具。
    • 输入验证: 充分利用 Pydantic 的验证能力,对所有输入数据进行严格校验。
    • SQL 注入防护: 使用 ORM 通常可以防止大多数 SQL 注入,但仍需注意不要手动拼接 SQL 字符串。
  9. 性能调优: 分析慢查询(echo=True 或数据库日志),优化索引,选择合适的查询策略和关系加载方式。
  10. SQLModel: 由 FastAPI 作者创建,旨在结合 SQLAlchemy 和 Pydantic,提供单一的模型类来同时定义数据库表结构和 API 数据模式。如果你喜欢这种整合方式,可以考虑使用 SQLModel。

结论

将 FastAPI 与 ORM(特别是强大的 SQLAlchemy)结合使用,可以构建出高性能、类型安全且易于维护的现代 Web API。通过理解数据库配置、模型定义、Pydantic schemas、CRUD 操作封装以及 FastAPI 的依赖注入机制,你可以高效地开发数据库驱动的应用程序。

本教程提供了一个全面的起点,涵盖了从项目设置到实现基本 CRUD 操作的完整流程,并重点介绍了推荐的异步方式。希望这个详细的指南能帮助你快速上手 FastAPI 和 ORM 的集成,并为进一步探索更高级的功能打下坚实的基础。继续实践,探索文档,并参与社区,你将能够充分利用这对强大组合的潜力!


发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

滚动至顶部