Django DRF #4: Async Work with Celery
By #3, reads and writes are fast. But when work like sending emails, calling external APIs, resizing images, or generating PDFs happens inside a view, responses can block for several seconds, and the API slows down whenever an external service does.
The answer is always the same — detach from the request flow and send it to the background. The standard tool in the Python/Django world is Celery.
Where Celery fits #
| Slot | Suitable work |
|---|---|
| Heavy CPU/IO | Image resizing, PDF/report generation, data aggregation |
| External APIs | Email (SES/SendGrid), Slack notifications, payment external calls |
| Deferrable work | Welcome email after signup (1 sec late is OK), stats ingestion |
| Scheduling (cron) | Daily early-morning reports, expired token cleanup |
| Fan-out | “Notify all users” — N-item work distributed |
The core benefit is detaching from the response. The user gets a 200 response immediately, and the actual work is processed by a worker.
Alternatives in one line #
| Verdict | |
|---|---|
| Celery | De facto standard. Rich features, big ecosystem, docs |
| RQ | Lightweight. Redis only. When you need a simple queue |
| Dramatiq | Modern, simpler API. Celery alternative |
| Django Q2 | DB backend possible, strong Django integration. Small projects |
| Huey | Very lightweight. Good fit for a single machine |
This post goes deep on the Celery + Redis combination — the most common shape on the market.
Broker — Redis vs RabbitMQ #
Celery talks to workers through a broker (message queue).
| Redis | RabbitMQ | |
|---|---|---|
| Setup difficulty | Very easy | Moderate |
| Operations burden | Low | Moderate |
| Feature richness | Simple | Complex routing possible |
| Performance | Very fast | Fast |
| Reliability | Good (with persistence) | Very good (AMQP standard) |
| Use as result backend too | Yes | Separate needed |
| Best fit | General web services (most cases) | Complex routing, large-scale distributed |
This series goes with Redis — broker + result backend in one. 90% of the slot for small-to-medium services.
Install #
uv add celery redisdocker run -d -p 6379:6379 redis:7-alpineCelery setup — Django integration #
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
app = Celery("mysite")
# Read all settings from Django settings with 'CELERY_' prefix
app.config_from_object("django.conf:settings", namespace="CELERY")
# Auto-discover tasks.py in all INSTALLED_APPS
app.autodiscover_tasks()from .celery import app as celery_app
__all__ = ("celery_app",)The import in __init__.py is the key — when Django starts, the Celery app instance has to load alongside so that @shared_task can find it.
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Seoul"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # 30-minute hard limitThanks to namespace="CELERY", settings’ CELERY_xxx becomes Celery’s xxx setting.
First task #
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_welcome_email(user_id: int) -> str:
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(pk=user_id)
send_mail(
subject="Welcome",
message=f"Welcome, {user.username}.",
from_email="noreply@example.com",
recipient_list=[user.email],
)
return f"sent to {user.email}"@shared_task creates a task that isn’t tied to an app — you can import and use it the same way from any Django app.
Calling a task — delay vs apply_async
#
from .tasks import send_welcome_email
# Simplest
send_welcome_email.delay(user.id)
# When you need options
send_welcome_email.apply_async(
args=[user.id],
countdown=10, # 10 seconds later
expires=300, # discard if not started in 5 min
queue="emails", # different worker queue
)delay() is the shortcut for apply_async(). With no options, delay is enough.
Calling inside View / Signal — transaction.on_commit
#
The most common trap is a task running before the DB transaction commits. The worker can’t find the user that was just created.
@router.post("/register")
def register(request):
user = User.objects.create_user(...)
send_welcome_email.delay(user.id) # ← worker may not find the user
return Response({"ok": True})Inside @transaction.atomic or with ATOMIC_REQUESTS enabled, the commit happens just before the response is sent. If a worker picks up the task in the meantime, the user may not yet exist in the DB.
The answer is transaction.on_commit — call only after commit finishes (the post-transaction processing pattern from Advanced #5).
from django.db import transaction
@router.post("/register")
def register(request):
user = User.objects.create_user(...)
transaction.on_commit(lambda: send_welcome_email.delay(user.id))
return Response({"ok": True})Same trap in signals #
from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth import get_user_model
from .tasks import send_welcome_email
@receiver(post_save, sender=get_user_model())
def on_user_created(sender, instance, created, **kwargs):
if not created:
return
transaction.on_commit(lambda: send_welcome_email.delay(instance.id))post_save fires immediately after save, but if it fires inside a transaction, the commit hasn’t happened yet. Always wrap the task call with on_commit.
Getting the result #
The return value of a shared_task is stored in the result backend (Redis).
result = send_welcome_email.delay(user.id)
print(result.id) # task UUID
print(result.status) # PENDING / STARTED / SUCCESS / FAILURE
print(result.get(timeout=10)) # result (blocking)A common pattern is to return task.id to the client in the API response, then expose progress via a separate /api/tasks/{id}/ endpoint — useful for long jobs with a progress UI.
Retry — retry policy #
When an external API temporarily returns 503, discarding the task on the first failure is not an option. Auto retry is the standard solution.
@shared_task(
bind=True, # receives self
autoretry_for=(ConnectionError, TimeoutError),
retry_kwargs={"max_retries": 5},
retry_backoff=True, # 1, 2, 4, 8, 16 sec (exponential)
retry_backoff_max=600, # max 10 min
retry_jitter=True, # ± random (prevents thundering herd)
)
def call_external_api(self, url: str) -> dict:
response = httpx.get(url, timeout=5)
response.raise_for_status()
return response.json()Manual retry #
@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id: int):
try:
result = payment_gateway.charge(order_id)
except TemporaryFailure as exc:
# Retry after 60 sec, max 3 times
raise self.retry(exc=exc, countdown=60)The heart of the retry pattern #
- idempotency — the same task running multiple times must produce the same result (design under the assumption email may be sent twice)
- backoff + jitter — so transient outages don’t hit at the same time
- dead letter — after max retries also fail, send to a separate queue — for humans to review
Celery Beat — periodic tasks (cron) #
When you need scheduling, use Celery’s sister tool Beat.
uv add django-celery-beatINSTALLED_APPS += ["django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"uv run python manage.py migrateDefine in code #
from celery.schedules import crontab
app.conf.beat_schedule = {
"cleanup-expired-tokens": {
"task": "blog.tasks.cleanup_expired_tokens",
"schedule": crontab(hour=3, minute=0), # daily 03:00
},
"daily-summary-email": {
"task": "blog.tasks.send_daily_summary",
"schedule": crontab(hour=9, minute=0, day_of_week="mon-fri"),
},
"every-5-min-healthcheck": {
"task": "blog.tasks.ping_external",
"schedule": 300.0, # in seconds
},
}Define in admin (django-celery-beat’s strength) #
With DatabaseScheduler, you can add/edit schedules from the Django Admin GUI — meaning schedule changes in production require no code deploy.
Running the worker #
# Worker
uv run celery -A mysite worker -l info
# Separate terminal — Beat (scheduler)
uv run celery -A mysite beat -l info
# Or in one process (dev only, not for production)
uv run celery -A mysite worker -l info -BConcurrency #
celery -A mysite worker -l info \
--concurrency=4 \ # 4 worker processes
--queues=default,emails \ # queues to process
--hostname=worker1@%hThe default concurrency model is prefork (multiprocess). For tasks heavy in I/O, --pool=gevent --concurrency=100 (an async pool) is more efficient in some slots.
Queue separation #
@shared_task(queue="emails")
def send_welcome_email(user_id: int): ...
@shared_task(queue="reports")
def generate_pdf(...): ...Run separate workers per queue to prevent heavy work from blocking light work. Email workers with high concurrency; report workers on a memory-rich machine separately.
Monitoring — Flower, Sentry #
Flower — real-time dashboard #
uv add flower
celery -A mysite flower --port=5555Open http://localhost:5555 in a browser to see:
- Worker status (online/offline, load)
- In-progress / completed / failed task lists
- Average execution time per task
- Per-queue backlog status
Sentry integration #
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.celery import CeleryIntegration
sentry_sdk.init(
dsn=os.environ["SENTRY_DSN"],
integrations=[DjangoIntegration(), CeleryIntegration()],
traces_sample_rate=0.1,
)CeleryIntegration automatically reports task failures to Sentry. Which task failed, with which arguments, all collected in one place.
Bring it up together with docker-compose #
This is covered in depth in #6, but the minimal form looks like this.
services:
web:
build: .
command: gunicorn mysite.wsgi --bind 0.0.0.0:8000
depends_on: [db, redis]
worker:
build: .
command: celery -A mysite worker -l info
depends_on: [db, redis]
beat:
build: .
command: celery -A mysite beat -l info
depends_on: [db, redis]
redis:
image: redis:7-alpine
db:
image: postgres:16-alpine
environment:
POSTGRES_DB: blog
POSTGRES_USER: blog
POSTGRES_PASSWORD: blogThree containers — web / worker / beat — built from the same image. Same code, only the start command differs.
Common pitfalls #
1) Missing transaction.on_commit
#
The trap covered above: calling task.delay() inside a transaction may leave the worker unable to find the data. Always use on_commit.
2) Model objects as task arguments #
send_welcome_email.delay(user)Celery serializes arguments (JSON). ORM objects don’t serialize, or even if they did the worker may receive a stale state. Pass IDs only, re-query inside the worker.
send_welcome_email.delay(user.id)3) Reduce backend load if you don’t use the result #
@shared_task(ignore_result=True)
def fire_and_forget(...): ...Don’t store in the result backend. For tasks where result lookup is unnecessary, this reduces Redis pressure.
4) Multiple Beat instances trigger N times #
celery beat should run as exactly one instance. Multiple instances trigger the same schedule N times. When deploying via containers, ensure replicas=1.
5) Tasks that are too big #
If one task processes 10,000 items, a failure means redoing them all. Split into chunks and dispatch each chunk as a separate task (fan-out pattern) — Celery’s group, chord, chain fit that slot.
from celery import group
job = group(process_one.s(uid) for uid in user_ids)
result = job.apply_async()Real example — notify + reindex after publishing #
A common scenario in a blog API. When a post turns published:
- Send notification emails to subscribers
- Update the search index
- Invalidate RSS/Sitemap cache
All detached from the response flow.
@shared_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=3)
def notify_subscribers(post_id: int):
post = Post.objects.get(pk=post_id)
subscribers = post.author.subscribers.all()
for sub in subscribers:
send_mail("New post", post.title, "noreply@example.com", [sub.email])
@shared_task
def reindex_post(post_id: int):
# Index in OpenSearch / Meilisearch
...
@shared_task(ignore_result=True)
def invalidate_caches(post_id: int):
cache.delete_many(["rss:feed", "sitemap:posts"])from django.db import transaction
from .tasks import notify_subscribers, reindex_post, invalidate_caches
class PostViewSet(viewsets.ModelViewSet):
@action(detail=True, methods=["post"])
def publish(self, request, pk=None):
post = self.get_object()
post.published = True
post.save()
transaction.on_commit(lambda: notify_subscribers.delay(post.id))
transaction.on_commit(lambda: reindex_post.delay(post.id))
transaction.on_commit(lambda: invalidate_caches.delay(post.id))
return Response({"published": True})The view responds with 200 immediately. The three tasks run in the background. Even if one fails, the others and the response itself are unaffected.
Recap #
What this post nailed down:
- Where Celery fits — heavy work / external IO / scheduling / fan-out
- Broker — Redis is common, RabbitMQ for complex routing
- Setup —
mysite/celery.py,celery_appin__init__.py,autodiscover_tasks @shared_task— app-independent tasktask.delay()(shortcut) vsapply_async()(with options)transaction.on_commitfor post-commit calls (loops back to Advanced #5)- Results —
result.get(),result.status, result backend - Retry —
autoretry_for,retry_backoff,retry_jitter, idempotency - Celery Beat —
crontabor DB-based scheduler (managed in admin) - Running workers —
celery -A mysite worker, concurrency, queue separation - Monitoring — Flower (real-time), Sentry (failure tracking)
- web/worker/beat together in docker-compose
- Pitfalls — on_commit, passing ORM objects, ignore_result, single Beat instance, splitting big tasks
Celery is a tool that doesn’t fit in one post — workflows (group/chord/chain), task routing, priority queues, signals, and so on go deeper. The patterns above fill 90% of the slot, and for the deeper places the Celery official docs are friendly.
The next post (#5 OpenAPI docs (drf-spectacular)) covers the slot of automatically documenting the API you’ve built — Swagger UI, ReDoc, and even auto-generating client SDKs.