目次
26 章

認証 — OAuth2 パスワードフロー + JWT

パスワードハッシング(argon2/bcrypt)、OAuth2 パスワードフロー、JWT 発行/検証、そして current_user 依存性で認証フローを構成するパターンをまとめます。

第25章 DB 連携 — SQLAlchemy 2.x + Alembic で DB まで接続した Todo が、これからは ユーザー別に 分離される必要があります。会員登録、ログイン、認証されたリクエストを区別する標準パターン — OAuth2 パスワードフロー + JWT を扱います。

本章の認証コードはシークレット / パスワードのような機微情報を扱うので、第31章 logging と観測性 で扱う PII / secret ロギング防止パターンと一緒に運用されます。

認証 vs 認可 #

用語からまず整理。

  • 認証 (Authentication) — 「あなたは誰か?」 — ログイン自体
  • 認可 (Authorization) — 「その人にこれをやる権限があるか?」 — 権限チェック

本章は認証に集中。認可 (role、permission など) は追加のミドルウェア領域ですが、パターンは非常に似ています。

2 つの流れ #

セッション方式トークン方式 (JWT)
保存サーバー (DB / Redis)クライアント
状態statefulstateless
分散システム共有ストレージ必要自然
失効即時可能期限切れまで、またはブラックリスト
モバイル / SPA 親和普通良い

API サーバー (FastAPI) + SPA / モバイルアプリの組み合わせでは JWT が標準解です。セッション方式の方が向く場合もありますが (サーバーレンダリング Web アプリ)、本書は JWT で進めます。

パスワードは絶対に平文で保存しない #

データベースが流出してもパスワードが露出してはなりません。一方向ハッシュ で保存 — ログイン時に入力値を同じハッシュにして比較するだけ。

アルゴリズム選択 #

アルゴリズム評価
MD5、SHA-1、SHA-256❌ 絶対不可 (速すぎる、GPU で brute force)
bcrypt⭕ 旧来の標準、十分安全
argon2id現在の推奨標準 (OWASP)
scrypt⭕ 安全だが argon2 がより推奨

新しいプロジェクトは argon2id を第一選択。シンプルなコードと安全なデフォルト値を持ちます。

インストール #

argon2 + JWT
uv add argon2-cffi pyjwt

argon2-cffi が argon2 の実装、pyjwt が JWT 標準ライブラリ。

互換性メモ: 旧コードでよく見る passlib は徐々にメンテナンスが減っており、新しいプロジェクトは直接 argon2-cffi または bcrypt を使う流れが一般化しました。

パスワードハッシングモジュール #

app/core/security.py
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError

_hasher = PasswordHasher()    # OWASP 推奨デフォルトで初期化

def hash_password(plain: str) -> str:
    return _hasher.hash(plain)

def verify_password(plain: str, hashed: str) -> bool:
    try:
        _hasher.verify(hashed, plain)
        return True
    except VerifyMismatchError:
        return False

def needs_rehash(hashed: str) -> bool:
    return _hasher.check_needs_rehash(hashed)

needs_rehashハッシュパラメータが時間の経過で弱くなったとき に再ハッシュすべきかを検査 — ユーザーがログインするときに検査して新しいパラメータで再保存するパターン。

ログイン時の再ハッシュフロー
if verify_password(plain, user.hashed_password):
    if needs_rehash(user.hashed_password):
        user.hashed_password = hash_password(plain)
        await db.commit()
    return user

JWT — トークンの正体 #

JWT (JSON Web Token) は 3 つの部分で構成された文字列です。

JWT の形
xxxxx.yyyyy.zzzzz
header.payload.signature
  • header — アルゴリズム、タイプ
  • payload — クレーム (sub、exp、iat など)
  • signature — 上の 2 つをシークレットで署名。改ざん防止

核心: payload は base64 デコードすれば誰でも読めます。 JWT は 暗号化 ではなく 署名 — 改ざん防止であって秘密の保持ではありません。パスワードのような機微情報を JWT に入れてはいけません。

標準クレーム #

クレーム意味
subsubject — 通常はユーザー ID
expexpiration — 有効期限 (UNIX timestamp)
iatissued at — 発行時刻
nbfnot before — この時刻より前は無効
jtiJWT ID — 失効 / ブラックリスト用
audaudience — トークンの対象者
ississuer — 発行者

exp はほぼ常に。残りはポリシーによります。

JWT 発行 / 検証 #

app/core/security.py 追加
from datetime import datetime, timedelta, timezone
import jwt
from app.core.config import settings

ALGORITHM = "HS256"

def create_access_token(subject: str, expires_minutes: int = 60) -> str:
    now = datetime.now(timezone.utc)
    payload = {
        "sub": subject,
        "iat": now,
        "exp": now + timedelta(minutes=expires_minutes),
    }
    return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)

def decode_token(token: str) -> dict:
    return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])

HS256対称鍵署名 — 同じシークレットで署名し検証。単一サーバーには十分です。複数のサーバーが検証だけ行うなら RS256 (非対称 — RSA 公開 / 秘密鍵) で。

シークレット管理 #

jwt_secret第22章pydantic-settings で環境変数から読みます。

.env
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')

最低 32 バイト (256 ビット) のランダム で生成してください。実際の環境変数には python -c で作ったランダム文字列を直接入れます。

会員登録 / ログインルート #

Pydantic スキーマ #

app/schemas/user.py
from pydantic import BaseModel, EmailStr, Field

class UserCreate(BaseModel):
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)

class UserOut(BaseModel):
    id: int
    email: EmailStr
    model_config = {"from_attributes": True}

class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"

CRUD #

app/services/user.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.core.security import hash_password, verify_password

async def create_user(db: AsyncSession, email: str, password: str) -> User:
    user = User(email=email, hashed_password=hash_password(password))
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
    result = await db.execute(select(User).where(User.email == email))
    return result.scalar_one_or_none()

async def authenticate(db: AsyncSession, email: str, password: str) -> User | None:
    user = await get_user_by_email(db, email)
    if user is None:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

authenticateユーザーが存在しない場合とパスワードが間違っている場合を区別しません。 両方とも None — 「メールが存在するかどうかを教える」情報漏洩を防ぎます。

ルート #

app/api/auth.py
from fastapi import APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Annotated
from app.api.deps import DBSession
from app.schemas.user import UserCreate, UserOut, Token
from app.services import user as user_service
from app.core.security import create_access_token

router = APIRouter(prefix="/auth", tags=["auth"])

@router.post("/register", response_model=UserOut, status_code=201)
async def register(payload: UserCreate, db: DBSession) -> UserOut:
    existing = await user_service.get_user_by_email(db, payload.email)
    if existing:
        raise HTTPException(409, "既に登録済みのメール")
    user = await user_service.create_user(db, payload.email, payload.password)
    return user

@router.post("/login", response_model=Token)
async def login(
    form: Annotated[OAuth2PasswordRequestForm, Depends()],
    db: DBSession,
) -> Token:
    user = await user_service.authenticate(db, form.username, form.password)
    if user is None:
        raise HTTPException(
            status.HTTP_401_UNAUTHORIZED,
            "Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    token = create_access_token(subject=str(user.id))
    return Token(access_token=token)

OAuth2PasswordRequestForm の正体 #

form: OAuth2PasswordRequestForm = Depends()標準の OAuth2 パスワードフォーム を自動でパースします。

  • application/x-www-form-urlencoded ボディから usernamepassword を受け取る
  • 自分たちのドメインはメールですが、OAuth2 標準のフィールド名は username。そのまま使ってください。
  • Swagger UI で自動的に「Authorize」ボタンが有効化

依存性で current_user を作る #

リクエストからトークンを取り出し → 検証 → ユーザーオブジェクトをルートに注入。

app/api/deps.py 拡張
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt import InvalidTokenError
from app.core.security import decode_token
from app.models.user import User
from app.services import user as user_service

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: DBSession,
) -> User:
    credentials_exc = HTTPException(
        status.HTTP_401_UNAUTHORIZED,
        "有効でないトークン",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = decode_token(token)
        user_id = int(payload["sub"])
    except (InvalidTokenError, KeyError, ValueError):
        raise credentials_exc

    user = await db.get(User, user_id)
    if user is None:
        raise credentials_exc
    return user

CurrentUser = Annotated[User, Depends(get_current_user)]

OAuth2PasswordBearer がやること #

  • Authorization: Bearer xxxxx ヘッダーからトークンを抽出
  • ヘッダーがない、または形式が誤っていれば自動で 401 レスポンス
  • Swagger UI の「Authorize」ボタンがこれを見て動作
  • tokenUrl="/auth/login" はトークン発行エンドポイント — ドキュメント / UI 用

ルートで使用 #

app/api/todos.py 拡張
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(
    payload: TodoCreate,
    current_user: CurrentUser,
    db: DBSession,
) -> Todo:
    todo = Todo(
        **payload.model_dump(),
        owner_id=current_user.id,
    )
    db.add(todo)
    await db.commit()
    await db.refresh(todo)
    return todo

@router.get("/", response_model=list[TodoOut])
async def list_my_todos(
    current_user: CurrentUser,
    db: DBSession,
) -> list[Todo]:
    result = await db.execute(
        select(Todo).where(Todo.owner_id == current_user.id)
    )
    return list(result.scalars())

current_user: CurrentUser の一行が入ると:

  • トークンがない、または無効なら自動で 401
  • 有効なら current_user が認証されたユーザーオブジェクト

ルートの中でトークン検証コードを 1 行も書いていません。共通ロジックは依存性として分離 されています。

追加パターン #

アクティブ / 非アクティブユーザー #

アクティブユーザーのみ
async def get_current_active_user(current_user: CurrentUser) -> User:
    if not current_user.is_active:
        raise HTTPException(403, "非アクティブアカウント")
    return current_user

ActiveUser = Annotated[User, Depends(get_current_active_user)]

依存性の上に依存性。レイヤーのように 積み上げます。

権限 (ロールベース) #

role チェック
def require_role(role: str):
    async def checker(user: CurrentUser) -> User:
        if user.role != role:
            raise HTTPException(403, f"{role} 権限が必要")
        return user
    return checker

AdminUser = Annotated[User, Depends(require_role("admin"))]

@router.delete("/admin-only")
async def admin_only(admin: AdminUser): ...

Refresh トークン (オプション) #

Access トークンを短く (15 分 〜 1 時間) + Refresh トークンを長く (7 日 〜 30 日) 置くパターン。Access が期限切れになったら Refresh で新しい Access を発行。セキュリティとユーザビリティの均衡。Refresh トークンは DB に保存 して失効可能にしておくのが普通。

本書の 4部はシンプルに access トークン 1 つで進めます。実サービスでは追加してください。

よく出会う落とし穴 #

1) JWT シークレットが弱い #

悪い例
JWT_SECRET=secret
JWT_SECRET=mypassword
良い例
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')

短いシークレットは brute force 可能です。

2) JWT payload に機微情報 #

悪い例
payload = {"sub": "u1", "password": "...", "ssn": "..."}

JWT payload は誰でも読めます。ユーザー ID のような識別子だけ

3) HTTPS なし #

トークンは HTTPS でのみ送信。HTTP の上では誰でもトークンを傍受できます。開発は localhost (HTTPS 免除)、プロダクションは無条件に HTTPS。

4) トークンの有効期限が長すぎる #

悪い例: 7 日の access トークン
expires_minutes=10080

期限が長いほど、盗まれた場合の被害時間が長くなります。15 分 〜 1 時間 が推奨。長く使いたければ refresh トークンパターン。

5) パスワード強度の検証なし #

良い例: 強度検証
class UserCreate(BaseModel):
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)

    @field_validator("password")
    @classmethod
    def strong_enough(cls, v: str) -> str:
        if v.lower() in {"password", "12345678", "qwerty"}:
            raise ValueError("ありふれすぎたパスワード")
        return v

Have I Been Pwned API で漏洩パスワード検証を追加する方法もあります。

練習問題 #

  1. argon2-cffihash_password / verify_password / needs_rehash の 3 つの関数を書いてください。同じパスワードを 2 回ハッシュしたときに異なる結果が出るか (salt の自動付与)、verify が両方とも OK かを確認します。
  2. pyjwtcreate_access_token(subject: str) / decode_token(token: str) を書いてください。わざと期限切れのトークン (exp が過去) を作り、decode_tokenExpiredSignatureError を投げるか確認します。
  3. OAuth2PasswordBearer + Depends(get_current_user)current_user 依存性を実装し、GET /me ルートで認証されたユーザー情報を返すようにしてください。Swagger UI の「Authorize」ボタンでログインし、保護されたエンドポイント呼び出しまで直接確認します。

一行まとめ: 認証は誰か / 認可は権限。JWT は stateless、クライアント保管、SPA / モバイル親和。argon2id がパスワードハッシングの標準 (OWASP)。JWT 標準クレームは sub / exp / iat、HS256 対称署名、シークレット 32 バイト以上のランダム。OAuth2PasswordRequestForm フォーム自動パース、OAuth2PasswordBearer ヘッダートークン抽出。CurrentUser 依存性でルート認証を一行に。落とし穴は弱いシークレット / payload の機微情報 / HTTP / 期限の長さ / パスワード強度。

次の章 #

次の 第27章 非同期とバックグラウンドジョブ では レスポンスを待たずに処理すべき作業 — メール送信、重い変換、外部 API 呼び出し — を扱うパターン (BackgroundTasks、外部キュー) を扱います。

X