Modern Python in Practice #4: Authentication — OAuth2 Password Flow + JWT
The Todo we wired to a database in #3 now needs to be separated per user. This post covers the standard pattern for sign-up, login, and authenticated requests — OAuth2 password flow + JWT.
Authentication vs authorization #
Terminology first.
- Authentication — “Who are you?” — login itself
- Authorization — “Are you allowed to do this?” — permission checks
This post focuses on authentication. Authorization (roles, permissions, etc.) lives in additional middleware, but the patterns are very similar.
Two paths #
| Session-based | Token-based (JWT) | |
|---|---|---|
| Storage | Server (DB/Redis) | Client |
| State | stateful | stateless |
| Distributed systems | Needs shared storage | Naturally fits |
| Revocation | Immediate | Until expiration or via blacklist |
| Mobile/SPA friendly | Decent | Good |
For an API server (FastAPI) + SPA/mobile combination, JWT is the standard answer. Session-based fits better in some cases (such as server-rendered web apps), but this series uses JWT.
Never store passwords in plaintext #
Even if the database leaks, passwords must not be exposed. Store them as a one-way hash — at login, hash the input the same way and compare.
Choosing an algorithm #
| Algorithm | Verdict |
|---|---|
| MD5, SHA-1, SHA-256 | Never (too fast, GPU brute force) |
| bcrypt | OK — long-standing standard, secure enough |
| argon2id | Currently recommended (OWASP) |
| scrypt | OK — secure, but argon2 is preferred |
For a new project, choose argon2id first. It has simple code and safe defaults.
Installation #
uv add argon2-cffi pyjwtargon2-cffi is the argon2 implementation; pyjwt is the standard JWT library.
Compatibility note: the
passlibyou often see in older code has been receiving fewer updates, so new projects increasingly useargon2-cffiorbcryptdirectly.
Password hashing module #
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
_hasher = PasswordHasher() # Initialized with OWASP-recommended defaults
def hash_password(plain: str) -> str:
return _hasher.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
try:
_hasher.verify(hashed, plain)
return True
except VerifyMismatchError:
return False
def needs_rehash(hashed: str) -> bool:
return _hasher.check_needs_rehash(hashed)needs_rehash checks whether the hash parameters have grown weaker over time — a pattern where you check on each login and re-store with updated parameters when needed.
if verify_password(plain, user.hashed_password):
if needs_rehash(user.hashed_password):
user.hashed_password = hash_password(plain)
await db.commit()
return userJWT — what a token really is #
A JWT (JSON Web Token) is a string with three parts.
xxxxx.yyyyy.zzzzz
header.payload.signature- header — algorithm, type
- payload — claims (sub, exp, iat, etc.)
- signature — signs the first two with a secret. Tamper-resistant
The key point: anyone can read the payload by base64-decoding it. A JWT is signed, not encrypted — it prevents tampering, not disclosure. Never put sensitive data like passwords in a JWT.
Standard claims #
| Claim | Meaning |
|---|---|
sub | subject — usually the user ID |
exp | expiration — expiry time (UNIX timestamp) |
iat | issued at — issuance time |
nbf | not before — invalid before this time |
jti | JWT ID — for revocation/blacklisting |
aud | audience — who the token is for |
iss | issuer — who issued it |
exp is almost always set. The rest depend on your policy.
Issuing/verifying JWTs #
from datetime import datetime, timedelta, timezone
import jwt
from app.core.config import settings
ALGORITHM = "HS256"
def create_access_token(subject: str, expires_minutes: int = 60) -> str:
now = datetime.now(timezone.utc)
payload = {
"sub": subject,
"iat": now,
"exp": now + timedelta(minutes=expires_minutes),
}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def decode_token(token: str) -> dict:
return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])HS256 is symmetric-key signing — sign and verify with the same secret. Fine for a single server. If multiple services need to verify tokens without issuing them, switch to RS256 (asymmetric — RSA public/private key).
Managing the secret #
jwt_secret is read from environment variables via the pydantic-settings setup from #1.
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')Generate at least 32 bytes (256 bits) of randomness. Copy the output of python -c directly into your environment variable.
Sign-up / login routes #
Pydantic schemas #
from pydantic import BaseModel, EmailStr, Field
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(min_length=8, max_length=128)
class UserOut(BaseModel):
id: int
email: EmailStr
model_config = {"from_attributes": True}
class Token(BaseModel):
access_token: str
token_type: str = "bearer"CRUD #
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.core.security import hash_password, verify_password
async def create_user(db: AsyncSession, email: str, password: str) -> User:
user = User(email=email, hashed_password=hash_password(password))
db.add(user)
await db.commit()
await db.refresh(user)
return user
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def authenticate(db: AsyncSession, email: str, password: str) -> User | None:
user = await get_user_by_email(db, email)
if user is None:
return None
if not verify_password(password, user.hashed_password):
return None
return userauthenticate does not distinguish between a missing user and a wrong password. Both return None — this prevents leaking whether a given email address exists.
Routes #
from fastapi import APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Annotated
from app.api.deps import DBSession
from app.schemas.user import UserCreate, UserOut, Token
from app.services import user as user_service
from app.core.security import create_access_token
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/register", response_model=UserOut, status_code=201)
async def register(payload: UserCreate, db: DBSession) -> UserOut:
existing = await user_service.get_user_by_email(db, payload.email)
if existing:
raise HTTPException(409, "Email already registered")
user = await user_service.create_user(db, payload.email, payload.password)
return user
@router.post("/login", response_model=Token)
async def login(
form: Annotated[OAuth2PasswordRequestForm, Depends()],
db: DBSession,
) -> Token:
user = await user_service.authenticate(db, form.username, form.password)
if user is None:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
"Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
token = create_access_token(subject=str(user.id))
return Token(access_token=token)What OAuth2PasswordRequestForm does
#
form: OAuth2PasswordRequestForm = Depends() automatically parses the standard OAuth2 password flow form.
- It reads
usernameandpasswordfrom anapplication/x-www-form-urlencodedbody - Our domain uses email, but the OAuth2 standard field name is
username. Use it as is. - Swagger UI’s “Authorize” button gets activated automatically
Building current_user as a dependency
#
Pull the token out of the request → verify it → inject the user object into the route.
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt import InvalidTokenError
from app.core.security import decode_token
from app.models.user import User
from app.services import user as user_service
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: DBSession,
) -> User:
credentials_exc = HTTPException(
status.HTTP_401_UNAUTHORIZED,
"Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(token)
user_id = int(payload["sub"])
except (InvalidTokenError, KeyError, ValueError):
raise credentials_exc
user = await db.get(User, user_id)
if user is None:
raise credentials_exc
return user
CurrentUser = Annotated[User, Depends(get_current_user)]What OAuth2PasswordBearer does
#
- Extracts the token from the
Authorization: Bearer xxxxxheader - Returns 401 automatically if the header is missing or malformed
- Powers the Swagger UI “Authorize” button
tokenUrl="/auth/login"is the token-issuing endpoint — for docs/UI
Use it in routes #
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(
payload: TodoCreate,
current_user: CurrentUser,
db: DBSession,
) -> Todo:
todo = Todo(
**payload.model_dump(),
owner_id=current_user.id,
)
db.add(todo)
await db.commit()
await db.refresh(todo)
return todo
@router.get("/", response_model=list[TodoOut])
async def list_my_todos(
current_user: CurrentUser,
db: DBSession,
) -> list[Todo]:
result = await db.execute(
select(Todo).where(Todo.owner_id == current_user.id)
)
return list(result.scalars())Drop current_user: CurrentUser in and:
- Missing or invalid token → automatic 401
- Valid →
current_useris the authenticated user object
You wrote zero lines of token-verification code inside the route. Shared logic is cleanly captured in a dependency.
Additional patterns #
Active vs inactive users #
async def get_current_active_user(current_user: CurrentUser) -> User:
if not current_user.is_active:
raise HTTPException(403, "Inactive account")
return current_user
ActiveUser = Annotated[User, Depends(get_current_active_user)]A dependency on top of a dependency. Stack them like layers.
Permissions (role-based) #
def require_role(role: str):
async def checker(user: CurrentUser) -> User:
if user.role != role:
raise HTTPException(403, f"{role} permission required")
return user
return checker
AdminUser = Annotated[User, Depends(require_role("admin"))]
@router.delete("/admin-only")
async def admin_only(admin: AdminUser): ...Refresh tokens (optional) #
A pattern with a short-lived access token (15 minutes to 1 hour) and a longer-lived refresh token (7 to 30 days). When the access token expires, the refresh token issues a new one — a balance between security and usability. Refresh tokens are usually stored in the DB so they can be revoked.
This series sticks with a single access token for simplicity. Add refresh tokens for real services.
Common pitfalls #
1) Weak JWT secret #
JWT_SECRET=secret
JWT_SECRET=mypasswordJWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')Short secrets are brute-forceable.
2) Sensitive data in the JWT payload #
payload = {"sub": "u1", "password": "...", "ssn": "..."}Anyone can read the JWT payload. Identifiers like user ID only.
3) No HTTPS #
Tokens must only travel over HTTPS. Over plain HTTP, anyone on the network can intercept them. Development on localhost is exempt; production must use HTTPS.
4) Token expiration too long #
expires_minutes=10080The longer the expiration, the larger the window of damage if a token is stolen. 15 minutes to 1 hour is recommended. For long-lived sessions, use the refresh-token pattern.
5) No password strength check #
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(min_length=8, max_length=128)
@field_validator("password")
@classmethod
def strong_enough(cls, v: str) -> str:
if v.lower() in {"password", "12345678", "qwerty"}:
raise ValueError("Password is too common")
return vYou can also check leaked passwords against the Have I Been Pwned API.
Recap #
What this post nailed down:
- Authentication (who) vs authorization (permitted)
- JWT — stateless, stored on the client, friendly to SPAs/mobile
- argon2id is today’s standard password hash (OWASP recommended)
- The
argon2-cffi+pyjwtcombination - Standard JWT claims:
sub,exp,iat - HS256 symmetric signing; secret is 32+ random bytes
OAuth2PasswordRequestForm— standard form parserOAuth2PasswordBearer— extracts the Bearer token- A
CurrentUserdependency makes route auth a one-liner - Active-user and role checks layer dependencies
- Pitfalls: weak secret, sensitive payload, HTTPS, expiration length, password strength
In the next post (#5 Async and Background Tasks) we cover work that shouldn’t block the response — emails, heavy conversions, external API calls — and the patterns (BackgroundTasks, external queues) that handle them.