장고 실전 #4 Celery로 비동기 작업
#3 까지 읽기/쓰기는 빠릅니다. 그런데 이메일 발송, 외부 API 호출, 이미지 리사이즈, PDF 생성 같은 작업이 view 안에서 일어나면 응답이 몇 초씩 막히고, 외부 서비스가 느려지면 우리 API도 같이 느려집니다.
답은 늘 같습니다 — 요청 흐름에서 떼어 내고 백그라운드로 보낸다. Python/Django 진영에서 그 역할의 표준 도구가 Celery입니다.
Celery의 쓰임 #
| 용도 | 어울리는 작업 |
|---|---|
| 무거운 CPU/IO | 이미지 리사이즈, PDF/리포트 생성, 데이터 집계 |
| 외부 API | 이메일 (SES/SendGrid), Slack 알림, 결제 외부 호출 |
| 지연 가능 작업 | 회원가입 환영 메일 (1초 늦어도 OK), 통계 적재 |
| 스케줄링 (cron) | 매일 새벽 리포트, 만료 토큰 정리 |
| 팬아웃 | “전 사용자에게 알림” 같은 N건 작업 분산 |
응답에서 떼어내는 게 핵심 효용입니다. 사용자는 즉시 200 응답을 받고, 실제 작업은 워커가 처리.
대안 한 줄 #
| 평가 | |
|---|---|
| Celery | 사실상 표준. 풍부한 기능, 큰 생태계, 문서 |
| RQ | 가벼움. Redis만 지원. 단순 큐가 필요할 때 |
| Dramatiq | 모던, 더 단순한 API. Celery의 대안 |
| Django Q2 | DB 백엔드 가능, Django 통합 강함. 작은 프로젝트 |
| Huey | 매우 가벼움. 단일 머신에 적합 |
이 글은 Celery + Redis 조합을 깊게 봅니다 — 시장에서 가장 많이 만나는 형태.
브로커 — Redis vs RabbitMQ #
Celery는 **브로커 (메시지 큐)**를 통해 워커와 소통합니다.
| Redis | RabbitMQ | |
|---|---|---|
| 셋업 난이도 | 매우 쉬움 | 보통 |
| 운영 부담 | 낮음 | 보통 |
| 기능 풍부함 | 단순 | 복잡한 라우팅 가능 |
| 성능 | 매우 빠름 | 빠름 |
| 신뢰성 | 좋음 (영속화 옵션) | 매우 좋음 (AMQP 표준) |
| 결과 백엔드 같이 쓰기 | 됨 | 별도 필요 |
| 어울리는 경우 | 일반 웹 서비스 (대부분) | 복잡한 라우팅, 대규모 분산 |
이 시리즈는 Redis로 갑니다 — 브로커 + result backend 한 번에 정리합니다. 작은~중간 규모 서비스에서 가장 흔한 형태입니다.
설치 #
uv add celery redisdocker run -d -p 6379:6379 redis:7-alpineCelery 셋업 — Django와의 통합 #
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
app = Celery("mysite")
# 모든 설정을 Django settings에서 'CELERY_' prefix로 읽음
app.config_from_object("django.conf:settings", namespace="CELERY")
# 모든 INSTALLED_APPS의 tasks.py 자동 발견
app.autodiscover_tasks()from .celery import app as celery_app
__all__ = ("celery_app",)__init__.py의 import가 핵심입니다 — Django가 시작될 때 Celery 앱 인스턴스가 같이 로드돼야 @shared_task가 그걸 발견할 수 있습니다.
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Seoul"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # 30분 hard limitnamespace="CELERY" 덕분에 settings의 CELERY_xxx가 Celery의 xxx 설정이 됩니다.
첫 task #
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_welcome_email(user_id: int) -> str:
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(pk=user_id)
send_mail(
subject="환영합니다",
message=f"{user.username} 님, 가입을 환영합니다.",
from_email="noreply@example.com",
recipient_list=[user.email],
)
return f"sent to {user.email}"@shared_task는 앱에 묶이지 않은 task를 만듭니다 — Django의 어느 앱에서든 같은 방식으로 import 해 쓸 수 있습니다.
task 호출 — delay vs apply_async
#
from .tasks import send_welcome_email
# 가장 단순
send_welcome_email.delay(user.id)
# 옵션이 필요하면
send_welcome_email.apply_async(
args=[user.id],
countdown=10, # 10초 뒤
expires=300, # 5분 안에 시작 안 되면 폐기
queue="emails", # 다른 워커 큐
)delay()는 apply_async()의 단축형. 옵션이 없으면 delay로 충분합니다.
View / Signal 안에서 호출 — transaction.on_commit
#
가장 흔한 함정은 DB 트랜잭션이 커밋되기 전에 task가 실행되는 경우. 워커가 막 만든 사용자를 못 찾습니다.
@router.post("/register")
def register(request):
user = User.objects.create_user(...)
send_welcome_email.delay(user.id) # ← 워커가 user를 못 찾을 수 있음
return Response({"ok": True})@transaction.atomic 안이거나 ATOMIC_REQUESTS가 켜져 있으면, 응답 직전에 커밋됩니다. 그 사이에 워커가 task를 집어가면 사용자가 아직 DB에 없습니다.
답은 transaction.on_commit — 커밋이 끝난 뒤에만 호출 (고급 #5의 트랜잭션 후 처리 패턴).
from django.db import transaction
@router.post("/register")
def register(request):
user = User.objects.create_user(...)
transaction.on_commit(lambda: send_welcome_email.delay(user.id))
return Response({"ok": True})Signal에서도 같은 함정 #
from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth import get_user_model
from .tasks import send_welcome_email
@receiver(post_save, sender=get_user_model())
def on_user_created(sender, instance, created, **kwargs):
if not created:
return
transaction.on_commit(lambda: send_welcome_email.delay(instance.id))post_save는 저장 직후 호출되지만, 트랜잭션 안에서 호출됐다면 아직 커밋 전입니다. 늘 on_commit으로 감싸세요.
결과 받아오기 #
shared_task의 반환값은 result backend (Redis)에 저장됩니다.
result = send_welcome_email.delay(user.id)
print(result.id) # task UUID
print(result.status) # PENDING / STARTED / SUCCESS / FAILURE
print(result.get(timeout=10)) # 결과 (블로킹)API 응답에서 task.id를 클라이언트에 돌려주고, 별도 /api/tasks/{id}/ 엔드포인트로 진행 상태를 확인하는 패턴이 일반적입니다 (긴 작업, 진행률 UI).
재시도 — retry policy #
외부 API가 일시적으로 503을 줄 때, 한 번 실패로 task를 버리면 안 됩니다. 자동 재시도가 표준.
@shared_task(
bind=True, # self 인자 받음
autoretry_for=(ConnectionError, TimeoutError),
retry_kwargs={"max_retries": 5},
retry_backoff=True, # 1, 2, 4, 8, 16초 (지수)
retry_backoff_max=600, # 최대 10분
retry_jitter=True, # ± 랜덤 (thundering herd 방지)
)
def call_external_api(self, url: str) -> dict:
response = httpx.get(url, timeout=5)
response.raise_for_status()
return response.json()수동 retry #
@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id: int):
try:
result = payment_gateway.charge(order_id)
except TemporaryFailure as exc:
# 60초 뒤 재시도, 최대 3회
raise self.retry(exc=exc, countdown=60)재시도 패턴의 핵심 #
- idempotency — 같은 task가 여러 번 실행돼도 결과가 같아야 함 (이메일은 중복 발송될 수 있다는 전제로 설계)
- backoff + jitter — 일시 장애에서 동시에 몰리지 않도록
- dead letter — 최대 재시도 후에도 실패하면 별도 큐로 — 사람이 보고 처리
Celery Beat — periodic tasks (cron) #
스케줄링이 필요하면 Celery의 자매 도구 Beat.
uv add django-celery-beatINSTALLED_APPS += ["django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"uv run python manage.py migrate코드로 정의 #
from celery.schedules import crontab
app.conf.beat_schedule = {
"cleanup-expired-tokens": {
"task": "blog.tasks.cleanup_expired_tokens",
"schedule": crontab(hour=3, minute=0), # 매일 03:00
},
"daily-summary-email": {
"task": "blog.tasks.send_daily_summary",
"schedule": crontab(hour=9, minute=0, day_of_week="mon-fri"),
},
"every-5-min-healthcheck": {
"task": "blog.tasks.ping_external",
"schedule": 300.0, # 초 단위
},
}Admin에서 정의 (django-celery-beat의 강점) #
DatabaseScheduler 면 Django Admin에서 GUI로 스케줄을 추가/수정 할 수 있습니다 — 코드 배포 없이 운영 중에 스케줄을 바꾸는 방식입니다.
워커 실행 #
# 워커
uv run celery -A mysite worker -l info
# 별도 터미널 — Beat (스케줄러)
uv run celery -A mysite beat -l info
# 또는 한 프로세스로 (개발만, 프로덕션 비추)
uv run celery -A mysite worker -l info -B동시성 #
celery -A mysite worker -l info \
--concurrency=4 \ # 워커 프로세스 4개
--queues=default,emails \ # 처리할 큐
--hostname=worker1@%h기본 동시성 모델은 prefork (멀티프로세스). I/O 많은 task는 --pool=gevent --concurrency=100로 비동기 풀이 더 효율적인 경우도 있습니다.
큐 분리 #
@shared_task(queue="emails")
def send_welcome_email(user_id: int): ...
@shared_task(queue="reports")
def generate_pdf(...): ...각 큐에 워커를 따로 띄우면 무거운 작업이 가벼운 작업을 막지 않게 분리할 수 있습니다. 이메일 워커는 동시성 높게, 리포트 워커는 메모리 많은 머신에 따로.
모니터링 — Flower, Sentry #
Flower — 실시간 대시보드 #
uv add flower
celery -A mysite flower --port=5555브라우저에서 http://localhost:5555를 열면:
- 워커 상태 (online/offline, load)
- 진행 중/완료/실패 task 목록
- task 별 평균 실행 시간
- 큐별 적체 상태
Sentry 통합 #
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.celery import CeleryIntegration
sentry_sdk.init(
dsn=os.environ["SENTRY_DSN"],
integrations=[DjangoIntegration(), CeleryIntegration()],
traces_sample_rate=0.1,
)CeleryIntegration이 task 실패를 자동으로 Sentry에 보고합니다. 어느 task가 어떤 인자로 실패했는지 한곳에 모입니다.
docker-compose로 같이 띄우기 #
#6에서 깊게 다루지만, 최소 형태는 이 구조입니다.
services:
web:
build: .
command: gunicorn mysite.wsgi --bind 0.0.0.0:8000
depends_on: [db, redis]
worker:
build: .
command: celery -A mysite worker -l info
depends_on: [db, redis]
beat:
build: .
command: celery -A mysite beat -l info
depends_on: [db, redis]
redis:
image: redis:7-alpine
db:
image: postgres:16-alpine
environment:
POSTGRES_DB: blog
POSTGRES_USER: blog
POSTGRES_PASSWORD: blog같은 이미지로 web / worker / beat 세 컨테이너를 띄웁니다. 코드는 같고 시작 명령만 다릅니다.
자주 만나는 함정 #
1) transaction.on_commit 누락
#
방금 본 그 함정. 트랜잭션 안에서 task.delay() 하면 워커가 데이터를 못 찾을 수 있습니다. 늘 on_commit.
2) task 인자에 모델 객체 #
send_welcome_email.delay(user)Celery는 인자를 직렬화 (JSON) 합니다. ORM 객체는 직렬화 안 되거나, 직렬화돼도 워커가 받을 때는 stale 한 상태일 수 있습니다. ID만 전달, 워커 안에서 다시 조회.
send_welcome_email.delay(user.id)3) 결과를 안 쓸 거면 백엔드 부하 줄이기 #
@shared_task(ignore_result=True)
def fire_and_forget(...): ...result backend에 저장 안 함. 결과 조회가 필요 없는 task는 이걸로 Redis 부담을 줄이세요.
4) Beat가 여러 개 돌면 task가 N 번 #
celery beat는 딱 한 인스턴스만 돌아야 합니다. 여러 개 돌면 같은 스케줄을 N번 트리거. 컨테이너로 띄울 때 replicas=1 보장.
5) task가 너무 큼 #
한 task가 1만 건을 처리하면 실패할 때 다 다시. chunk로 나누고 각 chunk를 별도 task로 (팬아웃 패턴) — Celery의 group, chord, chain이 그 일을 합니다.
from celery import group
job = group(process_one.s(uid) for uid in user_ids)
result = job.apply_async()실전 예 — 글 발행 후 알림 + 인덱스 갱신 #
블로그 API에서 흔한 시나리오. 글이 published로 바뀌면:
- 구독자에게 알림 이메일
- 검색 인덱스 갱신
- RSS/Sitemap 캐시 무효화
전부 응답 흐름에서 떼어냅니다.
@shared_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=3)
def notify_subscribers(post_id: int):
post = Post.objects.get(pk=post_id)
subscribers = post.author.subscribers.all()
for sub in subscribers:
send_mail("새 글", post.title, "noreply@example.com", [sub.email])
@shared_task
def reindex_post(post_id: int):
# OpenSearch / Meilisearch에 인덱싱
...
@shared_task(ignore_result=True)
def invalidate_caches(post_id: int):
cache.delete_many(["rss:feed", "sitemap:posts"])from django.db import transaction
from .tasks import notify_subscribers, reindex_post, invalidate_caches
class PostViewSet(viewsets.ModelViewSet):
@action(detail=True, methods=["post"])
def publish(self, request, pk=None):
post = self.get_object()
post.published = True
post.save()
transaction.on_commit(lambda: notify_subscribers.delay(post.id))
transaction.on_commit(lambda: reindex_post.delay(post.id))
transaction.on_commit(lambda: invalidate_caches.delay(post.id))
return Response({"published": True})view는 즉시 200으로 응답. 세 작업은 워커가 백그라운드로 처리. 한 작업이 실패해도 다른 작업과 응답 자체에는 영향 없음.
정리 #
이번 글에서 잡은 것:
- Celery의 쓰임 — 무거운 작업 / 외부 IO / 스케줄링 / 팬아웃
- 브로커 — Redis가 일반적, RabbitMQ는 복잡한 라우팅
- 셋업 —
mysite/celery.py,__init__.py의celery_app,autodiscover_tasks @shared_task— 앱 독립 tasktask.delay()(단축) vsapply_async()(옵션)- **
transaction.on_commit**으로 커밋 후 호출 (고급 #5 회귀) - 결과 —
result.get(),result.status, result backend - 재시도 —
autoretry_for,retry_backoff,retry_jitter, idempotency - Celery Beat —
crontab또는 DB 기반 스케줄러 (admin에서 관리) - 워커 실행 —
celery -A mysite worker, 동시성, 큐 분리 - 모니터링 — Flower (실시간), Sentry (실패 추적)
- docker-compose에 web/worker/beat 한곳에
- 함정 — on_commit, ORM 객체 전달, ignore_result, Beat 인스턴스 1개, 큰 task 분할
Celery는 한 글로는 다 못 다루는 도구입니다 — workflow (group/chord/chain), task routing, 우선순위 큐, signals 등 깊이가 더 있습니다. 일단 위 패턴이 90%를 채워주고, 더 깊은 부분은 Celery 공식 문서가 친절합니다.
다음 글(#5 OpenAPI 문서 (drf-spectacular))에서는 지금까지 만든 API를 자동 문서화 — Swagger UI, ReDoc, 클라이언트 SDK 자동 생성까지 — 하는 흐름을 다룹니다.