목차
27 장

비동기와 백그라운드 작업

async 라우트의 사용 시점, BackgroundTasks로 응답 후 처리, 외부 큐(Celery, ARQ)가 필요해지는 경계, 그리고 동기 라이브러리와 안전하게 섞는 법을 정리합니다.

14장 비동기 입문18장 비동기 깊이에서 본 비동기가 FastAPI 안에서 어떻게 자연스럽게 풀리는지 — 그리고 응답 이후 처리해야 할 작업 (이메일, 무거운 변환, 외부 API)을 다루는 패턴들을 정리합니다.

본 챕터에서 외부 큐 (ARQ / Celery 등)가 필요해지는 경계를 명시합니다. 작은 후처리는 BackgroundTasks로 충분하지만, 재시도 / 추적 / 별도 스케일이 필요해지는 순간이 와요.

async def vs def — FastAPI가 알아서 처리 #

FastAPI의 라우트는 둘 다 받습니다.

둘 다 가능
@router.get("/sync")
def sync_handler():
    ...

@router.get("/async")
async def async_handler():
    ...

차이:

  • async def — 이벤트 루프 위에서 직접 실행. await가 필요하면 양도 가능
  • def — 자동으로 스레드 풀로 보내져 실행. 이벤트 루프를 막지 않음

FastAPI가 자동으로 적절한 방식으로 처리 합니다. 그래서 동기 라이브러리를 쓰는 라우트는 그냥 def로 적어도 다른 요청을 막지 않습니다.

절대 하지 말 것 — async def 안에서 동기 블로킹 #

🚫 가장 큰 함정
@router.get("/items")
async def list_items():
    response = requests.get("https://api.example.com")   # 동기, 블로킹!
    return response.json()

async def 안에서 동기 블로킹 함수를 호출하면 — 이벤트 루프 전체가 멈춥니다. 다른 요청도 멈춥니다. 19장 GIL과 동시성에서 본 GIL + 비동기의 만남.

해결 두 가지:

✅ 1. 비동기 라이브러리
import httpx

@router.get("/items")
async def list_items():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")
    return response.json()
✅ 2. 동기를 스레드로
import asyncio
import requests

@router.get("/items")
async def list_items():
    response = await asyncio.to_thread(requests.get, "https://api.example.com")
    return response.json()

또는 그냥 def로 적기 — FastAPI가 알아서 스레드로 보냅니다.

✅ 3. def로 적기
@router.get("/items")
def list_items():
    response = requests.get("https://api.example.com")
    return response.json()

실전 가이드 — 언제 무엇? #

상황첫 시도
비동기 라이브러리만 사용 (httpx, asyncpg, redis-py async)async def
동기 라이브러리 (requests, psycopg2)만 사용def
둘이 섞임async def + to_thread로 동기 부분 감싸기
CPU 바운드 처리def 또는 async def + ProcessPoolExecutor

본 책의 4부는 25장에서 SQLAlchemy 비동기를 썼으니 async def가 표준.

BackgroundTasks — 응답 후 처리 #

이메일 발송, 알림, 로그 같은 작업을 응답이 나간 뒤 백그라운드로 돌리고 싶을 때.

기본 사용
from fastapi import BackgroundTasks

def send_email_sync(to: str, subject: str, body: str):
    # 실제 이메일 전송 (smtplib 등)
    print(f"이메일 보냄: {to}")

@router.post("/users")
async def create_user(
    payload: UserCreate,
    background_tasks: BackgroundTasks,
    db: DBSession,
) -> UserOut:
    user = await user_service.create_user(db, payload.email, payload.password)
    background_tasks.add_task(
        send_email_sync, user.email, "환영", "가입을 환영합니다",
    )
    return user

흐름:

  1. 라우트 본문이 정상 종료
  2. 응답이 클라이언트에 전송됨
  3. 그 후 background_tasks의 함수들이 순서대로 실행

비동기 함수도 OK #

async task
async def send_email_async(to: str, subject: str, body: str):
    async with httpx.AsyncClient() as client:
        await client.post("https://email-api/", json={...})

background_tasks.add_task(send_email_async, user.email, "환영", "...")

add_task는 동기 · 비동기 둘 다 받습니다.

한계 — 같은 프로세스 안에서만 #

BackgroundTasks의 작업은 현재 프로세스가 살아있어야 실행 됩니다. 즉:

  • 빠른 후처리 (수 초 이내)
  • 실패해도 재시도 안 해도 되는 작업
  • ❌ 무거운 영상 / 이미지 변환
  • ❌ 외부 API 호출 실패 시 재시도 필요
  • ❌ 서버 재시작 시 살아있어야 할 작업

이런 경우는 외부 큐가 필요합니다.

외부 큐 — 언제 도입? #

신호외부 큐 필요?
작업이 1초 미만아니, BackgroundTasks 충분
실패 시 재시도 정책 필요
작업 결과를 추적 / 조회해야
워커를 별도로 스케일하고 싶음
스케줄링 (매일 자정 작업 등)
우선순위 큐 필요

해당하면 별도 워커 인프라를 도입하세요.

선택지 #

도구평가
Celery가장 오래되고 널리 쓰임. 복잡함. Redis / RabbitMQ 백엔드
ARQasync 친화. Redis만 사용. 간단함
Dramatiq단순한 인터페이스. 백엔드 다양
RQ간단함. Redis. 동기 위주
TaskiqFastAPI 식 의존성 주입

비동기 코드 베이스에는 ARQ 또는 Taskiq가 자연스럽고, 큰 프로젝트는 Celery가 검증된 표준입니다.

ARQ — async 큐 미니 가이드 #

가장 간단한 도입을 ARQ로 보겠습니다.

설치
uv add arq
app/tasks/worker.py
from arq.connections import RedisSettings
from app.core.config import settings

async def send_email(ctx, to: str, subject: str, body: str):
    print(f"[worker] 이메일 보냄: {to}")
    # 실제 전송 코드

class WorkerSettings:
    redis_settings = RedisSettings.from_dsn(settings.redis_url)
    functions = [send_email]
    max_jobs = 10
    job_timeout = 60
라우트에서 enqueue
from arq import create_pool
from arq.connections import RedisSettings

@router.post("/users")
async def create_user(payload: UserCreate, db: DBSession):
    user = await user_service.create_user(...)
    pool = await create_pool(RedisSettings.from_dsn(settings.redis_url))
    await pool.enqueue_job("send_email", user.email, "환영", "...")
    return user

워커 실행:

별도 프로세스로 워커
uv run arq app.tasks.worker.WorkerSettings

웹 서버와 워커가 별도 프로세스로 돕니다. 한쪽이 죽어도 다른 쪽은 살아있고, 워커만 따로 스케일 가능.

의존성 주입은 직접 #

ARQ는 FastAPI의 Depends가 동작하지 않습니다. 워커 함수 안에서 DB 세션 등을 직접 만들어야 합니다.

워커 안에서 DB
from app.db.session import AsyncSessionLocal

async def send_email(ctx, user_id: int):
    async with AsyncSessionLocal() as db:
        user = await db.get(User, user_id)
        # 발송 로직

스트리밍 응답 — 길게 보내기 #

파일 다운로드, 큰 JSON, Server-Sent Events (SSE)처럼 응답을 한 번에 만들지 말고 흘려보내고 싶을 때.

StreamingResponse
from fastapi.responses import StreamingResponse

@router.get("/csv")
async def export_csv():
    async def generate():
        yield "id,title\n"
        async for row in stream_rows():
            yield f"{row.id},{row.title}\n"

    return StreamingResponse(generate(), media_type="text/csv")

11장 이터러블, 제너레이터, yield from의 async generator가 그대로 동작합니다. 메모리에 다 안 올리고 흘려보냅니다.

Server-Sent Events (SSE) #

SSE — 실시간 알림
@router.get("/events")
async def events():
    async def event_stream():
        for i in range(100):
            yield f"data: tick {i}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
    )

브라우저의 EventSource가 자동으로 받습니다. WebSocket만큼 양방향은 아니지만, 서버 → 클라이언트 단방향 실시간에는 충분.

WebSocket #

양방향 실시간이 필요하면 WebSocket.

WebSocket 기본
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"[{room}] echo: {data}")
    except WebSocketDisconnect:
        print(f"클라이언트 연결 종료")

채팅, 실시간 협업, 게임 같은 경우에 사용합니다. 다만 상태 관리, 인증, 룸 분리 등이 추가로 필요한 영역이라 본격 도입은 별도 학습.

동시성 제한 — 외부 API 보호 #

18장 비동기 깊이에서 본 Semaphore가 여기서도 유용합니다. 한 라우트에서 외부 API 100개를 호출해야 한다면, 동시에 N 개까지만:

동시 호출 제한
import asyncio

_external_sem = asyncio.Semaphore(10)

async def call_external(url: str) -> dict:
    async with _external_sem:
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return response.json()

외부 API의 rate limit, 자기 서비스의 메모리 보호 모두에 도움이 됩니다.

타임아웃 — 무한 대기 방지 #

외부 호출 타임아웃
async with asyncio.timeout(5):
    response = await client.get(url)

외부 API가 응답하지 않을 때 요청이 무한정 매달리는 것을 막습니다. 14장 비동기 입문에서 본 도구.

httpx 자체에도 타임아웃 옵션:

httpx timeout
async with httpx.AsyncClient(timeout=10.0) as client:
    ...

라이프사이클 이벤트 — lifespan #

서버 시작 / 종료 시점에 자원을 준비 / 정리.

lifespan
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 시작 시
    print("서버 시작")
    redis = await create_redis_pool()
    app.state.redis = redis

    yield

    # 종료 시
    print("서버 종료")
    await redis.close()

app = FastAPI(lifespan=lifespan)

10장 컨텍스트 매니저@asynccontextmanager 패턴 그대로. 옛 @app.on_event("startup") 데코레이터는 deprecated.

자주 만나는 함정 #

1) BackgroundTasks의 예외 무시 #

🚫 무시되는 예외
def task_that_might_fail():
    raise SomethingWrong()    # 응답엔 영향 없지만, 로그만 남음

BackgroundTasks는 예외를 응답에 반영하지 않습니다. 작업 실패가 사용자에게 안 보입니다. 중요한 작업은 외부 큐 + 재시도가 안전.

2) 의존성에서 받은 DB 세션을 백그라운드로 넘김 #

🚫 세션 누수
@router.post("/")
async def create(db: DBSession, bg: BackgroundTasks):
    bg.add_task(use_db_later, db)   # ← 위험
    return ...

라우트가 끝나면 의존성 (DB 세션)이 닫힙니다. 백그라운드에서 그것을 쓰면 이미 닫힌 세션 사고. 백그라운드 작업은 세션을 새로 만들어 쓰세요.

async def use_db_later(user_id: int):
    async with AsyncSessionLocal() as db:
        ...

bg.add_task(use_db_later, user.id)

3) def 라우트에서 비동기 라이브러리 쓰기 #

🚫
@router.get("/")
def handler():
    asyncio.run(something_async())   # 이벤트 루프 중첩 — 에러

def 라우트 안에서 비동기 코드 실행이 필요하면 **라우트 자체를 async def**로 바꾸세요.

연습문제 #

  1. BackgroundTasksPOST /users 가입 후 환영 이메일을 백그라운드로 보내는 코드를 작성하세요 (실제 SMTP 대신 print로 시뮬레이션). 응답이 클라이언트에 먼저 도달하고 이메일 print가 뒤따르는지 시간 순서를 직접 관찰합니다.
  2. 일부러 BackgroundTasks 안의 작업이 예외를 던지도록 만든 뒤, 응답에 영향 없이 작업이 조용히 실패하는 것을 확인하세요. 그 작업을 외부 큐 (ARQ)로 옮기는 코드 변경을 시작합니다 (Redis가 없으면 ARQ 설치까지만).
  3. StreamingResponse와 async generator로 GET /csv 엔드포인트를 작성하세요. 1만 행의 더미 데이터를 메모리에 다 안 올리고 흘려보냅니다. curl -N으로 호출해 첫 바이트가 즉시 오는지 (스트리밍) 확인합니다.

한 줄 요약: async def는 이벤트 루프, def는 스레드 풀, FastAPI가 자동. async def 안 동기 블로킹은 금지 — 비동기 라이브러리 / to_thread / def로 풀기. BackgroundTasks는 응답 후 같은 프로세스 빠른 후처리, 외부 큐 (ARQ / Celery 등)는 재시도 / 추적 / 별도 스케일 필요할 때. StreamingResponse + async generator로 큰 응답 / SSE. Semaphore로 동시성 제한, asyncio.timeout으로 타임아웃, lifespan으로 시작 / 종료 자원. 함정은 백그라운드 예외 무시 / 닫힌 세션 / def-async 혼선.

다음 챕터 #

다음 28장 테스트와 배포 — pytest, Docker, Railway/Fly가 4부의 마지막입니다. pytest + httpx로 통합 테스트, 의존성 오버라이드로 외부 의존성 격리, 그리고 Docker와 클라우드 배포까지 함께 정리합니다.

X