Modern Python Advanced #4: Async in depth — event loop, gather/wait, async generator

4 min read

In Intermediate #7 Async intro we saw the meaning of async/await, asyncio.run, gather, and TaskGroup. This post is the next step. We organize how the event loop actually works, the difference between Future and Task, the subtle differences between gather and wait, async generators, and async iteration.

Event loop — one level deeper #

The event loop runs only one coroutine at a time. It’s not multithreaded. So how does concurrency happen?

The essence of the event loop — pseudocode
def run_loop(coros):
    queue = list(coros)
    while queue:
        coro = queue.pop(0)
        try:
            # advance the coroutine to the next await
            future = coro.send(None)
        except StopIteration as e:
            continue   # coroutine finished
        # when the future completes, put the coroutine back in the queue
        future.add_done_callback(lambda _: queue.append(coro))

Core:

  1. Advance the coroutine one step with send(None)
  2. Run until the next await, then pause
  3. When the Future at that point completes, put the coroutine back on the queue
  4. Move to the next coroutine (others can run in between)

The key idea is that await is a cooperative yield point. Other coroutines can step in only where yielding happens.

Code without yields blocks concurrency #

🚫 No yields
async def slow():
    total = 0
    for i in range(10_000_000):    # CPU-bound, no await
        total += i
    return total

async def main():
    await asyncio.gather(slow(), slow(), slow())   # three concurrently?

It looks like all three run together but it’s actually serial. There’s no await inside slow() — no yield happens. CPU-bound work should go through to_thread or multiprocessing (#5).

Future — a result that will be filled later #

A Future is an object representing “a not-yet-decided result”.

Future directly
async def main():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()

    # fill the result after 1 second
    loop.call_later(1, fut.set_result, "done")

    result = await fut
    print(result)   # done

States:

  • pending — result undecided
  • done — result decided (fut.set_result(x) or fut.set_exception(e))
  • cancelled — cancelled

You usually don’t build Future yourself. It’s used inside libraries (collecting results from a thread pool, signals, etc.). In regular code you touch it indirectly through a Task.

TaskFuture + coroutine #

What Task really is
async def main():
    task = asyncio.create_task(work())   # create Task
    print(isinstance(task, asyncio.Future))   # True

A Task is a subclass of Future. It wraps a coroutine and registers it with the event loop. Future is the generic “result promise”; Task is “a promise filled by a coroutine.”

Task’s done, result, cancel #

Task interface
task = asyncio.create_task(work())

await asyncio.sleep(0.1)
print(task.done())     # False / True

if task.done():
    if task.exception():
        print("실패:", task.exception())
    else:
        print("성공:", task.result())

task.cancel()           # try to cancel

Keep a reference to the Task — pitfall #

🚫 Common pitfall
async def main():
    asyncio.create_task(background_work())   # discard the result
    # ...

When the reference to the Task object disappears, the garbage collector may clean up the Task. The background task can suddenly vanish. Always keep a reference somewhere.

✅ Keep the reference
async def main():
    background_tasks = set()

    task = asyncio.create_task(background_work())
    background_tasks.add(task)
    task.add_done_callback(background_tasks.discard)

    # ...

TaskGroup handles this automatically — that’s why it’s recommended in 3.11+.

gather and wait — the subtle differences #

gather — list of results, order preserved #

gather
results = await asyncio.gather(
    fetch("a"), fetch("b"), fetch("c"),
    return_exceptions=False,
)
# results: [a result, b result, c result]  ← in call order

With return_exceptions=True, exceptions come back as results — even if one task fails, you can receive the others.

Exceptions as results
results = await asyncio.gather(
    fetch("a"),
    fetch_might_fail("b"),
    return_exceptions=True,
)
for r in results:
    if isinstance(r, Exception):
        print("실패:", r)
    else:
        print("성공:", r)

wait — two sets done/pending, no order guarantee #

wait
done, pending = await asyncio.wait(
    [task1, task2, task3],
    return_when=asyncio.FIRST_COMPLETED,
)

return_when options:

  • ALL_COMPLETED — default, until everything finishes
  • FIRST_COMPLETED — return as soon as one finishes
  • FIRST_EXCEPTION — return as soon as any exception fires

wait returns two sets done/pending, not a result list. Call task.result() directly to get values.

When to use which? #

CaseTool
Until everything finishes, results in ordergather
Proceed once some finish / handle the firstwait
Process results as they finishas_completed
New code, 3.11+, cleanup guaranteedTaskGroup

For new code, prefer TaskGroup whenever possible. gather/wait for legacy/special cases.

as_completed — process results as they arrive #

as_completed
async def main():
    coros = [fetch(u) for u in urls]
    for coro in asyncio.as_completed(coros):
        result = await coro
        print("받음:", result)

Receive results in the order each finishes. For when you want to process quickly without waiting for all to finish.

Integrating sync functions — to_thread #

Safely call CPU-bound or blocking I/O sync functions from inside async.

to_thread
import asyncio
import requests   # sync library

async def fetch(url):
    response = await asyncio.to_thread(requests.get, url)
    return response.text

asyncio.to_thread(fn, *args) runs in a separate thread and wraps it in an awaitable. Internally it calls loop.run_in_executor.

run_in_executor — finer control #

run_in_executor
from concurrent.futures import ProcessPoolExecutor

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_heavy_function, arg)

You can specify a ThreadPoolExecutor or ProcessPoolExecutor directly for finer control.

Async generator — async yield #

async generator
async def stream_chunks(url):
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as resp:
            async for chunk in resp.aiter_bytes():
                yield chunk

async def main():
    async for chunk in stream_chunks("https://example.com"):
        print(len(chunk))

async def + yield = async generator. Calling it returns an async iterator. Fits streaming very well.

Sync vs async generator #

Sync generatorAsync generator
Definitiondef fn(): yieldasync def fn(): yield
Consumefor x in g:async for x in g:
await inside
sleep/io inside❌ (or sync)

Async iteration — __aiter__ / __anext__ #

When you want to build an async iterable yourself.

Async iterator — class
class Counter:
    def __init__(self, limit):
        self.limit = limit
        self.n = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.n >= self.limit:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.n += 1
        return self.n

async def main():
    async for i in Counter(3):
        print(i)
# 1, 2, 3 at 0.1s intervals

For most cases, async generators are shorter and more intuitive — class form is rarely used.

Async context manager — __aenter__ / __aexit__ #

Briefly seen in Intermediate #3.

@asynccontextmanager
from contextlib import asynccontextmanager

@asynccontextmanager
async def open_db():
    conn = await connect_async()
    try:
        yield conn
    finally:
        await conn.close()

async def main():
    async with open_db() as conn:
        await conn.execute(...)

Concurrency patterns — common cases #

1) Semaphore for concurrency limit #

N concurrent
sem = asyncio.Semaphore(10)

async def fetch_one(url):
    async with sem:
        return await fetch(url)

async def fetch_all(urls):
    return await asyncio.gather(*[fetch_one(u) for u in urls])

2) Producer / Consumer — Queue #

Queue-based pipeline
queue = asyncio.Queue(maxsize=10)

async def producer():
    for i in range(100):
        await queue.put(i)
    await queue.put(None)    # termination signal

async def consumer():
    while True:
        item = await queue.get()
        if item is None:
            break
        process(item)
        queue.task_done()

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer())
        tg.create_task(consumer())

Queue naturally handles backpressure between producer and consumer (when the queue is full, producers wait automatically).

3) Take only the first result, cancel the rest #

first wins
async def main():
    tasks = [
        asyncio.create_task(fetch_a()),
        asyncio.create_task(fetch_b()),
    ]
    done, pending = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED
    )
    for t in pending:
        t.cancel()
    return done.pop().result()

A pattern for sending the same request to multiple backends and using whichever responds first.

4) Timeout #

timeout
async def main():
    try:
        async with asyncio.timeout(5):
            data = await slow_op()
    except TimeoutError:
        print("5초 초과")

The one we saw in Intermediate #7. 3.11+ standard.

Cancellation — a deep topic #

When a coroutine is cancelled, CancelledError is raised at the await.

Handling cancel
async def work():
    try:
        await long_operation()
    except asyncio.CancelledError:
        await cleanup()    # cleanup can also be async
        raise              # ← you must re-raise!

Important: If you catch CancelledError and don’t re-raise, the task is treated as having completed normally and cleanup is severed. Always include raise.

Shielding — protect part from cancellation #

asyncio.shield
async def critical():
    return await asyncio.shield(important_work())

The part wrapped by shield is immune to outside cancellation. For places that mustn’t be interrupted, like the final step of a payment.

Debugging — asyncio.run(debug=True) #

Debug mode
PYTHONASYNCIODEBUG=1 python app.py

Or:

asyncio.run(main(), debug=True)

In debug mode:

  • Warnings for coroutines that run too long without yielding (suspect blocking)
  • Warnings for missing await
  • Warnings for unused results when a Task is GC’d

Always turn this on during development. 90% of async bugs are caught here.

Wrap-up #

What this post covered:

  • Event loop runs only one coroutine at a timeawait is the yield point
  • CPU-bound code without yields blocks concurrency → use to_thread / ProcessPoolExecutor
  • Future (result promise) ⊂ Task (coroutine + Future)
  • Background Tasks must keep a reference to avoid GC
  • gather (result list), wait (done/pending sets), as_completed (in completion order), TaskGroup (cleanup guaranteed)
  • Integrate sync functions with to_thread / run_in_executor
  • Async generator (async def + yield), async for, async with
  • Patterns: Semaphore, Queue, first-wins, timeout
  • Catch CancelledError, do cleanup, always raise
  • shield blocks cancellation
  • debug=True is essential during development

In the next post (#5 GIL and concurrency) we cover CPU-bound concurrency briefly mentioned here — the GIL, threading vs multiprocessing vs asyncio, and the free-threaded build of Python 3.13–3.14.

X