モダンPython実践 #4 認証 — OAuth2 パスワードフロー + JWT
#3 で DB まで接続した Todo を、これから ユーザー別に 分離する必要があります。会員登録、ログイン、認証済みリクエストを区別する標準パターン — OAuth2 パスワードフロー + JWT を扱います。
認証 vs 認可 #
用語からまず整理。
- 認証 (Authentication) — 「あなたは誰か?」 — ログイン自体
- 認可 (Authorization) — 「その人がこの仕事をしてよいか?」 — 権限チェック
今回は認証に集中。認可 (role、permission など) は追加のミドルウェア領域ですが、パターンは非常に似ています。
2 つの分かれ道の流れ #
| セッションベース | トークンベース (JWT) | |
|---|---|---|
| 保存場所 | サーバー (DB/Redis) | クライアント |
| 状態 | stateful | stateless |
| 分散システム | 共有ストレージが必要 | 自然 |
| 失効 | 即時可能 | 期限まで、またはブラックリスト |
| モバイル/SPA フレンドリー | 普通 | 良い |
API サーバー (FastAPI) + SPA / モバイルアプリの組み合わせでは JWT が標準的な答えです。セッションベースの方が向く場面 (サーバーレンダリングの Web アプリ) もありますが、このシリーズは JWT で進めます。
パスワードは絶対に平文で保存しない #
データベースが流出してもパスワードが露出してはいけません。一方向のハッシュ で保存し、ログイン時に入力値を同じハッシュにして比較するだけ。
アルゴリズムの選択 #
| アルゴリズム | 評価 |
|---|---|
| MD5、SHA-1、SHA-256 | ❌ 絶対に不可 (速すぎる、GPU で brute force) |
| bcrypt | ⭕ 古くからの標準、十分に安全 |
| argon2id | ✅ 現在の推奨標準 (OWASP) |
| scrypt | ⭕ 安全だが argon2 の方が推奨される |
新しいプロジェクトは argon2id を最初の選択に。シンプルなコードと安全なデフォルト値を持ちます。
インストール #
uv add argon2-cffi pyjwtargon2-cffi が argon2 の実装、pyjwt が JWT の標準ライブラリ。
互換性メモ: 古いコードでよく見る
passlibは徐々にメンテナンスが減っており、新しいプロジェクトでは直接argon2-cffiまたはbcryptを使う流れが一般化しました。
パスワードハッシングモジュール #
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
_hasher = PasswordHasher() # OWASP 推奨のデフォルト値で初期化
def hash_password(plain: str) -> str:
return _hasher.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
try:
_hasher.verify(hashed, plain)
return True
except VerifyMismatchError:
return False
def needs_rehash(hashed: str) -> bool:
return _hasher.check_needs_rehash(hashed)needs_rehash は ハッシュパラメータが時間経過で弱くなったときに 再ハッシュするか検査するもの — ユーザーがログインするたびに検査して新しいパラメータで保存し直すパターン。
if verify_password(plain, user.hashed_password):
if needs_rehash(user.hashed_password):
user.hashed_password = hash_password(plain)
await db.commit()
return userJWT — トークンの正体 #
JWT (JSON Web Token) は 3 つの部分で構成された文字列です。
xxxxx.yyyyy.zzzzz
header.payload.signature- header — アルゴリズム、タイプ
- payload — クレーム (sub、exp、iat など)
- signature — 上の 2 つをシークレットで署名。改ざん防止
核心: payload は base64 でデコードすれば誰でも読めます。 JWT は 暗号化 ではなく 署名 — 改ざん防止であって秘密保持ではありません。パスワードのような機密情報を JWT に入れてはいけません。
標準クレーム #
| クレーム | 意味 |
|---|---|
sub | subject — 通常はユーザー ID |
exp | expiration — 失効時刻 (UNIX timestamp) |
iat | issued at — 発行時刻 |
nbf | not before — この時刻以前は無効 |
jti | JWT ID — 失効 / ブラックリスト用 |
aud | audience — トークンの対象者 |
iss | issuer — 発行者 |
exp はほぼ常に。残りはポリシーに応じて。
JWT の発行 / 検証 #
from datetime import datetime, timedelta, timezone
import jwt
from app.core.config import settings
ALGORITHM = "HS256"
def create_access_token(subject: str, expires_minutes: int = 60) -> str:
now = datetime.now(timezone.utc)
payload = {
"sub": subject,
"iat": now,
"exp": now + timedelta(minutes=expires_minutes),
}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def decode_token(token: str) -> dict:
return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])HS256 は 対称鍵署名 — 同じシークレットで署名と検証。単一サーバーで十分です。複数のサーバーが検証だけする必要があるなら RS256 (非対称 — RSA 公開 / 秘密鍵) で。
シークレットの管理 #
jwt_secret は #1 の pydantic-settings で環境変数から読み込みます。
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')最低 32 バイト (256 ビット) のランダム で生成してください。実際の環境変数には python -c で作ったランダム文字列を直接入れます。
会員登録 / ログインのルート #
Pydantic スキーマ #
from pydantic import BaseModel, EmailStr, Field
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(min_length=8, max_length=128)
class UserOut(BaseModel):
id: int
email: EmailStr
model_config = {"from_attributes": True}
class Token(BaseModel):
access_token: str
token_type: str = "bearer"CRUD #
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.core.security import hash_password, verify_password
async def create_user(db: AsyncSession, email: str, password: str) -> User:
user = User(email=email, hashed_password=hash_password(password))
db.add(user)
await db.commit()
await db.refresh(user)
return user
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def authenticate(db: AsyncSession, email: str, password: str) -> User | None:
user = await get_user_by_email(db, email)
if user is None:
return None
if not verify_password(password, user.hashed_password):
return None
return userauthenticate は ユーザーが存在しない場合とパスワードが異なる場合を区別しません。 どちらも None を返すことで、「そのメールアドレスが存在するかどうか」を外部に教えてしまう情報漏洩を防いでいます。
ルート #
from fastapi import APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Annotated
from app.api.deps import DBSession
from app.schemas.user import UserCreate, UserOut, Token
from app.services import user as user_service
from app.core.security import create_access_token
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/register", response_model=UserOut, status_code=201)
async def register(payload: UserCreate, db: DBSession) -> UserOut:
existing = await user_service.get_user_by_email(db, payload.email)
if existing:
raise HTTPException(409, "既に登録済みのメール")
user = await user_service.create_user(db, payload.email, payload.password)
return user
@router.post("/login", response_model=Token)
async def login(
form: Annotated[OAuth2PasswordRequestForm, Depends()],
db: DBSession,
) -> Token:
user = await user_service.authenticate(db, form.username, form.password)
if user is None:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
"Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
token = create_access_token(subject=str(user.id))
return Token(access_token=token)OAuth2PasswordRequestForm の正体
#
form: OAuth2PasswordRequestForm = Depends() は 標準 OAuth2 パスワードフローのフォーム を自動でパースします。
application/x-www-form-urlencodedのボディからusername、passwordを受け取る- 私たちのドメインはメールですが、OAuth2 標準のフィールド名は
username。そのまま使ってください。 - Swagger UI で「Authorize」ボタンが自動で有効化
依存性で current_user を作る
#
リクエストからトークンを取り出して → 検証 → ユーザーオブジェクトをルートに注入。
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt import InvalidTokenError
from app.core.security import decode_token
from app.models.user import User
from app.services import user as user_service
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: DBSession,
) -> User:
credentials_exc = HTTPException(
status.HTTP_401_UNAUTHORIZED,
"無効なトークン",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(token)
user_id = int(payload["sub"])
except (InvalidTokenError, KeyError, ValueError):
raise credentials_exc
user = await db.get(User, user_id)
if user is None:
raise credentials_exc
return user
CurrentUser = Annotated[User, Depends(get_current_user)]OAuth2PasswordBearer がやること
#
Authorization: Bearer xxxxxヘッダからトークンを抽出- ヘッダがないか形式が違うと自動で 401 レスポンス
- Swagger UI の「Authorize」ボタンがこれを見て動作
tokenUrl="/auth/login"はトークン発行エンドポイント — ドキュメント / UI 用
ルートで使用 #
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(
payload: TodoCreate,
current_user: CurrentUser,
db: DBSession,
) -> Todo:
todo = Todo(
**payload.model_dump(),
owner_id=current_user.id,
)
db.add(todo)
await db.commit()
await db.refresh(todo)
return todo
@router.get("/", response_model=list[TodoOut])
async def list_my_todos(
current_user: CurrentUser,
db: DBSession,
) -> list[Todo]:
result = await db.execute(
select(Todo).where(Todo.owner_id == current_user.id)
)
return list(result.scalars())current_user: CurrentUser の一行が入ると:
- トークンがないか無効なら自動で 401
- 有効なら
current_userが認証されたユーザーオブジェクト
ルートの中ではトークン検証コードを 1 行も書いていません。共通ロジックが依存性できれいに 解かれています。
追加パターン #
有効 / 無効ユーザー #
async def get_current_active_user(current_user: CurrentUser) -> User:
if not current_user.is_active:
raise HTTPException(403, "無効なアカウント")
return current_user
ActiveUser = Annotated[User, Depends(get_current_active_user)]依存性の上に依存性。レイヤーのように 積み重ねていきます。
権限 (ロールベース) #
def require_role(role: str):
async def checker(user: CurrentUser) -> User:
if user.role != role:
raise HTTPException(403, f"{role} 権限が必要")
return user
return checker
AdminUser = Annotated[User, Depends(require_role("admin"))]
@router.delete("/admin-only")
async def admin_only(admin: AdminUser): ...Refresh トークン (オプション) #
Access トークンを短く (15 分~ 1 時間) + Refresh トークンを長く (7 日~ 30 日) するパターン。Access が失効したら Refresh で新しい Access を発行。セキュリティと使いやすさのバランス。Refresh トークンは DB に保存 して失効可能にしておくのが普通。
このシリーズはシンプルに access トークン 1 つで進めます。実サービスでは追加してください。
よく出会う落とし穴 #
1) JWT シークレットが弱い #
JWT_SECRET=secret
JWT_SECRET=mypasswordJWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')短いシークレットは brute force が可能です。
2) JWT payload に機密情報 #
payload = {"sub": "u1", "password": "...", "ssn": "..."}JWT payload は誰でも読めます。ユーザー ID のような識別子だけ。
3) HTTPS なし #
トークンは HTTPS でだけ送信。HTTP の上では誰でもトークンを傍受できます。開発は localhost (HTTPS 免除)、プロダクションは無条件に HTTPS。
4) トークンの失効が長すぎる #
expires_minutes=10080失効が長いほど奪取時の被害時間が長くなります。15 分~ 1 時間 が推奨。長く使いたければ refresh トークンパターン。
5) パスワード強度の検証なし #
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(min_length=8, max_length=128)
@field_validator("password")
@classmethod
def strong_enough(cls, v: str) -> str:
if v.lower() in {"password", "12345678", "qwerty"}:
raise ValueError("ありふれたパスワード")
return vHave I Been Pwned API で漏洩パスワードの検証を追加する場面もあります。
まとめ #
今回つかんだもの:
- 認証 (誰か) vs 認可 (権限)
- JWT — stateless、クライアント保管、SPA / モバイルフレンドリー
- argon2id が現在の標準パスワードハッシング (OWASP 推奨)
argon2-cffi+pyjwtの組み合わせ- JWT 標準クレーム:
sub、exp、iat - HS256 対称署名、シークレットは 32 バイト + ランダム
OAuth2PasswordRequestForm— 標準フォームの自動パースOAuth2PasswordBearer— Bearer トークンの抽出CurrentUser依存性でルート認証が一行- 有効ユーザー、ロールチェックは依存性をレイヤーで
- 落とし穴: 弱いシークレット、payload に機密情報、HTTPS、失効時間、パスワード強度
次回(#5 非同期とバックグラウンドタスク)では レスポンスを待たずに処理すべき作業 — メール送信、重い変換、外部 API 呼び出し — を扱うパターン (BackgroundTasks、外部キュー) を扱います。