FastAPI ORM 教程:快速入门指南 (详细版)
引言
FastAPI 以其卓越的性能、易用性和强大的类型提示支持,迅速成为现代 Web API 开发的热门选择。它基于 Starlette 和 Pydantic 构建,提供了异步支持、自动数据验证和交互式 API 文档等开箱即用的功能。然而,任何复杂的 Web 应用程序通常都需要与数据库进行交互。直接编写 SQL 查询语句虽然灵活,但往往冗长、容易出错,且难以维护。
这就是对象关系映射器(Object-Relational Mapper, ORM)发挥作用的地方。ORM 允许开发者使用面向对象的方式来操作数据库,将数据库表映射为 Python 类(模型),将表记录映射为类的实例。这极大地提高了开发效率,减少了重复代码,并增强了代码的可读性和可维护性。
在 Python 生态系统中,SQLAlchemy 是最成熟、功能最强大、社区最活跃的 ORM 之一。它与 FastAPI 结合使用,能够构建出既高性能又易于维护的数据库驱动的 API。
本教程将详细指导你如何在 FastAPI 项目中集成 SQLAlchemy,实现基本的 CRUD(创建、读取、更新、删除)操作,并理解其核心概念。我们将从项目设置开始,逐步构建一个简单的待办事项(Todo)API 作为示例。虽然我们将重点关注 SQLAlchemy,但其核心概念也适用于其他与 FastAPI 兼容的 ORM(如 Tortoise ORM 或 SQLModel)。
为什么选择 FastAPI + ORM (SQLAlchemy)?
- 开发效率: ORM 让你用 Python 代码操作数据库,无需编写繁琐的 SQL 语句。模型定义清晰,业务逻辑更集中。
- 类型安全: FastAPI 强制使用 Pydantic 进行数据验证和序列化,结合 SQLAlchemy 的模型,可以在多个层面确保数据类型的一致性,减少运行时错误。
- 代码可维护性: ORM 抽象了数据库细节,使得更换数据库(如从 SQLite 切换到 PostgreSQL)变得相对容易。代码结构更清晰,更易于团队协作和长期维护。
- 性能: FastAPI 本身是高性能的异步框架。SQLAlchemy 也提供了对异步操作的支持(通过
asyncio
扩展),允许你在 I/O 密集型的数据库操作中充分利用异步优势,避免阻塞事件循环。 - 生态系统: FastAPI 和 SQLAlchemy 都拥有庞大而活跃的社区,提供了丰富的文档、教程和第三方扩展。
前提条件
- 熟悉 Python 基础语法。
- 了解基本的 Web API 概念 (HTTP 方法, 请求/响应)。
- 安装了 Python 3.7+。
- 熟悉命令行/终端操作。
一、项目设置与环境准备
首先,我们需要创建一个项目目录,并设置一个虚拟环境来隔离项目依赖。
-
创建项目目录:
bash
mkdir fastapi_orm_tutorial
cd fastapi_orm_tutorial -
创建并激活虚拟环境:
- Linux/macOS:
bash
python3 -m venv venv
source venv/bin/activate - Windows:
bash
python -m venv venv
.\venv\Scripts\activate
激活后,你的终端提示符前应该会显示(venv)
。
- Linux/macOS:
-
安装必要的库:
我们将安装 FastAPI、Uvicorn(ASGI 服务器)、SQLAlchemy 以及用于与数据库交互的驱动程序。为了简单起见,我们将首先使用 SQLite 数据库,它不需要单独的数据库服务器。对于异步操作,我们将需要aiosqlite
。对于生产环境或需要更强大功能的场景,通常会使用 PostgreSQL 或 MySQL,对应的异步驱动分别是asyncpg
和aiomysql
。bash
pip install fastapi "uvicorn[standard]" sqlalchemy "pydantic[email]" aiosqlite asyncpg psycopg2-binary
*fastapi
: 核心框架。
*uvicorn[standard]
: ASGI 服务器,用于运行 FastAPI 应用。[standard]
包含httptools
和uvloop
(在 Linux/macOS 上) 以获得更好的性能。
*sqlalchemy
: 核心 ORM 库。
*pydantic[email]
: FastAPI 用于数据验证的基础库。[email]
是可选的,用于 Pydantic 的 Email 类型验证。
*aiosqlite
: SQLite 的异步驱动。
*asyncpg
: PostgreSQL 的异步驱动(如果计划使用 PostgreSQL)。
*psycopg2-binary
: PostgreSQL 的同步驱动(某些 SQLAlchemy 功能可能仍需)。
二、数据库配置 (database.py)
我们需要配置 SQLAlchemy 与数据库的连接。创建一个名为 database.py
的文件,用于存放数据库相关的设置和会话管理。
我们将同时展示同步和异步的设置方式,但在初级示例中,先侧重异步方式,因为它与 FastAPI 的异步特性更契合。
“`python
database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
import os
from dotenv import load_dotenv
优先从 .env 文件加载环境变量(如果存在)
load_dotenv()
— 异步配置 —
使用环境变量或默认值配置数据库 URL
对于 SQLite (异步): “sqlite+aiosqlite:///./test.db”
对于 PostgreSQL (异步): “postgresql+asyncpg://user:password@host:port/dbname”
ASYNC_DATABASE_URL = os.getenv(“ASYNC_DATABASE_URL”, “sqlite+aiosqlite:///./test_async.db”)
创建异步引擎
echo=True 会打印执行的 SQL 语句,便于调试
async_engine = create_async_engine(ASYNC_DATABASE_URL, echo=True)
创建异步会话工厂
expire_on_commit=False 防止在提交后访问已提交的对象时需要重新查询
AsyncSessionLocal = sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False
)
— 同步配置 (可选,用于对比或特定场景) —
SYNC_DATABASE_URL = os.getenv(“SYNC_DATABASE_URL”, “sqlite:///./test_sync.db”)
sync_engine = create_engine(SYNC_DATABASE_URL, echo=True)
SyncSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=sync_engine)
— 数据库模型基类 —
所有 ORM 模型都需要继承这个 Base 类
Base = declarative_base()
— 异步数据库会话依赖 —
async def get_async_db():
“””
FastAPI 依赖项,用于获取异步数据库会话。
使用 async with 语句确保会话在使用后正确关闭。
“””
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit() # 尝试提交事务
except Exception as e:
await session.rollback() # 发生错误时回滚
raise e
finally:
await session.close() # 确保会话关闭
— 同步数据库会话依赖 (可选) —
def get_sync_db():
db = SyncSessionLocal()
try:
yield db
finally:
db.close()
print(f”使用异步数据库: {ASYNC_DATABASE_URL}”)
print(f”使用同步数据库: {SYNC_DATABASE_URL}”) # 如果使用同步
“`
说明:
ASYNC_DATABASE_URL
: 定义了数据库连接字符串。格式为driver+dialect://user:password@host:port/database
。我们默认使用sqlite+aiosqlite
指向当前目录下的test_async.db
文件。create_async_engine
: 创建 SQLAlchemy 的异步引擎实例。echo=True
用于调试,它会打印所有执行的 SQL 语句。AsyncSessionLocal
: 创建一个异步会话工厂 (sessionmaker
)。AsyncSession
是 SQLAlchemy 提供的异步会话类。expire_on_commit=False
是一个常用设置,防止在事务提交后 ORM 对象过期。Base
:declarative_base()
返回一个类,我们的所有数据库模型都需要继承它。SQLAlchemy 会通过这个基类来发现和管理所有的表模型。get_async_db
: 这是一个异步生成器函数,设计为 FastAPI 的依赖项 (Dependency)。它创建一个新的AsyncSession
,通过yield
提供给路径操作函数,并在请求处理完成后(无论成功或失败)确保会话被正确关闭(通过async with
和finally
)。添加了try...except...finally
块来处理事务:成功时提交,出错时回滚。
三、定义 ORM 模型 (models.py)
模型是数据库表的 Python 表示。创建一个名为 models.py
的文件,定义我们的数据模型。我们将创建一个简单的 Todo
模型。
“`python
models.py
from sqlalchemy import Column, Integer, String, Boolean, ForeignKey, DateTime
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func # 用于数据库级别的默认值,如时间戳
from .database import Base # 从 database.py 导入 Base
class User(Base):
tablename = “users”
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# 定义关系:一个用户可以有多个 Todo 项
todos = relationship("Todo", back_populates="owner")
class Todo(Base):
tablename = “todos” # 表名
# 定义列
id = Column(Integer, primary_key=True, index=True) # 主键,自动增长,加索引
title = Column(String, index=True, nullable=False) # 标题,加索引,不允许为空
description = Column(String, index=True, nullable=True) # 描述,加索引,允许为空
completed = Column(Boolean, default=False) # 完成状态,默认为 False
created_at = Column(DateTime(timezone=True), server_default=func.now()) # 创建时间,数据库自动设置
updated_at = Column(DateTime(timezone=True), onupdate=func.now()) # 更新时间,数据库自动更新
# 外键,关联到 users 表的 id 列
owner_id = Column(Integer, ForeignKey("users.id"))
# 定义关系:这个 Todo 项属于哪个 User
# back_populates 与 User 模型中的 'todos' 关系相对应,建立双向关系
owner = relationship("User", back_populates="todos")
“`
说明:
- 每个类都继承自
database.Base
。 __tablename__
: 指定了数据库中对应的表名。Column
: 定义表的列。参数包括:- 类型 (
Integer
,String
,Boolean
,DateTime
等)。 primary_key=True
: 指定为主键。index=True
: 在该列上创建索引,加快查询速度。unique=True
: 该列的值必须唯一。nullable=False
: 该列不允许为空。default
: Python 级别的默认值。server_default
: 数据库级别的默认值(如func.now()
获取当前时间戳)。onupdate
: 数据库级别的更新触发器(如func.now()
在更新时设置时间戳)。
- 类型 (
ForeignKey
: 定义外键约束,链接到users
表的id
列。relationship
: 定义模型之间的关系。Todo
中的owner = relationship("User", back_populates="todos")
表示一个 Todo 属于一个 User,并且通过User
模型中名为todos
的关系属性进行反向关联。User
中的todos = relationship("Todo", back_populates="owner")
表示一个 User 可以拥有多个 Todo 项,通过Todo
模型中名为owner
的关系属性进行反向关联。back_populates
建立了这种双向链接。
四、定义 Pydantic Schemas (schemas.py)
虽然 SQLAlchemy 模型定义了数据库结构,但 API 的输入和输出通常需要不同的结构或字段子集。例如,创建用户时不应传入 id
,响应中可能不包含密码。Pydantic 模型(在 FastAPI 中称为 Schemas)用于定义 API 的数据接口。
创建一个名为 schemas.py
的文件。
“`python
schemas.py
from pydantic import BaseModel, EmailStr, ConfigDict
from typing import Optional, List
from datetime import datetime
— Todo Schemas —
基础 Schema,包含共享字段
class TodoBase(BaseModel):
title: str
description: Optional[str] = None # description 是可选的
创建 Todo 时使用的 Schema (输入)
继承自 TodoBase,不需要 id, created_at, updated_at, owner_id, completed (有默认值)
class TodoCreate(TodoBase):
pass # 目前与 TodoBase 相同,但可以添加特定于创建的字段
更新 Todo 时使用的 Schema (输入)
所有字段都是可选的,因为是部分更新
class TodoUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
completed: Optional[bool] = None
从数据库读取 Todo 时使用的 Schema (输出)
继承自 TodoBase,并包含数据库生成的字段
class Todo(TodoBase):
id: int
completed: bool
created_at: datetime
updated_at: Optional[datetime] = None # 可能还没有更新过
owner_id: int
# Pydantic V2 配置: 允许从 ORM 对象属性读取数据
model_config = ConfigDict(from_attributes=True)
# Pydantic V1 写法:
# class Config:
# orm_mode = True
— User Schemas —
class UserBase(BaseModel):
email: EmailStr # 使用 Pydantic 的 EmailStr 进行邮箱格式验证
class UserCreate(UserBase):
password: str # 创建用户时需要密码
class User(UserBase):
id: int
is_active: bool
created_at: datetime
updated_at: Optional[datetime] = None
todos: List[Todo] = [] # 包含关联的 Todo 列表
model_config = ConfigDict(from_attributes=True)
# Pydantic V1 写法:
# class Config:
# orm_mode = True
“`
说明:
- 我们为
Todo
和User
定义了多个 Pydantic 模型:Base
: 包含所有模型共有的基础字段。Create
: 用于 API 创建操作的输入验证。通常不包含id
、时间戳等由数据库或服务器生成的字段。Update
: 用于 API 更新操作的输入验证。字段通常是可选的 (Optional
)。- 无后缀 (如
Todo
,User
): 用于 API 读取操作的输出。包含数据库中的所有相关字段,包括id
和时间戳。
Optional[T]
表示该字段是可选的。EmailStr
是 Pydantic 提供的特殊类型,用于验证电子邮件格式。model_config = ConfigDict(from_attributes=True)
(Pydantic V2) 或Config.orm_mode = True
(Pydantic V1) 是关键!它告诉 Pydantic 模型可以直接从 ORM 对象(具有属性访问权限的对象)而不是仅仅从字典中读取数据。这使得我们可以直接将 SQLAlchemy 查询结果传递给 Pydantic 模型进行序列化。- 在
User
schema 中,todos: List[Todo] = []
定义了嵌套的 Schema,用于在获取用户信息时一并返回其拥有的Todo
列表。from_attributes=True
会自动处理这种关系加载(如果 SQLAlchemy 对象已经加载了关系数据)。
五、实现 CRUD 操作 (crud.py)
将数据库操作逻辑(创建、读取、更新、删除)封装在单独的函数中是一种良好的实践,这有助于保持 API 路由处理程序的简洁性。创建一个名为 crud.py
的文件。
“`python
crud.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select # SQLAlchemy 2.0+ 风格的 select
from sqlalchemy.orm import selectinload # 用于预加载关系数据
from . import models, schemas
from typing import List, Optional
import bcrypt # 用于密码哈希
— User CRUD —
async def get_user(db: AsyncSession, user_id: int) -> Optional[models.User]:
“””根据 ID 获取单个用户”””
result = await db.execute(select(models.User).filter(models.User.id == user_id))
return result.scalar_one_or_none()
async def get_user_by_email(db: AsyncSession, email: str) -> Optional[models.User]:
“””根据 Email 获取单个用户”””
result = await db.execute(select(models.User).filter(models.User.email == email))
return result.scalar_one_or_none()
async def get_users(db: AsyncSession, skip: int = 0, limit: int = 100) -> List[models.User]:
“””获取用户列表,支持分页”””
result = await db.execute(
select(models.User).offset(skip).limit(limit)
)
return result.scalars().all()
def get_password_hash(password: str) -> str:
“””哈希密码”””
# 注意:实际应用中 salt 应妥善管理,或使用更高级的库如 passlib
pwd_bytes = password.encode(‘utf-8’)
salt = bcrypt.gensalt()
hashed_password = bcrypt.hashpw(pwd_bytes, salt)
return hashed_password.decode(‘utf-8’) # 存储为字符串
async def create_user(db: AsyncSession, user: schemas.UserCreate) -> models.User:
“””创建新用户”””
hashed_password = get_password_hash(user.password)
db_user = models.User(
email=user.email,
hashed_password=hashed_password
# is_active 默认为 True, created_at/updated_at 由数据库自动生成
)
db.add(db_user)
# 注意:这里不 commit,由 get_async_db 依赖项处理事务
await db.flush() # 将对象添加到会话,但不提交,以便获取 ID(如果需要)
await db.refresh(db_user) # 刷新对象状态,获取数据库生成的默认值
return db_user
— Todo CRUD —
async def get_todo(db: AsyncSession, todo_id: int) -> Optional[models.Todo]:
“””根据 ID 获取单个 Todo 项”””
result = await db.execute(
select(models.Todo).filter(models.Todo.id == todo_id)
)
return result.scalar_one_or_none()
async def get_todos(db: AsyncSession, skip: int = 0, limit: int = 100) -> List[models.Todo]:
“””获取 Todo 列表,支持分页”””
result = await db.execute(
select(models.Todo).offset(skip).limit(limit)
)
return result.scalars().all()
async def get_todos_by_owner(db: AsyncSession, owner_id: int, skip: int = 0, limit: int = 100) -> List[models.Todo]:
“””获取特定用户的 Todo 列表”””
result = await db.execute(
select(models.Todo)
.filter(models.Todo.owner_id == owner_id)
.offset(skip)
.limit(limit)
)
return result.scalars().all()
async def create_user_todo(db: AsyncSession, todo: schemas.TodoCreate, user_id: int) -> models.Todo:
“””为特定用户创建新的 Todo 项”””
db_todo = models.Todo(
**todo.model_dump(), # 从 Pydantic 模型解包字段
owner_id=user_id
# completed 默认为 False, created_at/updated_at 由数据库自动生成
)
db.add(db_todo)
await db.flush()
await db.refresh(db_todo)
return db_todo
async def update_todo(db: AsyncSession, todo_id: int, todo_update: schemas.TodoUpdate) -> Optional[models.Todo]:
“””更新 Todo 项”””
db_todo = await get_todo(db, todo_id)
if db_todo is None:
return None
# 获取 Pydantic 模型中已设置(非 None)的字段进行更新
update_data = todo_update.model_dump(exclude_unset=True)
for key, value in update_data.items():
setattr(db_todo, key, value)
db.add(db_todo) # 将更改添加到会话
await db.flush()
await db.refresh(db_todo)
return db_todo
async def delete_todo(db: AsyncSession, todo_id: int) -> Optional[models.Todo]:
“””删除 Todo 项”””
db_todo = await get_todo(db, todo_id)
if db_todo:
await db.delete(db_todo)
# 注意:删除操作也需要 flush 或 commit 才能生效
await db.flush() # 或者直接在 get_async_db 中 commit
return db_todo
return None
async def get_user_with_todos(db: AsyncSession, user_id: int) -> Optional[models.User]:
“””获取用户及其所有 Todo 项(使用关系预加载)”””
# selectinload 用于高效加载一对多或多对多关系
result = await db.execute(
select(models.User)
.options(selectinload(models.User.todos))
.filter(models.User.id == user_id)
)
return result.scalar_one_or_none()
“`
说明:
- 所有数据库操作函数都接受
db: AsyncSession
作为第一个参数,这个会话将由 FastAPI 的依赖注入系统提供。 - 使用了
async def
和await
,因为我们配置的是异步数据库连接和会话。 - 使用了 SQLAlchemy 2.0+ 风格的
select()
语句,这是推荐的现代用法。 db.add()
: 将新创建或已修改的 ORM 对象添加到会话中,准备持久化。await db.flush()
: 将会话中的更改发送到数据库,但不提交事务。这对于获取数据库生成的 ID 或检查约束很有用。await db.refresh(obj)
: 在flush
或commit
后,用数据库中的最新数据更新 Python 对象的状态(例如获取自动生成的主键或默认值)。await db.delete(obj)
: 标记一个对象以便在下次flush
或commit
时删除。result.scalar_one_or_none()
: 执行查询并返回单个结果,如果没有结果则返回None
。result.scalars().all()
: 执行查询并返回所有结果的列表。.scalars()
用于获取每行的第一个元素(即我们的 ORM 对象)。model_dump()
(Pydantic V2) 或dict()
(Pydantic V1): 用于将 Pydantic 模型转换为字典,方便地传递给 SQLAlchemy 模型构造函数或更新操作。exclude_unset=True
在更新时很有用,它只包含显式设置了值的字段。selectinload
: 是一种关系加载策略,用于在查询主对象(如 User)时,通过一次额外的查询(或 JOIN,取决于配置)预先加载关联的对象(如 Todos)。这可以避免在后续访问关系属性时产生 N+1 查询问题,提高性能。- 密码哈希使用了
bcrypt
库。注意: 这是一个基础示例。生产环境中应使用更健壮的密码管理库(如passlib
)并妥善处理 salt。
六、创建 FastAPI 应用和 API 路由 (main.py)
现在我们将所有部分组合起来,创建 FastAPI 应用实例并定义 API 端点。创建一个名为 main.py
的文件。
“`python
main.py
from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from . import crud, models, schemas
from .database import async_engine, Base, get_async_db # 导入异步相关项
— 数据库初始化 —
注意:在生产环境中,通常使用数据库迁移工具(如 Alembic)来管理表结构
async def create_db_and_tables():
async with async_engine.begin() as conn:
# 如果表已存在,不会重复创建
# await conn.run_sync(Base.metadata.drop_all) # 可选:开发时清空数据库
await conn.run_sync(Base.metadata.create_all)
print(“数据库表已创建 (如果不存在)”)
— FastAPI 应用实例 —
app = FastAPI(title=”FastAPI ORM Tutorial”, description=”一个使用 FastAPI 和 SQLAlchemy (异步) 的示例 API”)
— 应用启动事件 —
在应用启动时尝试创建数据库表
@app.on_event(“startup”)
async def on_startup():
await create_db_and_tables()
— API 路由 —
用户相关的端点
@app.post(“/users/”, response_model=schemas.User, status_code=status.HTTP_201_CREATED, tags=[“Users”])
async def create_new_user(user: schemas.UserCreate, db: AsyncSession = Depends(get_async_db)):
“””创建一个新用户”””
db_user = await crud.get_user_by_email(db, email=user.email)
if db_user:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=”邮箱已被注册”)
created_user = await crud.create_user(db=db, user=user)
return created_user
@app.get(“/users/”, response_model=List[schemas.User], tags=[“Users”])
async def read_all_users(skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db)):
“””获取用户列表(带分页)”””
users = await crud.get_users(db, skip=skip, limit=limit)
return users
@app.get(“/users/{user_id}”, response_model=schemas.User, tags=[“Users”])
async def read_single_user(user_id: int, db: AsyncSession = Depends(get_async_db)):
“””根据 ID 获取单个用户”””
db_user = await crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”用户未找到”)
return db_user
@app.get(“/users/{user_id}/details”, response_model=schemas.User, tags=[“Users”])
async def read_user_with_todos(user_id: int, db: AsyncSession = Depends(get_async_db)):
“””获取用户及其所有 Todo 项”””
db_user = await crud.get_user_with_todos(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”用户未找到”)
# Pydantic 的 from_attributes=True 会自动处理嵌套的 todos 列表
return db_user
Todo 相关的端点
@app.post(“/users/{user_id}/todos/”, response_model=schemas.Todo, status_code=status.HTTP_201_CREATED, tags=[“Todos”])
async def create_todo_for_user(
user_id: int, todo: schemas.TodoCreate, db: AsyncSession = Depends(get_async_db)
):
“””为指定用户创建 Todo 项”””
# 可以在这里加一步验证 user_id 是否存在,或者让数据库外键约束处理
db_user = await crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”用户未找到,无法创建 Todo”)
created_todo = await crud.create_user_todo(db=db, todo=todo, user_id=user_id)
return created_todo
@app.get(“/todos/”, response_model=List[schemas.Todo], tags=[“Todos”])
async def read_all_todos(skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db)):
“””获取所有 Todo 项(带分页)”””
todos = await crud.get_todos(db, skip=skip, limit=limit)
return todos
@app.get(“/users/{user_id}/todos/”, response_model=List[schemas.Todo], tags=[“Todos”])
async def read_user_todos(user_id: int, skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db)):
“””获取指定用户的所有 Todo 项”””
# 验证用户是否存在
db_user = await crud.get_user(db, user_id=user_id)
if db_user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”用户未找到”)
todos = await crud.get_todos_by_owner(db=db, owner_id=user_id, skip=skip, limit=limit)
return todos
@app.get(“/todos/{todo_id}”, response_model=schemas.Todo, tags=[“Todos”])
async def read_single_todo(todo_id: int, db: AsyncSession = Depends(get_async_db)):
“””根据 ID 获取单个 Todo 项”””
db_todo = await crud.get_todo(db, todo_id=todo_id)
if db_todo is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”Todo 项未找到”)
return db_todo
@app.put(“/todos/{todo_id}”, response_model=schemas.Todo, tags=[“Todos”])
async def update_existing_todo(
todo_id: int, todo_update: schemas.TodoUpdate, db: AsyncSession = Depends(get_async_db)
):
“””更新指定的 Todo 项”””
updated_todo = await crud.update_todo(db=db, todo_id=todo_id, todo_update=todo_update)
if updated_todo is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”Todo 项未找到,无法更新”)
return updated_todo
@app.delete(“/todos/{todo_id}”, status_code=status.HTTP_204_NO_CONTENT, tags=[“Todos”])
async def delete_existing_todo(todo_id: int, db: AsyncSession = Depends(get_async_db)):
“””删除指定的 Todo 项”””
deleted_todo = await crud.delete_todo(db=db, todo_id=todo_id)
if deleted_todo is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=”Todo 项未找到,无法删除”)
# 对于 204 状态码,FastAPI 默认不返回任何内容体
# 如果需要返回被删除的对象信息,可以修改 response_model 和 status_code
return None # 或者 return Response(status_code=status.HTTP_204_NO_CONTENT)
根路径
@app.get(“/”, tags=[“Root”])
async def root():
return {“message”: “欢迎来到 FastAPI ORM 教程 API!”}
“`
说明:
create_db_and_tables()
: 这个异步函数使用async_engine
来创建models.py
中定义的所有表(如果它们尚不存在)。我们通过@app.on_event("startup")
注册它,使其在 FastAPI 应用启动时自动执行。重要提示: 在生产环境中,应使用数据库迁移工具(如 Alembic)来管理数据库模式的变更,而不是依赖create_all
。create_all
不会处理表的修改或删除。app = FastAPI(...)
: 创建 FastAPI 应用实例。@app.post(...)
,@app.get(...)
,@app.put(...)
,@app.delete(...)
: 定义 API 路由(路径操作)。response_model
: 指定了 API 端点成功响应时应该返回的数据结构(Pydantic schema)。FastAPI 会自动使用这个模型来序列化返回的数据,并过滤掉不必要的字段。status_code
: 定义了成功响应时的 HTTP 状态码。tags
: 用于在自动生成的 API 文档(Swagger UI / ReDoc)中对端点进行分组。Depends(get_async_db)
: 这是 FastAPI 的依赖注入。对于每个需要数据库访问的请求,FastAPI 会调用get_async_db
函数,并将返回的AsyncSession
实例注入到路径操作函数的db
参数中。get_async_db
中的yield
确保了会话在使用后会被妥善处理(提交或回滚,然后关闭)。- 路径参数(如
{user_id}
)和查询参数(如skip
,limit
)通过函数参数的类型提示自动解析和验证。 - 请求体(如
POST /users/
中的user: schemas.UserCreate
)通过 Pydantic 模型进行自动解析和验证。如果请求数据不符合UserCreate
的结构,FastAPI 会自动返回 422 Unprocessable Entity 错误。 HTTPException
: 用于在特定条件下(如资源未找到、请求无效)返回标准的 HTTP 错误响应。
七、运行应用程序
现在,一切准备就绪,可以运行我们的 FastAPI 应用了。
-
确保你在项目根目录 (
fastapi_orm_tutorial
) 并且虚拟环境已激活。 -
启动 Uvicorn 服务器:
bash
uvicorn main:app --reload --host 0.0.0.0 --port 8000main:app
: 指向main.py
文件中的app
FastAPI 实例。--reload
: 开发时非常有用,当代码文件更改时,服务器会自动重启。--host 0.0.0.0
: 使服务器可以从网络中的其他机器访问(如果需要)。--port 8000
: 指定监听的端口。
你应该会看到类似以下的输出,表明服务器正在运行,并且数据库表已创建(如果是第一次运行):
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
INFO: Started reloader process [xxxxx] using statreload
INFO: Started server process [xxxxx]
INFO: Waiting for application startup.
使用异步数据库: sqlite+aiosqlite:///./test_async.db
INFO: Loading module main from file fastapi_orm_tutorial/main.py...
INFO: Will watch for changes in directory: fastapi_orm_tutorial/
INFO: Reloader statreload detected changes in 'main.py'. Reloading...
INFO: Shutting down
INFO: Waiting for application shutdown.
INFO: Application shutdown complete.
INFO: Finished server process [xxxxx]
INFO: Started reloader process [xxxxx] using statreload
INFO: Started server process [xxxxx]
INFO: Waiting for application startup.
使用异步数据库: sqlite+aiosqlite:///./test_async.db
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)
数据库表已创建 (如果不存在) # 这句来自我们的启动事件 -
访问 API 文档:
打开浏览器,访问http://127.0.0.1:8000/docs
(或者如果你使用了不同的 host/port,请相应修改)。你应该能看到 FastAPI 自动生成的 Swagger UI 交互式文档。这里列出了我们定义的所有 API 端点,你可以直接在浏览器中测试它们!- 尝试创建一个用户 (
POST /users/
)。 - 尝试获取用户列表 (
GET /users/
)。 - 尝试为用户创建 Todo 项 (
POST /users/{user_id}/todos/
)。 - 尝试获取所有 Todo 或特定用户的 Todo。
- 尝试更新和删除 Todo 项。
- 尝试创建一个用户 (
八、关于同步与异步的选择
本教程重点演示了异步方式 (asyncio
+ AsyncSession
),因为它最能发挥 FastAPI 的性能优势,尤其是在处理大量并发请求和 I/O 密集型操作(如数据库查询)时。
然而,在某些情况下,同步方式可能仍然适用:
- 项目初期或简单应用,对并发性能要求不高。
- 团队对异步编程还不太熟悉。
- 使用了不支持
asyncio
的旧库或驱动。
如果选择同步,你需要:
-
修改
database.py
:- 使用同步数据库 URL(如
sqlite:///./test_sync.db
或postgresql://user:password@host/dbname
)。 - 使用
create_engine
代替create_async_engine
。 - 使用
sessionmaker
绑定到同步引擎,并不指定class_=AsyncSession
。 - 创建一个同步的
get_db
依赖项。
- 使用同步数据库 URL(如
-
修改
crud.py
:- 函数定义移除
async
关键字。 - 移除所有
await
关键字。 - 数据库操作(如
db.execute
,db.commit
,db.refresh
,db.delete
)使用同步版本。 result.scalar_one_or_none()
和result.scalars().all()
等用法保持不变。
- 函数定义移除
-
修改
main.py
:- 所有路径操作函数移除
async
关键字。 - 依赖项改为
Depends(get_sync_db)
。
- 所有路径操作函数移除
同步代码通常更直接,但可能会在数据库操作期间阻塞 FastAPI 的事件循环,影响并发处理能力。对于现代 FastAPI 应用,强烈推荐尽可能使用异步方式。
九、进阶主题与最佳实践
这个快速入门指南涵盖了基础,但实际项目中还有更多需要考虑:
- 数据库迁移 (Alembic): 如前所述,
Base.metadata.create_all
不适用于生产环境。Alembic 是 SQLAlchemy 官方推荐的数据库迁移工具,它可以跟踪模型变化并生成迁移脚本来更新数据库模式,而不会丢失数据。 - 更复杂的关系: SQLAlchemy 支持一对一、多对多等更复杂的关系,以及丰富的加载策略(
joinedload
,subqueryload
等)来优化查询性能。 - 错误处理: 更精细地处理数据库约束错误(如唯一性冲突)、连接错误等,并返回更具体的 HTTP 状态码和错误信息。
- 依赖注入的进阶使用: 可以创建更复杂的依赖项,例如获取当前登录用户,进行权限检查等。
- 测试: 编写单元测试和集成测试来确保 CRUD 操作和 API 端点的正确性。可以使用内存数据库(如 SQLite in-memory)或测试专用的数据库实例。FastAPI 提供了
TestClient
来方便地测试 API。 - 代码组织: 对于大型项目,可以将路由、模型、schemas 和 CRUD 操作按功能模块(如
users
,items
)组织到不同的子目录和文件中,使用 FastAPI 的APIRouter
来管理。 - 配置管理: 使用环境变量、配置文件(如
.env
文件配合python-dotenv
库,或 Pydantic 的BaseSettings
)来管理数据库连接字符串、密钥等敏感信息,而不是硬编码在代码中。 - 安全性:
- 密码管理: 使用
passlib
等库进行安全的密码哈希和验证。 - 认证与授权: 实现用户登录、令牌(如 JWT)生成与验证、基于角色的访问控制等。FastAPI 提供了
OAuth2PasswordBearer
等工具。 - 输入验证: 充分利用 Pydantic 的验证能力,对所有输入数据进行严格校验。
- SQL 注入防护: 使用 ORM 通常可以防止大多数 SQL 注入,但仍需注意不要手动拼接 SQL 字符串。
- 密码管理: 使用
- 性能调优: 分析慢查询(
echo=True
或数据库日志),优化索引,选择合适的查询策略和关系加载方式。 - SQLModel: 由 FastAPI 作者创建,旨在结合 SQLAlchemy 和 Pydantic,提供单一的模型类来同时定义数据库表结构和 API 数据模式。如果你喜欢这种整合方式,可以考虑使用 SQLModel。
结论
将 FastAPI 与 ORM(特别是强大的 SQLAlchemy)结合使用,可以构建出高性能、类型安全且易于维护的现代 Web API。通过理解数据库配置、模型定义、Pydantic schemas、CRUD 操作封装以及 FastAPI 的依赖注入机制,你可以高效地开发数据库驱动的应用程序。
本教程提供了一个全面的起点,涵盖了从项目设置到实现基本 CRUD 操作的完整流程,并重点介绍了推荐的异步方式。希望这个详细的指南能帮助你快速上手 FastAPI 和 ORM 的集成,并为进一步探索更高级的功能打下坚实的基础。继续实践,探索文档,并参与社区,你将能够充分利用这对强大组合的潜力!