장고 실전 #4 Celery로 비동기 작업

8 분 소요

#3 까지 읽기/쓰기는 빠릅니다. 그런데 이메일 발송, 외부 API 호출, 이미지 리사이즈, PDF 생성 같은 작업이 view 안에서 일어나면 응답이 몇 초씩 막히고, 외부 서비스가 느려지면 우리 API도 같이 느려집니다.

답은 늘 같습니다 — 요청 흐름에서 떼어 내고 백그라운드로 보낸다. Python/Django 진영에서 그 역할의 표준 도구가 Celery입니다.

Celery의 쓰임 #

용도어울리는 작업
무거운 CPU/IO이미지 리사이즈, PDF/리포트 생성, 데이터 집계
외부 API이메일 (SES/SendGrid), Slack 알림, 결제 외부 호출
지연 가능 작업회원가입 환영 메일 (1초 늦어도 OK), 통계 적재
스케줄링 (cron)매일 새벽 리포트, 만료 토큰 정리
팬아웃“전 사용자에게 알림” 같은 N건 작업 분산

응답에서 떼어내는 게 핵심 효용입니다. 사용자는 즉시 200 응답을 받고, 실제 작업은 워커가 처리.

대안 한 줄 #

평가
Celery사실상 표준. 풍부한 기능, 큰 생태계, 문서
RQ가벼움. Redis만 지원. 단순 큐가 필요할 때
Dramatiq모던, 더 단순한 API. Celery의 대안
Django Q2DB 백엔드 가능, Django 통합 강함. 작은 프로젝트
Huey매우 가벼움. 단일 머신에 적합

이 글은 Celery + Redis 조합을 깊게 봅니다 — 시장에서 가장 많이 만나는 형태.

브로커 — Redis vs RabbitMQ #

Celery는 **브로커 (메시지 큐)**를 통해 워커와 소통합니다.

RedisRabbitMQ
셋업 난이도매우 쉬움보통
운영 부담낮음보통
기능 풍부함단순복잡한 라우팅 가능
성능매우 빠름빠름
신뢰성좋음 (영속화 옵션)매우 좋음 (AMQP 표준)
결과 백엔드 같이 쓰기별도 필요
어울리는 경우일반 웹 서비스 (대부분)복잡한 라우팅, 대규모 분산

이 시리즈는 Redis로 갑니다 — 브로커 + result backend 한 번에 정리합니다. 작은~중간 규모 서비스에서 가장 흔한 형태입니다.

설치 #

패키지
uv add celery redis
Redis도커로 띄우기 (개발)
docker run -d -p 6379:6379 redis:7-alpine

Celery 셋업 — Django와의 통합 #

mysite/celery.py
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")

app = Celery("mysite")

# 모든 설정을 Django settings에서 'CELERY_' prefix로 읽음
app.config_from_object("django.conf:settings", namespace="CELERY")

# 모든 INSTALLED_APPS의 tasks.py 자동 발견
app.autodiscover_tasks()
mysite/__init__.py
from .celery import app as celery_app

__all__ = ("celery_app",)

__init__.py의 import가 핵심입니다 — Django가 시작될 때 Celery 앱 인스턴스가 같이 로드돼야 @shared_task가 그걸 발견할 수 있습니다.

mysite/settings.py — Celery 설정
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Seoul"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60   # 30분 hard limit

namespace="CELERY" 덕분에 settings의 CELERY_xxx가 Celery의 xxx 설정이 됩니다.

첫 task #

blog/tasks.py
from celery import shared_task
from django.core.mail import send_mail


@shared_task
def send_welcome_email(user_id: int) -> str:
    from django.contrib.auth import get_user_model
    User = get_user_model()
    user = User.objects.get(pk=user_id)
    send_mail(
        subject="환영합니다",
        message=f"{user.username} 님, 가입을 환영합니다.",
        from_email="noreply@example.com",
        recipient_list=[user.email],
    )
    return f"sent to {user.email}"

@shared_task앱에 묶이지 않은 task를 만듭니다 — Django의 어느 앱에서든 같은 방식으로 import 해 쓸 수 있습니다.

task 호출 — delay vs apply_async #

호출
from .tasks import send_welcome_email

# 가장 단순
send_welcome_email.delay(user.id)

# 옵션이 필요하면
send_welcome_email.apply_async(
    args=[user.id],
    countdown=10,                      # 10초 뒤
    expires=300,                       # 5분 안에 시작 안 되면 폐기
    queue="emails",                    # 다른 워커 큐
)

delay()apply_async()의 단축형. 옵션이 없으면 delay로 충분합니다.

View / Signal 안에서 호출 — transaction.on_commit #

가장 흔한 함정은 DB 트랜잭션이 커밋되기 전에 task가 실행되는 경우. 워커가 막 만든 사용자를 못 찾습니다.

🚫 트랜잭션 안에서 바로 호출
@router.post("/register")
def register(request):
    user = User.objects.create_user(...)
    send_welcome_email.delay(user.id)   # ← 워커가 user를 못 찾을 수 있음
    return Response({"ok": True})

@transaction.atomic 안이거나 ATOMIC_REQUESTS가 켜져 있으면, 응답 직전에 커밋됩니다. 그 사이에 워커가 task를 집어가면 사용자가 아직 DB에 없습니다.

답은 transaction.on_commit — 커밋이 끝난 뒤에만 호출 (고급 #5의 트랜잭션 후 처리 패턴).

✅ 커밋 후
from django.db import transaction


@router.post("/register")
def register(request):
    user = User.objects.create_user(...)
    transaction.on_commit(lambda: send_welcome_email.delay(user.id))
    return Response({"ok": True})

Signal에서도 같은 함정 #

signals.py
from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth import get_user_model
from .tasks import send_welcome_email


@receiver(post_save, sender=get_user_model())
def on_user_created(sender, instance, created, **kwargs):
    if not created:
        return
    transaction.on_commit(lambda: send_welcome_email.delay(instance.id))

post_save저장 직후 호출되지만, 트랜잭션 안에서 호출됐다면 아직 커밋 전입니다. 늘 on_commit으로 감싸세요.

결과 받아오기 #

shared_task의 반환값은 result backend (Redis)에 저장됩니다.

결과 조회
result = send_welcome_email.delay(user.id)
print(result.id)            # task UUID
print(result.status)        # PENDING / STARTED / SUCCESS / FAILURE
print(result.get(timeout=10))  # 결과 (블로킹)

API 응답에서 task.id를 클라이언트에 돌려주고, 별도 /api/tasks/{id}/ 엔드포인트로 진행 상태를 확인하는 패턴이 일반적입니다 (긴 작업, 진행률 UI).

재시도 — retry policy #

외부 API가 일시적으로 503을 줄 때, 한 번 실패로 task를 버리면 안 됩니다. 자동 재시도가 표준.

기본 retry
@shared_task(
    bind=True,                          # self 인자 받음
    autoretry_for=(ConnectionError, TimeoutError),
    retry_kwargs={"max_retries": 5},
    retry_backoff=True,                 # 1, 2, 4, 8, 16초 (지수)
    retry_backoff_max=600,              # 최대 10분
    retry_jitter=True,                  # ± 랜덤 (thundering herd 방지)
)
def call_external_api(self, url: str) -> dict:
    response = httpx.get(url, timeout=5)
    response.raise_for_status()
    return response.json()

수동 retry #

조건부 재시도
@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id: int):
    try:
        result = payment_gateway.charge(order_id)
    except TemporaryFailure as exc:
        # 60초 뒤 재시도, 최대 3회
        raise self.retry(exc=exc, countdown=60)

재시도 패턴의 핵심 #

  • idempotency — 같은 task가 여러 번 실행돼도 결과가 같아야 함 (이메일은 중복 발송될 수 있다는 전제로 설계)
  • backoff + jitter — 일시 장애에서 동시에 몰리지 않도록
  • dead letter — 최대 재시도 후에도 실패하면 별도 큐로 — 사람이 보고 처리

Celery Beat — periodic tasks (cron) #

스케줄링이 필요하면 Celery의 자매 도구 Beat.

설치 (DB 기반 스케줄)
uv add django-celery-beat
settings.py
INSTALLED_APPS += ["django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
마이그레이션
uv run python manage.py migrate

코드로 정의 #

mysite/celery.py 또는 settings.py
from celery.schedules import crontab

app.conf.beat_schedule = {
    "cleanup-expired-tokens": {
        "task": "blog.tasks.cleanup_expired_tokens",
        "schedule": crontab(hour=3, minute=0),   # 매일 03:00
    },
    "daily-summary-email": {
        "task": "blog.tasks.send_daily_summary",
        "schedule": crontab(hour=9, minute=0, day_of_week="mon-fri"),
    },
    "every-5-min-healthcheck": {
        "task": "blog.tasks.ping_external",
        "schedule": 300.0,   # 초 단위
    },
}

Admin에서 정의 (django-celery-beat의 강점) #

DatabaseScheduler 면 Django Admin에서 GUI로 스케줄을 추가/수정 할 수 있습니다 — 코드 배포 없이 운영 중에 스케줄을 바꾸는 방식입니다.

워커 실행 #

개발 모드 — 워커 + Beat
# 워커
uv run celery -A mysite worker -l info

# 별도 터미널 — Beat (스케줄러)
uv run celery -A mysite beat -l info

# 또는 한 프로세스로 (개발만, 프로덕션 비추)
uv run celery -A mysite worker -l info -B

동시성 #

옵션
celery -A mysite worker -l info \
  --concurrency=4 \           # 워커 프로세스 4개
  --queues=default,emails \   # 처리할 큐
  --hostname=worker1@%h

기본 동시성 모델은 prefork (멀티프로세스). I/O 많은 task는 --pool=gevent --concurrency=100로 비동기 풀이 더 효율적인 경우도 있습니다.

큐 분리 #

task 별 큐 지정
@shared_task(queue="emails")
def send_welcome_email(user_id: int): ...

@shared_task(queue="reports")
def generate_pdf(...): ...

각 큐에 워커를 따로 띄우면 무거운 작업이 가벼운 작업을 막지 않게 분리할 수 있습니다. 이메일 워커는 동시성 높게, 리포트 워커는 메모리 많은 머신에 따로.

모니터링 — Flower, Sentry #

Flower — 실시간 대시보드 #

설치 + 실행
uv add flower
celery -A mysite flower --port=5555

브라우저에서 http://localhost:5555를 열면:

  • 워커 상태 (online/offline, load)
  • 진행 중/완료/실패 task 목록
  • task 별 평균 실행 시간
  • 큐별 적체 상태

Sentry 통합 #

settings.py
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.celery import CeleryIntegration

sentry_sdk.init(
    dsn=os.environ["SENTRY_DSN"],
    integrations=[DjangoIntegration(), CeleryIntegration()],
    traces_sample_rate=0.1,
)

CeleryIntegration이 task 실패를 자동으로 Sentry에 보고합니다. 어느 task가 어떤 인자로 실패했는지 한곳에 모입니다.

docker-compose로 같이 띄우기 #

#6에서 깊게 다루지만, 최소 형태는 이 구조입니다.

docker-compose.yml — 발췌
services:
  web:
    build: .
    command: gunicorn mysite.wsgi --bind 0.0.0.0:8000
    depends_on: [db, redis]

  worker:
    build: .
    command: celery -A mysite worker -l info
    depends_on: [db, redis]

  beat:
    build: .
    command: celery -A mysite beat -l info
    depends_on: [db, redis]

  redis:
    image: redis:7-alpine

  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: blog
      POSTGRES_USER: blog
      POSTGRES_PASSWORD: blog

같은 이미지로 web / worker / beat 세 컨테이너를 띄웁니다. 코드는 같고 시작 명령만 다릅니다.

자주 만나는 함정 #

1) transaction.on_commit 누락 #

방금 본 그 함정. 트랜잭션 안에서 task.delay() 하면 워커가 데이터를 못 찾을 수 있습니다. on_commit.

2) task 인자에 모델 객체 #

🚫 ORM 객체 전달
send_welcome_email.delay(user)

Celery는 인자를 직렬화 (JSON) 합니다. ORM 객체는 직렬화 안 되거나, 직렬화돼도 워커가 받을 때는 stale 한 상태일 수 있습니다. ID만 전달, 워커 안에서 다시 조회.

✅ ID만
send_welcome_email.delay(user.id)

3) 결과를 안 쓸 거면 백엔드 부하 줄이기 #

ignore_result
@shared_task(ignore_result=True)
def fire_and_forget(...): ...

result backend에 저장 안 함. 결과 조회가 필요 없는 task는 이걸로 Redis 부담을 줄이세요.

4) Beat가 여러 개 돌면 task가 N 번 #

celery beat딱 한 인스턴스만 돌아야 합니다. 여러 개 돌면 같은 스케줄을 N번 트리거. 컨테이너로 띄울 때 replicas=1 보장.

5) task가 너무 큼 #

한 task가 1만 건을 처리하면 실패할 때 다 다시. chunk로 나누고 각 chunk를 별도 task로 (팬아웃 패턴) — Celery의 group, chord, chain이 그 일을 합니다.

group으로 팬아웃
from celery import group

job = group(process_one.s(uid) for uid in user_ids)
result = job.apply_async()

실전 예 — 글 발행 후 알림 + 인덱스 갱신 #

블로그 API에서 흔한 시나리오. 글이 published로 바뀌면:

  1. 구독자에게 알림 이메일
  2. 검색 인덱스 갱신
  3. RSS/Sitemap 캐시 무효화

전부 응답 흐름에서 떼어냅니다.

blog/tasks.py
@shared_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=3)
def notify_subscribers(post_id: int):
    post = Post.objects.get(pk=post_id)
    subscribers = post.author.subscribers.all()
    for sub in subscribers:
        send_mail("새 글", post.title, "noreply@example.com", [sub.email])

@shared_task
def reindex_post(post_id: int):
    # OpenSearch / Meilisearch에 인덱싱
    ...

@shared_task(ignore_result=True)
def invalidate_caches(post_id: int):
    cache.delete_many(["rss:feed", "sitemap:posts"])
blog/views.py — publish 액션
from django.db import transaction
from .tasks import notify_subscribers, reindex_post, invalidate_caches


class PostViewSet(viewsets.ModelViewSet):
    @action(detail=True, methods=["post"])
    def publish(self, request, pk=None):
        post = self.get_object()
        post.published = True
        post.save()

        transaction.on_commit(lambda: notify_subscribers.delay(post.id))
        transaction.on_commit(lambda: reindex_post.delay(post.id))
        transaction.on_commit(lambda: invalidate_caches.delay(post.id))

        return Response({"published": True})

view는 즉시 200으로 응답. 세 작업은 워커가 백그라운드로 처리. 한 작업이 실패해도 다른 작업과 응답 자체에는 영향 없음.

정리 #

이번 글에서 잡은 것:

  • Celery의 쓰임 — 무거운 작업 / 외부 IO / 스케줄링 / 팬아웃
  • 브로커 — Redis가 일반적, RabbitMQ는 복잡한 라우팅
  • 셋업 — mysite/celery.py, __init__.pycelery_app, autodiscover_tasks
  • @shared_task — 앱 독립 task
  • task.delay() (단축) vs apply_async() (옵션)
  • **transaction.on_commit**으로 커밋 후 호출 (고급 #5 회귀)
  • 결과 — result.get(), result.status, result backend
  • 재시도autoretry_for, retry_backoff, retry_jitter, idempotency
  • Celery Beatcrontab 또는 DB 기반 스케줄러 (admin에서 관리)
  • 워커 실행 — celery -A mysite worker, 동시성, 큐 분리
  • 모니터링 — Flower (실시간), Sentry (실패 추적)
  • docker-compose에 web/worker/beat 한곳에
  • 함정 — on_commit, ORM 객체 전달, ignore_result, Beat 인스턴스 1개, 큰 task 분할

Celery는 한 글로는 다 못 다루는 도구입니다 — workflow (group/chord/chain), task routing, 우선순위 큐, signals 등 깊이가 더 있습니다. 일단 위 패턴이 90%를 채워주고, 더 깊은 부분은 Celery 공식 문서가 친절합니다.

다음 글(#5 OpenAPI 문서 (drf-spectacular))에서는 지금까지 만든 API를 자동 문서화 — Swagger UI, ReDoc, 클라이언트 SDK 자동 생성까지 — 하는 흐름을 다룹니다.

X