モダンPython実践 #3 DB 連携 — SQLAlchemy 2.x + Alembic
#2 のインメモリ dict を、今回 本物の DB に置き換え ます。SQLAlchemy 2.x の新スタイル、async セッション、FastAPI の依存性注入との自然な結合、そしてスキーマ変更を安全に転がす Alembic のマイグレーションまで。
なぜ SQLAlchemy 2.x なのか #
Python の ORM はいくつもありますが (SQLAlchemy、Tortoise、SQLModel、Peewee、Django ORM)、モダンな非同期 + 型フレンドリー の観点で最も標準のポジションにあるのが SQLAlchemy 2.x です。
2.x は 1.x と同じライブラリですが、使用スタイルがほぼ別ライブラリレベルで変わりました。
| 1.x スタイル | 2.x スタイル |
|---|---|
Column(String, ...) | Mapped[str] = mapped_column(...) |
session.query(User).filter(...) | session.execute(select(User).where(...)) |
| 同期だけ (3rd party 非同期) | ビルトイン async |
| 型ヒントは補助的 | 型ヒントが核心 |
新しいコードでは 必ず 2.x スタイル を使います。
SQLModel は? #
FastAPI を作った人 が作ったライブラリで、SQLAlchemy + Pydantic 統合 を試みたものです。短いコードに有利ですが、まだ SQLAlchemy 2.x のすべての機能を露出していないので、複雑なクエリで窮屈な境界 が来ます。新しいプロジェクトは SQLAlchemy 2.x を直接 + Pydantic 別 を推奨します。
インストール #
uv add sqlalchemy "sqlalchemy[asyncio]" asyncpg
uv add --dev alembic- sqlalchemy — ORM 本体
- asyncpg — PostgreSQL 非同期ドライバ (async 使用時)
- alembic — マイグレーションツール
SQLite で始めるなら aiosqlite (PostgreSQL の代わりに)。
uv add aiosqliteEngine とセッション #
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine, AsyncSession
from app.core.config import settings
engine = create_async_engine(
settings.database_url,
echo=False, # 開発中は True にして SQL ログ
pool_pre_ping=True, # 切れた接続を自動回復
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # コミット後もオブジェクト使用可能
)expire_on_commit=False は非同期環境でほぼ必須。True だとコミット後の属性アクセスが lazy load をトリガーしますが、async 関数の外では危険です。
依存性でセッションを注入 #
from typing import Annotated, AsyncIterator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import AsyncSessionLocal
async def get_session() -> AsyncIterator[AsyncSession]:
async with AsyncSessionLocal() as session:
yield session
DBSession = Annotated[AsyncSession, Depends(get_session)]これが #2 で見た Depends パターンの最も強力な活用。ルートで:
@router.get("/{todo_id}")
async def read_todo(todo_id: int, db: DBSession) -> TodoOut:
...DBSession の一行が入ると リクエスト単位でセッションが作られ、ルートが終わると自動でクローズ。トランザクション管理も同じところで。
モデル定義 — 2.x スタイル #
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
passfrom datetime import datetime
from sqlalchemy import String, Boolean, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column
from app.models.base import Base
class Todo(Base):
__tablename__ = "todos"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
description: Mapped[str | None]
done: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)中級 #2 の Mapped[T] がまさにここで使われます。普通の型ヒントのように見えますが、SQL カラム定義 が一緒に行われます。
読み方:
Mapped[int]— カラムの Python 型mapped_column(...)— DB 側の追加オプション (制約、デフォルト値など)Mapped[str | None]— NULL 許可server_default=func.now()— DB が自動で埋める
関係 — 1:N #
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(String(200), unique=True, index=True)
hashed_password: Mapped[str]
todos: Mapped[list["Todo"]] = relationship(back_populates="owner")
class Todo(Base):
__tablename__ = "todos"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(200))
owner_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
owner: Mapped["User"] = relationship(back_populates="todos")relationship(back_populates=...) が双方向の接続を作ります。user.todos、todo.owner の両方が自然に。
CRUD — 2.x スタイル #
作成 #
from sqlalchemy.ext.asyncio import AsyncSession
async def create_todo(db: AsyncSession, data: TodoCreate, owner_id: int) -> Todo:
todo = Todo(
title=data.title,
description=data.description,
owner_id=owner_id,
)
db.add(todo)
await db.commit()
await db.refresh(todo) # DB が埋めた id、created_at などを再ロード
return tododb.refresh(obj) が DB が自動生成したカラム を再読み込みします (id、timestamp など)。非同期でよく抜けがちな部分です。
取得 #
from sqlalchemy import select
async def get_todo(db: AsyncSession, todo_id: int) -> Todo | None:
result = await db.execute(
select(Todo).where(Todo.id == todo_id)
)
return result.scalar_one_or_none()
async def list_todos(db: AsyncSession, skip: int, limit: int) -> list[Todo]:
result = await db.execute(
select(Todo).offset(skip).limit(limit).order_by(Todo.created_at.desc())
)
return list(result.scalars())核心パターン:
select(Todo).where(...)— クエリオブジェクト (実行されない)db.execute(...)— 実際の実行.scalar_one_or_none()— 0 行または 1 行、なければ None.scalars()— すべての行を ORM オブジェクトとして
旧 1.x の db.query(Todo).filter(...).first() パターンは 2.x で deprecated です。新しいコードでは必ず select + execute を使います。
更新 #
async def update_todo(db: AsyncSession, todo: Todo, data: TodoUpdate) -> Todo:
update_dict = data.model_dump(exclude_unset=True)
for field, value in update_dict.items():
setattr(todo, field, value)
await db.commit()
await db.refresh(todo)
return todoまたは SQL UPDATE を直接:
from sqlalchemy import update
async def mark_all_done(db: AsyncSession, owner_id: int) -> int:
result = await db.execute(
update(Todo)
.where(Todo.owner_id == owner_id)
.values(done=True)
)
await db.commit()
return result.rowcountORM オブジェクト単位の更新は変更追跡が自動、SQL 直接は大量更新で速い。場面に応じて選択。
削除 #
async def delete_todo(db: AsyncSession, todo: Todo) -> None:
await db.delete(todo)
await db.commit()N+1 問題と即時ロード #
ORM の最もよくある性能の落とし穴。ユーザー 100 人 + それぞれの todos を取得するとき:
users = (await db.execute(select(User))).scalars()
for user in users:
todos = user.todos # ← 毎回追加クエリ100 人 + 100 回のクエリ = 101 回のクエリ。selectinload または joinedload で一度に:
from sqlalchemy.orm import selectinload
result = await db.execute(
select(User).options(selectinload(User.todos))
)
users = result.scalars()
for user in users:
print(user.todos) # 追加クエリなしselectinload— 別クエリで取得 (1:N に向く)joinedload— JOIN で一度に (1:1、N:1 に向く)
上級 #7 の測定の流れがここでも — echo=True で SQL ログを ON にして N+1 パターンが見えたら捕まえてください。
ルートとの結合 #
from fastapi import APIRouter, HTTPException
from app.api.deps import DBSession
from app.schemas.todo import TodoCreate, TodoOut, TodoUpdate
from app.models.todo import Todo
from sqlalchemy import select
router = APIRouter(prefix="/todos", tags=["todos"])
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(payload: TodoCreate, db: DBSession) -> Todo:
todo = Todo(**payload.model_dump(), owner_id=1) # owner_id は #4 で認証
db.add(todo)
await db.commit()
await db.refresh(todo)
return todo
@router.get("/{todo_id}", response_model=TodoOut)
async def read_todo(todo_id: int, db: DBSession) -> Todo:
todo = await db.get(Todo, todo_id)
if todo is None:
raise HTTPException(404, "Todo not found")
return todoresponse_model=TodoOut + model_config = {"from_attributes": True} (#2 で見たそれ) が SQLAlchemy オブジェクトを Pydantic に自動変換します。
Alembic — スキーママイグレーション #
DB スキーマは時間とともに変わります。カラム追加、インデックス変更、テーブルの新規作成など。その変更を バージョン管理 する道具が Alembic です。
初期化 #
uv run alembic init -t async alembic-t async が非同期テンプレート。ディレクトリ構造:
alembic/
├── env.py # マイグレーション環境
├── script.py.mako # テンプレート
└── versions/ # マイグレーションファイル設定 #
sqlalchemy.url = postgresql+asyncpg://user:pw@localhost/todoまたは環境変数から読むのが安全です。
from app.core.config import settings
from app.models.base import Base
from app.models import todo, user # すべてのモデルを import
config = context.config
config.set_main_option("sqlalchemy.url", settings.database_url)
target_metadata = Base.metadatatarget_metadata = Base.metadata の一行が核心 — Alembic がモデル定義と現在の DB 状態を比較してマイグレーションを自動生成します。
最初のマイグレーション #
uv run alembic revision --autogenerate -m "create todos table"alembic/versions/ に新しいファイルが生成されます。
def upgrade() -> None:
op.create_table(
"todos",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("title", sa.String(200)),
...
)
def downgrade() -> None:
op.drop_table("todos")自動生成されたコードは必ずレビュー してください。autogenerate がすべての変化を正確に拾うわけではありません。特にインデックス、制約、カラム型変更には注意。
適用 #
uv run alembic upgrade head # 最新まで
uv run alembic downgrade -1 # 一段戻す
uv run alembic current # 現在のバージョン
uv run alembic history # すべてのマイグレーションマージ衝突の解決 #
複数の開発者が同時にマイグレーションを作ると分岐が生まれます。alembic merge で合わせます。
uv run alembic merge -m "merge a and b" abc123 def456よく出会う落とし穴 #
1) select の結果を .all() で受け取らない
#
result = await db.execute(select(Todo))
rows = result.all() # 各 row が (Todo,) タプルresult = await db.execute(select(Todo))
todos = list(result.scalars()) # Todo オブジェクトのリスト.scalars() が ORM オブジェクトを直接返してくれます。
2) 非同期セッションの中で同期 lazy load 禁止 #
expire_on_commit=False + selectinload/joinedload で事前ロード。そうしないとルートがレスポンスを作るときに lazy load がトリガーされて MissingGreenlet エラーになります。
3) トランザクションの欠落 #
commit を呼ばないと変更が反映されません。依存性で自動 commit するパターンも可能:
async def get_session() -> AsyncIterator[AsyncSession]:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raiseただし 明示的な commit の方が安全な場面もあります。プロジェクトのポリシーで決めてください。
まとめ #
今回つかんだもの:
- SQLAlchemy 2.x —
Mapped[T]+mapped_column(...)の新スタイル - 非同期 —
create_async_engine、AsyncSession、await db.execute(...) - FastAPI の依存性注入 —
get_sessionでリクエスト単位のセッション - CRUD —
select(...).where(...)、db.add、db.commit、db.refresh、db.delete - 関係 —
relationship+back_populates、selectinload/joinedloadで N+1 回避 - Pydantic
from_attributes=Trueで ORM → レスポンスを自動 - Alembic —
init -t async、revision --autogenerate、upgrade head - 落とし穴:
.scalars()、expire_on_commit、lazy load、明示的 commit
次回(#4 認証 — OAuth2 + JWT)ではこの上に ユーザーの会員登録 / ログイン を載せます。OAuth2 パスワードフロー、JWT 発行、パスワードハッシング、そして依存性で解く current_user まで。