Django上級 #6 Django Channels — WebSocket

読了 7分

#1 ASGI で見た非同期陣営の頂点。WebSocket でリアルタイム双方向通信を作る場面です。道具は Django Channels

なぜ WebSocket なのか #

HTTP のリクエスト - レスポンスモデルでは解けない場面:

  • リアルタイム通知 — 新しいメッセージが届いたら即時
  • チャット、共同編集 — 双方向、低レイテンシ
  • ライブダッシュボード — サーバーが push で更新
  • ゲーム、ホワイトボード — 頻繁な双方向

候補となる解決策:

ポーリングLong pollingSSEWebSocket
方向クライアント → サーバー (間隔)クライアント → サーバー (待機)サーバー → クライアント単方向双方向
接続寿命短い長いリクエスト長いレスポンス常時接続
プロキシ / ファイアウォールフレンドリフレンドリフレンドリ (HTTP)アップグレードが必要
実装難易度非常に簡単簡単簡単中間
場面単純な更新通知の一部単方向 push双方向、チャット

サーバー → クライアント単方向 なら SSE (Server-Sent Events) も十分よい選択肢。双方向が必要なときに WebSocket。

Channels の出番 #

Django 自体は ASGI の上で非同期 view をサポートしますが、WebSocket プロトコルの処理 はしません。そこに入るのが Channels。

Channels がもたらすもの:

  • WebSocket / 別プロトコルの処理
  • Channel Layer — ワーカー間のメッセージ交換 (普通は Redis バックエンド)
  • 認証 / セッションミドルウェア (HTTP と同じ道具)
  • ASGI ルーティング

インストールとセットアップ #

インストール
pip install channels channels-redis
settings.py
INSTALLED_APPS = [
    ...,
    "daphne",        # ASGI サーバー、runserver と統合
    "channels",
    "myapp",
]

ASGI_APPLICATION = "myproject.asgi.application"

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("127.0.0.1", 6379)],
        },
    },
}

daphneINSTALLED_APPS最上部に近い位置 に置けば runserver が daphne ベースで動作 — 開発時に WebSocket も一緒に。

myproject/asgi.py
import os
import django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
django.setup()

from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from django.core.asgi import get_asgi_application
from myapp.routing import websocket_urlpatterns

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(websocket_urlpatterns)
    ),
})

中心:

  • ProtocolTypeRouterhttpwebsocket ごとにルーティング
  • HTTP は既存 Django のまま
  • WebSocket は別ルーターへ
  • AuthMiddlewareStack — セッションクッキーからユーザーを認識

最初の Consumer #

Consumer は view の WebSocket 版。

myapp/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class EchoConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()
        await self.send(text_data=json.dumps({"hello": "world"}))

    async def disconnect(self, close_code):
        pass

    async def receive(self, text_data=None, bytes_data=None):
        data = json.loads(text_data)
        await self.send(text_data=json.dumps({"echo": data}))

3 つのフック:

  • connect — クライアントが接続を試みる。accept() で受諾、呼ばないと拒否
  • disconnect — 切断
  • receive — メッセージ受信
myapp/routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r"ws/echo/$", consumers.EchoConsumer.as_asgi()),
]

as_asgi()as_view() の WebSocket 版。クラスインスタンスを ASGI アプリにしてくれます。

クライアント側 #

ブラウザで
<script>
const ws = new WebSocket("ws://localhost:8000/ws/echo/");
ws.onopen = () => ws.send(JSON.stringify({msg: "hi"}));
ws.onmessage = (e) => console.log(JSON.parse(e.data));
</script>

Group — broadcast #

ここが本当の価値。複数の接続に同じメッセージを送る場面。

チャットルーム Consumer
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope["url_route"]["kwargs"]["room"]
        self.group_name = f"chat_{self.room_name}"

        # グループに参加
        await self.channel_layer.group_add(self.group_name, self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.group_name, self.channel_name)

    async def receive(self, text_data=None, bytes_data=None):
        data = json.loads(text_data)
        message = data["message"]
        user = self.scope["user"]
        username = user.username if user.is_authenticated else "anon"

        # グループ全体に send
        await self.channel_layer.group_send(
            self.group_name,
            {
                "type": "chat.message",   # ハンドラ名とマッチ
                "username": username,
                "message": message,
            },
        )

    async def chat_message(self, event):
        # group_send の type="chat.message" がこれを呼ぶ
        await self.send(text_data=json.dumps({
            "username": event["username"],
            "message": event["message"],
        }))
routing.py
websocket_urlpatterns = [
    re_path(r"ws/chat/(?P<room>\w+)/$", consumers.ChatConsumer.as_asgi()),
]

中心の仕組み:

  • channel_name — この接続の固有 ID (Channels が自動付与)
  • group_name — 私たちが作った論理チャンネル (例: chat_general)
  • group_add — この接続をグループに参加させる
  • group_send — グループのすべての接続にメッセージ
  • メッセージの type フィールド (chat.message) → 同じ Consumer のメソッド (chat_message) を呼ぶ

type のドット (.) がメソッド名のアンダースコア (_) に変換されます。命名規約。

Channel Layer がすること #

channel_layer.group_sendRedis の list / pub-sub の上で動作。メッセージが:

  1. Redis に push
  2. グループに参加したすべてのチャンネルが取り出す
  3. それぞれ自分の Consumer のハンドラを呼ぶ

これが 複数のワーカー / サーバーに散らばった接続 に同じメッセージを送る道です。単一プロセスでは InMemoryChannelLayer も可能ですが、運用は Redis。

HTTP view からの push #

WebSocket だけの世界ではなく、通常の HTTP view やシグナル、Celery task から WebSocket 接続にメッセージを送れて初めて本当に役に立ちます。

views.py — 通知 push
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer

def create_notification(request):
    notif = Notification.objects.create(
        user=request.user,
        text=request.POST["text"],
    )

    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f"user_{request.user.id}",
        {
            "type": "notify.message",
            "id": notif.id,
            "text": notif.text,
        },
    )
    return JsonResponse({"ok": True})

get_channel_layer() で layer を得て、同期 view では async_to_sync で包みます (#1 のアダプタ)。

非同期 view / Celery (async サポート時) ではそのまま await

Celery / シグナルとの結合 #

#5transaction.on_commit パターンがそのまま生きます。

シグナル → WebSocket
@receiver(post_save, sender=Notification)
def push_notification(sender, instance, created, **kwargs):
    if not created:
        return

    def push():
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)(
            f"user_{instance.user_id}",
            {"type": "notify.message", "id": instance.id, "text": instance.text},
        )

    transaction.on_commit(push)

トランザクション commit 後に push — 存在しない通知に対する push は飛びません。

認証 — AuthMiddlewareStack #

asgi.py で見たそれ。

自動で
"websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),

これがセッションクッキーを読んで scope["user"] に詰めてくれます。Consumer の中で:

認証確認
async def connect(self):
    user = self.scope["user"]
    if not user.is_authenticated:
        await self.close()
        return
    await self.accept()

JWT などのトークン認証 #

セッションクッキーの代わりにトークンを使うならカスタムミドルウェアを作ります。

myapp/middleware.py
from urllib.parse import parse_qs
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser

class TokenAuthMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        query = parse_qs(scope.get("query_string", b"").decode())
        token = query.get("token", [None])[0]
        scope["user"] = await self.authenticate(token)
        return await self.app(scope, receive, send)

    @database_sync_to_async
    def authenticate(self, token):
        from .models import AuthToken
        if not token:
            return AnonymousUser()
        try:
            return AuthToken.objects.select_related("user").get(value=token).user
        except AuthToken.DoesNotExist:
            return AnonymousUser()

database_sync_to_async が ORM 呼び出しを非同期コンテキストへ安全に。

asgi.py — 適用
"websocket": TokenAuthMiddleware(URLRouter(websocket_urlpatterns)),

デプロイ #

daphne または uvicorn #

#1 で見た ASGI サーバー。WebSocket まで扱うので daphne / uvicorn / hypercorn から選択。

daphne
daphne -b 0.0.0.0 -p 8000 myproject.asgi:application
gunicorn + uvicorn worker
gunicorn myproject.asgi:application \
  -k uvicorn.workers.UvicornWorker \
  --workers 4 --bind 0.0.0.0:8000

nginx の WebSocket プロキシ #

リバースプロキシが nginx なら WebSocket のアップグレードを明示しなければなりません。

nginx.conf
upstream django_asgi {
    server 127.0.0.1:8000;
}

map $http_upgrade $connection_upgrade {
    default upgrade;
    ''      close;
}

server {
    listen 443 ssl http2;
    server_name myapp.com;

    location / {
        proxy_pass http://django_asgi;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # WebSocket アップグレード
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;

        # タイムアウト — WebSocket は長く
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
    }
}

中心:

  • proxy_http_version 1.1
  • UpgradeConnection ヘッダーの伝達
  • proxy_read_timeout を長く (デフォルト 60 秒では 1 分で切れる)

ワーカー数 #

WebSocket ワーカーは 長寿命接続 を扱うので、同期ワーカーのように「リクエスト = ワーカー占有」 ではありません。1 つのワーカーが数千の接続を扱えます。CPU ではなく メモリ / 接続上限 で決まります。

単純なサイトは Channels 分離 #

HTTP と WebSocket を同じ ASGI アプリで立ち上げるのが最もシンプル。ただしトラフィックが大きくなると:

  • HTTP は gunicorn + sync worker (速く、よく知られた運用)
  • WebSocket だけ別の daphne プロセス

この分離パターンがよくあります。nginx が path でルーティング (/ws/* → daphne、それ以外 → gunicorn)。

よく出会う落とし穴 #

1) channel_layer.group_send が届かない #

CHANNEL_LAYERS 設定漏れまたは Redis 接続失敗。get_channel_layer() が None を返したら layer 未設定。

2) 別のワーカーの接続に届かない #

InMemoryChannelLayer は 同じプロセスのみ 共有。多重ワーカーは必ず Redis (または別の外部) バックエンド。

3) ORM 呼び出しで SynchronousOnlyOperation #

非同期 Consumer の中で同期 ORM 呼び出し。a メソッドまたは database_sync_to_async

from channels.db import database_sync_to_async

@database_sync_to_async
def get_recent_messages(room):
    return list(Message.objects.filter(room=room)[:50])

async def connect(self):
    msgs = await get_recent_messages(self.room_name)

4) メッセージ type とメソッド名 #

type="chat.message"async def chat_message(self, event)。ドットがアンダースコアに変換されます。一致しない場合、メッセージが無視されます (エラーも出ない)。

5) close code を無視 #

disconnect(close_code) の close_code で正常終了 / エラー / 期限切れなどを区別できます。ロギングに活用。

6) 認証変更の追跡 #

AuthMiddlewareStack接続時点 のユーザー情報を使用。セッションが期限切れになってもパスワードが変わっても接続は生きています。機密の場面は定期的に再認証を。

代替 — 短く #

Django 陣営の外のオプション:

似合う場面
SSEサーバー → クライアント単方向、HTTP だけで十分
Pusher / Ablyマネージド — インフラを作りたくないとき
Phoenix Channels (Elixir)数十万接続規模
Socket.io (Node)フォールバック自動、JS エコシステムフレンドリ
Centrifugo別プロセスでメッセージング、言語に非依存

Django + Channels がよく似合う場面は すでに Django サイトがあって、そこにリアルタイム機能を加える 場面。最初から数十万の同時接続が目標なら別のスタックを見るのが正直です。

まとめ #

今回つかんだもの:

  • WebSocket の場面 (双方向、低レイテンシ)、単方向なら SSE もよい
  • Channels = Django の上の WebSocket/ASGI 陣営、channel layer (Redis) が中心
  • ProtocolTypeRouter で http / websocket 分離、AuthMiddlewareStack
  • AsyncWebsocketConsumerconnect/disconnect/receive
  • Group: group_addgroup_sendgroup_discardtype/メソッドのマッピング
  • HTTP view / シグナル / Celery からの push: get_channel_layer + async_to_sync(group_send)
  • シグナル + transaction.on_commit で安全な push
  • 認証: セッションは AuthMiddlewareStack、トークンはカスタムミドルウェア
  • デプロイ: daphne / uvicorn、nginx の Upgrade/Connection、read_timeout
  • 落とし穴: SynchronousOnlyOperation、type 命名、layer 未設定、多重ワーカー
  • 代替: SSE、Pusher/Ably、Centrifugo

次回 (#7 デプロイのセキュリティ) ではシリーズを締めくくりつつ運用の最後の場面 — settings 分離、ALLOWED_HOSTS、CSRF、クッキーセキュリティ、secret 管理、manage.py check –deploy — を整理します。

X