비동기 깊이 — 이벤트 루프, gather/wait, async generator
중급 입문의 다음 단계 — 이벤트 루프의 실제 동작, Future와 Task의 차이, gather vs wait, async generator와 async iteration까지 정리합니다.
14장 비동기 입문 (asyncio)에서 async/await의 의미와 asyncio.run, gather, TaskGroup까지 봤습니다. 본 챕터는 그 다음 단계입니다. 이벤트 루프가 실제로 어떻게 동작하는지, Future와 Task의 차이, gather와 wait의 미세한 차이, async generator, async iteration 까지를 정리합니다.
본 챕터의 끝에서 잠깐 언급되는 CPU 바운드 동시성은 다음 19장 GIL과 동시성 — threading vs multiprocessing vs asyncio에서 본격적으로 다룹니다. 그리고 본 챕터의 비동기 디버깅 / 성능 측정은 21장 성능 — cProfile, py-spy, 메모리 프로파일링와 짝을 이룹니다.
이벤트 루프 — 한 단계 깊이 #
이벤트 루프는 한 번에 하나의 코루틴만 실행 합니다. 멀티스레드가 아닙니다. 그런데 어떻게 동시성이 생기나?
def run_loop(coros):
queue = list(coros)
while queue:
coro = queue.pop(0)
try:
# 코루틴을 다음 await 까지 진행
future = coro.send(None)
except StopIteration as e:
continue # 코루틴 끝남
# future 가 끝나면 코루틴을 큐에 다시 넣음
future.add_done_callback(lambda _: queue.append(coro))핵심:
- 코루틴을
send(None)으로 한 단계 진행 await가 만나는 지점까지 실행되고 멈춤- 그 지점의
Future가 완료되면 코루틴을 다시 큐에 넣음 - 다음 코루틴으로 넘어감 (다른 코루틴이 그 사이에 실행될 수 있음)
await가 협력적 양도점이라는 게 본 그림의 핵심입니다. 양도가 일어나는 지점에서만 다른 코루틴이 끼어들 수 있습니다.
양도 안 되는 코드는 동시성을 막음 #
async def slow():
total = 0
for i in range(10_000_000): # CPU 바운드, await 없음
total += i
return total
async def main():
await asyncio.gather(slow(), slow(), slow()) # 세 개를 동시에?겉보기엔 셋이 동시에 도는 것 같지만 실제로는 직렬입니다. slow() 안에 await가 없어서 양도가 일어나지 않습니다. CPU 바운드는 to_thread 또는 multiprocessing으로 보내야 합니다 (19장 GIL과 동시성).
Future — 미래에 채워질 결과
#
Future는 **“아직 결정되지 않은 결과”**를 표현하는 객체입니다.
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
# 1초 후에 결과를 채움
loop.call_later(1, fut.set_result, "done")
result = await fut
print(result) # done상태:
- pending — 결과 미결정
- done — 결과 결정 (
fut.set_result(x)또는fut.set_exception(e)) - cancelled — 취소됨
Future는 보통 직접 만들 일이 없습니다. 라이브러리 내부 (스레드 풀에서 결과 회수, 시그널 등)에서 쓰는 도구입니다. 일반 코드에서는 Task를 통해 간접적으로 만집니다.
Task — Future + 코루틴
#
async def main():
task = asyncio.create_task(work()) # Task 생성
print(isinstance(task, asyncio.Future)) # TrueTask는 Future의 서브클래스입니다. 코루틴을 감싸 이벤트 루프에 등록한 형태입니다. Future가 일반화된 “결과 약속"이고, Task는 “코루틴이 채울 약속”.
Task의 done, result, cancel
#
task = asyncio.create_task(work())
await asyncio.sleep(0.1)
print(task.done()) # False / True
if task.done():
if task.exception():
print("실패:", task.exception())
else:
print("성공:", task.result())
task.cancel() # 취소 시도Task 참조 유지 — 함정 #
async def main():
asyncio.create_task(background_work()) # 결과 무시
# ...Task 객체에 대한 참조가 사라지면, 가비지 컬렉터가 Task를 정리할 수 있습니다. 백그라운드 작업이 갑자기 사라지는 사고가 발생합니다. 항상 어딘가에 참조를 유지 하세요.
async def main():
background_tasks = set()
task = asyncio.create_task(background_work())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
# ...TaskGroup을 쓰면 본 골치가 자동으로 풀립니다 — 그래서 3.11+ 에서 권장됩니다.
gather와 wait — 둘의 미세한 차이
#
gather — 결과 리스트, 순서 보존
#
results = await asyncio.gather(
fetch("a"), fetch("b"), fetch("c"),
return_exceptions=False,
)
# results: [a 결과, b 결과, c 결과] ← 호출 순서대로return_exceptions=True 옵션을 주면 예외도 결과처럼 들어옵니다 — 한 task 실패해도 다른 task 결과를 받을 수 있습니다.
results = await asyncio.gather(
fetch("a"),
fetch_might_fail("b"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print("실패:", r)
else:
print("성공:", r)wait — done / pending 두 셋, 순서 무보장
#
done, pending = await asyncio.wait(
[task1, task2, task3],
return_when=asyncio.FIRST_COMPLETED,
)return_when 옵션:
ALL_COMPLETED— 기본, 모두 끝날 때까지FIRST_COMPLETED— 하나라도 끝나면 반환FIRST_EXCEPTION— 예외 하나라도 발생하면 반환
wait는 결과 리스트가 아니라 done / pending 두 셋을 반환합니다. 결과를 받으려면 task.result()를 직접 부르세요.
언제 무엇을? #
| 상황 | 도구 |
|---|---|
| 모두 끝날 때까지, 결과 순서대로 | gather |
| 일부만 끝나면 진행 / 첫 결과만 처리 | wait |
| 첫 결과부터 차례로 처리 | as_completed |
| 새 코드, 3.11+ 환경, cleanup 보장 | TaskGroup |
새 코드는 가능한 한 TaskGroup입니다. gather / wait는 옛 코드 / 일부 경우에만.
as_completed — 첫 결과부터 처리
#
async def main():
coros = [fetch(u) for u in urls]
for coro in asyncio.as_completed(coros):
result = await coro
print("받음:", result)각 작업이 끝나는 순서대로 결과를 받습니다. 모두 끝날 때까지 기다리지 않고 빨리 처리하고 싶을 때.
동기 함수와 통합 — to_thread
#
CPU 바운드 또는 블로킹 I/O 동기 함수를 비동기 안에서 안전하게 호출.
import asyncio
import requests # 동기 라이브러리
async def fetch(url):
response = await asyncio.to_thread(requests.get, url)
return response.textasyncio.to_thread(fn, *args)가 별도 스레드에서 실행 하고 await 가능한 형태로 감싸줍니다. 내부적으로 loop.run_in_executor를 호출.
run_in_executor — 더 세밀
#
from concurrent.futures import ProcessPoolExecutor
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_function, arg)ThreadPoolExecutor 또는 ProcessPoolExecutor를 직접 지정해 더 세밀한 제어가 가능합니다. 19장 GIL과 동시성에서 둘의 선택 기준을 다룹니다.
Async generator — 비동기 yield #
async def stream_chunks(url):
async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
async def main():
async for chunk in stream_chunks("https://example.com"):
print(len(chunk))async def + yield = async generator. 호출 시 비동기 이터레이터를 반환합니다. 스트리밍 경우에 잘 어울립니다.
동기 vs 비동기 generator #
| 동기 generator | async generator | |
|---|---|---|
| 정의 | def fn(): yield | async def fn(): yield |
| 소비 | for x in g: | async for x in g: |
| 안에서 await | ❌ | ✅ |
| 안에서 sleep / io | ❌ (또는 동기) | ✅ |
Async iteration — __aiter__ / __anext__
#
비동기 이터러블을 직접 만들고 싶을 때.
class Counter:
def __init__(self, limit):
self.limit = limit
self.n = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.n >= self.limit:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.n += 1
return self.n
async def main():
async for i in Counter(3):
print(i)
# 0.1초 간격으로 1, 2, 3대부분은 async generator가 더 짧고 직관적이라 클래스 형태는 잘 안 씁니다.
Async context manager — __aenter__ / __aexit__
#
10장 컨텍스트 매니저에서 짧게 본 도구.
from contextlib import asynccontextmanager
@asynccontextmanager
async def open_db():
conn = await connect_async()
try:
yield conn
finally:
await conn.close()
async def main():
async with open_db() as conn:
await conn.execute(...)동시성 패턴 — 자주 만나는 형태 #
1) Semaphore로 동시성 제한 #
sem = asyncio.Semaphore(10)
async def fetch_one(url):
async with sem:
return await fetch(url)
async def fetch_all(urls):
return await asyncio.gather(*[fetch_one(u) for u in urls])2) Producer / Consumer — Queue
#
queue = asyncio.Queue(maxsize=10)
async def producer():
for i in range(100):
await queue.put(i)
await queue.put(None) # 종료 신호
async def consumer():
while True:
item = await queue.get()
if item is None:
break
process(item)
queue.task_done()
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
tg.create_task(consumer())Queue는 producer / consumer 간 백프레셔를 자연스럽게 처리해줍니다 (큐가 차면 producer가 자동 대기). 27장 비동기와 백그라운드 작업에서 본 패턴을 다시 다룹니다.
3) 첫 결과만 받고 나머지 취소 #
async def main():
tasks = [
asyncio.create_task(fetch_a()),
asyncio.create_task(fetch_b()),
]
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for t in pending:
t.cancel()
return done.pop().result()여러 백엔드에 같은 요청을 던지고 먼저 응답한 것을 채택하는 패턴.
4) Timeout #
async def main():
try:
async with asyncio.timeout(5):
data = await slow_op()
except TimeoutError:
print("5초 초과")14장에서 본 그것. 3.11+ 표준.
취소 — 깊은 주제 #
코루틴이 cancel 되면 CancelledError가 await 지점에서 던져집니다.
async def work():
try:
await long_operation()
except asyncio.CancelledError:
await cleanup() # 정리는 비동기로도 가능
raise # ← 다시 던져야 함!중요: CancelledError를 잡고 다시 던지지 않으면 — task가 정상 종료된 것으로 간주되어 cleanup이 끊깁니다. 항상 raise를 같이 두세요.
shielding — 일부만 취소 막기 #
async def critical():
return await asyncio.shield(important_work())shield로 감싼 부분은 외부 cancel의 영향을 받지 않습니다. 결제 마지막 단계 같이 중간에 끊어지면 안 되는 경우.
디버깅 — asyncio.run(debug=True)
#
PYTHONASYNCIODEBUG=1 python app.py또는
asyncio.run(main(), debug=True)디버그 모드에서:
- 양도 없이 너무 오래 도는 코루틴 경고 (블로킹 의심)
- 누락된
await경고 - task가 GC 될 때 결과 미사용 경고
개발 중에는 항상 켜는 게 좋습니다. 비동기 버그의 90%가 여기서 잡힙니다. 정식 프로파일링은 21장 성능 — cProfile, py-spy, 메모리 프로파일링.
연습문제 #
- 100개 URL을 동시에 fetch 하되
Semaphore(10)으로 동시성을 10으로 제한하는 함수를 작성하세요. 같은 일을 동시성 1 / 10 / 100으로 실행해 시간을 비교합니다. 너무 높이면 왜 오히려 느려지는지 직접 관찰합니다. asyncio.TaskGroup으로 producer / consumer 두 코루틴이asyncio.Queue(maxsize=10)를 공유하며 100개 아이템을 흘리는 코드를 작성하세요. 큐가 차면 producer가 자동으로 대기하는 백프레셔가 동작하는지 확인합니다.asyncio.run(main(), debug=True)로 실행하고 일부러 양도 없는 무거운 동기 코드(for i in range(10_000_000): ...)를 코루틴 안에 두세요. 어떤 경고가 출력되는지 확인합니다.
한 줄 요약: 이벤트 루프는 한 번에 한 코루틴만,
await가 양도점. CPU 바운드는 양도가 없어 동시성을 막음 →to_thread/ProcessPoolExecutor.Future⊂Task. 백그라운드 Task는 참조 유지 필수.gather(결과 리스트) /wait(done-pending) /as_completed(순서대로), 새 코드는TaskGroup. async generator(async def+yield),async for,async with.CancelledError는 잡고 정리 후raise.shield로 취소 차단.debug=True는 개발 중 필수.
다음 챕터 #
다음 19장 GIL과 동시성 — threading vs multiprocessing vs asyncio에서는 본 챕터에서 짧게 비춘 CPU 바운드 동시성 — GIL의 정체, threading vs multiprocessing vs asyncio, 그리고 Python 3.13~3.14의 free-threaded 빌드까지 다룹니다.