DB 連携 — SQLAlchemy 2.x + Alembic
SQLAlchemy 2.x の新スタイル — Mapped/mapped_column、async セッション、FastAPI 依存性注入との接続、Alembic のマイグレーションまでまとめます。
第23章 ルーティング、Pydantic モデル、依存性注入 のインメモリ dict を本章で DB に置き換え ます。SQLAlchemy 2.x の新スタイル、async セッション、FastAPI 依存性注入との接続、そしてスキーマ変更を安全に扱う Alembic のマイグレーションまでです。
本章の Mapped[T] 表記と ORM マッピングは、第16章 ディスクリプタと __set_name__ のディスクリプタと、第17章 メタクラス の ORM DSL パターンが組み合わさってできています。非同期セッションは第14章 非同期入門 / 第18章 非同期の深層 の上に乗ります。
なぜ SQLAlchemy 2.x なのか #
Python の ORM はいくつもありますが (SQLAlchemy、Tortoise、SQLModel、Peewee、Django ORM)、モダンな非同期 + 型フレンドリー の観点で最も標準的な選択肢なのが SQLAlchemy 2.x です。
2.x は 1.x と同じライブラリですが、使用スタイルは事実上変わりました。
| 1.x スタイル | 2.x スタイル |
|---|---|
Column(String, ...) | Mapped[str] = mapped_column(...) |
session.query(User).filter(...) | session.execute(select(User).where(...)) |
| 同期だけ (3rd party 非同期) | ビルトイン async |
| 型ヒントは補助的 | 型ヒントが核心 |
新しいコードは 無条件に 2.x スタイル です。
SQLModel は? #
FastAPI を作った人が設計したライブラリで、SQLAlchemy + Pydantic 統合 を試みたものです。短いコードに有利ですが、まだ SQLAlchemy 2.x のすべての機能を露出していないので、複雑なクエリでは限界 があります。新しいプロジェクトは SQLAlchemy 2.x を直接 + Pydantic 別 を推奨します。
インストール #
uv add sqlalchemy "sqlalchemy[asyncio]" asyncpg
uv add --dev alembic- sqlalchemy — ORM 本体
- asyncpg — PostgreSQL 非同期ドライバ (async 使用時)
- alembic — マイグレーションツール
SQLite で始めるなら aiosqlite (PostgreSQL の代わりに)。
uv add aiosqliteEngine とセッション #
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession
from app.core.config import settings
engine = create_async_engine(
settings.database_url,
echo=False, # 開発中は True にして SQL ログ
pool_pre_ping=True, # 切れた接続を自動回復
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # コミット後もオブジェクト使用可能
)expire_on_commit=False は非同期環境でほぼ必須。True だとコミット後の属性アクセスが lazy load をトリガーしますが、async 関数の外では危険です。
依存性でセッションを注入 #
from typing import Annotated, AsyncIterator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import AsyncSessionLocal
async def get_session() -> AsyncIterator[AsyncSession]:
async with AsyncSessionLocal() as session:
yield session
DBSession = Annotated[AsyncSession, Depends(get_session)]これが 第23章 で見た Depends パターンの最も強力な活用。ルートで:
@router.get("/{todo_id}")
async def read_todo(todo_id: int, db: DBSession) -> TodoOut:
...DBSession の一行が入ると リクエスト単位でセッションが作られ、ルートが終わると自動でクローズされます。トランザクション管理もこの依存性の中で扱います。
モデル定義 — 2.x スタイル #
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
passfrom datetime import datetime
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from app.models.base import Base
class Todo(Base):
__tablename__ = "todos"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
description: Mapped[str | None]
done: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)第9章 typing 本格 の Mapped[T] がまさにここで使われます。普通の型ヒントのように見えますが、SQL カラム定義 が一緒に行われます。
読み方:
Mapped[int]— カラムの Python 型mapped_column(...)— DB 側の追加オプション (制約、デフォルト値など)Mapped[str | None]— NULL 許可server_default=func.now()— DB が自動で埋める
関係 — 1:N #
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(200), unique=True, index=True)
hashed_password: Mapped[str]
todos: Mapped[list["Todo"]] = relationship(back_populates="owner")
class Todo(Base):
__tablename__ = "todos"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
owner_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
owner: Mapped["User"] = relationship(back_populates="todos")relationship(back_populates=...) が双方向の接続を作ります。user.todos、todo.owner の両方が自然に。
CRUD — 2.x スタイル #
作成 #
from sqlalchemy.ext.asyncio import AsyncSession
async def create_todo(db: AsyncSession, data: TodoCreate, owner_id: int) -> Todo:
todo = Todo(
title=data.title,
description=data.description,
owner_id=owner_id,
)
db.add(todo)
await db.commit()
await db.refresh(todo) # DB が埋めた id、created_at などを再ロード
return tododb.refresh(obj) が DB が自動生成したカラム を再読み込みします (id、timestamp など)。非同期でよく抜けがちな部分です。
取得 #
from sqlalchemy import select
async def get_todo(db: AsyncSession, todo_id: int) -> Todo | None:
result = await db.execute(
select(Todo).where(Todo.id == todo_id)
)
return result.scalar_one_or_none()
async def list_todos(db: AsyncSession, skip: int, limit: int) -> list[Todo]:
result = await db.execute(
select(Todo).offset(skip).limit(limit).order_by(Todo.created_at.desc())
)
return list(result.scalars())核心パターン:
select(Todo).where(...)— クエリオブジェクト (実行されない)db.execute(...)— 実際の実行.scalar_one_or_none()— 0 行または 1 行、なければ None.scalars()— すべての行を ORM オブジェクトとして
旧 1.x の db.query(Todo).filter(...).first() パターンは 2.x で deprecated です。新しいコードは無条件に select + execute。
更新 #
async def update_todo(db: AsyncSession, todo: Todo, data: TodoUpdate) -> Todo:
update_dict = data.model_dump(exclude_unset=True)
for field, value in update_dict.items():
setattr(todo, field, value)
await db.commit()
await db.refresh(todo)
return todoまたは SQL UPDATE を直接:
from sqlalchemy import update
async def mark_all_done(db: AsyncSession, owner_id: int) -> int:
result = await db.execute(
update(Todo)
.where(Todo.owner_id == owner_id)
.values(done=True)
)
await db.commit()
return result.rowcountORM オブジェクト単位の更新は変更追跡が自動、SQL 直接は大量更新で速い。場面に応じて選択。
削除 #
async def delete_todo(db: AsyncSession, todo: Todo) -> None:
await db.delete(todo)
await db.commit()N+1 問題と即時ロード #
ORM の最もよくある性能の落とし穴。ユーザー 100 人 + それぞれの todos を取得するとき:
users = (await db.execute(select(User))).scalars()
for user in users:
todos = user.todos # ← 毎回追加クエリ100 人 + 100 回のクエリ = 101 回のクエリ。selectinload または joinedload で一度に:
from sqlalchemy.orm import selectinload
result = await db.execute(
select(User).options(selectinload(User.todos))
)
users = result.scalars()
for user in users:
print(user.todos) # 追加クエリなしselectinload— 別クエリで取得 (1:N に向く)joinedload— JOIN で一度に (1:1、N:1 に向く)
第21章 性能 — cProfile、py-spy、メモリプロファイリング の測定の流れがここでも — echo=True で SQL ログを ON にして N+1 パターンが見えたら捕まえてください。
ルートとの結合 #
from fastapi import APIRouter, HTTPException
from app.api.deps import DBSession
from app.schemas.todo import TodoCreate, TodoOut, TodoUpdate
from app.models.todo import Todo
from sqlalchemy import select
router = APIRouter(prefix="/todos", tags=["todos"])
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(payload: TodoCreate, db: DBSession) -> Todo:
todo = Todo(**payload.model_dump(), owner_id=1) # owner_id は第26章で認証
db.add(todo)
await db.commit()
await db.refresh(todo)
return todo
@router.get("/{todo_id}", response_model=TodoOut)
async def read_todo(todo_id: int, db: DBSession) -> Todo:
todo = await db.get(Todo, todo_id)
if todo is None:
raise HTTPException(404, "Todo not found")
return todoresponse_model=TodoOut + model_config = {"from_attributes": True} (第23章 で見たそれ) が SQLAlchemy オブジェクトを Pydantic に自動変換します。
Alembic — スキーママイグレーション #
DB スキーマは時間とともに変わります。カラム追加、インデックス変更、テーブルの新規作成など。その変更を バージョン管理 する道具が Alembic です。
初期化 #
uv run alembic init -t async alembic-t async が非同期テンプレート。ディレクトリ構造:
alembic/
├── env.py # マイグレーション環境
├── script.py.mako # テンプレート
└── versions/ # マイグレーションファイル設定 #
sqlalchemy.url = postgresql+asyncpg://user:pw@localhost/todoまたは環境変数から読むのが安全です。
from app.core.config import settings
from app.models.base import Base
from app.models import todo, user # すべてのモデルを import
config = context.config
config.set_main_option("sqlalchemy.url", settings.database_url)
target_metadata = Base.metadatatarget_metadata = Base.metadata の一行が核心 — Alembic がモデル定義と現在の DB 状態を比較してマイグレーションを自動生成します。
最初のマイグレーション #
uv run alembic revision --autogenerate -m "create todos table"alembic/versions/ に新しいファイルが生成されます。
def upgrade() -> None:
op.create_table(
"todos",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("title", sa.String(200)),
...
)
def downgrade() -> None:
op.drop_table("todos")自動生成されたコードは必ずレビュー してください。autogenerate がすべての変化を正確に拾うわけではありません。特にインデックス、制約、カラム型変更には注意。
適用 #
uv run alembic upgrade head # 最新まで
uv run alembic downgrade -1 # 一段戻す
uv run alembic current # 現在のバージョン
uv run alembic history # すべてのマイグレーションマージ衝突の解決 #
複数の開発者が同時にマイグレーションを作ると分岐が生まれます。alembic merge で合わせます。
uv run alembic merge -m "merge a and b" abc123 def456よく出会う落とし穴 #
1) select の結果を .all() で受け取らない
#
result = await db.execute(select(Todo))
rows = result.all() # 各 row が (Todo,) タプルresult = await db.execute(select(Todo))
todos = list(result.scalars()) # Todo オブジェクトのリスト.scalars() が ORM オブジェクトを直接返してくれます。
2) 非同期セッションの中で同期 lazy load 禁止 #
expire_on_commit=False + selectinload / joinedload で事前ロード。そうしないとルートがレスポンスを作るときに lazy load がトリガーされて MissingGreenlet エラーになります。
3) トランザクションの欠落 #
commit を呼ばないと変更が反映されません。依存性で自動 commit するパターンも可能:
async def get_session() -> AsyncIterator[AsyncSession]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseただし 明示的な commit の方が安全な場面もあります。プロジェクトのポリシーで決めてください。
練習問題 #
- SQLite + aiosqlite で
todo-apiプロジェクトにapp/db/session.py、app/models/{base,todo,user}.pyを書いてください。Mapped[T]+mapped_column(...)スタイルでカラム定義。SQL echo を ON にして最初のselectを実行し、ログを直接確認します。 User↔Todoの 1:N 関係を定義してください。わざと N+1 クエリが出るコードを書いた後、selectinload(User.todos)で修正して SQL ログでクエリ回数の変化を確認します。- Alembic を初期化し (
init -t async)、最初のマイグレーションを自動生成します (revision --autogenerate)。生成されたファイルをレビューした後、upgrade headで適用し、downgrade -1で戻す流れまで直接確認します。
一行まとめ: SQLAlchemy 2.x の
Mapped[T]+mapped_column(...)が標準。非同期はcreate_async_engine+AsyncSession、expire_on_commit=False。Depends依存性でリクエスト単位のセッション。CRUD はselect(...).where(...)+db.execute(...)、.scalars()/.scalar_one_or_none()。関係はrelationship(back_populates=...)、N+1 はselectinload/joinedloadで回避。Pydanticfrom_attributes=Trueで ORM → レスポンスを自動。Alembic でスキーママイグレーションのバージョン管理。
次の章 #
次の 第26章 認証 — OAuth2 パスワードフロー + JWT では本章の上に ユーザーの会員登録 / ログイン を載せます。OAuth2 パスワードフロー、JWT 発行、パスワードハッシング、そして依存性で解く current_user までです。