Modern Python in Practice #5: Async and Background Tasks

This post pulls together how the async we covered in Intermediate #7 and Advanced #4 fits naturally inside FastAPI — and the patterns for work that runs after the response (emails, heavy conversions, external APIs).

async def vs def — FastAPI handles both #

FastAPI accepts both kinds of route.

Both are fine
@router.get("/sync")
def sync_handler():
    ...

@router.get("/async")
async def async_handler():
    ...

Differences:

  • async def — runs directly on the event loop. Yields when an await happens
  • def — automatically dispatched to a thread pool. Doesn’t block the event loop

FastAPI chooses the right execution mode automatically. That’s why a route using a sync library can stay as plain def without blocking other requests.

Never do this — sync blocking inside async def #

🚫 the biggest pitfall
@router.get("/items")
async def list_items():
    response = requests.get("https://api.example.com")   # sync, blocking!
    return response.json()

Calling a sync blocking function inside async def freezes the entire event loop. Every other request stops too. This is the GIL + async clash from Advanced #5.

Two solutions:

✅ 1. async library
import httpx

@router.get("/items")
async def list_items():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")
    return response.json()
✅ 2. sync via thread
import asyncio
import requests

@router.get("/items")
async def list_items():
    response = await asyncio.to_thread(requests.get, "https://api.example.com")
    return response.json()

Or just write def — FastAPI will move it to a thread for you.

✅ 3. write def
@router.get("/items")
def list_items():
    response = requests.get("https://api.example.com")
    return response.json()

Practical guide — what to use when #

SituationFirst try
Async libraries only (httpx, asyncpg, redis-py async)async def
Sync libraries only (requests, psycopg2)def
Mixedasync def + wrap sync parts with to_thread
CPU-bounddef or async def + ProcessPoolExecutor

Since we used SQLAlchemy async in #3, this series defaults to async def.

BackgroundTasks — work after the response #

Use this when you want emails, notifications, or logs to run after the response is sent, in the background.

basic use
from fastapi import BackgroundTasks

def send_email_sync(to: str, subject: str, body: str):
    # Real email send (smtplib etc.)
    print(f"Sent email: {to}")

@router.post("/users")
async def create_user(
    payload: UserCreate,
    background_tasks: BackgroundTasks,
    db: DBSession,
) -> UserOut:
    user = await user_service.create_user(db, payload.email, payload.password)
    background_tasks.add_task(
        send_email_sync, user.email, "Welcome", "Welcome aboard!",
    )
    return user

Flow:

  1. Route body returns normally
  2. The response goes to the client
  3. After that, background_tasks functions run in order

Async functions are fine too #

async task
async def send_email_async(to: str, subject: str, body: str):
    async with httpx.AsyncClient() as client:
        await client.post("https://email-api/", json={...})

background_tasks.add_task(send_email_async, user.email, "Welcome", "...")

add_task accepts both sync and async functions.

Limitation — same process only #

BackgroundTasks work runs only while the current process is alive. That means:

  • Good for fast post-processing (within seconds)
  • Good for tasks that don’t need retries on failure
  • Bad for heavy video/image conversion
  • Bad for external APIs that need retry on failure
  • Bad for tasks that must survive a server restart

Those situations need an external queue.

External queues — when to introduce one? #

SignalExternal queue needed?
Tasks under 1 secondNo, BackgroundTasks is fine
Need retry policy on failure
Need to track/inspect task results
Want to scale workers separately
Scheduling (e.g., daily-at-midnight)
Need priority queues

If any of those apply, bring in dedicated worker infrastructure.

Options #

ToolVerdict
CeleryOldest and most widely used. Complex. Redis/RabbitMQ backends
ARQasync-friendly. Redis only. Simple
DramatiqSimple interface. Multiple backends
RQSimple. Redis. Sync-oriented
TaskiqFastAPI-style dependency injection

For an async codebase, ARQ or Taskiq fit naturally; large projects tend to land on Celery as the proven standard.

ARQ — async queue mini guide #

Let’s look at ARQ as the simplest introduction.

install
uv add arq
app/tasks/worker.py
from arq.connections import RedisSettings
from app.core.config import settings

async def send_email(ctx, to: str, subject: str, body: str):
    print(f"[worker] Sent email: {to}")
    # real send code

class WorkerSettings:
    redis_settings = RedisSettings.from_dsn(settings.redis_url)
    functions = [send_email]
    max_jobs = 10
    job_timeout = 60
enqueue from a route
from arq import create_pool
from arq.connections import RedisSettings

@router.post("/users")
async def create_user(payload: UserCreate, db: DBSession):
    user = await user_service.create_user(...)
    pool = await create_pool(RedisSettings.from_dsn(settings.redis_url))
    await pool.enqueue_job("send_email", user.email, "Welcome", "...")
    return user

Run the worker:

worker as a separate process
uv run arq app.tasks.worker.WorkerSettings

The web server and worker run as separate processes. If one dies, the other keeps running, and you can scale workers independently.

Dependency injection is on you #

ARQ doesn’t go through FastAPI’s Depends. You build the DB session etc. yourself inside the worker function.

DB inside a worker
from app.db.session import AsyncSessionLocal

async def send_email(ctx, user_id: int):
    async with AsyncSessionLocal() as db:
        user = await db.get(User, user_id)
        # send logic

Streaming responses — long sends #

Use this when you want to stream the response — file downloads, large JSON, Server-Sent Events (SSE) — instead of building it all up in memory at once.

StreamingResponse
from fastapi.responses import StreamingResponse

@router.get("/csv")
async def export_csv():
    async def generate():
        yield "id,title\n"
        async for row in stream_rows():
            yield f"{row.id},{row.title}\n"

    return StreamingResponse(generate(), media_type="text/csv")

The async generator from Intermediate #4 works as is. You stream out without loading everything into memory.

Server-Sent Events (SSE) #

SSE — real-time notifications
@router.get("/events")
async def events():
    async def event_stream():
        for i in range(100):
            yield f"data: tick {i}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
    )

The browser’s EventSource picks it up automatically. Not as bidirectional as WebSocket, but it’s enough for server-to-client one-way real-time.

WebSocket #

When you need bidirectional real-time, use WebSocket.

WebSocket basics
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"[{room}] echo: {data}")
    except WebSocketDisconnect:
        print(f"Client disconnected")

Good for chat, real-time collaboration, or games. State management, auth, and room separation add up quickly, though, so a serious implementation deserves its own study.

Concurrency limits — protecting external APIs #

The Semaphore from Advanced #4 is useful here too. If a route has to call 100 external APIs, limit it to N concurrent calls:

cap concurrent calls
import asyncio

_external_sem = asyncio.Semaphore(10)

async def call_external(url: str) -> dict:
    async with _external_sem:
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return response.json()

Helps both with the external API’s rate limits and with protecting your own service’s memory.

Timeouts — preventing infinite waits #

timeout for external calls
async with asyncio.timeout(5):
    response = await client.get(url)

Stops the request from hanging forever when an external API doesn’t respond. We covered this tool in Intermediate #7.

httpx itself has a timeout option:

httpx timeout
async with httpx.AsyncClient(timeout=10.0) as client:
    ...

Lifecycle events — lifespan #

Set up and tear down resources at server start/stop.

lifespan
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # at startup
    print("Server started")
    redis = await create_redis_pool()
    app.state.redis = redis

    yield

    # at shutdown
    print("Server stopped")
    await redis.close()

app = FastAPI(lifespan=lifespan)

The @asynccontextmanager pattern from Intermediate #3 applies here. The old @app.on_event("startup") decorator is deprecated.

Common pitfalls #

1) Ignoring exceptions in BackgroundTasks #

🚫 swallowed exception
def task_that_might_fail():
    raise SomethingWrong()    # response unaffected, only logs

BackgroundTasks doesn’t surface exceptions in the response. Failed work is invisible to the user. Important work is safer with an external queue + retries.

2) Passing a DB session received from a dependency to a background task #

🚫 session leak
@router.post("/")
async def create(db: DBSession, bg: BackgroundTasks):
    bg.add_task(use_db_later, db)   # ← dangerous
    return ...

When the route returns, the dependency (DB session) is closed. Using it from a background task means working with an already-closed session. Have background tasks create a fresh session instead.

async def use_db_later(user_id: int):
    async with AsyncSessionLocal() as db:
        ...

bg.add_task(use_db_later, user.id)

3) Using async libraries inside a def route #

🚫
@router.get("/")
def handler():
    asyncio.run(something_async())   # nested event loop — error

If a def route needs async code, switch the route itself to async def.

Recap #

What this post nailed down:

  • async def runs on the event loop, def on the thread pool — FastAPI picks
  • Sync blocking inside async def = event loop frozen
  • Fixes: async library / to_thread / switch to def
  • BackgroundTasks — post-response work in the same process, suited for fast tasks
  • External queues (ARQ/Celery/Dramatiq) — when you need retries, tracking, or independent scaling
  • ARQ mini guide — async-friendly, Redis only
  • StreamingResponse + async generators — large responses, SSE
  • WebSocket for bidirectional real-time
  • Semaphore for concurrency limits, asyncio.timeout for timeouts
  • lifespan for startup/shutdown resource management (@asynccontextmanager)
  • Pitfalls: swallowed background exceptions, using closed sessions, def/async confusion

The next post (#6 Testing and Deployment) closes the series. We cover integration tests with pytest + httpx, isolating external dependencies via dependency overrides, and Docker plus cloud deployment in one place.

X