장고 고급 #6 Django Channels — WebSocket

7 분 소요

#1 ASGI에서 본 비동기 진영의 정점입니다. WebSocket으로 실시간 양방향 통신을 구현하는 도구가 Django Channels입니다.

왜 WebSocket 인가 #

HTTP 요청-응답 모델로 못 푸는 경우:

  • 실시간 알림 — 새 메시지가 도착하면 즉시
  • 채팅, 협업 편집 — 양방향, 저지연
  • 라이브 대시보드 — 서버가 push로 갱신
  • 게임, 화이트보드 — 빈번한 양방향

해결 후보들:

폴링Long pollingSSEWebSocket
방향클라 → 서버 (간격)클라 → 서버 (대기)서버 → 클라 단방향양방향
연결 수명짧음긴 요청긴 응답항상 켜짐
프록시/방화벽친화적친화적친화적 (HTTP)업그레이드 필요
구현 난이도매우 쉬움쉬움쉬움중간
구분단순 갱신알림 일부일방향 push양방향, 채팅

서버 → 클라 단방향이면 SSE (Server-Sent Events)도 충분히 좋은 선택. 양방향이 필요할 때 WebSocket.

Channels의 쓰임 #

장고 자체는 ASGI 위에서 비동기 view를 지원하지만, WebSocket 프로토콜 처리는 기본 포함되지 않습니다. 그 역할을 담당하는 것이 Channels입니다.

Channels가 가져오는 것:

  • WebSocket / 다른 프로토콜 다루기
  • Channel Layer — 워커 간 메시지 교환 (보통 Redis 백엔드)
  • 인증/세션 미들웨어 (HTTP와 같은 도구)
  • ASGI 라우팅

설치와 셋업 #

설치
pip install channels channels-redis
settings.py
INSTALLED_APPS = [
    ...,
    "daphne",        # ASGI 서버, runserver와 통합
    "channels",
    "myapp",
]

ASGI_APPLICATION = "myproject.asgi.application"

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("127.0.0.1", 6379)],
        },
    },
}

daphneINSTALLED_APPS 맨 위 가까이에 두면 runserver가 daphne 기반으로 동작 — 개발 시 WebSocket도 같이.

myproject/asgi.py
import os
import django

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
django.setup()

from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from django.core.asgi import get_asgi_application
from myapp.routing import websocket_urlpatterns

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(websocket_urlpatterns)
    ),
})

핵심:

  • ProtocolTypeRouterhttp, websocket 별로 라우팅
  • HTTP는 기존 장고 그대로
  • WebSocket은 별도 라우터로
  • AuthMiddlewareStack — 세션 쿠키에서 사용자 인식

첫 Consumer #

Consumer는 view의 WebSocket 버전.

myapp/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class EchoConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()
        await self.send(text_data=json.dumps({"hello": "world"}))

    async def disconnect(self, close_code):
        pass

    async def receive(self, text_data=None, bytes_data=None):
        data = json.loads(text_data)
        await self.send(text_data=json.dumps({"echo": data}))

세 후크:

  • connect — 클라이언트가 연결 시도. accept()로 수락, 안 부르면 거절
  • disconnect — 끊김
  • receive — 메시지 수신
myapp/routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r"ws/echo/$", consumers.EchoConsumer.as_asgi()),
]

as_asgi()as_view()의 WebSocket 버전. 클래스 인스턴스를 ASGI 앱으로 만들어줍니다.

클라이언트 측 #

브라우저에서
<script>
const ws = new WebSocket("ws://localhost:8000/ws/echo/");
ws.onopen = () => ws.send(JSON.stringify({msg: "hi"}));
ws.onmessage = (e) => console.log(JSON.parse(e.data));
</script>

Group — broadcast #

여기가 진짜 가치. 여러 연결에 같은 메시지를 보내는 기능입니다.

채팅방 Consumer
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope["url_route"]["kwargs"]["room"]
        self.group_name = f"chat_{self.room_name}"

        # 그룹에 가입
        await self.channel_layer.group_add(self.group_name, self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.group_name, self.channel_name)

    async def receive(self, text_data=None, bytes_data=None):
        data = json.loads(text_data)
        message = data["message"]
        user = self.scope["user"]
        username = user.username if user.is_authenticated else "anon"

        # 그룹 전체에 send
        await self.channel_layer.group_send(
            self.group_name,
            {
                "type": "chat.message",   # 핸들러 이름과 매칭
                "username": username,
                "message": message,
            },
        )

    async def chat_message(self, event):
        # group_send의 type="chat.message" 가 이걸 호출
        await self.send(text_data=json.dumps({
            "username": event["username"],
            "message": event["message"],
        }))
routing.py
websocket_urlpatterns = [
    re_path(r"ws/chat/(?P<room>\w+)/$", consumers.ChatConsumer.as_asgi()),
]

핵심 메커니즘:

  • channel_name — 이 연결의 고유 ID (Channels가 자동 부여)
  • group_name — 우리가 만든 논리 채널 (예: chat_general)
  • group_add — 이 연결을 그룹에 가입
  • group_send — 그룹의 모든 연결에 메시지
  • 메시지의 type 필드 (chat.message) → 같은 Consumer의 메소드 (chat_message) 호출

type의 점(.)이 메소드 이름의 언더스코어(_)로 변환됩니다. 명명 규칙.

Channel Layer가 하는 일 #

channel_layer.group_sendRedis의 list / pub-sub 위에 동작. 메시지가:

  1. Redis에 push
  2. 그룹에 가입한 모든 채널이 가져감
  3. 각자 자신의 Consumer에서 핸들러 호출

이게 여러 워커/서버에 흩어진 연결들에 같은 메시지를 보내는 길입니다. 단일 프로세스에서는 InMemoryChannelLayer도 가능하지만, 운영은 Redis.

HTTP view에서 push #

WebSocket만의 세계가 아니라, 일반 HTTP view 나 시그널, Celery task에서 WebSocket 연결로 메시지를 보낼 수 있어야 진짜 쓸모가 있습니다.

views.py — 알림 push
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer

def create_notification(request):
    notif = Notification.objects.create(
        user=request.user,
        text=request.POST["text"],
    )

    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f"user_{request.user.id}",
        {
            "type": "notify.message",
            "id": notif.id,
            "text": notif.text,
        },
    )
    return JsonResponse({"ok": True})

get_channel_layer()로 layer를 얻고, **동기 view에서는 async_to_sync**로 감쌉니다 (#1의 어댑터).

비동기 view / Celery (async 지원 시)에서는 그냥 await.

Celery / 시그널과 결합 #

#5transaction.on_commit 패턴이 그대로 살아납니다.

시그널 → WebSocket
@receiver(post_save, sender=Notification)
def push_notification(sender, instance, created, **kwargs):
    if not created:
        return

    def push():
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)(
            f"user_{instance.user_id}",
            {"type": "notify.message", "id": instance.id, "text": instance.text},
        )

    transaction.on_commit(push)

trasaction commit 후 push — 존재하지 않는 알림에 대한 push가 가지 않습니다.

인증 — AuthMiddlewareStack #

asgi.py에서 본 그것.

자동으로
"websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),

이게 세션 쿠키를 읽어 scope["user"]에 채워줍니다. Consumer 안에서:

인증 확인
async def connect(self):
    user = self.scope["user"]
    if not user.is_authenticated:
        await self.close()
        return
    await self.accept()

JWT 등 토큰 인증 #

세션 쿠키 대신 토큰을 쓰면 커스텀 미들웨어를 만듭니다.

myapp/middleware.py
from urllib.parse import parse_qs
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser

class TokenAuthMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        query = parse_qs(scope.get("query_string", b"").decode())
        token = query.get("token", [None])[0]
        scope["user"] = await self.authenticate(token)
        return await self.app(scope, receive, send)

    @database_sync_to_async
    def authenticate(self, token):
        from .models import AuthToken
        if not token:
            return AnonymousUser()
        try:
            return AuthToken.objects.select_related("user").get(value=token).user
        except AuthToken.DoesNotExist:
            return AnonymousUser()

database_sync_to_async가 ORM 호출을 비동기 컨텍스트로 안전하게.

asgi.py — 적용
"websocket": TokenAuthMiddleware(URLRouter(websocket_urlpatterns)),

배포 #

daphne 또는 uvicorn #

#1에서 본 ASGI 서버. WebSocket까지 다루므로 daphne / uvicorn / hypercorn 중 선택.

daphne
daphne -b 0.0.0.0 -p 8000 myproject.asgi:application
gunicorn + uvicorn worker
gunicorn myproject.asgi:application \
  -k uvicorn.workers.UvicornWorker \
  --workers 4 --bind 0.0.0.0:8000

nginx의 WebSocket 프록시 #

리버스 프록시가 nginx 라면 WebSocket 업그레이드를 명시해야 합니다.

nginx.conf
upstream django_asgi {
    server 127.0.0.1:8000;
}

map $http_upgrade $connection_upgrade {
    default upgrade;
    ''      close;
}

server {
    listen 443 ssl http2;
    server_name myapp.com;

    location / {
        proxy_pass http://django_asgi;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;

        # WebSocket 업그레이드
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;

        # 타임아웃 — WebSocket은 길게
        proxy_read_timeout 3600s;
        proxy_send_timeout 3600s;
    }
}

핵심:

  • proxy_http_version 1.1
  • Upgrade, Connection 헤더 전달
  • proxy_read_timeout을 길게 (기본 60초면 1분만에 끊김)

워커 수 #

WebSocket 워커는 장수명 연결을 다루므로 동기 워커처럼 “요청 = 워커 점유” 가 아닙니다. 하나의 워커가 수천 연결을 다룰 수 있습니다. CPU가 아닌 메모리/연결 한도로 결정.

단순 사이트는 Channels 분리 #

HTTP와 WebSocket을 같은 ASGI 앱으로 띄우는 게 가장 단순. 다만 트래픽이 커지면:

  • HTTP는 gunicorn + sync worker (빠름, 잘 알려진 운영)
  • WebSocket만 별도 daphne 프로세스

이 분리 패턴이 흔합니다. nginx가 path로 라우팅 (/ws/* → daphne, 나머지 → gunicorn).

자주 만나는 함정 #

1) channel_layer.group_send이 안 가져옴 #

CHANNEL_LAYERS 설정 누락 또는 Redis 연결 실패. get_channel_layer()가 None을 반환하면 layer 미설정.

2) 다른 워커의 연결에 안 가져옴 #

InMemoryChannelLayer는 같은 프로세스만 공유. 다중 워커는 반드시 Redis (또는 다른 외부) 백엔드.

3) ORM 호출에서 SynchronousOnlyOperation #

비동기 Consumer 안에서 동기 ORM 호출. a 메소드 또는 database_sync_to_async.

from channels.db import database_sync_to_async

@database_sync_to_async
def get_recent_messages(room):
    return list(Message.objects.filter(room=room)[:50])

async def connect(self):
    msgs = await get_recent_messages(self.room_name)

4) 메시지 type과 메소드 이름 #

type="chat.message"async def chat_message(self, event). 점이 언더스코어. 안 맞으면 메시지가 무시됩니다 (에러도 안 남).

5) close code 무시 #

disconnect(close_code)에서 close_code로 정상 종료 / 에러 / 만료 등을 구분할 수 있습니다. 로깅에 활용.

6) 인증 변경 추적 #

AuthMiddlewareStack연결 시점의 사용자 정보를 사용. 세션이 만료되거나 비밀번호가 바뀌어도 연결은 살아 있습니다. 민감한 경우는 주기적 재인증.

대안 — 짧게 #

장고 진영 외 옵션:

어울리는 경우
SSE서버 → 클라 단방향, HTTP만으로 충분
Pusher / Ably매니지드 — 인프라 안 만들고 싶을 때
Phoenix Channels (Elixir)수십만 연결 규모
Socket.io (Node)폴백 자동, JS 생태계 친화
Centrifugo별도 프로세스로 메시징, 언어 무관

장고 + Channels가 잘 어울리는 경우는 이미 장고 사이트가 있고, 거기에 실시간 기능을 더하는 상황입니다. 처음부터 수십만 동시 연결이 목표라면 다른 스택을 보는 게 정직합니다.

정리 #

이번 글에서 잡은 것:

  • WebSocket의 쓰임 (양방향, 저지연), 단방향이면 SSE도 좋음
  • Channels = 장고 위의 WebSocket/ASGI 진영, **channel layer (Redis)**가 핵심
  • ProtocolTypeRouter로 http / websocket 분리, AuthMiddlewareStack
  • AsyncWebsocketConsumerconnect/disconnect/receive
  • Group: group_add, group_send, group_discard, type/메소드 매핑
  • HTTP view/시그널/Celery에서 push: get_channel_layer + async_to_sync(group_send)
  • 시그널 + transaction.on_commit으로 안전한 push
  • 인증: 세션은 AuthMiddlewareStack, 토큰은 커스텀 미들웨어
  • 배포: daphne / uvicorn, nginx의 Upgrade/Connection, read_timeout
  • 함정: SynchronousOnlyOperation, type 명명, layer 미설정, 다중 워커
  • 대안: SSE, Pusher/Ably, Centrifugo

다음 글(#7 배포 보안)에서는 시리즈를 마무리하면서 운영의 마지막 단계 — settings 분리, ALLOWED_HOSTS, CSRF, 쿠키 보안, secret 관리, manage.py check –deploy — 를 정리합니다.

X