Django DRF #4 Celery で非同期作業

読了 9分

#3 まで読み書きは速いです。ですが メール送信外部 API 呼び出し画像のリサイズPDF 生成 のような作業が view の中で起きるとレスポンスが何秒もブロックされ、外部サービスが遅くなると私たちの API も一緒に遅くなります。

答えはいつも同じ — リクエストの流れから切り離してバックグラウンドに送る。Python/Django 陣営の標準的な道具は Celery

Celery の位置づけ #

選択肢向く作業
重い CPU/IO画像リサイズ、PDF / レポート生成、データ集計
外部 APIメール (SES/SendGrid)、Slack 通知、決済外部呼び出し
遅延可能な作業会員登録のウェルカムメール (1 秒遅れても OK)、統計取り込み
スケジューリング (cron)毎日早朝のレポート、期限切れトークンの整理
ファンアウト「全ユーザーへ通知」のような N 件作業の分散

レスポンスから切り離すのが核心の効用です。ユーザーは即座に 200 レスポンス を受け取り、実際の作業はワーカーが処理します。

代替手段一行で #

評価
Celery事実上の標準。豊富な機能、大きなエコシステム、ドキュメント
RQ軽量。Redis のみサポート。シンプルなキューが必要なとき
Dramatiqモダン、よりシンプルな API。Celery の代替
Django Q2DB バックエンド可、Django との統合が強い。小さなプロジェクト
Huey非常に軽量。単一マシンに適合

この記事は Celery + Redis の組み合わせを深く見ます — 市場で最も多く出会う形。

ブローカー — Redis vs RabbitMQ #

Celery は ブローカー (メッセージキュー) を通じてワーカーと通信します。

RedisRabbitMQ
セットアップの難易度非常に簡単普通
運用負担低い普通
機能の豊富さシンプル複雑なルーティングが可能
パフォーマンス非常に速い速い
信頼性良い (永続化オプション)非常に良い (AMQP 標準)
結果バックエンドを一緒に使う可能別途必要
向く場面一般的な Web サービス (大半)複雑なルーティング、大規模分散

このシリーズは Redis で行きます — ブローカー + result backend を一度に。小~中規模サービスの 90% のケース。

インストール #

パッケージ
uv add celery redis
Redis を Docker で起動 (開発)
docker run -d -p 6379:6379 redis:7-alpine

Celery セットアップ — Django との統合 #

mysite/celery.py
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")

app = Celery("mysite")

# すべての設定を Django settings から 'CELERY_' prefix で読む
app.config_from_object("django.conf:settings", namespace="CELERY")

# すべての INSTALLED_APPS の tasks.py を自動発見
app.autodiscover_tasks()
mysite/__init__.py
from .celery import app as celery_app

__all__ = ("celery_app",)

__init__.py の import が核心です — Django が起動するときに Celery アプリのインスタンスが一緒にロードされなければ @shared_task がそれを発見できません。

mysite/settings.py — Celery 設定
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "Asia/Seoul"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60   # 30 分の hard limit

namespace="CELERY" のおかげで settings の CELERY_xxx が Celery の xxx 設定になります。

最初の task #

blog/tasks.py
from celery import shared_task
from django.core.mail import send_mail


@shared_task
def send_welcome_email(user_id: int) -> str:
    from django.contrib.auth import get_user_model
    User = get_user_model()
    user = User.objects.get(pk=user_id)
    send_mail(
        subject="ようこそ",
        message=f"{user.username} 様、ご登録ありがとうございます。",
        from_email="noreply@example.com",
        recipient_list=[user.email],
    )
    return f"sent to {user.email}"

@shared_taskアプリに紐づかない task を作ります — Django のどのアプリからも同じ方法で import して使えます。

task の呼び出し — delay vs apply_async #

呼び出し
from .tasks import send_welcome_email

# 最もシンプル
send_welcome_email.delay(user.id)

# オプションが必要なら
send_welcome_email.apply_async(
    args=[user.id],
    countdown=10,                      # 10 秒後
    expires=300,                       # 5 分以内に開始しなければ破棄
    queue="emails",                    # 別のワーカーキュー
)

delay()apply_async() の短縮形。オプションがなければ delay で十分です。

View / Signal の中で呼び出し — transaction.on_commit #

最もよくある落とし穴は DB トランザクションがコミットされる前に task が実行されるケース。ワーカーが作りたてのユーザーを見つけられません。

🚫 トランザクションの中で直接呼び出し
@router.post("/register")
def register(request):
    user = User.objects.create_user(...)
    send_welcome_email.delay(user.id)   # ← ワーカーが user を見つけられない可能性
    return Response({"ok": True})

@transaction.atomic の中、または ATOMIC_REQUESTS が ON だと、レスポンス直前にコミットされます。その間にワーカーが task を取りに来るとユーザーがまだ DB にいません。

答えは transaction.on_commit — コミットが終わった後にだけ呼び出し (上級 #5 のトランザクション後処理パターン)。

✅ コミット後
from django.db import transaction


@router.post("/register")
def register(request):
    user = User.objects.create_user(...)
    transaction.on_commit(lambda: send_welcome_email.delay(user.id))
    return Response({"ok": True})

Signal でも同じ落とし穴 #

signals.py
from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth import get_user_model
from .tasks import send_welcome_email


@receiver(post_save, sender=get_user_model())
def on_user_created(sender, instance, created, **kwargs):
    if not created:
        return
    transaction.on_commit(lambda: send_welcome_email.delay(instance.id))

post_save保存直後 に呼び出されますが、トランザクションの中で呼ばれていればまだコミット前です。常に on_commit で包んでください。

結果の取得 #

shared_task の戻り値は result backend (Redis) に保存されます。

結果の取得
result = send_welcome_email.delay(user.id)
print(result.id)            # task UUID
print(result.status)        # PENDING / STARTED / SUCCESS / FAILURE
print(result.get(timeout=10))  # 結果 (ブロッキング)

API レスポンスで task.id をクライアントに返し、別途 /api/tasks/{id}/ エンドポイントで進捗を確認するパターンが一般的です (長い作業、進捗 UI)。

再試行 — retry policy #

外部 API が一時的に 503 を返すとき、1 度の失敗で task を捨ててはいけません。自動再試行 が標準。

基本の retry
@shared_task(
    bind=True,                          # self 引数を受け取る
    autoretry_for=(ConnectionError, TimeoutError),
    retry_kwargs={"max_retries": 5},
    retry_backoff=True,                 # 1、2、4、8、16 秒 (指数)
    retry_backoff_max=600,              # 最大 10 分
    retry_jitter=True,                  # ± ランダム (thundering herd 防止)
)
def call_external_api(self, url: str) -> dict:
    response = httpx.get(url, timeout=5)
    response.raise_for_status()
    return response.json()

手動 retry #

条件付き再試行
@shared_task(bind=True, max_retries=3)
def process_payment(self, order_id: int):
    try:
        result = payment_gateway.charge(order_id)
    except TemporaryFailure as exc:
        # 60 秒後に再試行、最大 3 回
        raise self.retry(exc=exc, countdown=60)

再試行パターンの核心 #

  • idempotency — 同じ task が複数回実行されても結果が同じであるべき (メールは重複送信されうる前提で設計)
  • backoff + jitter — 一時障害で同時に殺到しないように
  • dead letter — 最大再試行後も失敗したら別キューへ — 人が見て処理

Celery Beat — periodic tasks (cron) #

スケジューリングが必要なら Celery の姉妹ツール Beat

インストール (DB ベースのスケジュール)
uv add django-celery-beat
settings.py
INSTALLED_APPS += ["django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"
マイグレーション
uv run python manage.py migrate

コードで定義 #

mysite/celery.py または settings.py
from celery.schedules import crontab

app.conf.beat_schedule = {
    "cleanup-expired-tokens": {
        "task": "blog.tasks.cleanup_expired_tokens",
        "schedule": crontab(hour=3, minute=0),   # 毎日 03:00
    },
    "daily-summary-email": {
        "task": "blog.tasks.send_daily_summary",
        "schedule": crontab(hour=9, minute=0, day_of_week="mon-fri"),
    },
    "every-5-min-healthcheck": {
        "task": "blog.tasks.ping_external",
        "schedule": 300.0,   # 秒単位
    },
}

Admin で定義 (django-celery-beat の強み) #

DatabaseScheduler なら Django Admin から GUI でスケジュールを追加 / 修正 できます — コードのデプロイなしで運用中にスケジュールを変えられる仕組み。

ワーカーの実行 #

開発モード — ワーカー + Beat
# ワーカー
uv run celery -A mysite worker -l info

# 別のターミナル — Beat (スケジューラ)
uv run celery -A mysite beat -l info

# または 1 プロセスで (開発のみ、プロダクション非推奨)
uv run celery -A mysite worker -l info -B

同時性 #

オプション
celery -A mysite worker -l info \
  --concurrency=4 \           # ワーカープロセス 4 個
  --queues=default,emails \   # 処理するキュー
  --hostname=worker1@%h

デフォルトの並行性モデルは prefork (マルチプロセス)。I/O が多い task は --pool=gevent --concurrency=100 で非同期プールのほうが効率的なケースもあります。

キューの分離 #

task 別のキュー指定
@shared_task(queue="emails")
def send_welcome_email(user_id: int): ...

@shared_task(queue="reports")
def generate_pdf(...): ...

各キューにワーカーを別途起動すれば 重い作業が軽い作業をブロックしない ように分離できます。メールワーカーは並行性高く、レポートワーカーはメモリの多いマシンに別途。

モニタリング — Flower、Sentry #

Flower — リアルタイムダッシュボード #

インストール + 実行
uv add flower
celery -A mysite flower --port=5555

ブラウザで http://localhost:5555 を開くと:

  • ワーカーの状態 (online/offline、load)
  • 進行中 / 完了 / 失敗の task 一覧
  • task 別の平均実行時間
  • キュー別の積み上がり状況

Sentry 統合 #

settings.py
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.celery import CeleryIntegration

sentry_sdk.init(
    dsn=os.environ["SENTRY_DSN"],
    integrations=[DjangoIntegration(), CeleryIntegration()],
    traces_sample_rate=0.1,
)

CeleryIntegration が task の失敗を自動で Sentry に報告します。どの task がどの引数で失敗したかが一カ所に集まります。

docker-compose で一緒に立ち上げる #

#6 で深く扱いますが、最小の形はこの形です。

docker-compose.yml — 抜粋
services:
  web:
    build: .
    command: gunicorn mysite.wsgi --bind 0.0.0.0:8000
    depends_on: [db, redis]

  worker:
    build: .
    command: celery -A mysite worker -l info
    depends_on: [db, redis]

  beat:
    build: .
    command: celery -A mysite beat -l info
    depends_on: [db, redis]

  redis:
    image: redis:7-alpine

  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_DB: blog
      POSTGRES_USER: blog
      POSTGRES_PASSWORD: blog

同じイメージで web / worker / beat の 3 つのコンテナを起動します。コードは同じで開始コマンドだけが違います。

よく出会う落とし穴 #

1) transaction.on_commit の漏れ #

たった今見たその落とし穴。トランザクションの中で task.delay() するとワーカーがデータを見つけられない可能性があります。常に on_commit

2) task の引数にモデルオブジェクト #

🚫 ORM オブジェクトを渡す
send_welcome_email.delay(user)

Celery は引数をシリアライズ (JSON) します。ORM オブジェクトはシリアライズできないか、シリアライズできてもワーカーが受け取るときには stale な状態の可能性があります。ID だけ渡し、ワーカーの中で再度クエリ。

✅ ID だけ
send_welcome_email.delay(user.id)

3) 結果を使わないならバックエンドの負荷を減らす #

ignore_result
@shared_task(ignore_result=True)
def fire_and_forget(...): ...

result backend に保存しません。結果取得が不要な task はこれで Redis の負担を減らしてください。

4) Beat が複数走ると task が N 回 #

celery beat必ず 1 インスタンスだけ 走るべきです。複数走ると同じスケジュールを N 回トリガー。コンテナで起動するときは replicas=1 を保証。

5) task が大きすぎる #

1 つの task が 1 万件を処理すると失敗時に全部やり直し。chunk に分けて各 chunk を別 task に (ファンアウトパターン) — Celery の groupchordchain がその出番。

group でファンアウト
from celery import group

job = group(process_one.s(uid) for uid in user_ids)
result = job.apply_async()

実戦例 — 記事公開後の通知 + インデックス更新 #

ブログ API でよくあるシナリオ。記事が published になると:

  1. 購読者に通知メール
  2. 検索インデックス更新
  3. RSS/Sitemap キャッシュの無効化

すべてレスポンスの流れから切り離します。

blog/tasks.py
@shared_task(autoretry_for=(Exception,), retry_backoff=True, max_retries=3)
def notify_subscribers(post_id: int):
    post = Post.objects.get(pk=post_id)
    subscribers = post.author.subscribers.all()
    for sub in subscribers:
        send_mail("新着記事", post.title, "noreply@example.com", [sub.email])

@shared_task
def reindex_post(post_id: int):
    # OpenSearch / Meilisearch にインデキシング
    ...

@shared_task(ignore_result=True)
def invalidate_caches(post_id: int):
    cache.delete_many(["rss:feed", "sitemap:posts"])
blog/views.py — publish アクション
from django.db import transaction
from .tasks import notify_subscribers, reindex_post, invalidate_caches


class PostViewSet(viewsets.ModelViewSet):
    @action(detail=True, methods=["post"])
    def publish(self, request, pk=None):
        post = self.get_object()
        post.published = True
        post.save()

        transaction.on_commit(lambda: notify_subscribers.delay(post.id))
        transaction.on_commit(lambda: reindex_post.delay(post.id))
        transaction.on_commit(lambda: invalidate_caches.delay(post.id))

        return Response({"published": True})

view は即座に 200 でレスポンス。3 つの作業はワーカーがバックグラウンドで処理。1 つの作業が失敗しても他の作業とレスポンス自体には影響なし。

まとめ #

今回つかんだもの:

  • Celery の位置づけ — 重い作業 / 外部 IO / スケジューリング / ファンアウト
  • ブローカー — Redis が一般的、RabbitMQ は複雑なルーティング
  • セットアップ — mysite/celery.py__init__.pycelery_appautodiscover_tasks
  • @shared_task — アプリ非依存の task
  • task.delay() (短縮) vs apply_async() (オプション)
  • transaction.on_commit でコミット後の呼び出し (上級 #5 の回帰)
  • 結果 — result.get()result.status、result backend
  • 再試行autoretry_forretry_backoffretry_jitter、idempotency
  • Celery Beatcrontab または DB ベースのスケジューラ (admin で管理)
  • ワーカー実行 — celery -A mysite worker、並行性、キューの分離
  • モニタリング — Flower (リアルタイム)、Sentry (失敗追跡)
  • docker-compose に web/worker/beat を一カ所に
  • 落とし穴 — on_commit、ORM オブジェクトを渡す、ignore_result、Beat インスタンス 1 個、大きな task の分割

Celery は 1 つの記事ではすべてを扱えない道具です — workflow (group/chord/chain)、task routing、優先度キュー、signals などまだ深さがあります。とりあえず上のパターンが 90% のケースを埋めてくれて、より深い部分は Celery 公式ドキュメント が親切です。

次回 (#5 OpenAPI ドキュメント (drf-spectacular)) ではここまで作った API を 自動ドキュメント化 — Swagger UI、ReDoc、クライアント SDK の自動生成まで — する方法を扱います。

X