장고 고급 #6 Django Channels — WebSocket
#1 ASGI에서 본 비동기 진영의 정점입니다. WebSocket으로 실시간 양방향 통신을 구현하는 도구가 Django Channels입니다.
왜 WebSocket 인가 #
HTTP 요청-응답 모델로 못 푸는 경우:
- 실시간 알림 — 새 메시지가 도착하면 즉시
- 채팅, 협업 편집 — 양방향, 저지연
- 라이브 대시보드 — 서버가 push로 갱신
- 게임, 화이트보드 — 빈번한 양방향
해결 후보들:
| 폴링 | Long polling | SSE | WebSocket | |
|---|---|---|---|---|
| 방향 | 클라 → 서버 (간격) | 클라 → 서버 (대기) | 서버 → 클라 단방향 | 양방향 |
| 연결 수명 | 짧음 | 긴 요청 | 긴 응답 | 항상 켜짐 |
| 프록시/방화벽 | 친화적 | 친화적 | 친화적 (HTTP) | 업그레이드 필요 |
| 구현 난이도 | 매우 쉬움 | 쉬움 | 쉬움 | 중간 |
| 구분 | 단순 갱신 | 알림 일부 | 일방향 push | 양방향, 채팅 |
서버 → 클라 단방향이면 SSE (Server-Sent Events)도 충분히 좋은 선택. 양방향이 필요할 때 WebSocket.
Channels의 쓰임 #
장고 자체는 ASGI 위에서 비동기 view를 지원하지만, WebSocket 프로토콜 처리는 기본 포함되지 않습니다. 그 역할을 담당하는 것이 Channels입니다.
Channels가 가져오는 것:
- WebSocket / 다른 프로토콜 다루기
- Channel Layer — 워커 간 메시지 교환 (보통 Redis 백엔드)
- 인증/세션 미들웨어 (HTTP와 같은 도구)
- ASGI 라우팅
설치와 셋업 #
pip install channels channels-redisINSTALLED_APPS = [
...,
"daphne", # ASGI 서버, runserver와 통합
"channels",
"myapp",
]
ASGI_APPLICATION = "myproject.asgi.application"
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("127.0.0.1", 6379)],
},
},
}daphne를 INSTALLED_APPS 맨 위 가까이에 두면 runserver가 daphne 기반으로 동작 — 개발 시 WebSocket도 같이.
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
django.setup()
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from django.core.asgi import get_asgi_application
from myapp.routing import websocket_urlpatterns
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(websocket_urlpatterns)
),
})핵심:
ProtocolTypeRouter—http,websocket별로 라우팅- HTTP는 기존 장고 그대로
- WebSocket은 별도 라우터로
AuthMiddlewareStack— 세션 쿠키에서 사용자 인식
첫 Consumer #
Consumer는 view의 WebSocket 버전.
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class EchoConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
await self.send(text_data=json.dumps({"hello": "world"}))
async def disconnect(self, close_code):
pass
async def receive(self, text_data=None, bytes_data=None):
data = json.loads(text_data)
await self.send(text_data=json.dumps({"echo": data}))세 후크:
connect— 클라이언트가 연결 시도.accept()로 수락, 안 부르면 거절disconnect— 끊김receive— 메시지 수신
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r"ws/echo/$", consumers.EchoConsumer.as_asgi()),
]as_asgi()가 as_view()의 WebSocket 버전. 클래스 인스턴스를 ASGI 앱으로 만들어줍니다.
클라이언트 측 #
<script>
const ws = new WebSocket("ws://localhost:8000/ws/echo/");
ws.onopen = () => ws.send(JSON.stringify({msg: "hi"}));
ws.onmessage = (e) => console.log(JSON.parse(e.data));
</script>Group — broadcast #
여기가 진짜 가치. 여러 연결에 같은 메시지를 보내는 기능입니다.
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room_name = self.scope["url_route"]["kwargs"]["room"]
self.group_name = f"chat_{self.room_name}"
# 그룹에 가입
await self.channel_layer.group_add(self.group_name, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.group_name, self.channel_name)
async def receive(self, text_data=None, bytes_data=None):
data = json.loads(text_data)
message = data["message"]
user = self.scope["user"]
username = user.username if user.is_authenticated else "anon"
# 그룹 전체에 send
await self.channel_layer.group_send(
self.group_name,
{
"type": "chat.message", # 핸들러 이름과 매칭
"username": username,
"message": message,
},
)
async def chat_message(self, event):
# group_send의 type="chat.message" 가 이걸 호출
await self.send(text_data=json.dumps({
"username": event["username"],
"message": event["message"],
}))websocket_urlpatterns = [
re_path(r"ws/chat/(?P<room>\w+)/$", consumers.ChatConsumer.as_asgi()),
]핵심 메커니즘:
channel_name— 이 연결의 고유 ID (Channels가 자동 부여)group_name— 우리가 만든 논리 채널 (예:chat_general)group_add— 이 연결을 그룹에 가입group_send— 그룹의 모든 연결에 메시지- 메시지의
type필드 (chat.message) → 같은 Consumer의 메소드 (chat_message) 호출
type의 점(.)이 메소드 이름의 언더스코어(_)로 변환됩니다. 명명 규칙.
Channel Layer가 하는 일 #
channel_layer.group_send는 Redis의 list / pub-sub 위에 동작. 메시지가:
- Redis에 push
- 그룹에 가입한 모든 채널이 가져감
- 각자 자신의 Consumer에서 핸들러 호출
이게 여러 워커/서버에 흩어진 연결들에 같은 메시지를 보내는 길입니다. 단일 프로세스에서는 InMemoryChannelLayer도 가능하지만, 운영은 Redis.
HTTP view에서 push #
WebSocket만의 세계가 아니라, 일반 HTTP view 나 시그널, Celery task에서 WebSocket 연결로 메시지를 보낼 수 있어야 진짜 쓸모가 있습니다.
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
def create_notification(request):
notif = Notification.objects.create(
user=request.user,
text=request.POST["text"],
)
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"user_{request.user.id}",
{
"type": "notify.message",
"id": notif.id,
"text": notif.text,
},
)
return JsonResponse({"ok": True})get_channel_layer()로 layer를 얻고, **동기 view에서는 async_to_sync**로 감쌉니다 (#1의 어댑터).
비동기 view / Celery (async 지원 시)에서는 그냥 await.
Celery / 시그널과 결합 #
#5의 transaction.on_commit 패턴이 그대로 살아납니다.
@receiver(post_save, sender=Notification)
def push_notification(sender, instance, created, **kwargs):
if not created:
return
def push():
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"user_{instance.user_id}",
{"type": "notify.message", "id": instance.id, "text": instance.text},
)
transaction.on_commit(push)trasaction commit 후 push — 존재하지 않는 알림에 대한 push가 가지 않습니다.
인증 — AuthMiddlewareStack
#
asgi.py에서 본 그것.
"websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),이게 세션 쿠키를 읽어 scope["user"]에 채워줍니다. Consumer 안에서:
async def connect(self):
user = self.scope["user"]
if not user.is_authenticated:
await self.close()
return
await self.accept()JWT 등 토큰 인증 #
세션 쿠키 대신 토큰을 쓰면 커스텀 미들웨어를 만듭니다.
from urllib.parse import parse_qs
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
class TokenAuthMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
query = parse_qs(scope.get("query_string", b"").decode())
token = query.get("token", [None])[0]
scope["user"] = await self.authenticate(token)
return await self.app(scope, receive, send)
@database_sync_to_async
def authenticate(self, token):
from .models import AuthToken
if not token:
return AnonymousUser()
try:
return AuthToken.objects.select_related("user").get(value=token).user
except AuthToken.DoesNotExist:
return AnonymousUser()database_sync_to_async가 ORM 호출을 비동기 컨텍스트로 안전하게.
"websocket": TokenAuthMiddleware(URLRouter(websocket_urlpatterns)),배포 #
daphne 또는 uvicorn #
#1에서 본 ASGI 서버. WebSocket까지 다루므로 daphne / uvicorn / hypercorn 중 선택.
daphne -b 0.0.0.0 -p 8000 myproject.asgi:applicationgunicorn myproject.asgi:application \
-k uvicorn.workers.UvicornWorker \
--workers 4 --bind 0.0.0.0:8000nginx의 WebSocket 프록시 #
리버스 프록시가 nginx 라면 WebSocket 업그레이드를 명시해야 합니다.
upstream django_asgi {
server 127.0.0.1:8000;
}
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 443 ssl http2;
server_name myapp.com;
location / {
proxy_pass http://django_asgi;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket 업그레이드
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
# 타임아웃 — WebSocket은 길게
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
}핵심:
proxy_http_version 1.1Upgrade,Connection헤더 전달proxy_read_timeout을 길게 (기본 60초면 1분만에 끊김)
워커 수 #
WebSocket 워커는 장수명 연결을 다루므로 동기 워커처럼 “요청 = 워커 점유” 가 아닙니다. 하나의 워커가 수천 연결을 다룰 수 있습니다. CPU가 아닌 메모리/연결 한도로 결정.
단순 사이트는 Channels 분리 #
HTTP와 WebSocket을 같은 ASGI 앱으로 띄우는 게 가장 단순. 다만 트래픽이 커지면:
- HTTP는 gunicorn + sync worker (빠름, 잘 알려진 운영)
- WebSocket만 별도 daphne 프로세스
이 분리 패턴이 흔합니다. nginx가 path로 라우팅 (/ws/* → daphne, 나머지 → gunicorn).
자주 만나는 함정 #
1) channel_layer.group_send이 안 가져옴
#
CHANNEL_LAYERS 설정 누락 또는 Redis 연결 실패. get_channel_layer()가 None을 반환하면 layer 미설정.
2) 다른 워커의 연결에 안 가져옴 #
InMemoryChannelLayer는 같은 프로세스만 공유. 다중 워커는 반드시 Redis (또는 다른 외부) 백엔드.
3) ORM 호출에서 SynchronousOnlyOperation
#
비동기 Consumer 안에서 동기 ORM 호출. a 메소드 또는 database_sync_to_async.
from channels.db import database_sync_to_async
@database_sync_to_async
def get_recent_messages(room):
return list(Message.objects.filter(room=room)[:50])
async def connect(self):
msgs = await get_recent_messages(self.room_name)4) 메시지 type과 메소드 이름 #
type="chat.message" → async def chat_message(self, event). 점이 언더스코어. 안 맞으면 메시지가 무시됩니다 (에러도 안 남).
5) close code 무시 #
disconnect(close_code)에서 close_code로 정상 종료 / 에러 / 만료 등을 구분할 수 있습니다. 로깅에 활용.
6) 인증 변경 추적 #
AuthMiddlewareStack은 연결 시점의 사용자 정보를 사용. 세션이 만료되거나 비밀번호가 바뀌어도 연결은 살아 있습니다. 민감한 경우는 주기적 재인증.
대안 — 짧게 #
장고 진영 외 옵션:
| 어울리는 경우 | |
|---|---|
| SSE | 서버 → 클라 단방향, HTTP만으로 충분 |
| Pusher / Ably | 매니지드 — 인프라 안 만들고 싶을 때 |
| Phoenix Channels (Elixir) | 수십만 연결 규모 |
| Socket.io (Node) | 폴백 자동, JS 생태계 친화 |
| Centrifugo | 별도 프로세스로 메시징, 언어 무관 |
장고 + Channels가 잘 어울리는 경우는 이미 장고 사이트가 있고, 거기에 실시간 기능을 더하는 상황입니다. 처음부터 수십만 동시 연결이 목표라면 다른 스택을 보는 게 정직합니다.
정리 #
이번 글에서 잡은 것:
- WebSocket의 쓰임 (양방향, 저지연), 단방향이면 SSE도 좋음
- Channels = 장고 위의 WebSocket/ASGI 진영, **channel layer (Redis)**가 핵심
ProtocolTypeRouter로 http / websocket 분리,AuthMiddlewareStackAsyncWebsocketConsumer—connect/disconnect/receive- Group:
group_add,group_send,group_discard,type/메소드 매핑 - HTTP view/시그널/Celery에서 push:
get_channel_layer+async_to_sync(group_send) - 시그널 +
transaction.on_commit으로 안전한 push - 인증: 세션은
AuthMiddlewareStack, 토큰은 커스텀 미들웨어 - 배포: daphne / uvicorn, nginx의 Upgrade/Connection, read_timeout
- 함정:
SynchronousOnlyOperation, type 명명, layer 미설정, 다중 워커 - 대안: SSE, Pusher/Ably, Centrifugo
다음 글(#7 배포 보안)에서는 시리즈를 마무리하면서 운영의 마지막 단계 — settings 분리, ALLOWED_HOSTS, CSRF, 쿠키 보안, secret 관리, manage.py check –deploy — 를 정리합니다.