認証 — OAuth2 パスワードフロー + JWT
パスワードハッシング(argon2/bcrypt)、OAuth2 パスワードフロー、JWT 発行/検証、そして current_user 依存性で認証フローを構成するパターンをまとめます。
第25章 DB 連携 — SQLAlchemy 2.x + Alembic で DB まで接続した Todo が、これからは ユーザー別に 分離される必要があります。会員登録、ログイン、認証されたリクエストを区別する標準パターン — OAuth2 パスワードフロー + JWT を扱います。
本章の認証コードはシークレット / パスワードのような機微情報を扱うので、第31章 logging と観測性 で扱う PII / secret ロギング防止パターンと一緒に運用されます。
認証 vs 認可 #
用語からまず整理。
- 認証 (Authentication) — 「あなたは誰か?」 — ログイン自体
- 認可 (Authorization) — 「その人にこれをやる権限があるか?」 — 権限チェック
本章は認証に集中。認可 (role、permission など) は追加のミドルウェア領域ですが、パターンは非常に似ています。
2 つの流れ #
| セッション方式 | トークン方式 (JWT) | |
|---|---|---|
| 保存 | サーバー (DB / Redis) | クライアント |
| 状態 | stateful | stateless |
| 分散システム | 共有ストレージ必要 | 自然 |
| 失効 | 即時可能 | 期限切れまで、またはブラックリスト |
| モバイル / SPA 親和 | 普通 | 良い |
API サーバー (FastAPI) + SPA / モバイルアプリの組み合わせでは JWT が標準解です。セッション方式の方が向く場合もありますが (サーバーレンダリング Web アプリ)、本書は JWT で進めます。
パスワードは絶対に平文で保存しない #
データベースが流出してもパスワードが露出してはなりません。一方向ハッシュ で保存 — ログイン時に入力値を同じハッシュにして比較するだけ。
アルゴリズム選択 #
| アルゴリズム | 評価 |
|---|---|
| MD5、SHA-1、SHA-256 | ❌ 絶対不可 (速すぎる、GPU で brute force) |
| bcrypt | ⭕ 旧来の標準、十分安全 |
| argon2id | ✅ 現在の推奨標準 (OWASP) |
| scrypt | ⭕ 安全だが argon2 がより推奨 |
新しいプロジェクトは argon2id を第一選択。シンプルなコードと安全なデフォルト値を持ちます。
インストール #
uv add argon2-cffi pyjwtargon2-cffi が argon2 の実装、pyjwt が JWT 標準ライブラリ。
互換性メモ: 旧コードでよく見る
passlibは徐々にメンテナンスが減っており、新しいプロジェクトは直接argon2-cffiまたはbcryptを使う流れが一般化しました。
パスワードハッシングモジュール #
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
_hasher = PasswordHasher() # OWASP 推奨デフォルトで初期化
def hash_password(plain: str) -> str:
return _hasher.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
try:
_hasher.verify(hashed, plain)
return True
except VerifyMismatchError:
return False
def needs_rehash(hashed: str) -> bool:
return _hasher.check_needs_rehash(hashed)needs_rehash は ハッシュパラメータが時間の経過で弱くなったとき に再ハッシュすべきかを検査 — ユーザーがログインするときに検査して新しいパラメータで再保存するパターン。
if verify_password(plain, user.hashed_password):
if needs_rehash(user.hashed_password):
user.hashed_password = hash_password(plain)
await db.commit()
return userJWT — トークンの正体 #
JWT (JSON Web Token) は 3 つの部分で構成された文字列です。
xxxxx.yyyyy.zzzzz
header.payload.signature- header — アルゴリズム、タイプ
- payload — クレーム (sub、exp、iat など)
- signature — 上の 2 つをシークレットで署名。改ざん防止
核心: payload は base64 デコードすれば誰でも読めます。 JWT は 暗号化 ではなく 署名 — 改ざん防止であって秘密の保持ではありません。パスワードのような機微情報を JWT に入れてはいけません。
標準クレーム #
| クレーム | 意味 |
|---|---|
sub | subject — 通常はユーザー ID |
exp | expiration — 有効期限 (UNIX timestamp) |
iat | issued at — 発行時刻 |
nbf | not before — この時刻より前は無効 |
jti | JWT ID — 失効 / ブラックリスト用 |
aud | audience — トークンの対象者 |
iss | issuer — 発行者 |
exp はほぼ常に。残りはポリシーによります。
JWT 発行 / 検証 #
from datetime import datetime, timedelta, timezone
import jwt
from app.core.config import settings
ALGORITHM = "HS256"
def create_access_token(subject: str, expires_minutes: int = 60) -> str:
now = datetime.now(timezone.utc)
payload = {
"sub": subject,
"iat": now,
"exp": now + timedelta(minutes=expires_minutes),
}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def decode_token(token: str) -> dict:
return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])HS256 は 対称鍵署名 — 同じシークレットで署名し検証。単一サーバーには十分です。複数のサーバーが検証だけ行うなら RS256 (非対称 — RSA 公開 / 秘密鍵) で。
シークレット管理 #
jwt_secret は 第22章 の pydantic-settings で環境変数から読みます。
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')最低 32 バイト (256 ビット) のランダム で生成してください。実際の環境変数には python -c で作ったランダム文字列を直接入れます。
会員登録 / ログインルート #
Pydantic スキーマ #
from pydantic import BaseModel, EmailStr, Field
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(min_length=8, max_length=128)
class UserOut(BaseModel):
id: int
email: EmailStr
model_config = {"from_attributes": True}
class Token(BaseModel):
access_token: str
token_type: str = "bearer"CRUD #
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.core.security import hash_password, verify_password
async def create_user(db: AsyncSession, email: str, password: str) -> User:
user = User(email=email, hashed_password=hash_password(password))
db.add(user)
await db.commit()
await db.refresh(user)
return user
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def authenticate(db: AsyncSession, email: str, password: str) -> User | None:
user = await get_user_by_email(db, email)
if user is None:
return None
if not verify_password(password, user.hashed_password):
return None
return userauthenticate は ユーザーが存在しない場合とパスワードが間違っている場合を区別しません。 両方とも None — 「メールが存在するかどうかを教える」情報漏洩を防ぎます。
ルート #
from fastapi import APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Annotated
from app.api.deps import DBSession
from app.schemas.user import UserCreate, UserOut, Token
from app.services import user as user_service
from app.core.security import create_access_token
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/register", response_model=UserOut, status_code=201)
async def register(payload: UserCreate, db: DBSession) -> UserOut:
existing = await user_service.get_user_by_email(db, payload.email)
if existing:
raise HTTPException(409, "既に登録済みのメール")
user = await user_service.create_user(db, payload.email, payload.password)
return user
@router.post("/login", response_model=Token)
async def login(
form: Annotated[OAuth2PasswordRequestForm, Depends()],
db: DBSession,
) -> Token:
user = await user_service.authenticate(db, form.username, form.password)
if user is None:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
"Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
token = create_access_token(subject=str(user.id))
return Token(access_token=token)OAuth2PasswordRequestForm の正体
#
form: OAuth2PasswordRequestForm = Depends() は 標準の OAuth2 パスワードフォーム を自動でパースします。
application/x-www-form-urlencodedボディからusername、passwordを受け取る- 自分たちのドメインはメールですが、OAuth2 標準のフィールド名は
username。そのまま使ってください。 - Swagger UI で自動的に「Authorize」ボタンが有効化
依存性で current_user を作る
#
リクエストからトークンを取り出し → 検証 → ユーザーオブジェクトをルートに注入。
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt import InvalidTokenError
from app.core.security import decode_token
from app.models.user import User
from app.services import user as user_service
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: DBSession,
) -> User:
credentials_exc = HTTPException(
status.HTTP_401_UNAUTHORIZED,
"有効でないトークン",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(token)
user_id = int(payload["sub"])
except (InvalidTokenError, KeyError, ValueError):
raise credentials_exc
user = await db.get(User, user_id)
if user is None:
raise credentials_exc
return user
CurrentUser = Annotated[User, Depends(get_current_user)]OAuth2PasswordBearer がやること
#
Authorization: Bearer xxxxxヘッダーからトークンを抽出- ヘッダーがない、または形式が誤っていれば自動で 401 レスポンス
- Swagger UI の「Authorize」ボタンがこれを見て動作
tokenUrl="/auth/login"はトークン発行エンドポイント — ドキュメント / UI 用
ルートで使用 #
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(
payload: TodoCreate,
current_user: CurrentUser,
db: DBSession,
) -> Todo:
todo = Todo(
**payload.model_dump(),
owner_id=current_user.id,
)
db.add(todo)
await db.commit()
await db.refresh(todo)
return todo
@router.get("/", response_model=list[TodoOut])
async def list_my_todos(
current_user: CurrentUser,
db: DBSession,
) -> list[Todo]:
result = await db.execute(
select(Todo).where(Todo.owner_id == current_user.id)
)
return list(result.scalars())current_user: CurrentUser の一行が入ると:
- トークンがない、または無効なら自動で 401
- 有効なら
current_userが認証されたユーザーオブジェクト
ルートの中でトークン検証コードを 1 行も書いていません。共通ロジックは依存性として分離 されています。
追加パターン #
アクティブ / 非アクティブユーザー #
async def get_current_active_user(current_user: CurrentUser) -> User:
if not current_user.is_active:
raise HTTPException(403, "非アクティブアカウント")
return current_user
ActiveUser = Annotated[User, Depends(get_current_active_user)]依存性の上に依存性。レイヤーのように 積み上げます。
権限 (ロールベース) #
def require_role(role: str):
async def checker(user: CurrentUser) -> User:
if user.role != role:
raise HTTPException(403, f"{role} 権限が必要")
return user
return checker
AdminUser = Annotated[User, Depends(require_role("admin"))]
@router.delete("/admin-only")
async def admin_only(admin: AdminUser): ...Refresh トークン (オプション) #
Access トークンを短く (15 分 〜 1 時間) + Refresh トークンを長く (7 日 〜 30 日) 置くパターン。Access が期限切れになったら Refresh で新しい Access を発行。セキュリティとユーザビリティの均衡。Refresh トークンは DB に保存 して失効可能にしておくのが普通。
本書の 4部はシンプルに access トークン 1 つで進めます。実サービスでは追加してください。
よく出会う落とし穴 #
1) JWT シークレットが弱い #
JWT_SECRET=secret
JWT_SECRET=mypasswordJWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')短いシークレットは brute force 可能です。
2) JWT payload に機微情報 #
payload = {"sub": "u1", "password": "...", "ssn": "..."}JWT payload は誰でも読めます。ユーザー ID のような識別子だけ。
3) HTTPS なし #
トークンは HTTPS でのみ送信。HTTP の上では誰でもトークンを傍受できます。開発は localhost (HTTPS 免除)、プロダクションは無条件に HTTPS。
4) トークンの有効期限が長すぎる #
expires_minutes=10080期限が長いほど、盗まれた場合の被害時間が長くなります。15 分 〜 1 時間 が推奨。長く使いたければ refresh トークンパターン。
5) パスワード強度の検証なし #
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(min_length=8, max_length=128)
@field_validator("password")
@classmethod
def strong_enough(cls, v: str) -> str:
if v.lower() in {"password", "12345678", "qwerty"}:
raise ValueError("ありふれすぎたパスワード")
return vHave I Been Pwned API で漏洩パスワード検証を追加する方法もあります。
練習問題 #
argon2-cffiでhash_password/verify_password/needs_rehashの 3 つの関数を書いてください。同じパスワードを 2 回ハッシュしたときに異なる結果が出るか (salt の自動付与)、verifyが両方とも OK かを確認します。pyjwtでcreate_access_token(subject: str)/decode_token(token: str)を書いてください。わざと期限切れのトークン (expが過去) を作り、decode_tokenがExpiredSignatureErrorを投げるか確認します。OAuth2PasswordBearer+Depends(get_current_user)でcurrent_user依存性を実装し、GET /meルートで認証されたユーザー情報を返すようにしてください。Swagger UI の「Authorize」ボタンでログインし、保護されたエンドポイント呼び出しまで直接確認します。
一行まとめ: 認証は誰か / 認可は権限。JWT は stateless、クライアント保管、SPA / モバイル親和。argon2id がパスワードハッシングの標準 (OWASP)。JWT 標準クレームは
sub/exp/iat、HS256 対称署名、シークレット 32 バイト以上のランダム。OAuth2PasswordRequestFormフォーム自動パース、OAuth2PasswordBearerヘッダートークン抽出。CurrentUser依存性でルート認証を一行に。落とし穴は弱いシークレット / payload の機微情報 / HTTP / 期限の長さ / パスワード強度。
次の章 #
次の 第27章 非同期とバックグラウンドジョブ では レスポンスを待たずに処理すべき作業 — メール送信、重い変換、外部 API 呼び出し — を扱うパターン (BackgroundTasks、外部キュー) を扱います。