Django上級 #6 Django Channels — WebSocket
#1 ASGI で見た非同期陣営の頂点。WebSocket でリアルタイム双方向通信を作る場面です。道具は Django Channels。
なぜ WebSocket なのか #
HTTP のリクエスト - レスポンスモデルでは解けない場面:
- リアルタイム通知 — 新しいメッセージが届いたら即時
- チャット、共同編集 — 双方向、低レイテンシ
- ライブダッシュボード — サーバーが push で更新
- ゲーム、ホワイトボード — 頻繁な双方向
候補となる解決策:
| ポーリング | Long polling | SSE | WebSocket | |
|---|---|---|---|---|
| 方向 | クライアント → サーバー (間隔) | クライアント → サーバー (待機) | サーバー → クライアント単方向 | 双方向 |
| 接続寿命 | 短い | 長いリクエスト | 長いレスポンス | 常時接続 |
| プロキシ / ファイアウォール | フレンドリ | フレンドリ | フレンドリ (HTTP) | アップグレードが必要 |
| 実装難易度 | 非常に簡単 | 簡単 | 簡単 | 中間 |
| 場面 | 単純な更新 | 通知の一部 | 単方向 push | 双方向、チャット |
サーバー → クライアント単方向 なら SSE (Server-Sent Events) も十分よい選択肢。双方向が必要なときに WebSocket。
Channels の出番 #
Django 自体は ASGI の上で非同期 view をサポートしますが、WebSocket プロトコルの処理 はしません。そこに入るのが Channels。
Channels がもたらすもの:
- WebSocket / 別プロトコルの処理
- Channel Layer — ワーカー間のメッセージ交換 (普通は Redis バックエンド)
- 認証 / セッションミドルウェア (HTTP と同じ道具)
- ASGI ルーティング
インストールとセットアップ #
pip install channels channels-redisINSTALLED_APPS = [
...,
"daphne", # ASGI サーバー、runserver と統合
"channels",
"myapp",
]
ASGI_APPLICATION = "myproject.asgi.application"
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("127.0.0.1", 6379)],
},
},
}daphne を INSTALLED_APPS の 最上部に近い位置 に置けば runserver が daphne ベースで動作 — 開発時に WebSocket も一緒に。
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
django.setup()
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from django.core.asgi import get_asgi_application
from myapp.routing import websocket_urlpatterns
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(websocket_urlpatterns)
),
})中心:
ProtocolTypeRouter—http、websocketごとにルーティング- HTTP は既存 Django のまま
- WebSocket は別ルーターへ
AuthMiddlewareStack— セッションクッキーからユーザーを認識
最初の Consumer #
Consumer は view の WebSocket 版。
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class EchoConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
await self.send(text_data=json.dumps({"hello": "world"}))
async def disconnect(self, close_code):
pass
async def receive(self, text_data=None, bytes_data=None):
data = json.loads(text_data)
await self.send(text_data=json.dumps({"echo": data}))3 つのフック:
connect— クライアントが接続を試みる。accept()で受諾、呼ばないと拒否disconnect— 切断receive— メッセージ受信
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r"ws/echo/$", consumers.EchoConsumer.as_asgi()),
]as_asgi() が as_view() の WebSocket 版。クラスインスタンスを ASGI アプリにしてくれます。
クライアント側 #
<script>
const ws = new WebSocket("ws://localhost:8000/ws/echo/");
ws.onopen = () => ws.send(JSON.stringify({msg: "hi"}));
ws.onmessage = (e) => console.log(JSON.parse(e.data));
</script>Group — broadcast #
ここが本当の価値。複数の接続に同じメッセージを送る場面。
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope["url_route"]["kwargs"]["room"]
self.group_name = f"chat_{self.room_name}"
# グループに参加
await self.channel_layer.group_add(self.group_name, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.group_name, self.channel_name)
async def receive(self, text_data=None, bytes_data=None):
data = json.loads(text_data)
message = data["message"]
user = self.scope["user"]
username = user.username if user.is_authenticated else "anon"
# グループ全体に send
await self.channel_layer.group_send(
self.group_name,
{
"type": "chat.message", # ハンドラ名とマッチ
"username": username,
"message": message,
},
)
async def chat_message(self, event):
# group_send の type="chat.message" がこれを呼ぶ
await self.send(text_data=json.dumps({
"username": event["username"],
"message": event["message"],
}))websocket_urlpatterns = [
re_path(r"ws/chat/(?P<room>\w+)/$", consumers.ChatConsumer.as_asgi()),
]中心の仕組み:
channel_name— この接続の固有 ID (Channels が自動付与)group_name— 私たちが作った論理チャンネル (例:chat_general)group_add— この接続をグループに参加させるgroup_send— グループのすべての接続にメッセージ- メッセージの
typeフィールド (chat.message) → 同じ Consumer のメソッド (chat_message) を呼ぶ
type のドット (.) がメソッド名のアンダースコア (_) に変換されます。命名規約。
Channel Layer がすること #
channel_layer.group_send は Redis の list / pub-sub の上で動作。メッセージが:
- Redis に push
- グループに参加したすべてのチャンネルが取り出す
- それぞれ自分の Consumer のハンドラを呼ぶ
これが 複数のワーカー / サーバーに散らばった接続 に同じメッセージを送る道です。単一プロセスでは InMemoryChannelLayer も可能ですが、運用は Redis。
HTTP view からの push #
WebSocket だけの世界ではなく、通常の HTTP view やシグナル、Celery task から WebSocket 接続にメッセージを送れて初めて本当に役に立ちます。
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
def create_notification(request):
notif = Notification.objects.create(
user=request.user,
text=request.POST["text"],
)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"user_{request.user.id}",
{
"type": "notify.message",
"id": notif.id,
"text": notif.text,
},
)
return JsonResponse({"ok": True})get_channel_layer() で layer を得て、同期 view では async_to_sync で包みます (#1 のアダプタ)。
非同期 view / Celery (async サポート時) ではそのまま await。
Celery / シグナルとの結合 #
#5 の transaction.on_commit パターンがそのまま生きます。
@receiver(post_save, sender=Notification)
def push_notification(sender, instance, created, **kwargs):
if not created:
return
def push():
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"user_{instance.user_id}",
{"type": "notify.message", "id": instance.id, "text": instance.text},
)
transaction.on_commit(push)トランザクション commit 後に push — 存在しない通知に対する push は飛びません。
認証 — AuthMiddlewareStack
#
asgi.py で見たそれ。
"websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),これがセッションクッキーを読んで scope["user"] に詰めてくれます。Consumer の中で:
async def connect(self):
user = self.scope["user"]
if not user.is_authenticated:
await self.close()
return
await self.accept()JWT などのトークン認証 #
セッションクッキーの代わりにトークンを使うならカスタムミドルウェアを作ります。
from urllib.parse import parse_qs
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
class TokenAuthMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
query = parse_qs(scope.get("query_string", b"").decode())
token = query.get("token", [None])[0]
scope["user"] = await self.authenticate(token)
return await self.app(scope, receive, send)
@database_sync_to_async
def authenticate(self, token):
from .models import AuthToken
if not token:
return AnonymousUser()
try:
return AuthToken.objects.select_related("user").get(value=token).user
except AuthToken.DoesNotExist:
return AnonymousUser()database_sync_to_async が ORM 呼び出しを非同期コンテキストへ安全に。
"websocket": TokenAuthMiddleware(URLRouter(websocket_urlpatterns)),デプロイ #
daphne または uvicorn #
#1 で見た ASGI サーバー。WebSocket まで扱うので daphne / uvicorn / hypercorn から選択。
daphne -b 0.0.0.0 -p 8000 myproject.asgi:applicationgunicorn myproject.asgi:application \
-k uvicorn.workers.UvicornWorker \
--workers 4 --bind 0.0.0.0:8000nginx の WebSocket プロキシ #
リバースプロキシが nginx なら WebSocket のアップグレードを明示しなければなりません。
upstream django_asgi {
server 127.0.0.1:8000;
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 443 ssl http2;
server_name myapp.com;
location / {
proxy_pass http://django_asgi;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket アップグレード
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
# タイムアウト — WebSocket は長く
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
}中心:
proxy_http_version 1.1Upgrade、Connectionヘッダーの伝達proxy_read_timeoutを長く (デフォルト 60 秒では 1 分で切れる)
ワーカー数 #
WebSocket ワーカーは 長寿命接続 を扱うので、同期ワーカーのように「リクエスト = ワーカー占有」 ではありません。1 つのワーカーが数千の接続を扱えます。CPU ではなく メモリ / 接続上限 で決まります。
単純なサイトは Channels 分離 #
HTTP と WebSocket を同じ ASGI アプリで立ち上げるのが最もシンプル。ただしトラフィックが大きくなると:
- HTTP は gunicorn + sync worker (速く、よく知られた運用)
- WebSocket だけ別の daphne プロセス
この分離パターンがよくあります。nginx が path でルーティング (/ws/* → daphne、それ以外 → gunicorn)。
よく出会う落とし穴 #
1) channel_layer.group_send が届かない
#
CHANNEL_LAYERS 設定漏れまたは Redis 接続失敗。get_channel_layer() が None を返したら layer 未設定。
2) 別のワーカーの接続に届かない #
InMemoryChannelLayer は 同じプロセスのみ 共有。多重ワーカーは必ず Redis (または別の外部) バックエンド。
3) ORM 呼び出しで SynchronousOnlyOperation
#
非同期 Consumer の中で同期 ORM 呼び出し。a メソッドまたは database_sync_to_async。
from channels.db import database_sync_to_async
@database_sync_to_async
def get_recent_messages(room):
return list(Message.objects.filter(room=room)[:50])
async def connect(self):
msgs = await get_recent_messages(self.room_name)4) メッセージ type とメソッド名 #
type="chat.message" → async def chat_message(self, event)。ドットがアンダースコアに変換されます。一致しない場合、メッセージが無視されます (エラーも出ない)。
5) close code を無視 #
disconnect(close_code) の close_code で正常終了 / エラー / 期限切れなどを区別できます。ロギングに活用。
6) 認証変更の追跡 #
AuthMiddlewareStack は 接続時点 のユーザー情報を使用。セッションが期限切れになってもパスワードが変わっても接続は生きています。機密の場面は定期的に再認証を。
代替 — 短く #
Django 陣営の外のオプション:
| 似合う場面 | |
|---|---|
| SSE | サーバー → クライアント単方向、HTTP だけで十分 |
| Pusher / Ably | マネージド — インフラを作りたくないとき |
| Phoenix Channels (Elixir) | 数十万接続規模 |
| Socket.io (Node) | フォールバック自動、JS エコシステムフレンドリ |
| Centrifugo | 別プロセスでメッセージング、言語に非依存 |
Django + Channels がよく似合う場面は すでに Django サイトがあって、そこにリアルタイム機能を加える 場面。最初から数十万の同時接続が目標なら別のスタックを見るのが正直です。
まとめ #
今回つかんだもの:
- WebSocket の場面 (双方向、低レイテンシ)、単方向なら SSE もよい
- Channels = Django の上の WebSocket/ASGI 陣営、channel layer (Redis) が中心
ProtocolTypeRouterで http / websocket 分離、AuthMiddlewareStackAsyncWebsocketConsumer—connect/disconnect/receive- Group:
group_add、group_send、group_discard、type/メソッドのマッピング - HTTP view / シグナル / Celery からの push:
get_channel_layer+async_to_sync(group_send) - シグナル +
transaction.on_commitで安全な push - 認証: セッションは
AuthMiddlewareStack、トークンはカスタムミドルウェア - デプロイ: daphne / uvicorn、nginx の Upgrade/Connection、read_timeout
- 落とし穴:
SynchronousOnlyOperation、type 命名、layer 未設定、多重ワーカー - 代替: SSE、Pusher/Ably、Centrifugo
次回 (#7 デプロイのセキュリティ) ではシリーズを締めくくりつつ運用の最後の場面 — settings 分離、ALLOWED_HOSTS、CSRF、クッキーセキュリティ、secret 管理、manage.py check –deploy — を整理します。