Django DRF #4: Async Work with Celery

10 min read

By #3, reads and writes are fast. But when work like sending emails, calling external APIs, resizing images, or generating PDFs happens inside a view, responses can block for several seconds, and the API slows down whenever an external service does.

The answer is always the same — detach from the request flow and send it to the background. The standard tool in the Python/Django world is Celery.

Where Celery fits #

SlotSuitable work
Heavy CPU/IOImage resizing, PDF/report generation, data aggregation
External APIsEmail (SES/SendGrid), Slack notifications, payment external calls
Deferrable workWelcome email after signup (1 sec late is OK), stats ingestion
Scheduling (cron)Daily early-morning reports, expired token cleanup
Fan-out“Notify all users” — N-item work distributed

The core benefit is detaching from the response. The user gets a 200 response immediately, and the actual work is processed by a worker.

Alternatives in one line #

Verdict
CeleryDe facto standard. Rich features, big ecosystem, docs
RQLightweight. Redis only. When you need a simple queue
DramatiqModern, simpler API. Celery alternative
Django Q2DB backend possible, strong Django integration. Small projects
HueyVery lightweight. Good fit for a single machine

This post goes deep on the Celery + Redis combination — the most common shape on the market.

Broker — Redis vs RabbitMQ #

Celery talks to workers through a broker (message queue).

RedisRabbitMQ
Setup difficultyVery easyModerate
Operations burdenLowModerate
Feature richnessSimpleComplex routing possible
PerformanceVery fastFast
ReliabilityGood (with persistence)Very good (AMQP standard)
Use as result backend tooYesSeparate needed
Best fitGeneral web services (most cases)Complex routing, large-scale distributed

This series goes with Redis — broker + result backend in one. 90% of the slot for small-to-medium services.

Install #

Packages
uv add celery redis
Run Redis with Docker (dev)
docker run -d -p 6379:6379 redis:7-alpine

Celery setup — Django integration #

mysite/celery.py
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")

app = Celery("mysite")

# Read all settings from Django settings with 'CELERY_' prefix
app.config_from_object("django.conf:settings", namespace="CELERY")

# Auto-discover tasks.py in all INSTALLED_APPS
app.autodiscover_tasks()
mysite/__init__.py
from .celery import app as celery_app

__all__ = ("celery_app",)

The import in __init__.py is the key — when Django starts, the Celery app instance has to load alongside so that @shared_task can find it.

mysite/settings.py — Celery config
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Seoul"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60   # 30-minute hard limit

Thanks to namespace="CELERY", settings’ CELERY_xxx becomes Celery’s xxx setting.

First task #

blog/tasks.py
from celery import shared_task
from django.core.mail import send_mail


@shared_task
def send_welcome_email(user_id: int) -> str:
    from django.contrib.auth import get_user_model
    User = get_user_model()
    user = User.objects.get(pk=user_id)
    send_mail(
        subject="Welcome",
        message=f"Welcome, {user.username}.",
        from_email="noreply@example.com",
        recipient_list=[user.email],
    )
    return f"sent to {user.email}"

@shared_task creates a task that isn’t tied to an app — you can import and use it the same way from any Django app.

Calling a task — delay vs apply_async #

Calling
from .tasks import send_welcome_email

# Simplest
send_welcome_email.delay(user.id)

# When you need options
send_welcome_email.apply_async(
    args=[user.id],
    countdown=10,                      # 10 seconds later
    expires=300,                       # discard if not started in 5 min
    queue="emails",                    # different worker queue
)

delay() is the shortcut for apply_async(). With no options, delay is enough.

Calling inside View / Signal — transaction.on_commit #

The most common trap is a task running before the DB transaction commits. The worker can’t find the user that was just created.

🚫 Direct call inside transaction
@router.post("/register")
def register(request):
    user = User.objects.create_user(...)
    send_welcome_email.delay(user.id)   # ← worker may not find the user
    return Response({"ok": True})

Inside @transaction.atomic or with ATOMIC_REQUESTS enabled, the commit happens just before the response is sent. If a worker picks up the task in the meantime, the user may not yet exist in the DB.

The answer is transaction.on_commit — call only after commit finishes (the post-transaction processing pattern from Advanced #5).

✅ After commit
from django.db import transaction


@router.post("/register")
def register(request):
    user = User.objects.create_user(...)
    transaction.on_commit(lambda: send_welcome_email.delay(user.id))
    return Response({"ok": True})

Same trap in signals #

signals.py
from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth import get_user_model
from .tasks import send_welcome_email


@receiver(post_save, sender=get_user_model())
def on_user_created(sender, instance, created, **kwargs):
    if not created:
        return
    transaction.on_commit(lambda: send_welcome_email.delay(instance.id))

post_save fires immediately after save, but if it fires inside a transaction, the commit hasn’t happened yet. Always wrap the task call with on_commit.

Getting the result #

The return value of a shared_task is stored in the result backend (Redis).

Reading the result
result = send_welcome_email.delay(user.id)
print(result.id)            # task UUID
print(result.status)        # PENDING / STARTED / SUCCESS / FAILURE
print(result.get(timeout=10))  # result (blocking)

A common pattern is to return task.id to the client in the API response, then expose progress via a separate /api/tasks/{id}/ endpoint — useful for long jobs with a progress UI.

Retry — retry policy #

When an external API temporarily returns 503, discarding the task on the first failure is not an option. Auto retry is the standard solution.

Basic retry
@shared_task(
    bind=True,                          # receives self
    autoretry_for=(ConnectionError, TimeoutError),
    retry_kwargs={"max_retries": 5},
    retry_backoff=True,                 # 1, 2, 4, 8, 16 sec (exponential)
    retry_backoff_max=600,              # max 10 min
    retry_jitter=True,                  # ± random (prevents thundering herd)
)
def call_external_api(self, url: str) -> dict:
    response = httpx.get(url, timeout=5)
    response.raise_for_status()
    return response.json()

Manual retry #

Conditional retry
@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id: int):
    try:
        result = payment_gateway.charge(order_id)
    except TemporaryFailure as exc:
        # Retry after 60 sec, max 3 times
        raise self.retry(exc=exc, countdown=60)

The heart of the retry pattern #

  • idempotency — the same task running multiple times must produce the same result (design under the assumption email may be sent twice)
  • backoff + jitter — so transient outages don’t hit at the same time
  • dead letter — after max retries also fail, send to a separate queue — for humans to review

Celery Beat — periodic tasks (cron) #

When you need scheduling, use Celery’s sister tool Beat.

Install (DB-based scheduler)
uv add django-celery-beat
settings.py
INSTALLED_APPS += ["django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
Migration
uv run python manage.py migrate

Define in code #

mysite/celery.py or settings.py
from celery.schedules import crontab

app.conf.beat_schedule = {
    "cleanup-expired-tokens": {
        "task": "blog.tasks.cleanup_expired_tokens",
        "schedule": crontab(hour=3, minute=0),   # daily 03:00
    },
    "daily-summary-email": {
        "task": "blog.tasks.send_daily_summary",
        "schedule": crontab(hour=9, minute=0, day_of_week="mon-fri"),
    },
    "every-5-min-healthcheck": {
        "task": "blog.tasks.ping_external",
        "schedule": 300.0,   # in seconds
    },
}

Define in admin (django-celery-beat’s strength) #

With DatabaseScheduler, you can add/edit schedules from the Django Admin GUI — meaning schedule changes in production require no code deploy.

Running the worker #

Dev mode — worker + Beat
# Worker
uv run celery -A mysite worker -l info

# Separate terminal — Beat (scheduler)
uv run celery -A mysite beat -l info

# Or in one process (dev only, not for production)
uv run celery -A mysite worker -l info -B

Concurrency #

Options
celery -A mysite worker -l info \
  --concurrency=4 \           # 4 worker processes
  --queues=default,emails \   # queues to process
  --hostname=worker1@%h

The default concurrency model is prefork (multiprocess). For tasks heavy in I/O, --pool=gevent --concurrency=100 (an async pool) is more efficient in some slots.

Queue separation #

Per-task queue
@shared_task(queue="emails")
def send_welcome_email(user_id: int): ...

@shared_task(queue="reports")
def generate_pdf(...): ...

Run separate workers per queue to prevent heavy work from blocking light work. Email workers with high concurrency; report workers on a memory-rich machine separately.

Monitoring — Flower, Sentry #

Flower — real-time dashboard #

Install + run
uv add flower
celery -A mysite flower --port=5555

Open http://localhost:5555 in a browser to see:

  • Worker status (online/offline, load)
  • In-progress / completed / failed task lists
  • Average execution time per task
  • Per-queue backlog status

Sentry integration #

settings.py
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.celery import CeleryIntegration

sentry_sdk.init(
    dsn=os.environ["SENTRY_DSN"],
    integrations=[DjangoIntegration(), CeleryIntegration()],
    traces_sample_rate=0.1,
)

CeleryIntegration automatically reports task failures to Sentry. Which task failed, with which arguments, all collected in one place.

Bring it up together with docker-compose #

This is covered in depth in #6, but the minimal form looks like this.

docker-compose.yml — excerpt
services:
  web:
    build: .
    command: gunicorn mysite.wsgi --bind 0.0.0.0:8000
    depends_on: [db, redis]

  worker:
    build: .
    command: celery -A mysite worker -l info
    depends_on: [db, redis]

  beat:
    build: .
    command: celery -A mysite beat -l info
    depends_on: [db, redis]

  redis:
    image: redis:7-alpine

  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: blog
      POSTGRES_USER: blog
      POSTGRES_PASSWORD: blog

Three containers — web / worker / beat — built from the same image. Same code, only the start command differs.

Common pitfalls #

1) Missing transaction.on_commit #

The trap covered above: calling task.delay() inside a transaction may leave the worker unable to find the data. Always use on_commit.

2) Model objects as task arguments #

🚫 Passing an ORM object
send_welcome_email.delay(user)

Celery serializes arguments (JSON). ORM objects don’t serialize, or even if they did the worker may receive a stale state. Pass IDs only, re-query inside the worker.

✅ ID only
send_welcome_email.delay(user.id)

3) Reduce backend load if you don’t use the result #

ignore_result
@shared_task(ignore_result=True)
def fire_and_forget(...): ...

Don’t store in the result backend. For tasks where result lookup is unnecessary, this reduces Redis pressure.

4) Multiple Beat instances trigger N times #

celery beat should run as exactly one instance. Multiple instances trigger the same schedule N times. When deploying via containers, ensure replicas=1.

5) Tasks that are too big #

If one task processes 10,000 items, a failure means redoing them all. Split into chunks and dispatch each chunk as a separate task (fan-out pattern) — Celery’s group, chord, chain fit that slot.

Fan-out with group
from celery import group

job = group(process_one.s(uid) for uid in user_ids)
result = job.apply_async()

Real example — notify + reindex after publishing #

A common scenario in a blog API. When a post turns published:

  1. Send notification emails to subscribers
  2. Update the search index
  3. Invalidate RSS/Sitemap cache

All detached from the response flow.

blog/tasks.py
@shared_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=3)
def notify_subscribers(post_id: int):
    post = Post.objects.get(pk=post_id)
    subscribers = post.author.subscribers.all()
    for sub in subscribers:
        send_mail("New post", post.title, "noreply@example.com", [sub.email])

@shared_task
def reindex_post(post_id: int):
    # Index in OpenSearch / Meilisearch
    ...

@shared_task(ignore_result=True)
def invalidate_caches(post_id: int):
    cache.delete_many(["rss:feed", "sitemap:posts"])
blog/views.py — publish action
from django.db import transaction
from .tasks import notify_subscribers, reindex_post, invalidate_caches


class PostViewSet(viewsets.ModelViewSet):
    @action(detail=True, methods=["post"])
    def publish(self, request, pk=None):
        post = self.get_object()
        post.published = True
        post.save()

        transaction.on_commit(lambda: notify_subscribers.delay(post.id))
        transaction.on_commit(lambda: reindex_post.delay(post.id))
        transaction.on_commit(lambda: invalidate_caches.delay(post.id))

        return Response({"published": True})

The view responds with 200 immediately. The three tasks run in the background. Even if one fails, the others and the response itself are unaffected.

Recap #

What this post nailed down:

  • Where Celery fits — heavy work / external IO / scheduling / fan-out
  • Broker — Redis is common, RabbitMQ for complex routing
  • Setup — mysite/celery.py, celery_app in __init__.py, autodiscover_tasks
  • @shared_task — app-independent task
  • task.delay() (shortcut) vs apply_async() (with options)
  • transaction.on_commit for post-commit calls (loops back to Advanced #5)
  • Results — result.get(), result.status, result backend
  • Retryautoretry_for, retry_backoff, retry_jitter, idempotency
  • Celery Beatcrontab or DB-based scheduler (managed in admin)
  • Running workers — celery -A mysite worker, concurrency, queue separation
  • Monitoring — Flower (real-time), Sentry (failure tracking)
  • web/worker/beat together in docker-compose
  • Pitfalls — on_commit, passing ORM objects, ignore_result, single Beat instance, splitting big tasks

Celery is a tool that doesn’t fit in one post — workflows (group/chord/chain), task routing, priority queues, signals, and so on go deeper. The patterns above fill 90% of the slot, and for the deeper places the Celery official docs are friendly.

The next post (#5 OpenAPI docs (drf-spectacular)) covers the slot of automatically documenting the API you’ve built — Swagger UI, ReDoc, and even auto-generating client SDKs.

X