Connecting a DB — SQLAlchemy 2.x + Alembic
SQLAlchemy 2.x's new style — Mapped/mapped_column, async sessions, integration with FastAPI's dependency injection, and Alembic migrations.
This chapter swaps the in-memory dict from Chapter 23 Routing, Pydantic models, dependency injection for a real database. It covers SQLAlchemy 2.x’s new style, async sessions, the natural fit with FastAPI’s dependency injection, and Alembic migrations for safely rolling out schema changes.
The Mapped[T] notation and ORM mapping in this chapter combine the descriptors of Chapter 16 Descriptors and __set_name__ with the ORM-DSL pattern from Chapter 17 Metaclasses. Async sessions sit on top of Chapter 14 Async intro and Chapter 18 Async in depth.
Why SQLAlchemy 2.x #
Python has many ORMs (SQLAlchemy, Tortoise, SQLModel, Peewee, Django ORM), but from a modern async + type-friendly angle, SQLAlchemy 2.x is the standard choice.
2.x is the same library as 1.x, but the usage style has changed enough that it almost feels like a different library.
| 1.x style | 2.x style |
|---|---|
Column(String, ...) | Mapped[str] = mapped_column(...) |
session.query(User).filter(...) | session.execute(select(User).where(...)) |
| Sync only (third-party async) | Built-in async |
| Type hints as a side concern | Type hints are central |
For new code, always go with the 2.x style.
What about SQLModel? #
Built by the person behind FastAPI as an attempt to unify SQLAlchemy + Pydantic. It shines in short code, but it doesn’t yet expose all of SQLAlchemy 2.x’s features, so you hit a frustrating ceiling with complex queries. For new projects we recommend SQLAlchemy 2.x directly + Pydantic separately.
Installation #
uv add sqlalchemy "sqlalchemy[asyncio]" asyncpg
uv add --dev alembic- sqlalchemy — the ORM itself
- asyncpg — async driver for PostgreSQL (when using async)
- alembic — migration tool
To start with SQLite, swap in aiosqlite instead of PostgreSQL.
uv add aiosqliteEngine and sessions #
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession
from app.core.config import settings
engine = create_async_engine(
settings.database_url,
echo=False, # Set True during development for SQL logs
pool_pre_ping=True, # Auto-recover dead connections
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Objects remain usable after commit
)expire_on_commit=False is almost mandatory in async environments. With True, attribute access after commit can trigger a lazy load — dangerous outside an async function.
Inject the session as a dependency #
from typing import Annotated, AsyncIterator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import AsyncSessionLocal
async def get_session() -> AsyncIterator[AsyncSession]:
async with AsyncSessionLocal() as session:
yield session
DBSession = Annotated[AsyncSession, Depends(get_session)]This is the most powerful application of the Depends pattern from Chapter 23. In a route:
@router.get("/{todo_id}")
async def read_todo(todo_id: int, db: DBSession) -> TodoOut:
...Drop DBSession in and a session is created per request and closed automatically when the route ends. Transaction management lives in the same place.
Defining models — 2.x style #
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
passfrom datetime import datetime
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from app.models.base import Base
class Todo(Base):
__tablename__ = "todos"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
description: Mapped[str | None]
done: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)The Mapped[T] from Chapter 9 Typing in earnest is used exactly here. It looks like an ordinary type hint, but a SQL column definition happens alongside it.
How to read it:
Mapped[int]— the column’s Python typemapped_column(...)— extra DB-side options (constraints, defaults, etc.)Mapped[str | None]— NULL allowedserver_default=func.now()— the DB fills it in
Relationships — one-to-many #
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(200), unique=True, index=True)
hashed_password: Mapped[str]
todos: Mapped[list["Todo"]] = relationship(back_populates="owner")
class Todo(Base):
__tablename__ = "todos"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
owner_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
owner: Mapped["User"] = relationship(back_populates="todos")relationship(back_populates=...) wires up the bidirectional link. Both user.todos and todo.owner work naturally.
CRUD — 2.x style #
Create #
from sqlalchemy.ext.asyncio import AsyncSession
async def create_todo(db: AsyncSession, data: TodoCreate, owner_id: int) -> Todo:
todo = Todo(
title=data.title,
description=data.description,
owner_id=owner_id,
)
db.add(todo)
await db.commit()
await db.refresh(todo) # Reload DB-filled id, created_at, etc.
return tododb.refresh(obj) reloads columns the DB filled in automatically (id, timestamp, etc.). An easy step to forget in async code.
Read #
from sqlalchemy import select
async def get_todo(db: AsyncSession, todo_id: int) -> Todo | None:
result = await db.execute(
select(Todo).where(Todo.id == todo_id)
)
return result.scalar_one_or_none()
async def list_todos(db: AsyncSession, skip: int, limit: int) -> list[Todo]:
result = await db.execute(
select(Todo).offset(skip).limit(limit).order_by(Todo.created_at.desc())
)
return list(result.scalars())Key patterns:
select(Todo).where(...)— query object (not yet executed)db.execute(...)— actual execution.scalar_one_or_none()— 0 or 1 row, None if missing.scalars()— every row as ORM objects
The old 1.x db.query(Todo).filter(...).first() pattern is deprecated in 2.x. Always use select + execute in new code.
Update #
async def update_todo(db: AsyncSession, todo: Todo, data: TodoUpdate) -> Todo:
update_dict = data.model_dump(exclude_unset=True)
for field, value in update_dict.items():
setattr(todo, field, value)
await db.commit()
await db.refresh(todo)
return todoOr directly with SQL UPDATE:
from sqlalchemy import update
async def mark_all_done(db: AsyncSession, owner_id: int) -> int:
result = await db.execute(
update(Todo)
.where(Todo.owner_id == owner_id)
.values(done=True)
)
await db.commit()
return result.rowcountORM-object updates get automatic change tracking; direct SQL is faster for bulk updates. Pick based on the situation.
Delete #
async def delete_todo(db: AsyncSession, todo: Todo) -> None:
await db.delete(todo)
await db.commit()N+1 problem and eager loading #
The most common ORM performance trap. Fetching 100 users plus each one’s todos:
users = (await db.execute(select(User))).scalars()
for user in users:
todos = user.todos # ← extra query each time100 users + 100 queries = 101 queries. Solve it in one shot with selectinload or joinedload:
from sqlalchemy.orm import selectinload
result = await db.execute(
select(User).options(selectinload(User.todos))
)
users = result.scalars()
for user in users:
print(user.todos) # No extra queryselectinload— fetched in a separate query (good for one-to-many)joinedload— JOIN in a single query (good for one-to-one, many-to-one)
The measurement loop from Chapter 21 Performance — cProfile, py-spy, memory profiling applies here too — turn on echo=True for SQL logs and catch any N+1 patterns you see.
Wiring routes to the DB #
from fastapi import APIRouter, HTTPException
from app.api.deps import DBSession
from app.schemas.todo import TodoCreate, TodoOut, TodoUpdate
from app.models.todo import Todo
from sqlalchemy import select
router = APIRouter(prefix="/todos", tags=["todos"])
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(payload: TodoCreate, db: DBSession) -> Todo:
todo = Todo(**payload.model_dump(), owner_id=1) # owner_id comes from auth in Chapter 26
db.add(todo)
await db.commit()
await db.refresh(todo)
return todo
@router.get("/{todo_id}", response_model=TodoOut)
async def read_todo(todo_id: int, db: DBSession) -> Todo:
todo = await db.get(Todo, todo_id)
if todo is None:
raise HTTPException(404, "Todo not found")
return todoresponse_model=TodoOut + model_config = {"from_attributes": True} (the same combo from Chapter 23) automatically converts SQLAlchemy objects into Pydantic models.
Alembic — schema migrations #
DB schemas evolve over time. Adding columns, changing indexes, creating new tables. Alembic is the tool that version-controls those changes.
Initialize #
uv run alembic init -t async alembic-t async selects the async template. Directory layout:
alembic/
├── env.py # migration environment
├── script.py.mako # template
└── versions/ # migration filesConfiguration #
sqlalchemy.url = postgresql+asyncpg://user:pw@localhost/todoOr read it from an environment variable, which is safer.
from app.core.config import settings
from app.models.base import Base
from app.models import todo, user # Import every model
config = context.config
config.set_main_option("sqlalchemy.url", settings.database_url)
target_metadata = Base.metadataThat target_metadata = Base.metadata line is the heart of it — Alembic compares your model definitions to the current DB state and auto-generates migrations.
First migration #
uv run alembic revision --autogenerate -m "create todos table"A new file appears in alembic/versions/.
def upgrade() -> None:
op.create_table(
"todos",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("title", sa.String(200)),
...
)
def downgrade() -> None:
op.drop_table("todos")Always review the auto-generated code. Autogenerate doesn’t catch every change correctly. Watch out especially for indexes, constraints, and column type changes.
Apply #
uv run alembic upgrade head # to the latest
uv run alembic downgrade -1 # roll back one step
uv run alembic current # current version
uv run alembic history # all migrationsResolving merge conflicts #
If multiple developers create migrations at the same time, branches appear. Combine them with alembic merge.
uv run alembic merge -m "merge a and b" abc123 def456Common pitfalls #
1) Don’t take select results with .all()
#
result = await db.execute(select(Todo))
rows = result.all() # each row is a (Todo,) tupleresult = await db.execute(select(Todo))
todos = list(result.scalars()) # list of Todo objects.scalars() unwraps the ORM objects directly.
2) No sync lazy loads inside an async session #
expire_on_commit=False plus selectinload/joinedload to preload. Otherwise lazy loads trigger when the route builds the response and you’ll hit a MissingGreenlet error.
3) Forgetting the transaction #
Without calling commit, your changes are never persisted. You can also auto-commit inside the dependency:
async def get_session() -> AsyncIterator[AsyncSession]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseThat said, explicit commits are safer in some places. Pick a project policy.
Exercises #
- Using SQLite + aiosqlite in your
todo-apiproject, writeapp/db/session.pyandapp/models/{base,todo,user}.py. Define columns in theMapped[T]+mapped_column(...)style. Turn on SQL echo, run the firstselect, and confirm the logs directly. - Define a
User↔Todoone-to-many relationship. Deliberately write code that produces N+1 queries, then fix it withselectinload(User.todos)and observe the change in query count from the SQL logs. - Initialize Alembic (
init -t async) and auto-generate the first migration (revision --autogenerate). Review the generated file, apply withupgrade head, then roll back withdowngrade -1so you see the full migration workflow.
In one line: SQLAlchemy 2.x’s
Mapped[T]+mapped_column(...)is the standard. Async iscreate_async_engine+AsyncSession,expire_on_commit=False. Per-request session viaDepends. CRUD isselect(...).where(...)+db.execute(...),.scalars()/.scalar_one_or_none(). Relationships userelationship(back_populates=...), and N+1 is avoided withselectinload/joinedload. Pydanticfrom_attributes=Truefor automatic ORM → response. Use Alembic to version-control schema migrations.
Next chapter #
Next, Chapter 26 Authentication — OAuth2 password flow + JWT layers user sign-up and login on top of this chapter. OAuth2 password flow, JWT issuance, password hashing, and a current_user dependency that wraps it all up.