모던 파이썬 고급 #4 비동기 깊이 — 이벤트 루프, gather/wait, async generator
중급 #7 비동기 입문에서 async/await의 의미와 asyncio.run, gather, TaskGroup까지 봤습니다. 이번 글은 그 다음 단계입니다. 이벤트 루프가 실제로 어떻게 동작하는지, Future와 Task의 차이, gather와 wait의 미세한 차이, async generator, async iteration까지를 정리합니다.
이벤트 루프 — 한 단계 깊이 #
이벤트 루프는 한 번에 하나의 코루틴만 실행합니다. 멀티스레드가 아닙니다. 그런데 어떻게 동시성이 생길까요?
def run_loop(coros):
queue = list(coros)
while queue:
coro = queue.pop(0)
try:
# 코루틴을 다음 await 까지 진행
future = coro.send(None)
except StopIteration as e:
continue # 코루틴 끝남
# future가 끝나면 코루틴을 큐에 다시 넣음
future.add_done_callback(lambda _: queue.append(coro))핵심:
- 코루틴을
send(None)으로 한 단계 진행시킵니다 await를 만나는 지점까지 실행되고 멈춥니다- 그 지점의
Future가 완료되면 코루틴을 다시 큐에 넣습니다 - 다음 코루틴으로 넘어갑니다 (다른 코루틴이 그 사이에 실행될 수 있습니다)
await가 협력적 양도점이라는 것이 이 그림의 핵심입니다. 양도가 일어나는 지점에서만 다른 코루틴이 끼어들 수 있습니다.
양도 안 되는 코드는 동시성을 막음 #
async def slow():
total = 0
for i in range(10_000_000): # CPU 바운드, await 없음
total += i
return total
async def main():
await asyncio.gather(slow(), slow(), slow()) # 세 개를 동시에?겉보기엔 셋이 동시에 도는 것 같지만 실제로는 직렬입니다. slow() 안에 await가 없어서 양도가 일어나지 않습니다. CPU 바운드는 to_thread 또는 multiprocessing으로 보내야 합니다 (#5).
Future — 미래에 채워질 결과
#
Future는 **“아직 결정되지 않은 결과”**를 표현하는 객체입니다.
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
# 1초 후에 결과를 채움
loop.call_later(1, fut.set_result, "done")
result = await fut
print(result) # done상태:
- pending — 결과 미결정
- done — 결과 결정 (
fut.set_result(x)또는fut.set_exception(e)) - cancelled — 취소됨
Future는 보통 직접 만들 일이 없습니다. 라이브러리 내부 (스레드 풀에서 결과 회수, 시그널 등)에서 쓰는 도구입니다. 일반 코드에서는 Task를 통해 간접적으로 만집니다.
Task — Future + 코루틴
#
async def main():
task = asyncio.create_task(work()) # Task 생성
print(isinstance(task, asyncio.Future)) # TrueTask는 Future의 서브클래스입니다. 코루틴을 감싸 이벤트 루프에 등록한 형태입니다. Future가 일반화된 “결과 약속"이고, Task는 “코루틴이 채울 약속"입니다.
Task의 done, result, cancel
#
task = asyncio.create_task(work())
await asyncio.sleep(0.1)
print(task.done()) # False / True
if task.done():
if task.exception():
print("실패:", task.exception())
else:
print("성공:", task.result())
task.cancel() # 취소 시도Task 참조 유지 — 함정 #
async def main():
asyncio.create_task(background_work()) # 결과 무시
# ...Task 객체에 대한 참조가 사라지면, 가비지 컬렉터가 Task를 정리할 수 있습니다. 백그라운드 작업이 갑자기 사라지는 사고가 발생합니다. 항상 어딘가에 참조를 유지하세요.
async def main():
background_tasks = set()
task = asyncio.create_task(background_work())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
# ...TaskGroup을 쓰면 이 골치가 자동으로 풀립니다. 그래서 3.11+에서 권장됩니다.
gather와 wait — 둘의 미세한 차이
#
gather — 결과 리스트, 순서 보존
#
results = await asyncio.gather(
fetch("a"), fetch("b"), fetch("c"),
return_exceptions=False,
)
# results: [a결과, b결과, c결과] ← 호출 순서대로return_exceptions=True 옵션을 주면 예외도 결과처럼 들어옵니다. 한 task가 실패해도 다른 task 결과를 받을 수 있습니다.
results = await asyncio.gather(
fetch("a"),
fetch_might_fail("b"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print("실패:", r)
else:
print("성공:", r)wait — done/pending 두 셋, 순서 무보장
#
done, pending = await asyncio.wait(
[task1, task2, task3],
return_when=asyncio.FIRST_COMPLETED,
)return_when 옵션:
ALL_COMPLETED— 기본, 모두 끝날 때까지FIRST_COMPLETED— 하나라도 끝나면 반환FIRST_EXCEPTION— 예외 하나라도 발생하면 반환
wait는 결과 리스트가 아니라 done/pending 두 셋을 반환합니다. 결과를 받으려면 task.result()를 직접 부르세요.
언제 무엇을? #
| 상황 | 도구 |
|---|---|
| 모두 끝날 때까지, 결과 순서대로 | gather |
| 일부만 끝나면 진행 / 첫 결과만 처리 | wait |
| 첫 결과부터 차례로 처리 | as_completed |
| 새 코드, 3.11+ 환경, cleanup 보장 | TaskGroup |
새 코드는 가능한 한 TaskGroup을 씁니다. gather/wait는 옛 코드나 일부 경우에만 사용합니다.
as_completed — 첫 결과부터 처리
#
async def main():
coros = [fetch(u) for u in urls]
for coro in asyncio.as_completed(coros):
result = await coro
print("받음:", result)각 작업이 끝나는 순서대로 결과를 받습니다. 모두 끝날 때까지 기다리지 않고 빨리 처리하고 싶을 때 씁니다.
동기 함수와 통합 — to_thread
#
CPU 바운드 또는 블로킹 I/O 동기 함수를 비동기 안에서 안전하게 호출합니다.
import asyncio
import requests # 동기 라이브러리
async def fetch(url):
response = await asyncio.to_thread(requests.get, url)
return response.textasyncio.to_thread(fn, *args)가 별도 스레드에서 실행하고 await 가능한 형태로 감싸줍니다. 내부적으로 loop.run_in_executor를 호출합니다.
run_in_executor — 더 세밀
#
from concurrent.futures import ProcessPoolExecutor
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_function, arg)ThreadPoolExecutor 또는 ProcessPoolExecutor를 직접 지정해 더 세밀한 제어가 가능합니다.
Async generator — 비동기 yield #
async def stream_chunks(url):
async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
async def main():
async for chunk in stream_chunks("https://example.com"):
print(len(chunk))async def + yield = async generator. 호출 시 비동기 이터레이터를 반환합니다. 스트리밍 상황에 특히 잘 어울립니다.
동기 vs 비동기 generator #
| 동기 generator | async generator | |
|---|---|---|
| 정의 | def fn(): yield | async def fn(): yield |
| 소비 | for x in g: | async for x in g: |
| 안에서 await | ❌ | ✅ |
| 안에서 sleep/io | ❌ (또는 동기) | ✅ |
Async iteration — __aiter__ / __anext__
#
비동기 이터러블을 직접 만들고 싶을 때 사용합니다.
class Counter:
def __init__(self, limit):
self.limit = limit
self.n = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.n >= self.limit:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.n += 1
return self.n
async def main():
async for i in Counter(3):
print(i)
# 0.1초 간격으로 1, 2, 3대부분은 async generator가 더 짧고 직관적이라 클래스 형태는 잘 안 씁니다.
Async context manager — __aenter__ / __aexit__
#
중급 #3에서 짧게 본 도구입니다.
from contextlib import asynccontextmanager
@asynccontextmanager
async def open_db():
conn = await connect_async()
try:
yield conn
finally:
await conn.close()
async def main():
async with open_db() as conn:
await conn.execute(...)동시성 패턴 — 자주 만나는 형태 #
1) Semaphore로 동시성 제한 #
sem = asyncio.Semaphore(10)
async def fetch_one(url):
async with sem:
return await fetch(url)
async def fetch_all(urls):
return await asyncio.gather(*[fetch_one(u) for u in urls])2) Producer / Consumer — Queue
#
queue = asyncio.Queue(maxsize=10)
async def producer():
for i in range(100):
await queue.put(i)
await queue.put(None) # 종료 신호
async def consumer():
while True:
item = await queue.get()
if item is None:
break
process(item)
queue.task_done()
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
tg.create_task(consumer())Queue는 producer/consumer 간 백프레셔를 자연스럽게 처리해줍니다 (큐가 차면 producer가 자동 대기).
3) 첫 결과만 받고 나머지 취소 #
async def main():
tasks = [
asyncio.create_task(fetch_a()),
asyncio.create_task(fetch_b()),
]
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for t in pending:
t.cancel()
return done.pop().result()여러 백엔드에 같은 요청을 던지고 먼저 응답한 것을 채택하는 패턴입니다.
4) Timeout #
async def main():
try:
async with asyncio.timeout(5):
data = await slow_op()
except TimeoutError:
print("5초 초과")중급 #7에서 본 그것입니다. 3.11+ 표준입니다.
취소 — 깊은 주제 #
코루틴이 cancel되면 CancelledError가 await 지점에서 던져집니다.
async def work():
try:
await long_operation()
except asyncio.CancelledError:
await cleanup() # 정리는 비동기로도 가능
raise # ← 다시 던져야 함!중요: CancelledError를 잡고 다시 던지지 않으면 task가 정상 종료된 것으로 간주되어, 취소가 상위로 전파되지 않습니다. 항상 raise를 같이 두세요.
shielding — 일부만 취소 막기 #
async def critical():
return await asyncio.shield(important_work())shield로 감싼 부분은 외부 cancel의 영향을 받지 않습니다. 결제 마지막 단계처럼 중간에 끊어지면 안 되는 경우에 씁니다.
디버깅 — asyncio.run(debug=True)
#
PYTHONASYNCIODEBUG=1 python app.py또는
asyncio.run(main(), debug=True)디버그 모드에서:
- 양도 없이 너무 오래 도는 코루틴 경고 (블로킹 의심)
- 누락된
await경고 - task가 GC될 때 결과 미사용 경고
개발 중에는 항상 켜는 것이 좋습니다. 비동기 버그의 90%가 여기서 잡힙니다.
정리 #
이번 글에서 본 것:
- 이벤트 루프는 한 번에 한 코루틴만 —
await가 양도점 - 양도 없는 CPU 바운드는 동시성을 막음 →
to_thread/ProcessPoolExecutor Future(결과 약속) ⊂Task(코루틴 + Future)- 백그라운드 Task는 참조를 유지해야 GC 안 됨
gather(결과 리스트),wait(done/pending 셋),as_completed(순서대로),TaskGroup(cleanup 보장)to_thread/run_in_executor로 동기 함수 통합- async generator (
async def+yield),async for,async with - 패턴: Semaphore, Queue, first-wins, timeout
CancelledError는 잡고 정리 후 반드시raiseshield로 cancel 차단debug=True는 개발 중 필수
다음 글(#5 GIL과 동시성)에서는 이 글에서 짧게 비춘 CPU 바운드 동시성 — GIL의 정체, threading vs multiprocessing vs asyncio, 그리고 Python 3.13~3.14의 free-threaded 빌드까지 다룹니다.