Modern Python in Practice #4: Authentication — OAuth2 Password Flow + JWT

8 min read

The Todo we wired to a database in #3 now needs to be separated per user. This post covers the standard pattern for sign-up, login, and authenticated requests — OAuth2 password flow + JWT.

Authentication vs authorization #

Terminology first.

  • Authentication — “Who are you?” — login itself
  • Authorization — “Are you allowed to do this?” — permission checks

This post focuses on authentication. Authorization (roles, permissions, etc.) lives in additional middleware, but the patterns are very similar.

Two paths #

Session-basedToken-based (JWT)
StorageServer (DB/Redis)Client
Statestatefulstateless
Distributed systemsNeeds shared storageNaturally fits
RevocationImmediateUntil expiration or via blacklist
Mobile/SPA friendlyDecentGood

For an API server (FastAPI) + SPA/mobile combination, JWT is the standard answer. Session-based fits better in some cases (such as server-rendered web apps), but this series uses JWT.

Never store passwords in plaintext #

Even if the database leaks, passwords must not be exposed. Store them as a one-way hash — at login, hash the input the same way and compare.

Choosing an algorithm #

AlgorithmVerdict
MD5, SHA-1, SHA-256Never (too fast, GPU brute force)
bcryptOK — long-standing standard, secure enough
argon2idCurrently recommended (OWASP)
scryptOK — secure, but argon2 is preferred

For a new project, choose argon2id first. It has simple code and safe defaults.

Installation #

argon2 + JWT
uv add argon2-cffi pyjwt

argon2-cffi is the argon2 implementation; pyjwt is the standard JWT library.

Compatibility note: the passlib you often see in older code has been receiving fewer updates, so new projects increasingly use argon2-cffi or bcrypt directly.

Password hashing module #

app/core/security.py
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError

_hasher = PasswordHasher()    # Initialized with OWASP-recommended defaults

def hash_password(plain: str) -> str:
    return _hasher.hash(plain)

def verify_password(plain: str, hashed: str) -> bool:
    try:
        _hasher.verify(hashed, plain)
        return True
    except VerifyMismatchError:
        return False

def needs_rehash(hashed: str) -> bool:
    return _hasher.check_needs_rehash(hashed)

needs_rehash checks whether the hash parameters have grown weaker over time — a pattern where you check on each login and re-store with updated parameters when needed.

Re-hash on login
if verify_password(plain, user.hashed_password):
    if needs_rehash(user.hashed_password):
        user.hashed_password = hash_password(plain)
        await db.commit()
    return user

JWT — what a token really is #

A JWT (JSON Web Token) is a string with three parts.

JWT shape
xxxxx.yyyyy.zzzzz
header.payload.signature
  • header — algorithm, type
  • payload — claims (sub, exp, iat, etc.)
  • signature — signs the first two with a secret. Tamper-resistant

The key point: anyone can read the payload by base64-decoding it. A JWT is signed, not encrypted — it prevents tampering, not disclosure. Never put sensitive data like passwords in a JWT.

Standard claims #

ClaimMeaning
subsubject — usually the user ID
expexpiration — expiry time (UNIX timestamp)
iatissued at — issuance time
nbfnot before — invalid before this time
jtiJWT ID — for revocation/blacklisting
audaudience — who the token is for
ississuer — who issued it

exp is almost always set. The rest depend on your policy.

Issuing/verifying JWTs #

add to app/core/security.py
from datetime import datetime, timedelta, timezone
import jwt
from app.core.config import settings

ALGORITHM = "HS256"

def create_access_token(subject: str, expires_minutes: int = 60) -> str:
    now = datetime.now(timezone.utc)
    payload = {
        "sub": subject,
        "iat": now,
        "exp": now + timedelta(minutes=expires_minutes),
    }
    return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)

def decode_token(token: str) -> dict:
    return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])

HS256 is symmetric-key signing — sign and verify with the same secret. Fine for a single server. If multiple services need to verify tokens without issuing them, switch to RS256 (asymmetric — RSA public/private key).

Managing the secret #

jwt_secret is read from environment variables via the pydantic-settings setup from #1.

.env
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')

Generate at least 32 bytes (256 bits) of randomness. Copy the output of python -c directly into your environment variable.

Sign-up / login routes #

Pydantic schemas #

app/schemas/user.py
from pydantic import BaseModel, EmailStr, Field

class UserCreate(BaseModel):
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)

class UserOut(BaseModel):
    id: int
    email: EmailStr
    model_config = {"from_attributes": True}

class Token(BaseModel):
    access_token: str
    token_type: str = "bearer"

CRUD #

app/services/user.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.core.security import hash_password, verify_password

async def create_user(db: AsyncSession, email: str, password: str) -> User:
    user = User(email=email, hashed_password=hash_password(password))
    db.add(user)
    await db.commit()
    await db.refresh(user)
    return user

async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
    result = await db.execute(select(User).where(User.email == email))
    return result.scalar_one_or_none()

async def authenticate(db: AsyncSession, email: str, password: str) -> User | None:
    user = await get_user_by_email(db, email)
    if user is None:
        return None
    if not verify_password(password, user.hashed_password):
        return None
    return user

authenticate does not distinguish between a missing user and a wrong password. Both return None — this prevents leaking whether a given email address exists.

Routes #

app/api/auth.py
from fastapi import APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Annotated
from app.api.deps import DBSession
from app.schemas.user import UserCreate, UserOut, Token
from app.services import user as user_service
from app.core.security import create_access_token

router = APIRouter(prefix="/auth", tags=["auth"])

@router.post("/register", response_model=UserOut, status_code=201)
async def register(payload: UserCreate, db: DBSession) -> UserOut:
    existing = await user_service.get_user_by_email(db, payload.email)
    if existing:
        raise HTTPException(409, "Email already registered")
    user = await user_service.create_user(db, payload.email, payload.password)
    return user

@router.post("/login", response_model=Token)
async def login(
    form: Annotated[OAuth2PasswordRequestForm, Depends()],
    db: DBSession,
) -> Token:
    user = await user_service.authenticate(db, form.username, form.password)
    if user is None:
        raise HTTPException(
            status.HTTP_401_UNAUTHORIZED,
            "Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    token = create_access_token(subject=str(user.id))
    return Token(access_token=token)

What OAuth2PasswordRequestForm does #

form: OAuth2PasswordRequestForm = Depends() automatically parses the standard OAuth2 password flow form.

  • It reads username and password from an application/x-www-form-urlencoded body
  • Our domain uses email, but the OAuth2 standard field name is username. Use it as is.
  • Swagger UI’s “Authorize” button gets activated automatically

Building current_user as a dependency #

Pull the token out of the request → verify it → inject the user object into the route.

extend app/api/deps.py
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt import InvalidTokenError
from app.core.security import decode_token
from app.models.user import User
from app.services import user as user_service

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: DBSession,
) -> User:
    credentials_exc = HTTPException(
        status.HTTP_401_UNAUTHORIZED,
        "Invalid token",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = decode_token(token)
        user_id = int(payload["sub"])
    except (InvalidTokenError, KeyError, ValueError):
        raise credentials_exc

    user = await db.get(User, user_id)
    if user is None:
        raise credentials_exc
    return user

CurrentUser = Annotated[User, Depends(get_current_user)]

What OAuth2PasswordBearer does #

  • Extracts the token from the Authorization: Bearer xxxxx header
  • Returns 401 automatically if the header is missing or malformed
  • Powers the Swagger UI “Authorize” button
  • tokenUrl="/auth/login" is the token-issuing endpoint — for docs/UI

Use it in routes #

extend app/api/todos.py
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(
    payload: TodoCreate,
    current_user: CurrentUser,
    db: DBSession,
) -> Todo:
    todo = Todo(
        **payload.model_dump(),
        owner_id=current_user.id,
    )
    db.add(todo)
    await db.commit()
    await db.refresh(todo)
    return todo

@router.get("/", response_model=list[TodoOut])
async def list_my_todos(
    current_user: CurrentUser,
    db: DBSession,
) -> list[Todo]:
    result = await db.execute(
        select(Todo).where(Todo.owner_id == current_user.id)
    )
    return list(result.scalars())

Drop current_user: CurrentUser in and:

  • Missing or invalid token → automatic 401
  • Valid → current_user is the authenticated user object

You wrote zero lines of token-verification code inside the route. Shared logic is cleanly captured in a dependency.

Additional patterns #

Active vs inactive users #

active users only
async def get_current_active_user(current_user: CurrentUser) -> User:
    if not current_user.is_active:
        raise HTTPException(403, "Inactive account")
    return current_user

ActiveUser = Annotated[User, Depends(get_current_active_user)]

A dependency on top of a dependency. Stack them like layers.

Permissions (role-based) #

role check
def require_role(role: str):
    async def checker(user: CurrentUser) -> User:
        if user.role != role:
            raise HTTPException(403, f"{role} permission required")
        return user
    return checker

AdminUser = Annotated[User, Depends(require_role("admin"))]

@router.delete("/admin-only")
async def admin_only(admin: AdminUser): ...

Refresh tokens (optional) #

A pattern with a short-lived access token (15 minutes to 1 hour) and a longer-lived refresh token (7 to 30 days). When the access token expires, the refresh token issues a new one — a balance between security and usability. Refresh tokens are usually stored in the DB so they can be revoked.

This series sticks with a single access token for simplicity. Add refresh tokens for real services.

Common pitfalls #

1) Weak JWT secret #

Bad example
JWT_SECRET=secret
JWT_SECRET=mypassword
Good example
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')

Short secrets are brute-forceable.

2) Sensitive data in the JWT payload #

Bad example
payload = {"sub": "u1", "password": "...", "ssn": "..."}

Anyone can read the JWT payload. Identifiers like user ID only.

3) No HTTPS #

Tokens must only travel over HTTPS. Over plain HTTP, anyone on the network can intercept them. Development on localhost is exempt; production must use HTTPS.

4) Token expiration too long #

Bad example: 7-day access token
expires_minutes=10080

The longer the expiration, the larger the window of damage if a token is stolen. 15 minutes to 1 hour is recommended. For long-lived sessions, use the refresh-token pattern.

5) No password strength check #

Good example: strength check
class UserCreate(BaseModel):
    email: EmailStr
    password: str = Field(min_length=8, max_length=128)

    @field_validator("password")
    @classmethod
    def strong_enough(cls, v: str) -> str:
        if v.lower() in {"password", "12345678", "qwerty"}:
            raise ValueError("Password is too common")
        return v

You can also check leaked passwords against the Have I Been Pwned API.

Recap #

What this post nailed down:

  • Authentication (who) vs authorization (permitted)
  • JWT — stateless, stored on the client, friendly to SPAs/mobile
  • argon2id is today’s standard password hash (OWASP recommended)
  • The argon2-cffi + pyjwt combination
  • Standard JWT claims: sub, exp, iat
  • HS256 symmetric signing; secret is 32+ random bytes
  • OAuth2PasswordRequestForm — standard form parser
  • OAuth2PasswordBearer — extracts the Bearer token
  • A CurrentUser dependency makes route auth a one-liner
  • Active-user and role checks layer dependencies
  • Pitfalls: weak secret, sensitive payload, HTTPS, expiration length, password strength

In the next post (#5 Async and Background Tasks) we cover work that shouldn’t block the response — emails, heavy conversions, external API calls — and the patterns (BackgroundTasks, external queues) that handle them.

X