모던 파이썬 실전 #5 비동기와 백그라운드 작업

6 분 소요

중급 #7고급 #4에서 본 비동기가 FastAPI 안에서 어떻게 자연스럽게 풀리는지 — 그리고 응답 이후 처리해야 할 작업 (이메일, 무거운 변환, 외부 API)을 다루는 패턴들을 정리합니다.

async def vs def — FastAPI가 알아서 처리 #

FastAPI의 라우트는 둘 다 받습니다.

둘 다 가능
@router.get("/sync")
def sync_handler():
    ...

@router.get("/async")
async def async_handler():
    ...

차이:

  • async def — 이벤트 루프 위에서 직접 실행. await가 필요하면 양도 가능
  • def — 자동으로 스레드 풀로 보내져 실행. 이벤트 루프를 막지 않음

FastAPI가 자동으로 적절한 방식으로 처리 합니다. 그래서 동기 라이브러리를 쓰는 라우트는 그냥 def로 적어도 다른 요청을 막지 않습니다.

절대 하지 말 것 — async def 안에서 동기 블로킹 #

🚫 가장 큰 함정
@router.get("/items")
async def list_items():
    response = requests.get("https://api.example.com")   # 동기, 블로킹!
    return response.json()

async def 안에서 동기 블로킹 함수를 호출하면 — 이벤트 루프 전체가 멈춥니다. 다른 요청도 멈춥니다. 고급 #5에서 본 GIL + 비동기의 만남.

해결 두 가지:

✅ 1. 비동기 라이브러리
import httpx

@router.get("/items")
async def list_items():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")
    return response.json()
✅ 2. 동기를 스레드로
import asyncio
import requests

@router.get("/items")
async def list_items():
    response = await asyncio.to_thread(requests.get, "https://api.example.com")
    return response.json()

또는 그냥 def로 적기 — FastAPI가 알아서 스레드로 보냅니다.

✅ 3. def로 적기
@router.get("/items")
def list_items():
    response = requests.get("https://api.example.com")
    return response.json()

실전 가이드 — 언제 무엇? #

상황첫 시도
비동기 라이브러리만 사용 (httpx, asyncpg, redis-py async)async def
동기 라이브러리 (requests, psycopg2)만 사용def
둘이 섞임async def + to_thread로 동기 부분 감싸기
CPU 바운드 처리def 또는 async def + ProcessPoolExecutor

이 시리즈는 #3에서 SQLAlchemy 비동기를 썼으니 async def가 표준.

BackgroundTasks — 응답 후 처리 #

이메일 발송, 알림, 로그 같은 작업을 응답이 나간 뒤 백그라운드로 돌리고 싶을 때.

기본 사용
from fastapi import BackgroundTasks

def send_email_sync(to: str, subject: str, body: str):
    # 실제 이메일 전송 (smtplib 등)
    print(f"이메일 보냄: {to}")

@router.post("/users")
async def create_user(
    payload: UserCreate,
    background_tasks: BackgroundTasks,
    db: DBSession,
) -> UserOut:
    user = await user_service.create_user(db, payload.email, payload.password)
    background_tasks.add_task(
        send_email_sync, user.email, "환영", "가입을 환영합니다",
    )
    return user

흐름:

  1. 라우트 본문이 정상 종료
  2. 응답이 클라이언트에 전송됨
  3. 그 후 background_tasks의 함수들이 순서대로 실행

비동기 함수도 OK #

async task
async def send_email_async(to: str, subject: str, body: str):
    async with httpx.AsyncClient() as client:
        await client.post("https://email-api/", json={...})

background_tasks.add_task(send_email_async, user.email, "환영", "...")

add_task는 동기,비동기 둘 다 받습니다.

한계 — 같은 프로세스 안에서만 #

BackgroundTasks의 작업은 현재 프로세스가 살아있어야 실행 됩니다. 즉:

  • 빠른 후처리 (수 초 이내)
  • 실패해도 재시도 안 해도 되는 작업
  • ❌ 무거운 영상/이미지 변환
  • ❌ 외부 API 호출 실패 시 재시도 필요
  • ❌ 서버 재시작 시 살아있어야 할 작업

이런 경우는 외부 큐가 필요합니다.

외부 큐 — 언제 도입? #

신호외부 큐 필요?
작업이 1초 미만아니, BackgroundTasks 충분
실패 시 재시도 정책 필요
작업 결과를 추적/조회해야
워커를 별도로 스케일하고 싶음
스케줄링 (매일 자정 작업 등)
우선순위 큐 필요

해당하면 별도 워커 인프라를 도입하세요.

선택지 #

도구평가
Celery가장 오래되고 널리 쓰임. 복잡함. Redis/RabbitMQ 백엔드
ARQasync 친화. Redis만 사용. 간단함
Dramatiq단순한 인터페이스. 백엔드 다양
RQ간단함. Redis. 동기 위주
TaskiqFastAPI 식 의존성 주입

비동기 코드 베이스에는 ARQ 또는 Taskiq가 자연스럽고, 큰 프로젝트는 Celery가 검증된 표준입니다.

ARQ — async 큐 미니 가이드 #

가장 간단한 도입을 ARQ로 보겠습니다.

설치
uv add arq
app/tasks/worker.py
from arq.connections import RedisSettings
from app.core.config import settings

async def send_email(ctx, to: str, subject: str, body: str):
    print(f"[worker] 이메일 보냄: {to}")
    # 실제 전송 코드

class WorkerSettings:
    redis_settings = RedisSettings.from_dsn(settings.redis_url)
    functions = [send_email]
    max_jobs = 10
    job_timeout = 60
라우트에서 enqueue
from arq import create_pool
from arq.connections import RedisSettings

@router.post("/users")
async def create_user(payload: UserCreate, db: DBSession):
    user = await user_service.create_user(...)
    pool = await create_pool(RedisSettings.from_dsn(settings.redis_url))
    await pool.enqueue_job("send_email", user.email, "환영", "...")
    return user

워커 실행:

별도 프로세스로 워커
uv run arq app.tasks.worker.WorkerSettings

웹 서버와 워커가 별도 프로세스로 동작합니다. 한쪽이 죽어도 다른 쪽은 살아있고, 워커만 따로 스케일 가능.

의존성 주입은 직접 #

ARQ는 FastAPI의 Depends가 동작하지 않습니다. 워커 함수 안에서 DB 세션 등을 직접 만들어야 합니다.

워커 안에서 DB
from app.db.session import AsyncSessionLocal

async def send_email(ctx, user_id: int):
    async with AsyncSessionLocal() as db:
        user = await db.get(User, user_id)
        # 발송 로직

스트리밍 응답 — 길게 보내기 #

파일 다운로드, 큰 JSON, Server-Sent Events (SSE)처럼 응답을 한 번에 만들지 말고 흘려보내고 싶을 때.

StreamingResponse
from fastapi.responses import StreamingResponse

@router.get("/csv")
async def export_csv():
    async def generate():
        yield "id,title\n"
        async for row in stream_rows():
            yield f"{row.id},{row.title}\n"

    return StreamingResponse(generate(), media_type="text/csv")

중급 #4의 async generator가 그대로 동작합니다. 메모리에 다 안 올리고 흘려보냅니다.

Server-Sent Events (SSE) #

SSE — 실시간 알림
@router.get("/events")
async def events():
    async def event_stream():
        for i in range(100):
            yield f"data: tick {i}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
    )

브라우저의 EventSource가 자동으로 받습니다. WebSocket만큼 양방향은 아니지만, 서버 → 클라이언트 단방향 실시간 에는 충분.

WebSocket #

양방향 실시간이 필요하면 WebSocket.

WebSocket 기본
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"[{room}] echo: {data}")
    except WebSocketDisconnect:
        print(f"클라이언트 연결 종료")

채팅, 실시간 협업, 게임 같은 경우에 사용합니다. 다만 상태 관리, 인증, 룸 분리 등이 추가로 필요한 영역이라 본격 도입은 별도 학습.

동시성 제한 — 외부 API 보호 #

고급 #4에서 본 Semaphore가 여기서도 유용합니다. 한 라우트에서 외부 API 100개를 호출해야 한다면, 동시에 N개까지만:

동시 호출 제한
import asyncio

_external_sem = asyncio.Semaphore(10)

async def call_external(url: str) -> dict:
    async with _external_sem:
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return response.json()

외부 API의 rate limit, 자기 서비스의 메모리 보호 모두에 도움이 됩니다.

타임아웃 — 무한 대기 방지 #

외부 호출 타임아웃
async with asyncio.timeout(5):
    response = await client.get(url)

외부 API가 응답하지 않을 때 요청이 무한정 매달리는 것을 막습니다. 중급 #7에서 본 도구.

httpx 자체에도 타임아웃 옵션:

httpx timeout
async with httpx.AsyncClient(timeout=10.0) as client:
    ...

라이프사이클 이벤트 — lifespan #

서버 시작/종료 시점에 자원을 준비/정리.

lifespan
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 시작 시
    print("서버 시작")
    redis = await create_redis_pool()
    app.state.redis = redis

    yield

    # 종료 시
    print("서버 종료")
    await redis.close()

app = FastAPI(lifespan=lifespan)

중급 #3@asynccontextmanager 패턴 그대로. 옛 @app.on_event("startup") 데코레이터는 deprecated.

자주 만나는 함정 #

1) BackgroundTasks의 예외 무시 #

🚫 무시되는 예외
def task_that_might_fail():
    raise SomethingWrong()    # 응답엔 영향 없지만, 로그만 남음

BackgroundTasks는 예외를 응답에 반영하지 않습니다. 작업 실패가 사용자에게 안 보입니다. 중요한 작업은 외부 큐 + 재시도가 안전.

2) 의존성에서 받은 DB 세션을 백그라운드로 넘김 #

🚫 세션 누수
@router.post("/")
async def create(db: DBSession, bg: BackgroundTasks):
    bg.add_task(use_db_later, db)   # ← 위험
    return ...

라우트가 끝나면 의존성 (DB 세션)이 닫힙니다. 백그라운드에서 그것을 쓰면 이미 닫힌 세션 사고. 백그라운드 작업은 세션을 새로 만들어 쓰세요.

async def use_db_later(user_id: int):
    async with AsyncSessionLocal() as db:
        ...

bg.add_task(use_db_later, user.id)

3) def 라우트에서 비동기 라이브러리 쓰기 #

🚫
@router.get("/")
def handler():
    asyncio.run(something_async())   # 이벤트 루프 중첩 — 에러

def 라우트 안에서 비동기 코드 실행이 필요하면 **라우트 자체를 async def**로 바꾸세요.

정리 #

이번 글에서 잡은 것:

  • async def는 이벤트 루프, def는 스레드 풀 — FastAPI가 자동
  • async def 안에 동기 블로킹 = 이벤트 루프 정지
  • 풀기: 비동기 라이브러리 / to_thread / def로 바꾸기
  • BackgroundTasks — 응답 후 같은 프로세스에서 후처리, 빠른 일에 적합
  • 외부 큐 (ARQ/Celery/Dramatiq) — 재시도, 추적, 별도 스케일이 필요할 때
  • ARQ 미니 가이드 — async 친화, Redis만
  • StreamingResponse + async generator — 큰 응답, SSE
  • WebSocket으로 양방향 실시간
  • Semaphore로 동시성 제한, asyncio.timeout으로 타임아웃
  • **lifespan**으로 시작/종료 자원 관리 (@asynccontextmanager)
  • 함정: 백그라운드 예외 무시, 닫힌 세션 사용, def-async 혼선

다음 글(#6 테스트와 배포)이 시리즈의 마지막입니다. pytest + httpx로 통합 테스트, 의존성 오버라이드로 외부 의존성 격리, 그리고 Docker와 클라우드 배포까지 한곳에 정리합니다.

X