Authentication — OAuth2 password flow + JWT
Password hashing (argon2/bcrypt), the OAuth2 password flow, JWT issuance/verification, and the current_user dependency that wraps it all into a clean auth pattern.
The Todo we connected to a DB in Chapter 25 Connecting a DB — SQLAlchemy 2.x + Alembic now needs to be per user. This chapter covers the standard pattern for sign-up, login, and distinguishing authenticated requests — OAuth2 password flow + JWT.
The auth code in this chapter handles sensitive data like secrets and passwords, so it runs alongside the PII / secret logging-prevention patterns covered in Chapter 31 Logging and observability.
Authentication vs authorization #
Terms first.
- Authentication — “Who are you?” — login itself
- Authorization — “Is this person allowed to do this?” — permission check
This chapter focuses on authentication. Authorization (role, permission, etc.) lives in additional middleware, but the patterns are very similar.
Two branches #
| Session-based | Token-based (JWT) | |
|---|---|---|
| Storage | Server (DB / Redis) | Client |
| State | stateful | stateless |
| Distributed systems | Needs shared storage | Natural |
| Revocation | Immediate | Until expiry, or blacklist |
| Mobile / SPA friendly | OK | Good |
For an API server (FastAPI) + SPA / mobile app combination, JWT is the standard fit. Session-based auth is better in some cases (server-rendered web apps), but this book uses JWT.
Never store passwords in plaintext #
Even if the database leaks, passwords must not be exposed. Store them as a one-way hash — at login time, hash the input and compare.
Algorithm selection #
| Algorithm | Verdict |
|---|---|
| MD5, SHA-1, SHA-256 | ❌ Never (too fast, GPU-brute-forceable) |
| bcrypt | ⭕ Old standard, sufficiently safe |
| argon2id | ✅ Currently recommended (OWASP) |
| scrypt | ⭕ Safe, but argon2 is more recommended |
New projects pick argon2id first. Simple code and safe defaults.
Installation #
uv add argon2-cffi pyjwtargon2-cffi is the argon2 implementation, and pyjwt is the standard JWT library.
Compatibility note:
passlib, which you often see in old code, is gradually losing maintenance, so new projects commonly useargon2-cffiorbcryptdirectly.
Password-hashing module #
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
_hasher = PasswordHasher() # Initialize with OWASP-recommended defaults
def hash_password(plain: str) -> str:
return _hasher.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
try:
_hasher.verify(hashed, plain)
return True
except VerifyMismatchError:
return False
def needs_rehash(hashed: str) -> bool:
return _hasher.check_needs_rehash(hashed)needs_rehash checks whether the hash’s parameters have weakened over time — a pattern of re-hashing with new parameters when the user logs in.
if verify_password(plain, user.hashed_password):
if needs_rehash(user.hashed_password):
user.hashed_password = hash_password(plain)
await db.commit()
return userJWT — what a token actually is #
A JWT (JSON Web Token) is a string composed of three parts.
xxxxx.yyyyy.zzzzz
header.payload.signature- header — algorithm, type
- payload — claims (sub, exp, iat, etc.)
- signature — signs the first two with the secret. Tamper-proofing
The key point: anyone can read the payload after base64 decoding. A JWT is signed, not encrypted — it’s about tamper resistance, not secrecy. Never put sensitive information like passwords into a JWT.
Standard claims #
| Claim | Meaning |
|---|---|
sub | subject — usually the user ID |
exp | expiration — expiry time (UNIX timestamp) |
iat | issued at — issue time |
nbf | not before — invalid before this time |
jti | JWT ID — for revocation / blacklisting |
aud | audience — who the token is for |
iss | issuer — who issued it |
exp almost always. The rest by policy.
JWT issue / verify #
from datetime import datetime, timedelta, timezone
import jwt
from app.core.config import settings
ALGORITHM = "HS256"
def create_access_token(subject: str, expires_minutes: int = 60) -> str:
now = datetime.now(timezone.utc)
payload = {
"sub": subject,
"iat": now,
"exp": now + timedelta(minutes=expires_minutes),
}
return jwt.encode(payload, settings.jwt_secret, algorithm=ALGORITHM)
def decode_token(token: str) -> dict:
return jwt.decode(token, settings.jwt_secret, algorithms=[ALGORITHM])HS256 is symmetric-key signing — sign and verify with the same secret. Fine for a single server. If multiple servers only need to verify, use RS256 (asymmetric — RSA public / private key).
Managing the secret #
jwt_secret is read from environment variables via pydantic-settings from Chapter 22.
JWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')Generate at least 32 bytes (256 bits) of randomness. In the actual environment variable, drop in the random string generated by python -c directly.
Sign-up / login routes #
Pydantic schemas #
from pydantic import BaseModel, EmailStr, Field
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(min_length=8, max_length=128)
class UserOut(BaseModel):
id: int
email: EmailStr
model_config = {"from_attributes": True}
class Token(BaseModel):
access_token: str
token_type: str = "bearer"CRUD #
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.core.security import hash_password, verify_password
async def create_user(db: AsyncSession, email: str, password: str) -> User:
user = User(email=email, hashed_password=hash_password(password))
db.add(user)
await db.commit()
await db.refresh(user)
return user
async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
result = await db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def authenticate(db: AsyncSession, email: str, password: str) -> User | None:
user = await get_user_by_email(db, email)
if user is None:
return None
if not verify_password(password, user.hashed_password):
return None
return userauthenticate does not distinguish between user-not-found and wrong-password. Both return None — that prevents leaking the “does this email exist” signal.
Routes #
from fastapi import APIRouter, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from typing import Annotated
from app.api.deps import DBSession
from app.schemas.user import UserCreate, UserOut, Token
from app.services import user as user_service
from app.core.security import create_access_token
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/register", response_model=UserOut, status_code=201)
async def register(payload: UserCreate, db: DBSession) -> UserOut:
existing = await user_service.get_user_by_email(db, payload.email)
if existing:
raise HTTPException(409, "Email already registered")
user = await user_service.create_user(db, payload.email, payload.password)
return user
@router.post("/login", response_model=Token)
async def login(
form: Annotated[OAuth2PasswordRequestForm, Depends()],
db: DBSession,
) -> Token:
user = await user_service.authenticate(db, form.username, form.password)
if user is None:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED,
"Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
token = create_access_token(subject=str(user.id))
return Token(access_token=token)What OAuth2PasswordRequestForm does
#
form: OAuth2PasswordRequestForm = Depends() automatically parses the standard OAuth2 password-flow form.
- Receives
username,passwordfrom anapplication/x-www-form-urlencodedbody - Our domain field is email, but the OAuth2 standard field name is
username. Use it as-is. - The “Authorize” button in Swagger UI is enabled automatically
Build current_user with a dependency
#
Pull the token out of the request → verify it → inject the user object into the route.
from typing import Annotated
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jwt import InvalidTokenError
from app.core.security import decode_token
from app.models.user import User
from app.services import user as user_service
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: DBSession,
) -> User:
credentials_exc = HTTPException(
status.HTTP_401_UNAUTHORIZED,
"Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_token(token)
user_id = int(payload["sub"])
except (InvalidTokenError, KeyError, ValueError):
raise credentials_exc
user = await db.get(User, user_id)
if user is None:
raise credentials_exc
return user
CurrentUser = Annotated[User, Depends(get_current_user)]What OAuth2PasswordBearer does
#
- Extracts the token from the
Authorization: Bearer xxxxxheader - If the header is missing or malformed, returns 401 automatically
- Swagger UI’s “Authorize” button watches for this and lights up
tokenUrl="/auth/login"is the token-issuing endpoint — for docs / UI
Use in a route #
@router.post("/", response_model=TodoOut, status_code=201)
async def create_todo(
payload: TodoCreate,
current_user: CurrentUser,
db: DBSession,
) -> Todo:
todo = Todo(
**payload.model_dump(),
owner_id=current_user.id,
)
db.add(todo)
await db.commit()
await db.refresh(todo)
return todo
@router.get("/", response_model=list[TodoOut])
async def list_my_todos(
current_user: CurrentUser,
db: DBSession,
) -> list[Todo]:
result = await db.execute(
select(Todo).where(Todo.owner_id == current_user.id)
)
return list(result.scalars())With a single current_user: CurrentUser line:
- No or invalid token → 401 automatically
- Valid →
current_useris the authenticated user object
Not a single line of token-verification code lives in the route. Shared logic is handled through dependency injection.
Extra patterns #
Active / inactive users #
async def get_current_active_user(current_user: CurrentUser) -> User:
if not current_user.is_active:
raise HTTPException(403, "Inactive account")
return current_user
ActiveUser = Annotated[User, Depends(get_current_active_user)]Dependency on top of dependency. Stacked like layers.
Permissions (role-based) #
def require_role(role: str):
async def checker(user: CurrentUser) -> User:
if user.role != role:
raise HTTPException(403, f"{role} permission required")
return user
return checker
AdminUser = Annotated[User, Depends(require_role("admin"))]
@router.delete("/admin-only")
async def admin_only(admin: AdminUser): ...Refresh tokens (optional) #
A pattern of short-lived access tokens (15 minutes ~ 1 hour) + long-lived refresh tokens (7 ~ 30 days). When the access token expires, the refresh token issues a new access token. Balances security and usability. Refresh tokens are usually stored in the DB so they can be revoked.
Part 4 of this book stays simple with a single access token. In a real service, add the refresh pattern.
Common pitfalls #
1) Weak JWT secret #
JWT_SECRET=secret
JWT_SECRET=mypasswordJWT_SECRET=$(python -c 'import secrets; print(secrets.token_urlsafe(64))')Short secrets can be brute-forced.
2) Sensitive data in the JWT payload #
payload = {"sub": "u1", "password": "...", "ssn": "..."}Anyone can read a JWT payload. Identifiers like user ID only.
3) No HTTPS #
Tokens travel over HTTPS only. Over HTTP, anyone can intercept the token. Development is fine on localhost (HTTPS-exempt); production is always HTTPS.
4) Token expiry too long #
expires_minutes=10080The longer the expiry, the longer the damage window if it’s stolen. 15 minutes ~ 1 hour is recommended. If you want longer-lived sessions, use the refresh-token pattern.
5) No password-strength validation #
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(min_length=8, max_length=128)
@field_validator("password")
@classmethod
def strong_enough(cls, v: str) -> str:
if v.lower() in {"password", "12345678", "qwerty"}:
raise ValueError("Too common a password")
return vAdding leak-check via the Have I Been Pwned API is another option.
Exercises #
- Write the three functions
hash_password/verify_password/needs_rehashusingargon2-cffi. Confirm that hashing the same password twice produces different results (auto-salting), and thatverifysucceeds for both. - With
pyjwt, writecreate_access_token(subject: str)/decode_token(token: str). Deliberately craft an expired token (withexpin the past) and confirm thatdecode_tokenraisesExpiredSignatureError. - Build a
current_userdependency withOAuth2PasswordBearer+Depends(get_current_user)and have aGET /meroute return the authenticated user. Use Swagger UI’s “Authorize” button to log in, then call the protected endpoint yourself.
In one line: Authentication is who, authorization is permission. JWT is stateless, client-stored, friendly to SPAs / mobile. argon2id is the password-hashing standard (OWASP). JWT standard claims are
sub/exp/iat, HS256 symmetric signature, secret of 32 bytes+ randomness.OAuth2PasswordRequestFormauto-parses the form,OAuth2PasswordBearerextracts the header token. ACurrentUserdependency authenticates routes in one line. Pitfalls are weak secret / sensitive payload / HTTP / long expiry / no password strength.
Next chapter #
Next, Chapter 27 Async and background jobs covers work that should not block the response — sending email, heavy conversions, calling external APIs — and the patterns for handling it (BackgroundTasks, external queues).