Django DRF #4 Celery で非同期作業
#3 まで読み書きは速いです。ですが メール送信、外部 API 呼び出し、画像のリサイズ、PDF 生成 のような作業が view の中で起きるとレスポンスが何秒もブロックされ、外部サービスが遅くなると私たちの API も一緒に遅くなります。
答えはいつも同じ — リクエストの流れから切り離してバックグラウンドに送る。Python/Django 陣営の標準的な道具は Celery。
Celery の位置づけ #
| 選択肢 | 向く作業 |
|---|---|
| 重い CPU/IO | 画像リサイズ、PDF / レポート生成、データ集計 |
| 外部 API | メール (SES/SendGrid)、Slack 通知、決済外部呼び出し |
| 遅延可能な作業 | 会員登録のウェルカムメール (1 秒遅れても OK)、統計取り込み |
| スケジューリング (cron) | 毎日早朝のレポート、期限切れトークンの整理 |
| ファンアウト | 「全ユーザーへ通知」のような N 件作業の分散 |
レスポンスから切り離すのが核心の効用です。ユーザーは即座に 200 レスポンス を受け取り、実際の作業はワーカーが処理します。
代替手段一行で #
| 評価 | |
|---|---|
| Celery | 事実上の標準。豊富な機能、大きなエコシステム、ドキュメント |
| RQ | 軽量。Redis のみサポート。シンプルなキューが必要なとき |
| Dramatiq | モダン、よりシンプルな API。Celery の代替 |
| Django Q2 | DB バックエンド可、Django との統合が強い。小さなプロジェクト |
| Huey | 非常に軽量。単一マシンに適合 |
この記事は Celery + Redis の組み合わせを深く見ます — 市場で最も多く出会う形。
ブローカー — Redis vs RabbitMQ #
Celery は ブローカー (メッセージキュー) を通じてワーカーと通信します。
| Redis | RabbitMQ | |
|---|---|---|
| セットアップの難易度 | 非常に簡単 | 普通 |
| 運用負担 | 低い | 普通 |
| 機能の豊富さ | シンプル | 複雑なルーティングが可能 |
| パフォーマンス | 非常に速い | 速い |
| 信頼性 | 良い (永続化オプション) | 非常に良い (AMQP 標準) |
| 結果バックエンドを一緒に使う | 可能 | 別途必要 |
| 向く場面 | 一般的な Web サービス (大半) | 複雑なルーティング、大規模分散 |
このシリーズは Redis で行きます — ブローカー + result backend を一度に。小~中規模サービスの 90% のケース。
インストール #
uv add celery redisdocker run -d -p 6379:6379 redis:7-alpineCelery セットアップ — Django との統合 #
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
app = Celery("mysite")
# すべての設定を Django settings から 'CELERY_' prefix で読む
app.config_from_object("django.conf:settings", namespace="CELERY")
# すべての INSTALLED_APPS の tasks.py を自動発見
app.autodiscover_tasks()from .celery import app as celery_app
__all__ = ("celery_app",)__init__.py の import が核心です — Django が起動するときに Celery アプリのインスタンスが一緒にロードされなければ @shared_task がそれを発見できません。
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Seoul"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 分の hard limitnamespace="CELERY" のおかげで settings の CELERY_xxx が Celery の xxx 設定になります。
最初の task #
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_welcome_email(user_id: int) -> str:
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(pk=user_id)
send_mail(
subject="ようこそ",
message=f"{user.username} 様、ご登録ありがとうございます。",
from_email="noreply@example.com",
recipient_list=[user.email],
)
return f"sent to {user.email}"@shared_task は アプリに紐づかない task を作ります — Django のどのアプリからも同じ方法で import して使えます。
task の呼び出し — delay vs apply_async
#
from .tasks import send_welcome_email
# 最もシンプル
send_welcome_email.delay(user.id)
# オプションが必要なら
send_welcome_email.apply_async(
args=[user.id],
countdown=10, # 10 秒後
expires=300, # 5 分以内に開始しなければ破棄
queue="emails", # 別のワーカーキュー
)delay() は apply_async() の短縮形。オプションがなければ delay で十分です。
View / Signal の中で呼び出し — transaction.on_commit
#
最もよくある落とし穴は DB トランザクションがコミットされる前に task が実行されるケース。ワーカーが作りたてのユーザーを見つけられません。
@router.post("/register")
def register(request):
user = User.objects.create_user(...)
send_welcome_email.delay(user.id) # ← ワーカーが user を見つけられない可能性
return Response({"ok": True})@transaction.atomic の中、または ATOMIC_REQUESTS が ON だと、レスポンス直前にコミットされます。その間にワーカーが task を取りに来るとユーザーがまだ DB にいません。
答えは transaction.on_commit — コミットが終わった後にだけ呼び出し (上級 #5 のトランザクション後処理パターン)。
from django.db import transaction
@router.post("/register")
def register(request):
user = User.objects.create_user(...)
transaction.on_commit(lambda: send_welcome_email.delay(user.id))
return Response({"ok": True})Signal でも同じ落とし穴 #
from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth import get_user_model
from .tasks import send_welcome_email
@receiver(post_save, sender=get_user_model())
def on_user_created(sender, instance, created, **kwargs):
if not created:
return
transaction.on_commit(lambda: send_welcome_email.delay(instance.id))post_save は 保存直後 に呼び出されますが、トランザクションの中で呼ばれていればまだコミット前です。常に on_commit で包んでください。
結果の取得 #
shared_task の戻り値は result backend (Redis) に保存されます。
result = send_welcome_email.delay(user.id)
print(result.id) # task UUID
print(result.status) # PENDING / STARTED / SUCCESS / FAILURE
print(result.get(timeout=10)) # 結果 (ブロッキング)API レスポンスで task.id をクライアントに返し、別途 /api/tasks/{id}/ エンドポイントで進捗を確認するパターンが一般的です (長い作業、進捗 UI)。
再試行 — retry policy #
外部 API が一時的に 503 を返すとき、1 度の失敗で task を捨ててはいけません。自動再試行 が標準。
@shared_task(
bind=True, # self 引数を受け取る
autoretry_for=(ConnectionError, TimeoutError),
retry_kwargs={"max_retries": 5},
retry_backoff=True, # 1、2、4、8、16 秒 (指数)
retry_backoff_max=600, # 最大 10 分
retry_jitter=True, # ± ランダム (thundering herd 防止)
)
def call_external_api(self, url: str) -> dict:
response = httpx.get(url, timeout=5)
response.raise_for_status()
return response.json()手動 retry #
@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id: int):
try:
result = payment_gateway.charge(order_id)
except TemporaryFailure as exc:
# 60 秒後に再試行、最大 3 回
raise self.retry(exc=exc, countdown=60)再試行パターンの核心 #
- idempotency — 同じ task が複数回実行されても結果が同じであるべき (メールは重複送信されうる前提で設計)
- backoff + jitter — 一時障害で同時に殺到しないように
- dead letter — 最大再試行後も失敗したら別キューへ — 人が見て処理
Celery Beat — periodic tasks (cron) #
スケジューリングが必要なら Celery の姉妹ツール Beat。
uv add django-celery-beatINSTALLED_APPS += ["django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"uv run python manage.py migrateコードで定義 #
from celery.schedules import crontab
app.conf.beat_schedule = {
"cleanup-expired-tokens": {
"task": "blog.tasks.cleanup_expired_tokens",
"schedule": crontab(hour=3, minute=0), # 毎日 03:00
},
"daily-summary-email": {
"task": "blog.tasks.send_daily_summary",
"schedule": crontab(hour=9, minute=0, day_of_week="mon-fri"),
},
"every-5-min-healthcheck": {
"task": "blog.tasks.ping_external",
"schedule": 300.0, # 秒単位
},
}Admin で定義 (django-celery-beat の強み) #
DatabaseScheduler なら Django Admin から GUI でスケジュールを追加 / 修正 できます — コードのデプロイなしで運用中にスケジュールを変えられる仕組み。
ワーカーの実行 #
# ワーカー
uv run celery -A mysite worker -l info
# 別のターミナル — Beat (スケジューラ)
uv run celery -A mysite beat -l info
# または 1 プロセスで (開発のみ、プロダクション非推奨)
uv run celery -A mysite worker -l info -B同時性 #
celery -A mysite worker -l info \
--concurrency=4 \ # ワーカープロセス 4 個
--queues=default,emails \ # 処理するキュー
--hostname=worker1@%hデフォルトの並行性モデルは prefork (マルチプロセス)。I/O が多い task は --pool=gevent --concurrency=100 で非同期プールのほうが効率的なケースもあります。
キューの分離 #
@shared_task(queue="emails")
def send_welcome_email(user_id: int): ...
@shared_task(queue="reports")
def generate_pdf(...): ...各キューにワーカーを別途起動すれば 重い作業が軽い作業をブロックしない ように分離できます。メールワーカーは並行性高く、レポートワーカーはメモリの多いマシンに別途。
モニタリング — Flower、Sentry #
Flower — リアルタイムダッシュボード #
uv add flower
celery -A mysite flower --port=5555ブラウザで http://localhost:5555 を開くと:
- ワーカーの状態 (online/offline、load)
- 進行中 / 完了 / 失敗の task 一覧
- task 別の平均実行時間
- キュー別の積み上がり状況
Sentry 統合 #
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.celery import CeleryIntegration
sentry_sdk.init(
dsn=os.environ["SENTRY_DSN"],
integrations=[DjangoIntegration(), CeleryIntegration()],
traces_sample_rate=0.1,
)CeleryIntegration が task の失敗を自動で Sentry に報告します。どの task がどの引数で失敗したかが一カ所に集まります。
docker-compose で一緒に立ち上げる #
#6 で深く扱いますが、最小の形はこの形です。
services:
web:
build: .
command: gunicorn mysite.wsgi --bind 0.0.0.0:8000
depends_on: [db, redis]
worker:
build: .
command: celery -A mysite worker -l info
depends_on: [db, redis]
beat:
build: .
command: celery -A mysite beat -l info
depends_on: [db, redis]
redis:
image: redis:7-alpine
db:
image: postgres:16-alpine
environment:
POSTGRES_DB: blog
POSTGRES_USER: blog
POSTGRES_PASSWORD: blog同じイメージで web / worker / beat の 3 つのコンテナを起動します。コードは同じで開始コマンドだけが違います。
よく出会う落とし穴 #
1) transaction.on_commit の漏れ
#
たった今見たその落とし穴。トランザクションの中で task.delay() するとワーカーがデータを見つけられない可能性があります。常に on_commit。
2) task の引数にモデルオブジェクト #
send_welcome_email.delay(user)Celery は引数をシリアライズ (JSON) します。ORM オブジェクトはシリアライズできないか、シリアライズできてもワーカーが受け取るときには stale な状態の可能性があります。ID だけ渡し、ワーカーの中で再度クエリ。
send_welcome_email.delay(user.id)3) 結果を使わないならバックエンドの負荷を減らす #
@shared_task(ignore_result=True)
def fire_and_forget(...): ...result backend に保存しません。結果取得が不要な task はこれで Redis の負担を減らしてください。
4) Beat が複数走ると task が N 回 #
celery beat は 必ず 1 インスタンスだけ 走るべきです。複数走ると同じスケジュールを N 回トリガー。コンテナで起動するときは replicas=1 を保証。
5) task が大きすぎる #
1 つの task が 1 万件を処理すると失敗時に全部やり直し。chunk に分けて各 chunk を別 task に (ファンアウトパターン) — Celery の group、chord、chain がその出番。
from celery import group
job = group(process_one.s(uid) for uid in user_ids)
result = job.apply_async()実戦例 — 記事公開後の通知 + インデックス更新 #
ブログ API でよくあるシナリオ。記事が published になると:
- 購読者に通知メール
- 検索インデックス更新
- RSS/Sitemap キャッシュの無効化
すべてレスポンスの流れから切り離します。
@shared_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=3)
def notify_subscribers(post_id: int):
post = Post.objects.get(pk=post_id)
subscribers = post.author.subscribers.all()
for sub in subscribers:
send_mail("新着記事", post.title, "noreply@example.com", [sub.email])
@shared_task
def reindex_post(post_id: int):
# OpenSearch / Meilisearch にインデキシング
...
@shared_task(ignore_result=True)
def invalidate_caches(post_id: int):
cache.delete_many(["rss:feed", "sitemap:posts"])from django.db import transaction
from .tasks import notify_subscribers, reindex_post, invalidate_caches
class PostViewSet(viewsets.ModelViewSet):
@action(detail=True, methods=["post"])
def publish(self, request, pk=None):
post = self.get_object()
post.published = True
post.save()
transaction.on_commit(lambda: notify_subscribers.delay(post.id))
transaction.on_commit(lambda: reindex_post.delay(post.id))
transaction.on_commit(lambda: invalidate_caches.delay(post.id))
return Response({"published": True})view は即座に 200 でレスポンス。3 つの作業はワーカーがバックグラウンドで処理。1 つの作業が失敗しても他の作業とレスポンス自体には影響なし。
まとめ #
今回つかんだもの:
- Celery の位置づけ — 重い作業 / 外部 IO / スケジューリング / ファンアウト
- ブローカー — Redis が一般的、RabbitMQ は複雑なルーティング
- セットアップ —
mysite/celery.py、__init__.pyのcelery_app、autodiscover_tasks @shared_task— アプリ非依存の tasktask.delay()(短縮) vsapply_async()(オプション)transaction.on_commitでコミット後の呼び出し (上級 #5 の回帰)- 結果 —
result.get()、result.status、result backend - 再試行 —
autoretry_for、retry_backoff、retry_jitter、idempotency - Celery Beat —
crontabまたは DB ベースのスケジューラ (admin で管理) - ワーカー実行 —
celery -A mysite worker、並行性、キューの分離 - モニタリング — Flower (リアルタイム)、Sentry (失敗追跡)
- docker-compose に web/worker/beat を一カ所に
- 落とし穴 — on_commit、ORM オブジェクトを渡す、ignore_result、Beat インスタンス 1 個、大きな task の分割
Celery は 1 つの記事ではすべてを扱えない道具です — workflow (group/chord/chain)、task routing、優先度キュー、signals などまだ深さがあります。とりあえず上のパターンが 90% のケースを埋めてくれて、より深い部分は Celery 公式ドキュメント が親切です。
次回 (#5 OpenAPI ドキュメント (drf-spectacular)) ではここまで作った API を 自動ドキュメント化 — Swagger UI、ReDoc、クライアント SDK の自動生成まで — する方法を扱います。