목차
18 장

비동기 깊이 — 이벤트 루프, gather/wait, async generator

중급 입문의 다음 단계 — 이벤트 루프의 실제 동작, Future와 Task의 차이, gather vs wait, async generator와 async iteration까지 정리합니다.

14장 비동기 입문 (asyncio)에서 async/await의 의미와 asyncio.run, gather, TaskGroup까지 봤습니다. 본 챕터는 그 다음 단계입니다. 이벤트 루프가 실제로 어떻게 동작하는지, FutureTask의 차이, gatherwait의 미세한 차이, async generator, async iteration 까지를 정리합니다.

본 챕터의 끝에서 잠깐 언급되는 CPU 바운드 동시성은 다음 19장 GIL과 동시성 — threading vs multiprocessing vs asyncio에서 본격적으로 다룹니다. 그리고 본 챕터의 비동기 디버깅 / 성능 측정은 21장 성능 — cProfile, py-spy, 메모리 프로파일링와 짝을 이룹니다.

이벤트 루프 — 한 단계 깊이 #

이벤트 루프는 한 번에 하나의 코루틴만 실행 합니다. 멀티스레드가 아닙니다. 그런데 어떻게 동시성이 생기나?

이벤트 루프의 본질 — 의사 코드
def run_loop(coros):
    queue = list(coros)
    while queue:
        coro = queue.pop(0)
        try:
            # 코루틴을 다음 await 까지 진행
            future = coro.send(None)
        except StopIteration as e:
            continue   # 코루틴 끝남
        # future 가 끝나면 코루틴을 큐에 다시 넣음
        future.add_done_callback(lambda _: queue.append(coro))

핵심:

  1. 코루틴을 send(None)으로 한 단계 진행
  2. await가 만나는 지점까지 실행되고 멈춤
  3. 그 지점의 Future가 완료되면 코루틴을 다시 큐에 넣음
  4. 다음 코루틴으로 넘어감 (다른 코루틴이 그 사이에 실행될 수 있음)

await가 협력적 양도점이라는 게 본 그림의 핵심입니다. 양도가 일어나는 지점에서만 다른 코루틴이 끼어들 수 있습니다.

양도 안 되는 코드는 동시성을 막음 #

🚫 양도 없음
async def slow():
    total = 0
    for i in range(10_000_000):    # CPU 바운드, await 없음
        total += i
    return total

async def main():
    await asyncio.gather(slow(), slow(), slow())   # 세 개를 동시에?

겉보기엔 셋이 동시에 도는 것 같지만 실제로는 직렬입니다. slow() 안에 await가 없어서 양도가 일어나지 않습니다. CPU 바운드는 to_thread 또는 multiprocessing으로 보내야 합니다 (19장 GIL과 동시성).

Future — 미래에 채워질 결과 #

Future는 **“아직 결정되지 않은 결과”**를 표현하는 객체입니다.

Future 직접
async def main():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()

    # 1초 후에 결과를 채움
    loop.call_later(1, fut.set_result, "done")

    result = await fut
    print(result)   # done

상태:

  • pending — 결과 미결정
  • done — 결과 결정 (fut.set_result(x) 또는 fut.set_exception(e))
  • cancelled — 취소됨

Future는 보통 직접 만들 일이 없습니다. 라이브러리 내부 (스레드 풀에서 결과 회수, 시그널 등)에서 쓰는 도구입니다. 일반 코드에서는 Task를 통해 간접적으로 만집니다.

TaskFuture + 코루틴 #

Task의 정체
async def main():
    task = asyncio.create_task(work())   # Task 생성
    print(isinstance(task, asyncio.Future))   # True

TaskFuture의 서브클래스입니다. 코루틴을 감싸 이벤트 루프에 등록한 형태입니다. Future가 일반화된 “결과 약속"이고, Task는 “코루틴이 채울 약속”.

Task의 done, result, cancel #

Task 인터페이스
task = asyncio.create_task(work())

await asyncio.sleep(0.1)
print(task.done())     # False / True

if task.done():
    if task.exception():
        print("실패:", task.exception())
    else:
        print("성공:", task.result())

task.cancel()           # 취소 시도

Task 참조 유지 — 함정 #

🚫 자주 빠지는 함정
async def main():
    asyncio.create_task(background_work())   # 결과 무시
    # ...

Task 객체에 대한 참조가 사라지면, 가비지 컬렉터가 Task를 정리할 수 있습니다. 백그라운드 작업이 갑자기 사라지는 사고가 발생합니다. 항상 어딘가에 참조를 유지 하세요.

✅ 참조 유지
async def main():
    background_tasks = set()

    task = asyncio.create_task(background_work())
    background_tasks.add(task)
    task.add_done_callback(background_tasks.discard)

    # ...

TaskGroup을 쓰면 본 골치가 자동으로 풀립니다 — 그래서 3.11+ 에서 권장됩니다.

gatherwait — 둘의 미세한 차이 #

gather — 결과 리스트, 순서 보존 #

gather
results = await asyncio.gather(
    fetch("a"), fetch("b"), fetch("c"),
    return_exceptions=False,
)
# results: [a 결과, b 결과, c 결과]  ← 호출 순서대로

return_exceptions=True 옵션을 주면 예외도 결과처럼 들어옵니다 — 한 task 실패해도 다른 task 결과를 받을 수 있습니다.

예외도 결과로
results = await asyncio.gather(
    fetch("a"),
    fetch_might_fail("b"),
    return_exceptions=True,
)
for r in results:
    if isinstance(r, Exception):
        print("실패:", r)
    else:
        print("성공:", r)

wait — done / pending 두 셋, 순서 무보장 #

wait
done, pending = await asyncio.wait(
    [task1, task2, task3],
    return_when=asyncio.FIRST_COMPLETED,
)

return_when 옵션:

  • ALL_COMPLETED — 기본, 모두 끝날 때까지
  • FIRST_COMPLETED — 하나라도 끝나면 반환
  • FIRST_EXCEPTION — 예외 하나라도 발생하면 반환

wait결과 리스트가 아니라 done / pending 두 셋을 반환합니다. 결과를 받으려면 task.result()를 직접 부르세요.

언제 무엇을? #

상황도구
모두 끝날 때까지, 결과 순서대로gather
일부만 끝나면 진행 / 첫 결과만 처리wait
첫 결과부터 차례로 처리as_completed
새 코드, 3.11+ 환경, cleanup 보장TaskGroup

새 코드는 가능한 한 TaskGroup입니다. gather / wait는 옛 코드 / 일부 경우에만.

as_completed — 첫 결과부터 처리 #

as_completed
async def main():
    coros = [fetch(u) for u in urls]
    for coro in asyncio.as_completed(coros):
        result = await coro
        print("받음:", result)

각 작업이 끝나는 순서대로 결과를 받습니다. 모두 끝날 때까지 기다리지 않고 빨리 처리하고 싶을 때.

동기 함수와 통합 — to_thread #

CPU 바운드 또는 블로킹 I/O 동기 함수를 비동기 안에서 안전하게 호출.

to_thread
import asyncio
import requests   # 동기 라이브러리

async def fetch(url):
    response = await asyncio.to_thread(requests.get, url)
    return response.text

asyncio.to_thread(fn, *args)별도 스레드에서 실행 하고 await 가능한 형태로 감싸줍니다. 내부적으로 loop.run_in_executor를 호출.

run_in_executor — 더 세밀 #

run_in_executor
from concurrent.futures import ProcessPoolExecutor

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_heavy_function, arg)

ThreadPoolExecutor 또는 ProcessPoolExecutor를 직접 지정해 더 세밀한 제어가 가능합니다. 19장 GIL과 동시성에서 둘의 선택 기준을 다룹니다.

Async generator — 비동기 yield #

async generator
async def stream_chunks(url):
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as resp:
            async for chunk in resp.aiter_bytes():
                yield chunk

async def main():
    async for chunk in stream_chunks("https://example.com"):
        print(len(chunk))

async def + yield = async generator. 호출 시 비동기 이터레이터를 반환합니다. 스트리밍 경우에 잘 어울립니다.

동기 vs 비동기 generator #

동기 generatorasync generator
정의def fn(): yieldasync def fn(): yield
소비for x in g:async for x in g:
안에서 await
안에서 sleep / io❌ (또는 동기)

Async iteration — __aiter__ / __anext__ #

비동기 이터러블을 직접 만들고 싶을 때.

비동기 이터레이터 — 클래스
class Counter:
    def __init__(self, limit):
        self.limit = limit
        self.n = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.n >= self.limit:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.n += 1
        return self.n

async def main():
    async for i in Counter(3):
        print(i)
# 0.1초 간격으로 1, 2, 3

대부분은 async generator가 더 짧고 직관적이라 클래스 형태는 잘 안 씁니다.

Async context manager — __aenter__ / __aexit__ #

10장 컨텍스트 매니저에서 짧게 본 도구.

@asynccontextmanager
from contextlib import asynccontextmanager

@asynccontextmanager
async def open_db():
    conn = await connect_async()
    try:
        yield conn
    finally:
        await conn.close()

async def main():
    async with open_db() as conn:
        await conn.execute(...)

동시성 패턴 — 자주 만나는 형태 #

1) Semaphore로 동시성 제한 #

동시 N 개
sem = asyncio.Semaphore(10)

async def fetch_one(url):
    async with sem:
        return await fetch(url)

async def fetch_all(urls):
    return await asyncio.gather(*[fetch_one(u) for u in urls])

2) Producer / Consumer — Queue #

큐 기반 파이프라인
queue = asyncio.Queue(maxsize=10)

async def producer():
    for i in range(100):
        await queue.put(i)
    await queue.put(None)    # 종료 신호

async def consumer():
    while True:
        item = await queue.get()
        if item is None:
            break
        process(item)
        queue.task_done()

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer())
        tg.create_task(consumer())

Queue는 producer / consumer 간 백프레셔를 자연스럽게 처리해줍니다 (큐가 차면 producer가 자동 대기). 27장 비동기와 백그라운드 작업에서 본 패턴을 다시 다룹니다.

3) 첫 결과만 받고 나머지 취소 #

first wins
async def main():
    tasks = [
        asyncio.create_task(fetch_a()),
        asyncio.create_task(fetch_b()),
    ]
    done, pending = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED
    )
    for t in pending:
        t.cancel()
    return done.pop().result()

여러 백엔드에 같은 요청을 던지고 먼저 응답한 것을 채택하는 패턴.

4) Timeout #

timeout
async def main():
    try:
        async with asyncio.timeout(5):
            data = await slow_op()
    except TimeoutError:
        print("5초 초과")

14장에서 본 그것. 3.11+ 표준.

취소 — 깊은 주제 #

코루틴이 cancel 되면 CancelledError가 await 지점에서 던져집니다.

cancel 처리
async def work():
    try:
        await long_operation()
    except asyncio.CancelledError:
        await cleanup()    # 정리는 비동기로도 가능
        raise              # ← 다시 던져야 함!

중요: CancelledError를 잡고 다시 던지지 않으면 — task가 정상 종료된 것으로 간주되어 cleanup이 끊깁니다. 항상 raise를 같이 두세요.

shielding — 일부만 취소 막기 #

asyncio.shield
async def critical():
    return await asyncio.shield(important_work())

shield로 감싼 부분은 외부 cancel의 영향을 받지 않습니다. 결제 마지막 단계 같이 중간에 끊어지면 안 되는 경우.

디버깅 — asyncio.run(debug=True) #

디버그 모드
PYTHONASYNCIODEBUG=1 python app.py

또는

asyncio.run(main(), debug=True)

디버그 모드에서:

  • 양도 없이 너무 오래 도는 코루틴 경고 (블로킹 의심)
  • 누락된 await 경고
  • task가 GC 될 때 결과 미사용 경고

개발 중에는 항상 켜는 게 좋습니다. 비동기 버그의 90%가 여기서 잡힙니다. 정식 프로파일링은 21장 성능 — cProfile, py-spy, 메모리 프로파일링.

연습문제 #

  1. 100개 URL을 동시에 fetch 하되 Semaphore(10)으로 동시성을 10으로 제한하는 함수를 작성하세요. 같은 일을 동시성 1 / 10 / 100으로 실행해 시간을 비교합니다. 너무 높이면 왜 오히려 느려지는지 직접 관찰합니다.
  2. asyncio.TaskGroup으로 producer / consumer 두 코루틴이 asyncio.Queue(maxsize=10)를 공유하며 100개 아이템을 흘리는 코드를 작성하세요. 큐가 차면 producer가 자동으로 대기하는 백프레셔가 동작하는지 확인합니다.
  3. asyncio.run(main(), debug=True)로 실행하고 일부러 양도 없는 무거운 동기 코드(for i in range(10_000_000): ...)를 코루틴 안에 두세요. 어떤 경고가 출력되는지 확인합니다.

한 줄 요약: 이벤트 루프는 한 번에 한 코루틴만, await가 양도점. CPU 바운드는 양도가 없어 동시성을 막음 → to_thread / ProcessPoolExecutor. FutureTask. 백그라운드 Task는 참조 유지 필수. gather (결과 리스트) / wait (done-pending) / as_completed (순서대로), 새 코드는 TaskGroup. async generator(async def + yield), async for, async with. CancelledError는 잡고 정리 후 raise. shield로 취소 차단. debug=True는 개발 중 필수.

다음 챕터 #

다음 19장 GIL과 동시성 — threading vs multiprocessing vs asyncio에서는 본 챕터에서 짧게 비춘 CPU 바운드 동시성 — GIL의 정체, threading vs multiprocessing vs asyncio, 그리고 Python 3.13~3.14의 free-threaded 빌드까지 다룹니다.

X