目次
25 章

DB 連携 — SQLAlchemy 2.x + Alembic

SQLAlchemy 2.x の新スタイル — Mapped/mapped_column、async セッション、FastAPI 依存性注入との接続、Alembic のマイグレーションまでまとめます。

第23章 ルーティング、Pydantic モデル、依存性注入 のインメモリ dict を本章で DB に置き換え ます。SQLAlchemy 2.x の新スタイル、async セッション、FastAPI 依存性注入との接続、そしてスキーマ変更を安全に扱う Alembic のマイグレーションまでです。

本章の Mapped[T] 表記と ORM マッピングは、第16章 ディスクリプタと __set_name__ のディスクリプタと、第17章 メタクラス の ORM DSL パターンが組み合わさってできています。非同期セッションは第14章 非同期入門 / 第18章 非同期の深層 の上に乗ります。

なぜ SQLAlchemy 2.x なのか #

Python の ORM はいくつもありますが (SQLAlchemy、Tortoise、SQLModel、Peewee、Django ORM)、モダンな非同期 + 型フレンドリー の観点で最も標準的な選択肢なのが SQLAlchemy 2.x です。

2.x は 1.x と同じライブラリですが、使用スタイルは事実上変わりました。

1.x スタイル2.x スタイル
Column(String, ...)Mapped[str] = mapped_column(...)
session.query(User).filter(...)session.execute(select(User).where(...))
同期だけ (3rd party 非同期)ビルトイン async
型ヒントは補助的型ヒントが核心

新しいコードは 無条件に 2.x スタイル です。

SQLModel は? #

FastAPI を作った人が設計したライブラリで、SQLAlchemy + Pydantic 統合 を試みたものです。短いコードに有利ですが、まだ SQLAlchemy 2.x のすべての機能を露出していないので、複雑なクエリでは限界 があります。新しいプロジェクトは SQLAlchemy 2.x を直接 + Pydantic 別 を推奨します。

インストール #

DB の依存関係
uv add sqlalchemy "sqlalchemy[asyncio]" asyncpg
uv add --dev alembic
  • sqlalchemy — ORM 本体
  • asyncpg — PostgreSQL 非同期ドライバ (async 使用時)
  • alembic — マイグレーションツール

SQLite で始めるなら aiosqlite (PostgreSQL の代わりに)。

SQLite で素早く
uv add aiosqlite

Engine とセッション #

app/db/session.py
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession
from app.core.config import settings

engine = create_async_engine(
    settings.database_url,
    echo=False,           # 開発中は True にして SQL ログ
    pool_pre_ping=True,   # 切れた接続を自動回復
)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,   # コミット後もオブジェクト使用可能
)

expire_on_commit=False は非同期環境でほぼ必須。True だとコミット後の属性アクセスが lazy load をトリガーしますが、async 関数の外では危険です。

依存性でセッションを注入 #

app/api/deps.py
from typing import Annotated, AsyncIterator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import AsyncSessionLocal

async def get_session() -> AsyncIterator[AsyncSession]:
    async with AsyncSessionLocal() as session:
        yield session

DBSession = Annotated[AsyncSession, Depends(get_session)]

これが 第23章 で見た Depends パターンの最も強力な活用。ルートで:

@router.get("/{todo_id}")
async def read_todo(todo_id: int, db: DBSession) -> TodoOut:
    ...

DBSession の一行が入ると リクエスト単位でセッションが作られ、ルートが終わると自動でクローズされます。トランザクション管理もこの依存性の中で扱います。

モデル定義 — 2.x スタイル #

app/models/base.py
from sqlalchemy.orm import DeclarativeBase

class Base(DeclarativeBase):
    pass
app/models/todo.py
from datetime import datetime
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from app.models.base import Base

class Todo(Base):
    __tablename__ = "todos"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    description: Mapped[str | None]
    done: Mapped[bool] = mapped_column(Boolean, default=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now(),
    )

第9章 typing 本格Mapped[T] がまさにここで使われます。普通の型ヒントのように見えますが、SQL カラム定義 が一緒に行われます。

読み方:

  • Mapped[int] — カラムの Python 型
  • mapped_column(...) — DB 側の追加オプション (制約、デフォルト値など)
  • Mapped[str | None] — NULL 許可
  • server_default=func.now() — DB が自動で埋める

関係 — 1:N #

users と todos の 1:N
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship

class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str] = mapped_column(String(200), unique=True, index=True)
    hashed_password: Mapped[str]

    todos: Mapped[list["Todo"]] = relationship(back_populates="owner")

class Todo(Base):
    __tablename__ = "todos"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(200))
    owner_id: Mapped[int] = mapped_column(ForeignKey("users.id"))

    owner: Mapped["User"] = relationship(back_populates="todos")

relationship(back_populates=...) が双方向の接続を作ります。user.todostodo.owner の両方が自然に。

CRUD — 2.x スタイル #

作成 #

create
from sqlalchemy.ext.asyncio import AsyncSession

async def create_todo(db: AsyncSession, data: TodoCreate, owner_id: int) -> Todo:
    todo = Todo(
        title=data.title,
        description=data.description,
        owner_id=owner_id,
    )
    db.add(todo)
    await db.commit()
    await db.refresh(todo)    # DB が埋めた id、created_at などを再ロード
    return todo

db.refresh(obj)DB が自動生成したカラム を再読み込みします (id、timestamp など)。非同期でよく抜けがちな部分です。

取得 #

select
from sqlalchemy import select

async def get_todo(db: AsyncSession, todo_id: int) -> Todo | None:
    result = await db.execute(
        select(Todo).where(Todo.id == todo_id)
    )
    return result.scalar_one_or_none()

async def list_todos(db: AsyncSession, skip: int, limit: int) -> list[Todo]:
    result = await db.execute(
        select(Todo).offset(skip).limit(limit).order_by(Todo.created_at.desc())
    )
    return list(result.scalars())

核心パターン:

  • select(Todo).where(...) — クエリオブジェクト (実行されない)
  • db.execute(...) — 実際の実行
  • .scalar_one_or_none() — 0 行または 1 行、なければ None
  • .scalars() — すべての行を ORM オブジェクトとして

旧 1.x の db.query(Todo).filter(...).first() パターンは 2.x で deprecated です。新しいコードは無条件に select + execute

更新 #

update — オブジェクト修正
async def update_todo(db: AsyncSession, todo: Todo, data: TodoUpdate) -> Todo:
    update_dict = data.model_dump(exclude_unset=True)
    for field, value in update_dict.items():
        setattr(todo, field, value)
    await db.commit()
    await db.refresh(todo)
    return todo

または SQL UPDATE を直接:

update — SQL 直接
from sqlalchemy import update

async def mark_all_done(db: AsyncSession, owner_id: int) -> int:
    result = await db.execute(
        update(Todo)
        .where(Todo.owner_id == owner_id)
        .values(done=True)
    )
    await db.commit()
    return result.rowcount

ORM オブジェクト単位の更新は変更追跡が自動、SQL 直接は大量更新で速い。場面に応じて選択。

削除 #

delete
async def delete_todo(db: AsyncSession, todo: Todo) -> None:
    await db.delete(todo)
    await db.commit()

N+1 問題と即時ロード #

ORM の最もよくある性能の落とし穴。ユーザー 100 人 + それぞれの todos を取得するとき:

🚫 N+1
users = (await db.execute(select(User))).scalars()
for user in users:
    todos = user.todos    # ← 毎回追加クエリ

100 人 + 100 回のクエリ = 101 回のクエリ。selectinload または joinedload で一度に:

✅ selectinload
from sqlalchemy.orm import selectinload

result = await db.execute(
    select(User).options(selectinload(User.todos))
)
users = result.scalars()
for user in users:
    print(user.todos)    # 追加クエリなし
  • selectinload — 別クエリで取得 (1:N に向く)
  • joinedload — JOIN で一度に (1:1、N:1 に向く)

第21章 性能 — cProfile、py-spy、メモリプロファイリング の測定の流れがここでも — echo=True で SQL ログを ON にして N+1 パターンが見えたら捕まえてください。

ルートとの結合 #

app/api/todos.py — DB 版
from fastapi import APIRouter, HTTPException
from app.api.deps import DBSession
from app.schemas.todo import TodoCreate, TodoOut, TodoUpdate
from app.models.todo import Todo
from sqlalchemy import select

router = APIRouter(prefix="/todos", tags=["todos"])

@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(payload: TodoCreate, db: DBSession) -> Todo:
    todo = Todo(**payload.model_dump(), owner_id=1)   # owner_id は第26章で認証
    db.add(todo)
    await db.commit()
    await db.refresh(todo)
    return todo

@router.get("/{todo_id}", response_model=TodoOut)
async def read_todo(todo_id: int, db: DBSession) -> Todo:
    todo = await db.get(Todo, todo_id)
    if todo is None:
        raise HTTPException(404, "Todo not found")
    return todo

response_model=TodoOut + model_config = {"from_attributes": True} (第23章 で見たそれ) が SQLAlchemy オブジェクトを Pydantic に自動変換します。

Alembic — スキーママイグレーション #

DB スキーマは時間とともに変わります。カラム追加、インデックス変更、テーブルの新規作成など。その変更を バージョン管理 する道具が Alembic です。

初期化 #

alembic 初期化
uv run alembic init -t async alembic

-t async が非同期テンプレート。ディレクトリ構造:

alembic/
├── env.py              # マイグレーション環境
├── script.py.mako       # テンプレート
└── versions/            # マイグレーションファイル

設定 #

alembic.ini — 一部だけ修正
sqlalchemy.url = postgresql+asyncpg://user:pw@localhost/todo

または環境変数から読むのが安全です。

alembic/env.py — 核心部分
from app.core.config import settings
from app.models.base import Base
from app.models import todo, user   # すべてのモデルを import

config = context.config
config.set_main_option("sqlalchemy.url", settings.database_url)

target_metadata = Base.metadata

target_metadata = Base.metadata の一行が核心 — Alembic がモデル定義と現在の DB 状態を比較してマイグレーションを自動生成します。

最初のマイグレーション #

自動生成
uv run alembic revision --autogenerate -m "create todos table"

alembic/versions/ に新しいファイルが生成されます。

versions/abc123_create_todos_table.py
def upgrade() -> None:
    op.create_table(
        "todos",
        sa.Column("id", sa.Integer(), primary_key=True),
        sa.Column("title", sa.String(200)),
        ...
    )

def downgrade() -> None:
    op.drop_table("todos")

自動生成されたコードは必ずレビュー してください。autogenerate がすべての変化を正確に拾うわけではありません。特にインデックス、制約、カラム型変更には注意。

適用 #

upgrade
uv run alembic upgrade head    # 最新まで
uv run alembic downgrade -1    # 一段戻す
uv run alembic current         # 現在のバージョン
uv run alembic history         # すべてのマイグレーション

マージ衝突の解決 #

複数の開発者が同時にマイグレーションを作ると分岐が生まれます。alembic merge で合わせます。

merge
uv run alembic merge -m "merge a and b" abc123 def456

よく出会う落とし穴 #

1) select の結果を .all() で受け取らない #

🚫
result = await db.execute(select(Todo))
rows = result.all()    # 各 row が (Todo,) タプル
result = await db.execute(select(Todo))
todos = list(result.scalars())    # Todo オブジェクトのリスト

.scalars() が ORM オブジェクトを直接返してくれます。

2) 非同期セッションの中で同期 lazy load 禁止 #

expire_on_commit=False + selectinload / joinedload で事前ロード。そうしないとルートがレスポンスを作るときに lazy load がトリガーされて MissingGreenlet エラーになります。

3) トランザクションの欠落 #

commit を呼ばないと変更が反映されません。依存性で自動 commit するパターンも可能:

自動 commit の依存性
async def get_session() -> AsyncIterator[AsyncSession]:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

ただし 明示的な commit の方が安全な場面もあります。プロジェクトのポリシーで決めてください。

練習問題 #

  1. SQLite + aiosqlite で todo-api プロジェクトに app/db/session.pyapp/models/{base,todo,user}.py を書いてください。Mapped[T] + mapped_column(...) スタイルでカラム定義。SQL echo を ON にして最初の select を実行し、ログを直接確認します。
  2. UserTodo の 1:N 関係を定義してください。わざと N+1 クエリが出るコードを書いた後、selectinload(User.todos) で修正して SQL ログでクエリ回数の変化を確認します。
  3. Alembic を初期化し (init -t async)、最初のマイグレーションを自動生成します (revision --autogenerate)。生成されたファイルをレビューした後、upgrade head で適用し、downgrade -1 で戻す流れまで直接確認します。

一行まとめ: SQLAlchemy 2.x の Mapped[T] + mapped_column(...) が標準。非同期は create_async_engine + AsyncSessionexpire_on_commit=FalseDepends 依存性でリクエスト単位のセッション。CRUD は select(...).where(...) + db.execute(...).scalars() / .scalar_one_or_none()。関係は relationship(back_populates=...)、N+1 は selectinload / joinedload で回避。Pydantic from_attributes=True で ORM → レスポンスを自動。Alembic でスキーママイグレーションのバージョン管理。

次の章 #

次の 第26章 認証 — OAuth2 パスワードフロー + JWT では本章の上に ユーザーの会員登録 / ログイン を載せます。OAuth2 パスワードフロー、JWT 発行、パスワードハッシング、そして依存性で解く current_user までです。

X