Contents
27 Chapter

Async and background jobs

When to use async routes, BackgroundTasks for post-response work, the boundary where external queues (Celery, ARQ) start to be needed, and how to safely mix in sync libraries.

How the async covered in Chapter 14 Async intro and Chapter 18 Async in depth fits naturally inside FastAPI — and the patterns for work that needs to happen after the response (email, heavy conversions, external APIs).

This chapter also spells out the boundary where an external queue (ARQ / Celery, etc.) starts to be needed. Small post-processing fits in BackgroundTasks, but there comes a point where retries, tracking, and separate scaling become necessary.

async def vs def — FastAPI handles it for you #

FastAPI routes accept both.

Both are fine
@router.get("/sync")
def sync_handler():
    ...

@router.get("/async")
async def async_handler():
    ...

The difference:

  • async def — runs directly on the event loop. If await is needed, it can yield
  • def — sent off to a thread pool automatically. Doesn’t block the event loop

FastAPI handles each appropriately on its own. So a route that uses a sync library can stay as def and won’t block other requests.

Never do this — sync blocking inside async def #

🚫 The biggest trap
@router.get("/items")
async def list_items():
    response = requests.get("https://api.example.com")   # sync, blocking!
    return response.json()

A sync blocking call inside async defthe entire event loop stops. Other requests stop too. The meeting of the GIL from Chapter 19 GIL and concurrency with async.

Two fixes:

✅ 1. async library
import httpx

@router.get("/items")
async def list_items():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")
    return response.json()
✅ 2. sync via thread
import asyncio
import requests

@router.get("/items")
async def list_items():
    response = await asyncio.to_thread(requests.get, "https://api.example.com")
    return response.json()

Or just write it as def — FastAPI sends it to a thread on its own.

✅ 3. Just use def
@router.get("/items")
def list_items():
    response = requests.get("https://api.example.com")
    return response.json()

Practical guide — when which? #

SituationFirst try
Async libraries only (httpx, asyncpg, redis-py async)async def
Sync libraries only (requests, psycopg2)def
Mixedasync def + wrap sync parts with to_thread
CPU-bound workdef, or async def + ProcessPoolExecutor

Part 4 of this book uses SQLAlchemy async from Chapter 25, so async def is the standard.

BackgroundTasks — post-response work #

For work like sending email, notifications, or logs that you want to run in the background after the response goes out.

Basic use
from fastapi import BackgroundTasks

def send_email_sync(to: str, subject: str, body: str):
    # actual email send (smtplib, etc.)
    print(f"sent email to: {to}")

@router.post("/users")
async def create_user(
    payload: UserCreate,
    background_tasks: BackgroundTasks,
    db: DBSession,
) -> UserOut:
    user = await user_service.create_user(db, payload.email, payload.password)
    background_tasks.add_task(
        send_email_sync, user.email, "Welcome", "Welcome to the service",
    )
    return user

The flow:

  1. The route body finishes normally
  2. The response is sent to the client
  3. After that, the functions in background_tasks run in order

Async functions are OK too #

async task
async def send_email_async(to: str, subject: str, body: str):
    async with httpx.AsyncClient() as client:
        await client.post("https://email-api/", json={...})

background_tasks.add_task(send_email_async, user.email, "Welcome", "...")

add_task accepts both sync and async.

Limits — within the same process only #

BackgroundTasks work runs only while the current process is alive. That means:

  • Quick post-processing (within a few seconds)
  • ✅ Work that doesn’t need retry on failure
  • ❌ Heavy video / image conversion
  • ❌ External API calls that need retry on failure
  • ❌ Work that must survive a server restart

For those cases you need an external queue.

External queues — when to adopt? #

SignalNeed an external queue?
Task takes under 1 secondNo, BackgroundTasks is enough
Need a retry policy on failure
Need to track / query job results
Want to scale workers separately
Scheduling (e.g. nightly jobs)
Need priority queues

If any of these apply, bring in a separate worker infra.

Choices #

ToolVerdict
CeleryOldest, most widely used. Complex. Redis / RabbitMQ backends
ARQAsync-friendly. Redis only. Simple
DramatiqSimple interface. Multiple backends
RQSimple. Redis. Sync-oriented
TaskiqFastAPI-style dependency injection

For an async codebase, ARQ or Taskiq are natural; for large projects, Celery is the battle-tested standard.

ARQ — async queue mini guide #

We’ll look at the simplest possible adoption with ARQ.

Install
uv add arq
app/tasks/worker.py
from arq.connections import RedisSettings
from app.core.config import settings

async def send_email(ctx, to: str, subject: str, body: str):
    print(f"[worker] sent email to: {to}")
    # actual send code

class WorkerSettings:
    redis_settings = RedisSettings.from_dsn(settings.redis_url)
    functions = [send_email]
    max_jobs = 10
    job_timeout = 60
Enqueue from a route
from arq import create_pool
from arq.connections import RedisSettings

@router.post("/users")
async def create_user(payload: UserCreate, db: DBSession):
    user = await user_service.create_user(...)
    pool = await create_pool(RedisSettings.from_dsn(settings.redis_url))
    await pool.enqueue_job("send_email", user.email, "Welcome", "...")
    return user

Run the worker:

Worker as a separate process
uv run arq app.tasks.worker.WorkerSettings

The web server and worker run as separate processes. One dying doesn’t kill the other, and the worker can scale separately.

No dependency injection — do it yourself #

FastAPI’s Depends doesn’t run inside ARQ. You have to build the DB session and friends inside the worker function yourself.

DB inside the worker
from app.db.session import AsyncSessionLocal

async def send_email(ctx, user_id: int):
    async with AsyncSessionLocal() as db:
        user = await db.get(User, user_id)
        # send logic

Streaming responses — send it as you go #

File downloads, big JSON, Server-Sent Events (SSE) — situations where you want to stream instead of building the whole response at once.

StreamingResponse
from fastapi.responses import StreamingResponse

@router.get("/csv")
async def export_csv():
    async def generate():
        yield "id,title\n"
        async for row in stream_rows():
            yield f"{row.id},{row.title}\n"

    return StreamingResponse(generate(), media_type="text/csv")

The async generator from Chapter 11 Iterables, generators, yield from works as-is. Without loading everything into memory, it streams.

Server-Sent Events (SSE) #

SSE — real-time updates
@router.get("/events")
async def events():
    async def event_stream():
        for i in range(100):
            yield f"data: tick {i}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
    )

The browser’s EventSource consumes it automatically. Not bidirectional like WebSocket, but plenty for server → client one-way real-time.

WebSocket #

For bidirectional real-time, use WebSocket.

WebSocket basics
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"[{room}] echo: {data}")
    except WebSocketDisconnect:
        print(f"client disconnected")

Use it for chat, real-time collaboration, games. That said, state management, auth, room separation etc. are additional territory, so a real-world rollout is its own study.

Concurrency limits — protect external APIs #

The Semaphore from Chapter 18 Async in depth is useful here too. If a single route has to call 100 external APIs, cap it at N concurrently:

Cap concurrent calls
import asyncio

_external_sem = asyncio.Semaphore(10)

async def call_external(url: str) -> dict:
    async with _external_sem:
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return response.json()

Helps with the external API’s rate limit and with your own service’s memory.

Timeouts — avoid hanging forever #

External-call timeout
async with asyncio.timeout(5):
    response = await client.get(url)

Stops the request from hanging indefinitely when an external API doesn’t respond. The tool from Chapter 14 Async intro.

httpx itself has a timeout option too:

httpx timeout
async with httpx.AsyncClient(timeout=10.0) as client:
    ...

Lifecycle events — lifespan #

Set up / tear down resources at server start / shutdown.

lifespan
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # On start
    print("server start")
    redis = await create_redis_pool()
    app.state.redis = redis

    yield

    # On shutdown
    print("server shutdown")
    await redis.close()

app = FastAPI(lifespan=lifespan)

The @asynccontextmanager pattern from Chapter 10 Context managers carries straight over. The old @app.on_event("startup") decorator is deprecated.

Common pitfalls #

1) BackgroundTasks swallows exceptions #

🚫 Exception is ignored
def task_that_might_fail():
    raise SomethingWrong()    # response is unaffected, only logged

BackgroundTasks doesn’t reflect exceptions in the response. A failure isn’t visible to the user. For important work, external queue + retry is safer.

2) Handing a dependency-injected DB session to background #

🚫 Session leak
@router.post("/")
async def create(db: DBSession, bg: BackgroundTasks):
    bg.add_task(use_db_later, db)   # ← dangerous
    return ...

When the route ends, the dependency (DB session) closes. Using it in the background means using an already-closed session — accident territory. Background work should make a fresh session.

async def use_db_later(user_id: int):
    async with AsyncSessionLocal() as db:
        ...

bg.add_task(use_db_later, user.id)

3) Using an async library inside a def route #

🚫
@router.get("/")
def handler():
    asyncio.run(something_async())   # nested event loop — error

If a def route needs to run async code, make the route itself async def.

Exercises #

  1. With BackgroundTasks, write code that sends a welcome email in the background after POST /users (simulate with print instead of real SMTP). Observe the order yourself — the response reaches the client first, the email print follows.
  2. Intentionally make a BackgroundTasks job raise an exception and confirm that the response is unaffected while the task fails quietly. Then start migrating that job to an external queue (ARQ) — without Redis, you can stop at the installation step.
  3. Use StreamingResponse and an async generator to write a GET /csv endpoint. Stream 10,000 dummy rows without ever loading them all into memory. Call it with curl -N and confirm the first bytes arrive immediately (streaming).

In one line: async def on the event loop, def on a thread pool, FastAPI picks automatically. No sync blocking inside async def — fix it with an async library / to_thread / def. BackgroundTasks is for quick post-response work in the same process; external queues (ARQ / Celery, etc.) are for retries / tracking / separate scaling. StreamingResponse + async generators for big responses / SSE. Semaphore for concurrency limits, asyncio.timeout for timeouts, lifespan for startup / shutdown resources. Pitfalls are swallowed background exceptions / closed sessions / def-async confusion.

Next chapter #

Next, Chapter 28 Testing and deploy — pytest, Docker, Railway/Fly is the last chapter of Part 4. It covers integration testing with pytest + httpx, isolating external dependencies with dependency overrides, and Docker plus cloud deployment.

X