Async and background jobs
When to use async routes, BackgroundTasks for post-response work, the boundary where external queues (Celery, ARQ) start to be needed, and how to safely mix in sync libraries.
How the async covered in Chapter 14 Async intro and Chapter 18 Async in depth fits naturally inside FastAPI — and the patterns for work that needs to happen after the response (email, heavy conversions, external APIs).
This chapter also spells out the boundary where an external queue (ARQ / Celery, etc.) starts to be needed. Small post-processing fits in BackgroundTasks, but there comes a point where retries, tracking, and separate scaling become necessary.
async def vs def — FastAPI handles it for you
#
FastAPI routes accept both.
@router.get("/sync")
def sync_handler():
...
@router.get("/async")
async def async_handler():
...The difference:
async def— runs directly on the event loop. If await is needed, it can yielddef— sent off to a thread pool automatically. Doesn’t block the event loop
FastAPI handles each appropriately on its own. So a route that uses a sync library can stay as def and won’t block other requests.
Never do this — sync blocking inside async def
#
@router.get("/items")
async def list_items():
response = requests.get("https://api.example.com") # sync, blocking!
return response.json()A sync blocking call inside async def — the entire event loop stops. Other requests stop too. The meeting of the GIL from Chapter 19 GIL and concurrency with async.
Two fixes:
import httpx
@router.get("/items")
async def list_items():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
return response.json()import asyncio
import requests
@router.get("/items")
async def list_items():
response = await asyncio.to_thread(requests.get, "https://api.example.com")
return response.json()Or just write it as def — FastAPI sends it to a thread on its own.
@router.get("/items")
def list_items():
response = requests.get("https://api.example.com")
return response.json()Practical guide — when which? #
| Situation | First try |
|---|---|
| Async libraries only (httpx, asyncpg, redis-py async) | async def |
| Sync libraries only (requests, psycopg2) | def |
| Mixed | async def + wrap sync parts with to_thread |
| CPU-bound work | def, or async def + ProcessPoolExecutor |
Part 4 of this book uses SQLAlchemy async from Chapter 25, so async def is the standard.
BackgroundTasks — post-response work
#
For work like sending email, notifications, or logs that you want to run in the background after the response goes out.
from fastapi import BackgroundTasks
def send_email_sync(to: str, subject: str, body: str):
# actual email send (smtplib, etc.)
print(f"sent email to: {to}")
@router.post("/users")
async def create_user(
payload: UserCreate,
background_tasks: BackgroundTasks,
db: DBSession,
) -> UserOut:
user = await user_service.create_user(db, payload.email, payload.password)
background_tasks.add_task(
send_email_sync, user.email, "Welcome", "Welcome to the service",
)
return userThe flow:
- The route body finishes normally
- The response is sent to the client
- After that, the functions in
background_tasksrun in order
Async functions are OK too #
async def send_email_async(to: str, subject: str, body: str):
async with httpx.AsyncClient() as client:
await client.post("https://email-api/", json={...})
background_tasks.add_task(send_email_async, user.email, "Welcome", "...")add_task accepts both sync and async.
Limits — within the same process only #
BackgroundTasks work runs only while the current process is alive. That means:
- ✅ Quick post-processing (within a few seconds)
- ✅ Work that doesn’t need retry on failure
- ❌ Heavy video / image conversion
- ❌ External API calls that need retry on failure
- ❌ Work that must survive a server restart
For those cases you need an external queue.
External queues — when to adopt? #
| Signal | Need an external queue? |
|---|---|
| Task takes under 1 second | No, BackgroundTasks is enough |
| Need a retry policy on failure | ✅ |
| Need to track / query job results | ✅ |
| Want to scale workers separately | ✅ |
| Scheduling (e.g. nightly jobs) | ✅ |
| Need priority queues | ✅ |
If any of these apply, bring in a separate worker infra.
Choices #
| Tool | Verdict |
|---|---|
| Celery | Oldest, most widely used. Complex. Redis / RabbitMQ backends |
| ARQ | Async-friendly. Redis only. Simple |
| Dramatiq | Simple interface. Multiple backends |
| RQ | Simple. Redis. Sync-oriented |
| Taskiq | FastAPI-style dependency injection |
For an async codebase, ARQ or Taskiq are natural; for large projects, Celery is the battle-tested standard.
ARQ — async queue mini guide #
We’ll look at the simplest possible adoption with ARQ.
uv add arqfrom arq.connections import RedisSettings
from app.core.config import settings
async def send_email(ctx, to: str, subject: str, body: str):
print(f"[worker] sent email to: {to}")
# actual send code
class WorkerSettings:
redis_settings = RedisSettings.from_dsn(settings.redis_url)
functions = [send_email]
max_jobs = 10
job_timeout = 60from arq import create_pool
from arq.connections import RedisSettings
@router.post("/users")
async def create_user(payload: UserCreate, db: DBSession):
user = await user_service.create_user(...)
pool = await create_pool(RedisSettings.from_dsn(settings.redis_url))
await pool.enqueue_job("send_email", user.email, "Welcome", "...")
return userRun the worker:
uv run arq app.tasks.worker.WorkerSettingsThe web server and worker run as separate processes. One dying doesn’t kill the other, and the worker can scale separately.
No dependency injection — do it yourself #
FastAPI’s Depends doesn’t run inside ARQ. You have to build the DB session and friends inside the worker function yourself.
from app.db.session import AsyncSessionLocal
async def send_email(ctx, user_id: int):
async with AsyncSessionLocal() as db:
user = await db.get(User, user_id)
# send logicStreaming responses — send it as you go #
File downloads, big JSON, Server-Sent Events (SSE) — situations where you want to stream instead of building the whole response at once.
from fastapi.responses import StreamingResponse
@router.get("/csv")
async def export_csv():
async def generate():
yield "id,title\n"
async for row in stream_rows():
yield f"{row.id},{row.title}\n"
return StreamingResponse(generate(), media_type="text/csv")The async generator from Chapter 11 Iterables, generators, yield from works as-is. Without loading everything into memory, it streams.
Server-Sent Events (SSE) #
@router.get("/events")
async def events():
async def event_stream():
for i in range(100):
yield f"data: tick {i}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
)The browser’s EventSource consumes it automatically. Not bidirectional like WebSocket, but plenty for server → client one-way real-time.
WebSocket #
For bidirectional real-time, use WebSocket.
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"[{room}] echo: {data}")
except WebSocketDisconnect:
print(f"client disconnected")Use it for chat, real-time collaboration, games. That said, state management, auth, room separation etc. are additional territory, so a real-world rollout is its own study.
Concurrency limits — protect external APIs #
The Semaphore from Chapter 18 Async in depth is useful here too. If a single route has to call 100 external APIs, cap it at N concurrently:
import asyncio
_external_sem = asyncio.Semaphore(10)
async def call_external(url: str) -> dict:
async with _external_sem:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()Helps with the external API’s rate limit and with your own service’s memory.
Timeouts — avoid hanging forever #
async with asyncio.timeout(5):
response = await client.get(url)Stops the request from hanging indefinitely when an external API doesn’t respond. The tool from Chapter 14 Async intro.
httpx itself has a timeout option too:
async with httpx.AsyncClient(timeout=10.0) as client:
...Lifecycle events — lifespan
#
Set up / tear down resources at server start / shutdown.
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# On start
print("server start")
redis = await create_redis_pool()
app.state.redis = redis
yield
# On shutdown
print("server shutdown")
await redis.close()
app = FastAPI(lifespan=lifespan)The @asynccontextmanager pattern from Chapter 10 Context managers carries straight over. The old @app.on_event("startup") decorator is deprecated.
Common pitfalls #
1) BackgroundTasks swallows exceptions #
def task_that_might_fail():
raise SomethingWrong() # response is unaffected, only loggedBackgroundTasks doesn’t reflect exceptions in the response. A failure isn’t visible to the user. For important work, external queue + retry is safer.
2) Handing a dependency-injected DB session to background #
@router.post("/")
async def create(db: DBSession, bg: BackgroundTasks):
bg.add_task(use_db_later, db) # ← dangerous
return ...When the route ends, the dependency (DB session) closes. Using it in the background means using an already-closed session — accident territory. Background work should make a fresh session.
async def use_db_later(user_id: int):
async with AsyncSessionLocal() as db:
...
bg.add_task(use_db_later, user.id)3) Using an async library inside a def route
#
@router.get("/")
def handler():
asyncio.run(something_async()) # nested event loop — errorIf a def route needs to run async code, make the route itself async def.
Exercises #
- With
BackgroundTasks, write code that sends a welcome email in the background afterPOST /users(simulate withprintinstead of real SMTP). Observe the order yourself — the response reaches the client first, the email print follows. - Intentionally make a
BackgroundTasksjob raise an exception and confirm that the response is unaffected while the task fails quietly. Then start migrating that job to an external queue (ARQ) — without Redis, you can stop at the installation step. - Use
StreamingResponseand an async generator to write aGET /csvendpoint. Stream 10,000 dummy rows without ever loading them all into memory. Call it withcurl -Nand confirm the first bytes arrive immediately (streaming).
In one line:
async defon the event loop,defon a thread pool, FastAPI picks automatically. No sync blocking insideasync def— fix it with an async library /to_thread/def.BackgroundTasksis for quick post-response work in the same process; external queues (ARQ / Celery, etc.) are for retries / tracking / separate scaling.StreamingResponse+ async generators for big responses / SSE.Semaphorefor concurrency limits,asyncio.timeoutfor timeouts,lifespanfor startup / shutdown resources. Pitfalls are swallowed background exceptions / closed sessions / def-async confusion.
Next chapter #
Next, Chapter 28 Testing and deploy — pytest, Docker, Railway/Fly is the last chapter of Part 4. It covers integration testing with pytest + httpx, isolating external dependencies with dependency overrides, and Docker plus cloud deployment.