Contents
18 Chapter

Async in depth — event loop, gather/wait, async generator

The next step from the intermediate intro — how the event loop actually works, the difference between Future and Task, gather vs wait, async generators and async iteration.

In Chapter 14 async intro (asyncio) we covered the meaning of async/await along with asyncio.run, gather, and TaskGroup. This chapter is the next step. It covers how the event loop actually works, the difference between Future and Task, the finer difference between gather and wait, and async generators and async iteration.

The CPU-bound concurrency briefly mentioned at the end of this chapter is covered in earnest in the next, Chapter 19 GIL and concurrency — threading vs multiprocessing vs asyncio. And the async debugging / performance measurement in this chapter pairs with Chapter 21 performance — cProfile, py-spy, memory profiling.

Event loop — one level deeper #

The event loop runs only one coroutine at a time. It is not multi-threaded. So how does concurrency happen?

Essence of the event loop — pseudocode
def run_loop(coros):
    queue = list(coros)
    while queue:
        coro = queue.pop(0)
        try:
            # advance the coroutine to its next await
            future = coro.send(None)
        except StopIteration as e:
            continue   # coroutine finished
        # when the future completes, put the coroutine back on the queue
        future.add_done_callback(lambda _: queue.append(coro))

Key points:

  1. Advance the coroutine one step with send(None)
  2. It runs until it hits an await and stops
  3. When the Future at that point completes, put the coroutine back on the queue
  4. Move to the next coroutine (other coroutines may run in between)

The key to this picture is that await is a cooperative yield point. Other coroutines can only step in at points where a yield happens.

Code that doesn’t yield blocks concurrency #

🚫 No yield
async def slow():
    total = 0
    for i in range(10_000_000):    # CPU-bound, no await
        total += i
    return total

async def main():
    await asyncio.gather(slow(), slow(), slow())   # three concurrently?

It looks like three running concurrently, but in reality it’s serial. There’s no await inside slow() so no yield happens. CPU-bound work has to be sent to to_thread or multiprocessing (Chapter 19 GIL and concurrency).

Future — a result to be filled in the future #

A Future is an object representing “a result not yet decided”.

Future directly
async def main():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()

    # fill the result after 1 second
    loop.call_later(1, fut.set_result, "done")

    result = await fut
    print(result)   # done

States:

  • pending — result undecided
  • done — result decided (fut.set_result(x) or fut.set_exception(e))
  • cancelled — cancelled

You usually don’t create a Future directly. It’s a tool used inside libraries (retrieving results from thread pools, signals, etc.). Regular code handles it indirectly through Task.

TaskFuture + coroutine #

The identity of Task
async def main():
    task = asyncio.create_task(work())   # create a Task
    print(isinstance(task, asyncio.Future))   # True

Task is a subclass of Future. It wraps a coroutine and registers it with the event loop. Future is the generalized “promise of a result,” and Task is “a promise that a coroutine will fill.”

done, result, cancel on Task #

Task interface
task = asyncio.create_task(work())

await asyncio.sleep(0.1)
print(task.done())     # False / True

if task.done():
    if task.exception():
        print("failed:", task.exception())
    else:
        print("succeeded:", task.result())

task.cancel()           # try to cancel

Keep a reference to the Task — a trap #

🚫 A common trap
async def main():
    asyncio.create_task(background_work())   # result ignored
    # ...

If the reference to the Task object disappears, the garbage collector may clean up the Task. An accident where background work suddenly vanishes can occur. Always keep a reference somewhere.

✅ Keep a reference
async def main():
    background_tasks = set()

    task = asyncio.create_task(background_work())
    background_tasks.add(task)
    task.add_done_callback(background_tasks.discard)

    # ...

TaskGroup solves this headache automatically — which is why it’s recommended in 3.11+.

gather and wait — the fine difference #

gather — result list, order preserved #

gather
results = await asyncio.gather(
    fetch("a"), fetch("b"), fetch("c"),
    return_exceptions=False,
)
# results: [result of a, result of b, result of c]  ← in call order

Pass return_exceptions=True and exceptions come in as results — even if one task fails, you can receive results from the others.

Exceptions as results
results = await asyncio.gather(
    fetch("a"),
    fetch_might_fail("b"),
    return_exceptions=True,
)
for r in results:
    if isinstance(r, Exception):
        print("failed:", r)
    else:
        print("succeeded:", r)

wait — two sets done / pending, no order guarantee #

wait
done, pending = await asyncio.wait(
    [task1, task2, task3],
    return_when=asyncio.FIRST_COMPLETED,
)

return_when options:

  • ALL_COMPLETED — default, until all are done
  • FIRST_COMPLETED — return as soon as one finishes
  • FIRST_EXCEPTION — return as soon as any exception occurs

wait returns two sets done / pending instead of a result list. To get results, call task.result() directly.

When which? #

SituationTool
Until all done, results in ordergather
Proceed when some are done / handle only the first resultwait
Process from the first finished onwardas_completed
New code, 3.11+ environment, cleanup guaranteeTaskGroup

For new code, TaskGroup whenever possible. gather / wait only for legacy code / certain cases.

as_completed — process from the first finished #

as_completed
async def main():
    coros = [fetch(u) for u in urls]
    for coro in asyncio.as_completed(coros):
        result = await coro
        print("received:", result)

Receive results in the order each job finishes. Use when you want to process fast without waiting for all to finish.

Integrating with sync functions — to_thread #

Safely call a synchronous function that’s CPU-bound or blocking I/O from inside async code.

to_thread
import asyncio
import requests   # synchronous library

async def fetch(url):
    response = await asyncio.to_thread(requests.get, url)
    return response.text

asyncio.to_thread(fn, *args) runs in a separate thread and wraps it in an awaitable form. Internally calls loop.run_in_executor.

run_in_executor — finer control #

run_in_executor
from concurrent.futures import ProcessPoolExecutor

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_heavy_function, arg)

By specifying a ThreadPoolExecutor or ProcessPoolExecutor directly, you get finer control. Chapter 19 GIL and concurrency covers the criteria for choosing between the two.

Async generator — async yield #

async generator
async def stream_chunks(url):
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as resp:
            async for chunk in resp.aiter_bytes():
                yield chunk

async def main():
    async for chunk in stream_chunks("https://example.com"):
        print(len(chunk))

async def + yield = async generator. Calling it returns an async iterator. Fits well with streaming cases.

Sync vs async generator #

sync generatorasync generator
Definitiondef fn(): yieldasync def fn(): yield
Consumptionfor x in g:async for x in g:
await inside
sleep / io inside❌ (or sync)

Async iteration — __aiter__ / __anext__ #

When you want to build an async iterable directly.

Async iterator — class
class Counter:
    def __init__(self, limit):
        self.limit = limit
        self.n = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.n >= self.limit:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.n += 1
        return self.n

async def main():
    async for i in Counter(3):
        print(i)
# 1, 2, 3 at 0.1s intervals

Most of the time async generators are shorter and more intuitive, so the class form isn’t used much.

Async context manager — __aenter__ / __aexit__ #

A tool seen briefly in Chapter 10 context managers.

@asynccontextmanager
from contextlib import asynccontextmanager

@asynccontextmanager
async def open_db():
    conn = await connect_async()
    try:
        yield conn
    finally:
        await conn.close()

async def main():
    async with open_db() as conn:
        await conn.execute(...)

Concurrency patterns — common shapes #

1) Semaphore to limit concurrency #

N concurrent
sem = asyncio.Semaphore(10)

async def fetch_one(url):
    async with sem:
        return await fetch(url)

async def fetch_all(urls):
    return await asyncio.gather(*[fetch_one(u) for u in urls])

2) Producer / Consumer — Queue #

Queue-based pipeline
queue = asyncio.Queue(maxsize=10)

async def producer():
    for i in range(100):
        await queue.put(i)
    await queue.put(None)    # termination signal

async def consumer():
    while True:
        item = await queue.get()
        if item is None:
            break
        process(item)
        queue.task_done()

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer())
        tg.create_task(consumer())

Queue handles backpressure between producer / consumer naturally (when the queue fills, the producer automatically waits). Chapter 27 async and background jobs covers this pattern again.

3) Take only the first result and cancel the rest #

first wins
async def main():
    tasks = [
        asyncio.create_task(fetch_a()),
        asyncio.create_task(fetch_b()),
    ]
    done, pending = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED
    )
    for t in pending:
        t.cancel()
    return done.pop().result()

A pattern for firing the same request to multiple backends and picking whichever responds first.

4) Timeout #

timeout
async def main():
    try:
        async with asyncio.timeout(5):
            data = await slow_op()
    except TimeoutError:
        print("over 5 seconds")

The same one seen in Chapter 14. Standard in 3.11+.

Cancellation — a deep topic #

When a coroutine is cancelled, CancelledError is raised at the await point.

Handling cancel
async def work():
    try:
        await long_operation()
    except asyncio.CancelledError:
        await cleanup()    # cleanup can also be async
        raise              # ← must re-raise!

Important: if you catch CancelledError and don’t re-raise — the task is treated as having completed normally, and cleanup breaks. Always put a raise along with it.

shielding — block partial cancellation #

asyncio.shield
async def critical():
    return await asyncio.shield(important_work())

The part wrapped by shield isn’t affected by outside cancel. For cases like the final step of payment that mustn’t be interrupted.

Debugging — asyncio.run(debug=True) #

Debug mode
PYTHONASYNCIODEBUG=1 python app.py

Or

asyncio.run(main(), debug=True)

In debug mode:

  • Warns about coroutines running too long without yielding (suspected blocking)
  • Warns about missing await
  • Warns when a task is GC’d with its result unused

Always keep it on during development. 90% of async bugs are caught here. Proper profiling is in Chapter 21 performance — cProfile, py-spy, memory profiling.

Exercises #

  1. Write a function that fetches 100 URLs concurrently while limiting concurrency to 10 with Semaphore(10). Run the same job with concurrency 1 / 10 / 100 and compare times. Observe firsthand why going too high actually gets slower.
  2. Write code using asyncio.TaskGroup where producer / consumer coroutines share an asyncio.Queue(maxsize=10) and flow 100 items. Verify that backpressure works (when the queue fills, the producer waits automatically).
  3. Run with asyncio.run(main(), debug=True) and deliberately put heavy synchronous code (for i in range(10_000_000): ...) without yielding inside a coroutine. Check what warnings are printed.

In one line: The event loop runs only one coroutine at a time, with await as the yield point. CPU-bound work has no yield and blocks concurrency → to_thread / ProcessPoolExecutor. FutureTask. Background Tasks must keep references. gather (result list) / wait (done-pending) / as_completed (in order); new code uses TaskGroup. async generator (async def + yield), async for, async with. Catch CancelledError, clean up, and raise. Block cancellation with shield. debug=True is essential during development.

Next chapter #

Next, Chapter 19 GIL and concurrency — threading vs multiprocessing vs asyncio covers CPU-bound concurrency briefly hinted at in this chapter — the identity of the GIL, threading vs multiprocessing vs asyncio, and the free-threaded builds of Python 3.13 ~ 3.14.

X