Django上級 #5 Signals の深さとトランザクション後処理

読了 8分

中級 #3 Signals と Middleware でシグナルの基本を固めたなら、ここからは 運用でシグナルを安全に 使うための道具箱を整理します。中心キーワードは transaction.on_commit節制

シグナルの危険 — なぜ「節制」 か #

シグナルは強力ですが、大きなプロジェクトで最も非難される機能でもあります。理由:

  • 流れが見えません。 User.save() の 1 行のあとに何が起こるかコードを追っても見えない — シグナルハンドラが別のファイルにあるから。
  • 順序が曖昧。 同じシグナルに複数の receiver が付くと登録順に呼ばれますが、登録自体が import 順に依存。
  • トランザクションと微妙にズレる。 post_save がトランザクションの で実行される — これが外部呼び出しなら大問題。
  • テストが漏れる抽象化。 別のテストのシグナル登録が影響して flaky の原因になります。

Django 自身の推奨は 「可能なら明示的な呼び出し、シグナルは本当に分離が必要な場面」。良い場面:

  • 別のアプリが私たちのモデルの変更を知らなければならないとき (auth.User の変更に私たちのアプリが反応)
  • キャッシュ無効化
  • 検索インデキシング (Elasticsearch、Algolia など)

悪い場面:

  • 同じアプリ内の明示的な後処理 (そのまま save の次の行に書こう)
  • 決済、残高のようなビジネスロジックの中心 (シグナルが漏れるとデータが壊れる)

トランザクション — 短い復習 #

Django の view は基本的に AUTOCOMMIT モード。明示的に束ねるなら:

@transaction.atomic
from django.db import transaction

@transaction.atomic
def transfer(from_id, to_id, amount):
    src = Account.objects.select_for_update().get(pk=from_id)
    dst = Account.objects.select_for_update().get(pk=to_id)
    src.balance -= amount
    dst.balance += amount
    src.save()
    dst.save()

ブロック内のすべての SQL が 1 つのトランザクション。例外が出たら自動 rollback。

ATOMIC_REQUESTS #

settings.py
DATABASES = {
    "default": {
        "ENGINE": "...",
        "ATOMIC_REQUESTS": True,
        ...
    }
}

これを有効にすると すべての view が自動でトランザクション で囲まれます。5xx レスポンスなら rollback。便利ですが:

  • トランザクションが長くなりロック時間が増える
  • view の中で外部 API 呼び出しがあるとその間トランザクションを掴む

大きなトラフィックのサイトは 明示的な atomic の方が精密です。

コンテキストマネージャ + savepoint #

ネストした atomic = savepoint
@transaction.atomic
def outer():
    Order.objects.create(...)
    try:
        with transaction.atomic():    # 内側 — savepoint で動作
            risky_step()
    except SomeError:
        # outer トランザクションは生きている
        log_failure()
    Order.objects.filter(...).update(status="processed")

内側の atomicsavepoint — 部分 rollback が可能です。外側のトランザクションは影響を受けません。

明示的に無効化
with transaction.atomic(savepoint=False):
    ...

非常にまれな場面。普通はデフォルト (savepoint=True) で十分。

transaction.on_commit — 後処理の標準 #

最も重要な道具。トランザクションが 正常に commit された後 に実行するコールバックを登録します。

基本
from django.db import transaction

@transaction.atomic
def create_order(user, items):
    order = Order.objects.create(user=user, total=0)
    for item in items:
        OrderItem.objects.create(order=order, ...)

    transaction.on_commit(lambda: send_confirmation_email(order.id))
    return order

流れ:

  1. order 生成、items 生成
  2. commit される
  3. その後で初めて send_confirmation_email が実行される

もしトランザクションが rollback されたら? コールバックは 呼ばれません。 まさに私たちが望む動作。

なぜこれが必須なのか #

🚫 atomic の中で直接呼び出し
@transaction.atomic
def create_order(user, items):
    order = Order.objects.create(user=user, total=0)
    send_confirmation_email(order.id)   # ← 危険
    raise SomeError()

メールは送られたのにトランザクションは rollback。存在しない注文に対するメール がユーザーに届きます。似たパターンが決済、通知、外部インデキシングなどあらゆる場面で同じ危険を作ります。

on_commit がまさにこの問題を防ぎます。

シグナルの中で #

post_save シグナルは トランザクションの中 で呼ばれます。外部呼び出しは必ず on_commit で先送りしてください。

signals.py
from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver
from .models import Order

@receiver(post_save, sender=Order)
def index_order(sender, instance, created, **kwargs):
    if not created:
        return
    transaction.on_commit(lambda: search_index.add(instance.id))

search_index.add は commit 後にだけ実行。rollback されればインデックスにも入りません。

Celery との結合 #

非同期タスクキューを使うなら:

Celery と
@receiver(post_save, sender=Order)
def queue_processing(sender, instance, created, **kwargs):
    if not created:
        return
    transaction.on_commit(lambda: process_order_task.delay(instance.id))

ここで on_commit が決定的。 そうでなければ Celery ワーカーが受け取って DB を照会したときに、まだ commit 前なので 注文が見えないかもしれません。 微妙な race condition のよくある原因。詳しい Celery 統合は DRF #4 で。

チェイニング / 複数のコールバック #

何度でも呼べます。登録順に実行されます。

複数のコールバック
@transaction.atomic
def f():
    ...
    transaction.on_commit(callback_a)
    transaction.on_commit(callback_b)
    transaction.on_commit(callback_c)
    # commit 後 a → b → c の順

トランザクションの外で呼ぶと #

on_commit をトランザクションの外で呼ぶと 即時実行 されます (すでに「commit されたもの」として扱われる)。なのでライブラリ / ユーティリティ関数に安全に入れられます — 呼び出し側が atomic で囲んでいてもいなくても。

シグナル — 深い使い方 #

dispatch_uid — 重複登録の防止 #

signals.py
@receiver(post_save, sender=Order, dispatch_uid="order_index_handler")
def index_order(...):
    ...

dispatch_uid唯一の ID。同じ ID で 2 回登録されたら無視されます。モジュールが 2 回 import されてもハンドラが 2 回登録されることはありません。

apps.py で登録 #

シグナルモジュールが 自動 import される よう推奨されるパターン。

myapp/apps.py
from django.apps import AppConfig

class MyAppConfig(AppConfig):
    default_auto_field = "django.db.models.BigAutoField"
    name = "myapp"

    def ready(self):
        from . import signals    # noqa
myapp/__init__.py
default_app_config = "myapp.apps.MyAppConfig"   # Django 3.2+ は自動検出

ready() の中で import するだけでデコレータが登録を自動で行います。

ready() の中で DB 照会は禁止。migration の途中に呼ばれることがあります。

よく使うシグナル #

シグナル発生タイミング
pre_save / post_savesave() の前 / 後
pre_delete / post_deletedelete() の前 / 後
pre_migrate / post_migratemigrate コマンドの前 / 後
pre_init / post_initモデルインスタンス生成
m2m_changedM2M 関係の変更 (add/remove/clear)
request_started / request_finishedリクエスト開始 / 終了
got_request_exceptionview 内で例外発生
user_logged_in / user_logged_out / user_login_failed認証

m2m_changed — 意外と厄介 #

m2m_changed
from django.db.models.signals import m2m_changed

@receiver(m2m_changed, sender=Post.tags.through)
def on_tags_changed(sender, instance, action, pk_set, **kwargs):
    if action in {"post_add", "post_remove", "post_clear"}:
        cache.delete(f"post_tags:{instance.id}")

action の値:

  • pre_addpost_add
  • pre_removepost_remove
  • pre_clearpost_clear

ほとんどは post_* だけ見れば十分です。pre_* + post_* の両方に同じハンドラを付けると 2 回呼ばれる よくあるミスがあります。

Custom signal — アプリのシグナルを定義 #

組み込みシグナルだけがあるわけではありません。自分のドメインイベント をシグナルに。

myapp/signals.py — 定義
from django.dispatch import Signal

order_paid = Signal()    # providing_args は 5.x で削除されたので docstring で代替
order_cancelled = Signal()
発行
from .signals import order_paid

def mark_paid(order):
    order.status = "paid"
    order.save()
    order_paid.send(sender=Order, order=order, paid_at=timezone.now())
購読
from django.dispatch import receiver
from myapp.signals import order_paid

@receiver(order_paid)
def reward_user(sender, order, paid_at, **kwargs):
    transaction.on_commit(lambda: give_points(order.user_id, 100))

@receiver(order_paid)
def notify_seller(sender, order, **kwargs):
    transaction.on_commit(lambda: notify_seller_task.delay(order.seller_id))

複数のモジュールが同じイベントに反応する場面に向きます。ただし上で見た「流れが見えない」短所がそのまま当てはまります — 明示的な関数呼び出しが可能ならその方がシンプル

send vs send_robust #

2 つの違い
order_paid.send(sender=Order, order=order)         # receiver の 1 つでも例外 → 伝播
order_paid.send_robust(sender=Order, order=order)  # 例外を受けてログだけ、別の receiver は続行

send_robust は結果として (receiver, response_or_exception) のリストを返します。レスポンスを検査したいならこちら。

メール通知のように 1 つの receiver が死んでも他が動かなければ ならない場面は send_robust。トランザクションの一部なら send (例外で rollback させたい)。

シグナルのテスト #

シグナル無効化 — mute_signals #

factory_boy のツールが標準です。

インストール
pip install factory_boy
tests
from factory.django import mute_signals
from django.db.models.signals import post_save

@mute_signals(post_save)
def test_create_without_signals():
    user = User.objects.create(email="a@b.com")
    # post_save ハンドラは呼ばれない

データシード、フィクスチャロードなど 副作用を避けたい場面 に決定的。クラスデコレータとしても使えます。

シグナルを直接 disconnect / reconnect #

setUp/tearDown
from django.test import TestCase
from django.db.models.signals import post_save
from myapp.signals import index_order

class MyTest(TestCase):
    def setUp(self):
        post_save.disconnect(index_order, sender=Order)

    def tearDown(self):
        post_save.connect(index_order, sender=Order)

mute_signals の方がきれいですが、特定の receiver だけ外したいなら直接。

シグナルが呼ばれたか検証 #

mock
from unittest.mock import MagicMock

def test_signal_fires():
    handler = MagicMock()
    order_paid.connect(handler, sender=Order)
    try:
        mark_paid(order)
        handler.assert_called_once()
    finally:
        order_paid.disconnect(handler, sender=Order)

bulk_* とシグナル #

#3 で見た bulk_createbulk_updateupdatedelete (queryset の) は シグナルを発生させません。 大きな落とし穴。

🚫 シグナル前提
Order.objects.filter(status="pending").update(status="cancelled")
# post_save は呼ばれない!

データマイグレーションのような場面で意図的にシグナルを迂回する場面なら OK。しかし普段のコードでシグナルが中心ロジックなら bulk_* 使用時に直接後処理 をコードでしなければなりません。

✅ 明示的な後処理
ids = list(qs.values_list("id", flat=True))
Order.objects.filter(id__in=ids).update(status="cancelled")
for order_id in ids:
    transaction.on_commit(lambda i=order_id: post_cancel_hook(i))

lambda i=order_id のデフォルト引数のトリックに注意 — そうしないとすべての lambda が最後の order_id をキャプチャします。クロージャの罠。

よく出会う落とし穴 #

1) post_save の中で instance.save() を再呼び出し #

無限再帰。update_fields で切るか、pre_save で処理するか、Model.objects.filter(pk=instance.pk).update(...) (シグナルは出ない)。

2) signal 内の外部 API 呼び出しがトランザクションを縛る #

上で見たそれ。必ず on_commit

3) m2m_changed が 2 回呼ばれる #

pre_*post_* の両方を受けると 2 回。1 つだけに。

4) 別アプリのシグナル登録漏れ #

apps.pyready() が import をトリガしないとシグナルが登録されません。静的解析ツールが拾えないよくある回帰。

5) async view でシグナルハンドラ #

ハンドラが同期なのに非同期コンテキストで呼ばれます。内部的に sync_to_async が適用されるとはいえ、ハンドラが重ければイベントループを塞ぐかもしれません。非同期 view がメインなら、シグナルではなく明示的呼び出し + await の方が正直。

まとめ #

今回つかんだもの:

  • シグナルは強力だが「流れが見えない」 短所 — 節制 推奨
  • @transaction.atomic の使いどころ、ATOMIC_REQUESTS のトレードオフ
  • ネストした atomic = savepoint、部分 rollback 可能
  • transaction.on_commit(callback) — シグナル / atomic の中での外部呼び出しの標準
  • dispatch_uid で重複登録防止、apps.pyready() で import
  • m2m_changed の action 種類と 2 回呼ばれる落とし穴
  • Custom Signal() — ドメインイベント、send vs send_robust
  • テスト: mute_signals、disconnect/reconnect、MagicMock で呼び出し検証
  • bulk_* / queryset の update/delete はシグナル発生せず — 明示的後処理
  • Celery と結合: 必ず on_commit の中で task.delay

次回 (#6 Django Channels — WebSocket) では #1 ASGI の上に WebSocket を載せます。AsyncWebsocketConsumer、group broadcast、HTTP view からの push、daphne デプロイまで — リアルタイム双方向通信の場面。

X