Contents
26 Chapter

Authentication — OAuth2 password flow + JWT

Password hashing (argon2/bcrypt), the OAuth2 password flow, JWT issuance/verification, and the current_user dependency that wraps it all into a clean auth pattern.

The Todo we connected to a DB in Chapter 25 Connecting a DB — SQLAlchemy 2.x + Alembic now needs to be per user. This chapter covers the standard pattern for sign-up, login, and distinguishing authenticated requests — OAuth2 password flow + JWT.

The auth code in this chapter handles sensitive data like secrets and passwords, so it runs alongside the PII / secret logging-prevention patterns covered in Chapter 31 Logging and observability.

Authentication vs authorization #

Terms first.

  • Authentication — “Who are you?” — login itself
  • Authorization — “Is this person allowed to do this?” — permission check

This chapter focuses on authentication. Authorization (role, permission, etc.) lives in additional middleware, but the patterns are very similar.

Two branches #

Session-basedToken-based (JWT)
StorageServer (DB / Redis)Client
Statestatefulstateless
Distributed systemsNeeds shared storageNatural
RevocationImmediateUntil expiry, or blacklist
Mobile / SPA friendlyOKGood

For an API server (FastAPI) + SPA / mobile app combination, JWT is the standard fit. Session-based auth is better in some cases (server-rendered web apps), but this book uses JWT.

Never store passwords in plaintext #

Even if the database leaks, passwords must not be exposed. Store them as a one-way hash — at login time, hash the input and compare.

Algorithm selection #

AlgorithmVerdict
MD5, SHA-1, SHA-256❌ Never (too fast, GPU-brute-forceable)
bcrypt⭕ Old standard, sufficiently safe
argon2idCurrently recommended (OWASP)
scrypt⭕ Safe, but argon2 is more recommended

New projects pick argon2id first. Simple code and safe defaults.

Installation #

argon2 + JWT
uv add argon2-cffi pyjwt

argon2-cffi is the argon2 implementation, and pyjwt is the standard JWT library.

Compatibility note: passlib, which you often see in old code, is gradually losing maintenance, so new projects commonly use argon2-cffi or bcrypt directly.

Password-hashing module #

app/core/security.py
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError

_hasher = PasswordHasher()    # Initialize with OWASP-recommended defaults

def hash_password(plain: str) -> str:
    return _hasher.hash(plain)

def verify_password(plain: str, hashed: str) -> bool:
    try:
        _hasher.verify(hashed, plain)
        return True
    except VerifyMismatchError:
        return False

def needs_rehash(hashed: str) -> bool:
    return _hasher.check_needs_rehash(hashed)

needs_rehash checks whether the hash’s parameters have weakened over time — a pattern of re-hashing with new parameters when the user logs in.

Re-hash flow on login
if verify_password(plain, user.hashed_password):
    if needs_rehash(user.hashed_password):
        user.hashed_password = hash_password(plain)
        await db.commit()
    return user

JWT — what a token actually is #

A JWT (JSON Web Token) is a string composed of three parts.

JWT shape
xxxxx.yyyyy.zzzzz
header.payload.signature
  • header — algorithm, type
  • payload — claims (sub, exp, iat, etc.)
  • signature — signs the first two with the secret. Tamper-proofing

The key point: anyone can read the payload after base64 decoding. A JWT is signed, not encrypted — it’s about tamper resistance, not secrecy. Never put sensitive information like passwords into a JWT.

Standard claims #

ClaimMeaning
subsubject — usually the user ID
expexpiration — expiry time (UNIX timestamp)
iatissued at — issue time
nbfnot before — invalid before this time
jtiJWT ID — for revocation / blacklisting
audaudience — who the token is for
ississuer — who issued it

exp almost always. The rest by policy.

JWT issue / verify #

Additions to app/core/security.py
from datetime import datetime, timedelta, timezone
import jwt
from app.core.config import settings

ALGORITHM = "HS256"

def create_access_token(subject: str, expires_minutes: int = 60) -> str:
    now = datetime.now(timezone.utc)
    payload = {
        "sub": subject,
        "iat": now,
        "exp": now + timedelta(minutes=expires_minutes),
    }
    return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)

def decode_token(token: str) -> dict:
    return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])

HS256 is symmetric-key signing — sign and verify with the same secret. Fine for a single server. If multiple servers only need to verify, use RS256 (asymmetric — RSA public / private key).

Managing the secret #

jwt_secret is read from environment variables via pydantic-settings from Chapter 22.

.env
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')

Generate at least 32 bytes (256 bits) of randomness. In the actual environment variable, drop in the random string generated by python -c directly.

Sign-up / login routes #

Pydantic schemas #

app/schemas/user.py
from pydantic import BaseModel, EmailStr, Field

class UserCreate(BaseModel):
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)

class UserOut(BaseModel):
    id: int
    email: EmailStr
    model_config = {"from_attributes": True}

class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"

CRUD #

app/services/user.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.core.security import hash_password, verify_password

async def create_user(db: AsyncSession, email: str, password: str) -> User:
    user = User(email=email, hashed_password=hash_password(password))
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
    result = await db.execute(select(User).where(User.email == email))
    return result.scalar_one_or_none()

async def authenticate(db: AsyncSession, email: str, password: str) -> User | None:
    user = await get_user_by_email(db, email)
    if user is None:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

authenticate does not distinguish between user-not-found and wrong-password. Both return None — that prevents leaking the “does this email exist” signal.

Routes #

app/api/auth.py
from fastapi import APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Annotated
from app.api.deps import DBSession
from app.schemas.user import UserCreate, UserOut, Token
from app.services import user as user_service
from app.core.security import create_access_token

router = APIRouter(prefix="/auth", tags=["auth"])

@router.post("/register", response_model=UserOut, status_code=201)
async def register(payload: UserCreate, db: DBSession) -> UserOut:
    existing = await user_service.get_user_by_email(db, payload.email)
    if existing:
        raise HTTPException(409, "Email already registered")
    user = await user_service.create_user(db, payload.email, payload.password)
    return user

@router.post("/login", response_model=Token)
async def login(
    form: Annotated[OAuth2PasswordRequestForm, Depends()],
    db: DBSession,
) -> Token:
    user = await user_service.authenticate(db, form.username, form.password)
    if user is None:
        raise HTTPException(
            status.HTTP_401_UNAUTHORIZED,
            "Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    token = create_access_token(subject=str(user.id))
    return Token(access_token=token)

What OAuth2PasswordRequestForm does #

form: OAuth2PasswordRequestForm = Depends() automatically parses the standard OAuth2 password-flow form.

  • Receives username, password from an application/x-www-form-urlencoded body
  • Our domain field is email, but the OAuth2 standard field name is username. Use it as-is.
  • The “Authorize” button in Swagger UI is enabled automatically

Build current_user with a dependency #

Pull the token out of the request → verify it → inject the user object into the route.

Extend app/api/deps.py
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt import InvalidTokenError
from app.core.security import decode_token
from app.models.user import User
from app.services import user as user_service

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: DBSession,
) -> User:
    credentials_exc = HTTPException(
        status.HTTP_401_UNAUTHORIZED,
        "Invalid token",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = decode_token(token)
        user_id = int(payload["sub"])
    except (InvalidTokenError, KeyError, ValueError):
        raise credentials_exc

    user = await db.get(User, user_id)
    if user is None:
        raise credentials_exc
    return user

CurrentUser = Annotated[User, Depends(get_current_user)]

What OAuth2PasswordBearer does #

  • Extracts the token from the Authorization: Bearer xxxxx header
  • If the header is missing or malformed, returns 401 automatically
  • Swagger UI’s “Authorize” button watches for this and lights up
  • tokenUrl="/auth/login" is the token-issuing endpoint — for docs / UI

Use in a route #

Extend app/api/todos.py
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(
    payload: TodoCreate,
    current_user: CurrentUser,
    db: DBSession,
) -> Todo:
    todo = Todo(
        **payload.model_dump(),
        owner_id=current_user.id,
    )
    db.add(todo)
    await db.commit()
    await db.refresh(todo)
    return todo

@router.get("/", response_model=list[TodoOut])
async def list_my_todos(
    current_user: CurrentUser,
    db: DBSession,
) -> list[Todo]:
    result = await db.execute(
        select(Todo).where(Todo.owner_id == current_user.id)
    )
    return list(result.scalars())

With a single current_user: CurrentUser line:

  • No or invalid token → 401 automatically
  • Valid → current_user is the authenticated user object

Not a single line of token-verification code lives in the route. Shared logic is handled through dependency injection.

Extra patterns #

Active / inactive users #

Active users only
async def get_current_active_user(current_user: CurrentUser) -> User:
    if not current_user.is_active:
        raise HTTPException(403, "Inactive account")
    return current_user

ActiveUser = Annotated[User, Depends(get_current_active_user)]

Dependency on top of dependency. Stacked like layers.

Permissions (role-based) #

role check
def require_role(role: str):
    async def checker(user: CurrentUser) -> User:
        if user.role != role:
            raise HTTPException(403, f"{role} permission required")
        return user
    return checker

AdminUser = Annotated[User, Depends(require_role("admin"))]

@router.delete("/admin-only")
async def admin_only(admin: AdminUser): ...

Refresh tokens (optional) #

A pattern of short-lived access tokens (15 minutes ~ 1 hour) + long-lived refresh tokens (7 ~ 30 days). When the access token expires, the refresh token issues a new access token. Balances security and usability. Refresh tokens are usually stored in the DB so they can be revoked.

Part 4 of this book stays simple with a single access token. In a real service, add the refresh pattern.

Common pitfalls #

1) Weak JWT secret #

Bad
JWT_SECRET=secret
JWT_SECRET=mypassword
Good
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')

Short secrets can be brute-forced.

2) Sensitive data in the JWT payload #

Bad
payload = {"sub": "u1", "password": "...", "ssn": "..."}

Anyone can read a JWT payload. Identifiers like user ID only.

3) No HTTPS #

Tokens travel over HTTPS only. Over HTTP, anyone can intercept the token. Development is fine on localhost (HTTPS-exempt); production is always HTTPS.

4) Token expiry too long #

Bad: 7-day access token
expires_minutes=10080

The longer the expiry, the longer the damage window if it’s stolen. 15 minutes ~ 1 hour is recommended. If you want longer-lived sessions, use the refresh-token pattern.

5) No password-strength validation #

Good: strength check
class UserCreate(BaseModel):
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)

    @field_validator("password")
    @classmethod
    def strong_enough(cls, v: str) -> str:
        if v.lower() in {"password", "12345678", "qwerty"}:
            raise ValueError("Too common a password")
        return v

Adding leak-check via the Have I Been Pwned API is another option.

Exercises #

  1. Write the three functions hash_password / verify_password / needs_rehash using argon2-cffi. Confirm that hashing the same password twice produces different results (auto-salting), and that verify succeeds for both.
  2. With pyjwt, write create_access_token(subject: str) / decode_token(token: str). Deliberately craft an expired token (with exp in the past) and confirm that decode_token raises ExpiredSignatureError.
  3. Build a current_user dependency with OAuth2PasswordBearer + Depends(get_current_user) and have a GET /me route return the authenticated user. Use Swagger UI’s “Authorize” button to log in, then call the protected endpoint yourself.

In one line: Authentication is who, authorization is permission. JWT is stateless, client-stored, friendly to SPAs / mobile. argon2id is the password-hashing standard (OWASP). JWT standard claims are sub / exp / iat, HS256 symmetric signature, secret of 32 bytes+ randomness. OAuth2PasswordRequestForm auto-parses the form, OAuth2PasswordBearer extracts the header token. A CurrentUser dependency authenticates routes in one line. Pitfalls are weak secret / sensitive payload / HTTP / long expiry / no password strength.

Next chapter #

Next, Chapter 27 Async and background jobs covers work that should not block the response — sending email, heavy conversions, calling external APIs — and the patterns for handling it (BackgroundTasks, external queues).

X