モダンPython実践 #4 認証 — OAuth2 パスワードフロー + JWT

読了 7分

#3 で DB まで接続した Todo を、これから ユーザー別に 分離する必要があります。会員登録、ログイン、認証済みリクエストを区別する標準パターン — OAuth2 パスワードフロー + JWT を扱います。

認証 vs 認可 #

用語からまず整理。

  • 認証 (Authentication) — 「あなたは誰か?」 — ログイン自体
  • 認可 (Authorization) — 「その人がこの仕事をしてよいか?」 — 権限チェック

今回は認証に集中。認可 (role、permission など) は追加のミドルウェア領域ですが、パターンは非常に似ています。

2 つの分かれ道の流れ #

セッションベーストークンベース (JWT)
保存場所サーバー (DB/Redis)クライアント
状態statefulstateless
分散システム共有ストレージが必要自然
失効即時可能期限まで、またはブラックリスト
モバイル/SPA フレンドリー普通良い

API サーバー (FastAPI) + SPA / モバイルアプリの組み合わせでは JWT が標準的な答えです。セッションベースの方が向く場面 (サーバーレンダリングの Web アプリ) もありますが、このシリーズは JWT で進めます。

パスワードは絶対に平文で保存しない #

データベースが流出してもパスワードが露出してはいけません。一方向のハッシュ で保存し、ログイン時に入力値を同じハッシュにして比較するだけ。

アルゴリズムの選択 #

アルゴリズム評価
MD5、SHA-1、SHA-256❌ 絶対に不可 (速すぎる、GPU で brute force)
bcrypt⭕ 古くからの標準、十分に安全
argon2id現在の推奨標準 (OWASP)
scrypt⭕ 安全だが argon2 の方が推奨される

新しいプロジェクトは argon2id を最初の選択に。シンプルなコードと安全なデフォルト値を持ちます。

インストール #

argon2 + JWT
uv add argon2-cffi pyjwt

argon2-cffi が argon2 の実装、pyjwt が JWT の標準ライブラリ。

互換性メモ: 古いコードでよく見る passlib は徐々にメンテナンスが減っており、新しいプロジェクトでは直接 argon2-cffi または bcrypt を使う流れが一般化しました。

パスワードハッシングモジュール #

app/core/security.py
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError

_hasher = PasswordHasher()    # OWASP 推奨のデフォルト値で初期化

def hash_password(plain: str) -> str:
    return _hasher.hash(plain)

def verify_password(plain: str, hashed: str) -> bool:
    try:
        _hasher.verify(hashed, plain)
        return True
    except VerifyMismatchError:
        return False

def needs_rehash(hashed: str) -> bool:
    return _hasher.check_needs_rehash(hashed)

needs_rehashハッシュパラメータが時間経過で弱くなったときに 再ハッシュするか検査するもの — ユーザーがログインするたびに検査して新しいパラメータで保存し直すパターン。

ログイン時の再ハッシュの流れ
if verify_password(plain, user.hashed_password):
    if needs_rehash(user.hashed_password):
        user.hashed_password = hash_password(plain)
        await db.commit()
    return user

JWT — トークンの正体 #

JWT (JSON Web Token) は 3 つの部分で構成された文字列です。

JWT の形
xxxxx.yyyyy.zzzzz
header.payload.signature
  • header — アルゴリズム、タイプ
  • payload — クレーム (sub、exp、iat など)
  • signature — 上の 2 つをシークレットで署名。改ざん防止

核心: payload は base64 でデコードすれば誰でも読めます。 JWT は 暗号化 ではなく 署名 — 改ざん防止であって秘密保持ではありません。パスワードのような機密情報を JWT に入れてはいけません。

標準クレーム #

クレーム意味
subsubject — 通常はユーザー ID
expexpiration — 失効時刻 (UNIX timestamp)
iatissued at — 発行時刻
nbfnot before — この時刻以前は無効
jtiJWT ID — 失効 / ブラックリスト用
audaudience — トークンの対象者
ississuer — 発行者

exp はほぼ常に。残りはポリシーに応じて。

JWT の発行 / 検証 #

app/core/security.py に追加
from datetime import datetime, timedelta, timezone
import jwt
from app.core.config import settings

ALGORITHM = "HS256"

def create_access_token(subject: str, expires_minutes: int = 60) -> str:
    now = datetime.now(timezone.utc)
    payload = {
        "sub": subject,
        "iat": now,
        "exp": now + timedelta(minutes=expires_minutes),
    }
    return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)

def decode_token(token: str) -> dict:
    return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])

HS256対称鍵署名 — 同じシークレットで署名と検証。単一サーバーで十分です。複数のサーバーが検証だけする必要があるなら RS256 (非対称 — RSA 公開 / 秘密鍵) で。

シークレットの管理 #

jwt_secret#1pydantic-settings で環境変数から読み込みます。

.env
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')

最低 32 バイト (256 ビット) のランダム で生成してください。実際の環境変数には python -c で作ったランダム文字列を直接入れます。

会員登録 / ログインのルート #

Pydantic スキーマ #

app/schemas/user.py
from pydantic import BaseModel, EmailStr, Field

class UserCreate(BaseModel):
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)

class UserOut(BaseModel):
    id: int
    email: EmailStr
    model_config = {"from_attributes": True}

class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"

CRUD #

app/services/user.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.core.security import hash_password, verify_password

async def create_user(db: AsyncSession, email: str, password: str) -> User:
    user = User(email=email, hashed_password=hash_password(password))
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
    result = await db.execute(select(User).where(User.email == email))
    return result.scalar_one_or_none()

async def authenticate(db: AsyncSession, email: str, password: str) -> User | None:
    user = await get_user_by_email(db, email)
    if user is None:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

authenticateユーザーが存在しない場合とパスワードが異なる場合を区別しません。 どちらも None を返すことで、「そのメールアドレスが存在するかどうか」を外部に教えてしまう情報漏洩を防いでいます。

ルート #

app/api/auth.py
from fastapi import APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Annotated
from app.api.deps import DBSession
from app.schemas.user import UserCreate, UserOut, Token
from app.services import user as user_service
from app.core.security import create_access_token

router = APIRouter(prefix="/auth", tags=["auth"])

@router.post("/register", response_model=UserOut, status_code=201)
async def register(payload: UserCreate, db: DBSession) -> UserOut:
    existing = await user_service.get_user_by_email(db, payload.email)
    if existing:
        raise HTTPException(409, "既に登録済みのメール")
    user = await user_service.create_user(db, payload.email, payload.password)
    return user

@router.post("/login", response_model=Token)
async def login(
    form: Annotated[OAuth2PasswordRequestForm, Depends()],
    db: DBSession,
) -> Token:
    user = await user_service.authenticate(db, form.username, form.password)
    if user is None:
        raise HTTPException(
            status.HTTP_401_UNAUTHORIZED,
            "Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    token = create_access_token(subject=str(user.id))
    return Token(access_token=token)

OAuth2PasswordRequestForm の正体 #

form: OAuth2PasswordRequestForm = Depends()標準 OAuth2 パスワードフローのフォーム を自動でパースします。

  • application/x-www-form-urlencoded のボディから usernamepassword を受け取る
  • 私たちのドメインはメールですが、OAuth2 標準のフィールド名は username。そのまま使ってください。
  • Swagger UI で「Authorize」ボタンが自動で有効化

依存性で current_user を作る #

リクエストからトークンを取り出して → 検証 → ユーザーオブジェクトをルートに注入。

app/api/deps.py を拡張
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt import InvalidTokenError
from app.core.security import decode_token
from app.models.user import User
from app.services import user as user_service

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: DBSession,
) -> User:
    credentials_exc = HTTPException(
        status.HTTP_401_UNAUTHORIZED,
        "無効なトークン",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = decode_token(token)
        user_id = int(payload["sub"])
    except (InvalidTokenError, KeyError, ValueError):
        raise credentials_exc

    user = await db.get(User, user_id)
    if user is None:
        raise credentials_exc
    return user

CurrentUser = Annotated[User, Depends(get_current_user)]

OAuth2PasswordBearer がやること #

  • Authorization: Bearer xxxxx ヘッダからトークンを抽出
  • ヘッダがないか形式が違うと自動で 401 レスポンス
  • Swagger UI の「Authorize」ボタンがこれを見て動作
  • tokenUrl="/auth/login" はトークン発行エンドポイント — ドキュメント / UI 用

ルートで使用 #

app/api/todos.py を拡張
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(
    payload: TodoCreate,
    current_user: CurrentUser,
    db: DBSession,
) -> Todo:
    todo = Todo(
        **payload.model_dump(),
        owner_id=current_user.id,
    )
    db.add(todo)
    await db.commit()
    await db.refresh(todo)
    return todo

@router.get("/", response_model=list[TodoOut])
async def list_my_todos(
    current_user: CurrentUser,
    db: DBSession,
) -> list[Todo]:
    result = await db.execute(
        select(Todo).where(Todo.owner_id == current_user.id)
    )
    return list(result.scalars())

current_user: CurrentUser の一行が入ると:

  • トークンがないか無効なら自動で 401
  • 有効なら current_user が認証されたユーザーオブジェクト

ルートの中ではトークン検証コードを 1 行も書いていません。共通ロジックが依存性できれいに 解かれています。

追加パターン #

有効 / 無効ユーザー #

有効ユーザーだけ
async def get_current_active_user(current_user: CurrentUser) -> User:
    if not current_user.is_active:
        raise HTTPException(403, "無効なアカウント")
    return current_user

ActiveUser = Annotated[User, Depends(get_current_active_user)]

依存性の上に依存性。レイヤーのように 積み重ねていきます。

権限 (ロールベース) #

role チェック
def require_role(role: str):
    async def checker(user: CurrentUser) -> User:
        if user.role != role:
            raise HTTPException(403, f"{role} 権限が必要")
        return user
    return checker

AdminUser = Annotated[User, Depends(require_role("admin"))]

@router.delete("/admin-only")
async def admin_only(admin: AdminUser): ...

Refresh トークン (オプション) #

Access トークンを短く (15 分~ 1 時間) + Refresh トークンを長く (7 日~ 30 日) するパターン。Access が失効したら Refresh で新しい Access を発行。セキュリティと使いやすさのバランス。Refresh トークンは DB に保存 して失効可能にしておくのが普通。

このシリーズはシンプルに access トークン 1 つで進めます。実サービスでは追加してください。

よく出会う落とし穴 #

1) JWT シークレットが弱い #

悪い例
JWT_SECRET=secret
JWT_SECRET=mypassword
良い例
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')

短いシークレットは brute force が可能です。

2) JWT payload に機密情報 #

悪い例
payload = {"sub": "u1", "password": "...", "ssn": "..."}

JWT payload は誰でも読めます。ユーザー ID のような識別子だけ

3) HTTPS なし #

トークンは HTTPS でだけ送信。HTTP の上では誰でもトークンを傍受できます。開発は localhost (HTTPS 免除)、プロダクションは無条件に HTTPS。

4) トークンの失効が長すぎる #

悪い例: 7 日間の access トークン
expires_minutes=10080

失効が長いほど奪取時の被害時間が長くなります。15 分~ 1 時間 が推奨。長く使いたければ refresh トークンパターン。

5) パスワード強度の検証なし #

良い例: 強度の検証
class UserCreate(BaseModel):
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)

    @field_validator("password")
    @classmethod
    def strong_enough(cls, v: str) -> str:
        if v.lower() in {"password", "12345678", "qwerty"}:
            raise ValueError("ありふれたパスワード")
        return v

Have I Been Pwned API で漏洩パスワードの検証を追加する場面もあります。

まとめ #

今回つかんだもの:

  • 認証 (誰か) vs 認可 (権限)
  • JWT — stateless、クライアント保管、SPA / モバイルフレンドリー
  • argon2id が現在の標準パスワードハッシング (OWASP 推奨)
  • argon2-cffi + pyjwt の組み合わせ
  • JWT 標準クレーム: subexpiat
  • HS256 対称署名、シークレットは 32 バイト + ランダム
  • OAuth2PasswordRequestForm — 標準フォームの自動パース
  • OAuth2PasswordBearer — Bearer トークンの抽出
  • CurrentUser 依存性でルート認証が一行
  • 有効ユーザー、ロールチェックは依存性をレイヤーで
  • 落とし穴: 弱いシークレット、payload に機密情報、HTTPS、失効時間、パスワード強度

次回(#5 非同期とバックグラウンドタスク)では レスポンスを待たずに処理すべき作業 — メール送信、重い変換、外部 API 呼び出し — を扱うパターン (BackgroundTasks、外部キュー) を扱います。

X