Modern Python in Practice #3: Connecting a DB — SQLAlchemy 2.x + Alembic
In this post we replace the in-memory dict from #2 with a real database. SQLAlchemy 2.x’s new style, async sessions, the natural fit with FastAPI’s dependency injection, and Alembic migrations to safely roll out schema changes.
Why SQLAlchemy 2.x? #
Python has many ORMs (SQLAlchemy, Tortoise, SQLModel, Peewee, Django ORM), but from a modern async + type-friendly angle, SQLAlchemy 2.x sits firmly in the standard slot.
2.x is the same library as 1.x, but the usage style has changed enough that it almost feels like a different one.
| 1.x style | 2.x style |
|---|---|
Column(String, ...) | Mapped[str] = mapped_column(...) |
session.query(User).filter(...) | session.execute(select(User).where(...)) |
| Sync only (third-party async) | Built-in async |
| Type hints as a side concern | Type hints are central |
For new code, always go with the 2.x style.
What about SQLModel? #
Built by the creator of FastAPI as an attempt to unify SQLAlchemy + Pydantic. It shines at keeping code short, but it doesn’t yet expose all of SQLAlchemy 2.x’s features, so you hit a frustrating ceiling with complex queries. For new projects we recommend SQLAlchemy 2.x directly + Pydantic separately.
Installation #
uv add sqlalchemy "sqlalchemy[asyncio]" asyncpg
uv add --dev alembic- sqlalchemy — the ORM itself
- asyncpg — async driver for PostgreSQL (when using async)
- alembic — migration tool
To start with SQLite, swap in aiosqlite instead of PostgreSQL.
uv add aiosqliteEngine and sessions #
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession
from app.core.config import settings
engine = create_async_engine(
settings.database_url,
echo=False, # Set True during development for SQL logs
pool_pre_ping=True, # Auto-recover dead connections
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # Objects remain usable after commit
)expire_on_commit=False is almost mandatory in async environments. When set to True, attribute access after a commit can trigger a lazy load — which is dangerous outside an async context.
Inject the session as a dependency #
from typing import Annotated, AsyncIterator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import AsyncSessionLocal
async def get_session() -> AsyncIterator[AsyncSession]:
async with AsyncSessionLocal() as session:
yield session
DBSession = Annotated[AsyncSession, Depends(get_session)]This is the most powerful application of the Depends pattern from #2. In a route:
@router.get("/{todo_id}")
async def read_todo(todo_id: int, db: DBSession) -> TodoOut:
...Drop DBSession in and a session is created per request and closed automatically when the route ends. Transaction management lives in the same place.
Defining models — 2.x style #
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
passfrom datetime import datetime
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from app.models.base import Base
class Todo(Base):
__tablename__ = "todos"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
description: Mapped[str | None]
done: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)The Mapped[T] from Intermediate #2 is used exactly here. It looks like an ordinary type hint, but a SQL column definition happens alongside it.
How to read it:
Mapped[int]— the column’s Python typemapped_column(...)— extra DB-side options (constraints, defaults, etc.)Mapped[str | None]— NULL allowedserver_default=func.now()— the DB fills it in
Relationships — one-to-many #
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(200), unique=True, index=True)
hashed_password: Mapped[str]
todos: Mapped[list["Todo"]] = relationship(back_populates="owner")
class Todo(Base):
__tablename__ = "todos"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
owner_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
owner: Mapped["User"] = relationship(back_populates="todos")relationship(back_populates=...) wires up the bidirectional link. Both user.todos and todo.owner work naturally.
CRUD — 2.x style #
Create #
from sqlalchemy.ext.asyncio import AsyncSession
async def create_todo(db: AsyncSession, data: TodoCreate, owner_id: int) -> Todo:
todo = Todo(
title=data.title,
description=data.description,
owner_id=owner_id,
)
db.add(todo)
await db.commit()
await db.refresh(todo) # Reload DB-filled id, created_at, etc.
return tododb.refresh(obj) reloads columns the DB filled in automatically (id, timestamp, etc.). An easy step to forget in async code.
Read #
from sqlalchemy import select
async def get_todo(db: AsyncSession, todo_id: int) -> Todo | None:
result = await db.execute(
select(Todo).where(Todo.id == todo_id)
)
return result.scalar_one_or_none()
async def list_todos(db: AsyncSession, skip: int, limit: int) -> list[Todo]:
result = await db.execute(
select(Todo).offset(skip).limit(limit).order_by(Todo.created_at.desc())
)
return list(result.scalars())Key patterns:
select(Todo).where(...)— query object (not yet executed)db.execute(...)— actual execution.scalar_one_or_none()— 0 or 1 row, None if missing.scalars()— every row as ORM objects
The old 1.x db.query(Todo).filter(...).first() pattern is deprecated in 2.x. Always use select + execute in new code.
Update #
async def update_todo(db: AsyncSession, todo: Todo, data: TodoUpdate) -> Todo:
update_dict = data.model_dump(exclude_unset=True)
for field, value in update_dict.items():
setattr(todo, field, value)
await db.commit()
await db.refresh(todo)
return todoOr directly with SQL UPDATE:
from sqlalchemy import update
async def mark_all_done(db: AsyncSession, owner_id: int) -> int:
result = await db.execute(
update(Todo)
.where(Todo.owner_id == owner_id)
.values(done=True)
)
await db.commit()
return result.rowcountORM-object updates get automatic change tracking; direct SQL is faster for bulk updates. Choose based on the situation.
Delete #
async def delete_todo(db: AsyncSession, todo: Todo) -> None:
await db.delete(todo)
await db.commit()N+1 problem and eager loading #
The most common ORM performance trap. Fetching 100 users plus each one’s todos:
users = (await db.execute(select(User))).scalars()
for user in users:
todos = user.todos # ← extra query each time100 users + 100 queries = 101 queries. Solve it in one shot with selectinload or joinedload:
from sqlalchemy.orm import selectinload
result = await db.execute(
select(User).options(selectinload(User.todos))
)
users = result.scalars()
for user in users:
print(user.todos) # No extra queryselectinload— fetched in a separate query (good for one-to-many)joinedload— JOIN in a single query (good for one-to-one, many-to-one)
The measurement loop from Advanced #7 applies here too — turn on echo=True for SQL logs and catch any N+1 patterns you see.
Wiring routes to the DB #
from fastapi import APIRouter, HTTPException
from app.api.deps import DBSession
from app.schemas.todo import TodoCreate, TodoOut, TodoUpdate
from app.models.todo import Todo
from sqlalchemy import select
router = APIRouter(prefix="/todos", tags=["todos"])
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(payload: TodoCreate, db: DBSession) -> Todo:
todo = Todo(**payload.model_dump(), owner_id=1) # owner_id will come from auth in #4
db.add(todo)
await db.commit()
await db.refresh(todo)
return todo
@router.get("/{todo_id}", response_model=TodoOut)
async def read_todo(todo_id: int, db: DBSession) -> Todo:
todo = await db.get(Todo, todo_id)
if todo is None:
raise HTTPException(404, "Todo not found")
return todoresponse_model=TodoOut + model_config = {"from_attributes": True} (the same combo from #2) automatically converts SQLAlchemy objects into Pydantic models.
Alembic — schema migrations #
DB schemas evolve over time. Adding columns, changing indexes, creating new tables. Alembic is the tool that version-controls those changes.
Initialize #
uv run alembic init -t async alembic-t async selects the async template. Directory layout:
alembic/
├── env.py # migration environment
├── script.py.mako # template
└── versions/ # migration filesConfiguration #
sqlalchemy.url = postgresql+asyncpg://user:pw@localhost/todoOr read it from an environment variable, which is safer.
from app.core.config import settings
from app.models.base import Base
from app.models import todo, user # Import every model
config = context.config
config.set_main_option("sqlalchemy.url", settings.database_url)
target_metadata = Base.metadataThat target_metadata = Base.metadata line is the key — Alembic compares your model definitions to the current DB state and auto-generates migrations.
First migration #
uv run alembic revision --autogenerate -m "create todos table"A new file appears in alembic/versions/.
def upgrade() -> None:
op.create_table(
"todos",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("title", sa.String(200)),
...
)
def downgrade() -> None:
op.drop_table("todos")Always review the auto-generated code. Autogenerate doesn’t catch every change correctly — watch out especially for indexes, constraints, and column type changes.
Apply #
uv run alembic upgrade head # to the latest
uv run alembic downgrade -1 # roll back one step
uv run alembic current # current version
uv run alembic history # all migrationsResolving merge conflicts #
If multiple developers create migrations at the same time, diverging branches appear. Combine them with alembic merge.
uv run alembic merge -m "merge a and b" abc123 def456Common pitfalls #
1) Don’t take select results with .all()
#
result = await db.execute(select(Todo))
rows = result.all() # each row is a (Todo,) tupleresult = await db.execute(select(Todo))
todos = list(result.scalars()) # list of Todo objects.scalars() unwraps the ORM objects directly.
2) No sync lazy loads inside an async session #
Use expire_on_commit=False and selectinload/joinedload to preload. Otherwise, lazy loads will trigger when the route builds the response and you’ll hit a MissingGreenlet error.
3) Forgetting the transaction #
Without calling commit, your changes are never persisted. You can also auto-commit inside the dependency:
async def get_session() -> AsyncIterator[AsyncSession]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseThat said, explicit commits are safer in some cases. Either way, settle on a consistent project policy.
Recap #
What this post nailed down:
- SQLAlchemy 2.x — the new style with
Mapped[T]+mapped_column(...) - Async —
create_async_engine,AsyncSession,await db.execute(...) - FastAPI dependency injection — per-request sessions via
get_session - CRUD —
select(...).where(...),db.add,db.commit,db.refresh,db.delete - Relationships —
relationship+back_populates, avoid N+1 withselectinload/joinedload - Pydantic
from_attributes=Truefor ORM → response auto-conversion - Alembic —
init -t async,revision --autogenerate,upgrade head - Pitfalls:
.scalars(), expire_on_commit, lazy loads, explicit commits
In the next post (#4 Authentication — OAuth2 + JWT) we layer user sign-up and login on top of this. OAuth2 password flow, JWT issuance, password hashing, and a current_user dependency that wraps it all up.