Modern Python in Practice #5: Async and Background Tasks
This post pulls together how the async we covered in Intermediate #7 and Advanced #4 fits naturally inside FastAPI — and the patterns for work that runs after the response (emails, heavy conversions, external APIs).
async def vs def — FastAPI handles both
#
FastAPI accepts both kinds of route.
@router.get("/sync")
def sync_handler():
...
@router.get("/async")
async def async_handler():
...Differences:
async def— runs directly on the event loop. Yields when anawaithappensdef— automatically dispatched to a thread pool. Doesn’t block the event loop
FastAPI chooses the right execution mode automatically. That’s why a route using a sync library can stay as plain def without blocking other requests.
Never do this — sync blocking inside async def
#
@router.get("/items")
async def list_items():
response = requests.get("https://api.example.com") # sync, blocking!
return response.json()Calling a sync blocking function inside async def freezes the entire event loop. Every other request stops too. This is the GIL + async clash from Advanced #5.
Two solutions:
import httpx
@router.get("/items")
async def list_items():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
return response.json()import asyncio
import requests
@router.get("/items")
async def list_items():
response = await asyncio.to_thread(requests.get, "https://api.example.com")
return response.json()Or just write def — FastAPI will move it to a thread for you.
@router.get("/items")
def list_items():
response = requests.get("https://api.example.com")
return response.json()Practical guide — what to use when #
| Situation | First try |
|---|---|
| Async libraries only (httpx, asyncpg, redis-py async) | async def |
| Sync libraries only (requests, psycopg2) | def |
| Mixed | async def + wrap sync parts with to_thread |
| CPU-bound | def or async def + ProcessPoolExecutor |
Since we used SQLAlchemy async in #3, this series defaults to async def.
BackgroundTasks — work after the response
#
Use this when you want emails, notifications, or logs to run after the response is sent, in the background.
from fastapi import BackgroundTasks
def send_email_sync(to: str, subject: str, body: str):
# Real email send (smtplib etc.)
print(f"Sent email: {to}")
@router.post("/users")
async def create_user(
payload: UserCreate,
background_tasks: BackgroundTasks,
db: DBSession,
) -> UserOut:
user = await user_service.create_user(db, payload.email, payload.password)
background_tasks.add_task(
send_email_sync, user.email, "Welcome", "Welcome aboard!",
)
return userFlow:
- Route body returns normally
- The response goes to the client
- After that,
background_tasksfunctions run in order
Async functions are fine too #
async def send_email_async(to: str, subject: str, body: str):
async with httpx.AsyncClient() as client:
await client.post("https://email-api/", json={...})
background_tasks.add_task(send_email_async, user.email, "Welcome", "...")add_task accepts both sync and async functions.
Limitation — same process only #
BackgroundTasks work runs only while the current process is alive. That means:
- Good for fast post-processing (within seconds)
- Good for tasks that don’t need retries on failure
- Bad for heavy video/image conversion
- Bad for external APIs that need retry on failure
- Bad for tasks that must survive a server restart
Those situations need an external queue.
External queues — when to introduce one? #
| Signal | External queue needed? |
|---|---|
| Tasks under 1 second | No, BackgroundTasks is fine |
| Need retry policy on failure | ✅ |
| Need to track/inspect task results | ✅ |
| Want to scale workers separately | ✅ |
| Scheduling (e.g., daily-at-midnight) | ✅ |
| Need priority queues | ✅ |
If any of those apply, bring in dedicated worker infrastructure.
Options #
| Tool | Verdict |
|---|---|
| Celery | Oldest and most widely used. Complex. Redis/RabbitMQ backends |
| ARQ | async-friendly. Redis only. Simple |
| Dramatiq | Simple interface. Multiple backends |
| RQ | Simple. Redis. Sync-oriented |
| Taskiq | FastAPI-style dependency injection |
For an async codebase, ARQ or Taskiq fit naturally; large projects tend to land on Celery as the proven standard.
ARQ — async queue mini guide #
Let’s look at ARQ as the simplest introduction.
uv add arqfrom arq.connections import RedisSettings
from app.core.config import settings
async def send_email(ctx, to: str, subject: str, body: str):
print(f"[worker] Sent email: {to}")
# real send code
class WorkerSettings:
redis_settings = RedisSettings.from_dsn(settings.redis_url)
functions = [send_email]
max_jobs = 10
job_timeout = 60from arq import create_pool
from arq.connections import RedisSettings
@router.post("/users")
async def create_user(payload: UserCreate, db: DBSession):
user = await user_service.create_user(...)
pool = await create_pool(RedisSettings.from_dsn(settings.redis_url))
await pool.enqueue_job("send_email", user.email, "Welcome", "...")
return userRun the worker:
uv run arq app.tasks.worker.WorkerSettingsThe web server and worker run as separate processes. If one dies, the other keeps running, and you can scale workers independently.
Dependency injection is on you #
ARQ doesn’t go through FastAPI’s Depends. You build the DB session etc. yourself inside the worker function.
from app.db.session import AsyncSessionLocal
async def send_email(ctx, user_id: int):
async with AsyncSessionLocal() as db:
user = await db.get(User, user_id)
# send logicStreaming responses — long sends #
Use this when you want to stream the response — file downloads, large JSON, Server-Sent Events (SSE) — instead of building it all up in memory at once.
from fastapi.responses import StreamingResponse
@router.get("/csv")
async def export_csv():
async def generate():
yield "id,title\n"
async for row in stream_rows():
yield f"{row.id},{row.title}\n"
return StreamingResponse(generate(), media_type="text/csv")The async generator from Intermediate #4 works as is. You stream out without loading everything into memory.
Server-Sent Events (SSE) #
@router.get("/events")
async def events():
async def event_stream():
for i in range(100):
yield f"data: tick {i}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
)The browser’s EventSource picks it up automatically. Not as bidirectional as WebSocket, but it’s enough for server-to-client one-way real-time.
WebSocket #
When you need bidirectional real-time, use WebSocket.
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"[{room}] echo: {data}")
except WebSocketDisconnect:
print(f"Client disconnected")Good for chat, real-time collaboration, or games. State management, auth, and room separation add up quickly, though, so a serious implementation deserves its own study.
Concurrency limits — protecting external APIs #
The Semaphore from Advanced #4 is useful here too. If a route has to call 100 external APIs, limit it to N concurrent calls:
import asyncio
_external_sem = asyncio.Semaphore(10)
async def call_external(url: str) -> dict:
async with _external_sem:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()Helps both with the external API’s rate limits and with protecting your own service’s memory.
Timeouts — preventing infinite waits #
async with asyncio.timeout(5):
response = await client.get(url)Stops the request from hanging forever when an external API doesn’t respond. We covered this tool in Intermediate #7.
httpx itself has a timeout option:
async with httpx.AsyncClient(timeout=10.0) as client:
...Lifecycle events — lifespan
#
Set up and tear down resources at server start/stop.
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# at startup
print("Server started")
redis = await create_redis_pool()
app.state.redis = redis
yield
# at shutdown
print("Server stopped")
await redis.close()
app = FastAPI(lifespan=lifespan)The @asynccontextmanager pattern from Intermediate #3 applies here. The old @app.on_event("startup") decorator is deprecated.
Common pitfalls #
1) Ignoring exceptions in BackgroundTasks #
def task_that_might_fail():
raise SomethingWrong() # response unaffected, only logsBackgroundTasks doesn’t surface exceptions in the response. Failed work is invisible to the user. Important work is safer with an external queue + retries.
2) Passing a DB session received from a dependency to a background task #
@router.post("/")
async def create(db: DBSession, bg: BackgroundTasks):
bg.add_task(use_db_later, db) # ← dangerous
return ...When the route returns, the dependency (DB session) is closed. Using it from a background task means working with an already-closed session. Have background tasks create a fresh session instead.
async def use_db_later(user_id: int):
async with AsyncSessionLocal() as db:
...
bg.add_task(use_db_later, user.id)3) Using async libraries inside a def route
#
@router.get("/")
def handler():
asyncio.run(something_async()) # nested event loop — errorIf a def route needs async code, switch the route itself to async def.
Recap #
What this post nailed down:
async defruns on the event loop,defon the thread pool — FastAPI picks- Sync blocking inside
async def= event loop frozen - Fixes: async library /
to_thread/ switch todef BackgroundTasks— post-response work in the same process, suited for fast tasks- External queues (ARQ/Celery/Dramatiq) — when you need retries, tracking, or independent scaling
- ARQ mini guide — async-friendly, Redis only
StreamingResponse+ async generators — large responses, SSE- WebSocket for bidirectional real-time
Semaphorefor concurrency limits,asyncio.timeoutfor timeoutslifespanfor startup/shutdown resource management (@asynccontextmanager)- Pitfalls: swallowed background exceptions, using closed sessions, def/async confusion
The next post (#6 Testing and Deployment) closes the series. We cover integration tests with pytest + httpx, isolating external dependencies via dependency overrides, and Docker plus cloud deployment in one place.