Async in depth — event loop, gather/wait, async generator
The next step from the intermediate intro — how the event loop actually works, the difference between Future and Task, gather vs wait, async generators and async iteration.
In Chapter 14 async intro (asyncio) we covered the meaning of async/await along with asyncio.run, gather, and TaskGroup. This chapter is the next step. It covers how the event loop actually works, the difference between Future and Task, the finer difference between gather and wait, and async generators and async iteration.
The CPU-bound concurrency briefly mentioned at the end of this chapter is covered in earnest in the next, Chapter 19 GIL and concurrency — threading vs multiprocessing vs asyncio. And the async debugging / performance measurement in this chapter pairs with Chapter 21 performance — cProfile, py-spy, memory profiling.
Event loop — one level deeper #
The event loop runs only one coroutine at a time. It is not multi-threaded. So how does concurrency happen?
def run_loop(coros):
queue = list(coros)
while queue:
coro = queue.pop(0)
try:
# advance the coroutine to its next await
future = coro.send(None)
except StopIteration as e:
continue # coroutine finished
# when the future completes, put the coroutine back on the queue
future.add_done_callback(lambda _: queue.append(coro))Key points:
- Advance the coroutine one step with
send(None) - It runs until it hits an
awaitand stops - When the
Futureat that point completes, put the coroutine back on the queue - Move to the next coroutine (other coroutines may run in between)
The key to this picture is that await is a cooperative yield point. Other coroutines can only step in at points where a yield happens.
Code that doesn’t yield blocks concurrency #
async def slow():
total = 0
for i in range(10_000_000): # CPU-bound, no await
total += i
return total
async def main():
await asyncio.gather(slow(), slow(), slow()) # three concurrently?It looks like three running concurrently, but in reality it’s serial. There’s no await inside slow() so no yield happens. CPU-bound work has to be sent to to_thread or multiprocessing (Chapter 19 GIL and concurrency).
Future — a result to be filled in the future
#
A Future is an object representing “a result not yet decided”.
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
# fill the result after 1 second
loop.call_later(1, fut.set_result, "done")
result = await fut
print(result) # doneStates:
- pending — result undecided
- done — result decided (
fut.set_result(x)orfut.set_exception(e)) - cancelled — cancelled
You usually don’t create a Future directly. It’s a tool used inside libraries (retrieving results from thread pools, signals, etc.). Regular code handles it indirectly through Task.
Task — Future + coroutine
#
async def main():
task = asyncio.create_task(work()) # create a Task
print(isinstance(task, asyncio.Future)) # TrueTask is a subclass of Future. It wraps a coroutine and registers it with the event loop. Future is the generalized “promise of a result,” and Task is “a promise that a coroutine will fill.”
done, result, cancel on Task
#
task = asyncio.create_task(work())
await asyncio.sleep(0.1)
print(task.done()) # False / True
if task.done():
if task.exception():
print("failed:", task.exception())
else:
print("succeeded:", task.result())
task.cancel() # try to cancelKeep a reference to the Task — a trap #
async def main():
asyncio.create_task(background_work()) # result ignored
# ...If the reference to the Task object disappears, the garbage collector may clean up the Task. An accident where background work suddenly vanishes can occur. Always keep a reference somewhere.
async def main():
background_tasks = set()
task = asyncio.create_task(background_work())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
# ...TaskGroup solves this headache automatically — which is why it’s recommended in 3.11+.
gather and wait — the fine difference
#
gather — result list, order preserved
#
results = await asyncio.gather(
fetch("a"), fetch("b"), fetch("c"),
return_exceptions=False,
)
# results: [result of a, result of b, result of c] ← in call orderPass return_exceptions=True and exceptions come in as results — even if one task fails, you can receive results from the others.
results = await asyncio.gather(
fetch("a"),
fetch_might_fail("b"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print("failed:", r)
else:
print("succeeded:", r)wait — two sets done / pending, no order guarantee
#
done, pending = await asyncio.wait(
[task1, task2, task3],
return_when=asyncio.FIRST_COMPLETED,
)return_when options:
ALL_COMPLETED— default, until all are doneFIRST_COMPLETED— return as soon as one finishesFIRST_EXCEPTION— return as soon as any exception occurs
wait returns two sets done / pending instead of a result list. To get results, call task.result() directly.
When which? #
| Situation | Tool |
|---|---|
| Until all done, results in order | gather |
| Proceed when some are done / handle only the first result | wait |
| Process from the first finished onward | as_completed |
| New code, 3.11+ environment, cleanup guarantee | TaskGroup |
For new code, TaskGroup whenever possible. gather / wait only for legacy code / certain cases.
as_completed — process from the first finished
#
async def main():
coros = [fetch(u) for u in urls]
for coro in asyncio.as_completed(coros):
result = await coro
print("received:", result)Receive results in the order each job finishes. Use when you want to process fast without waiting for all to finish.
Integrating with sync functions — to_thread
#
Safely call a synchronous function that’s CPU-bound or blocking I/O from inside async code.
import asyncio
import requests # synchronous library
async def fetch(url):
response = await asyncio.to_thread(requests.get, url)
return response.textasyncio.to_thread(fn, *args) runs in a separate thread and wraps it in an awaitable form. Internally calls loop.run_in_executor.
run_in_executor — finer control
#
from concurrent.futures import ProcessPoolExecutor
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_function, arg)By specifying a ThreadPoolExecutor or ProcessPoolExecutor directly, you get finer control. Chapter 19 GIL and concurrency covers the criteria for choosing between the two.
Async generator — async yield #
async def stream_chunks(url):
async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
async def main():
async for chunk in stream_chunks("https://example.com"):
print(len(chunk))async def + yield = async generator. Calling it returns an async iterator. Fits well with streaming cases.
Sync vs async generator #
| sync generator | async generator | |
|---|---|---|
| Definition | def fn(): yield | async def fn(): yield |
| Consumption | for x in g: | async for x in g: |
await inside | ❌ | ✅ |
| sleep / io inside | ❌ (or sync) | ✅ |
Async iteration — __aiter__ / __anext__
#
When you want to build an async iterable directly.
class Counter:
def __init__(self, limit):
self.limit = limit
self.n = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.n >= self.limit:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.n += 1
return self.n
async def main():
async for i in Counter(3):
print(i)
# 1, 2, 3 at 0.1s intervalsMost of the time async generators are shorter and more intuitive, so the class form isn’t used much.
Async context manager — __aenter__ / __aexit__
#
A tool seen briefly in Chapter 10 context managers.
from contextlib import asynccontextmanager
@asynccontextmanager
async def open_db():
conn = await connect_async()
try:
yield conn
finally:
await conn.close()
async def main():
async with open_db() as conn:
await conn.execute(...)Concurrency patterns — common shapes #
1) Semaphore to limit concurrency #
sem = asyncio.Semaphore(10)
async def fetch_one(url):
async with sem:
return await fetch(url)
async def fetch_all(urls):
return await asyncio.gather(*[fetch_one(u) for u in urls])2) Producer / Consumer — Queue
#
queue = asyncio.Queue(maxsize=10)
async def producer():
for i in range(100):
await queue.put(i)
await queue.put(None) # termination signal
async def consumer():
while True:
item = await queue.get()
if item is None:
break
process(item)
queue.task_done()
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
tg.create_task(consumer())Queue handles backpressure between producer / consumer naturally (when the queue fills, the producer automatically waits). Chapter 27 async and background jobs covers this pattern again.
3) Take only the first result and cancel the rest #
async def main():
tasks = [
asyncio.create_task(fetch_a()),
asyncio.create_task(fetch_b()),
]
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for t in pending:
t.cancel()
return done.pop().result()A pattern for firing the same request to multiple backends and picking whichever responds first.
4) Timeout #
async def main():
try:
async with asyncio.timeout(5):
data = await slow_op()
except TimeoutError:
print("over 5 seconds")The same one seen in Chapter 14. Standard in 3.11+.
Cancellation — a deep topic #
When a coroutine is cancelled, CancelledError is raised at the await point.
async def work():
try:
await long_operation()
except asyncio.CancelledError:
await cleanup() # cleanup can also be async
raise # ← must re-raise!Important: if you catch CancelledError and don’t re-raise — the task is treated as having completed normally, and cleanup breaks. Always put a raise along with it.
shielding — block partial cancellation #
async def critical():
return await asyncio.shield(important_work())The part wrapped by shield isn’t affected by outside cancel. For cases like the final step of payment that mustn’t be interrupted.
Debugging — asyncio.run(debug=True)
#
PYTHONASYNCIODEBUG=1 python app.pyOr
asyncio.run(main(), debug=True)In debug mode:
- Warns about coroutines running too long without yielding (suspected blocking)
- Warns about missing
await - Warns when a task is GC’d with its result unused
Always keep it on during development. 90% of async bugs are caught here. Proper profiling is in Chapter 21 performance — cProfile, py-spy, memory profiling.
Exercises #
- Write a function that fetches 100 URLs concurrently while limiting concurrency to 10 with
Semaphore(10). Run the same job with concurrency 1 / 10 / 100 and compare times. Observe firsthand why going too high actually gets slower. - Write code using
asyncio.TaskGroupwhere producer / consumer coroutines share anasyncio.Queue(maxsize=10)and flow 100 items. Verify that backpressure works (when the queue fills, the producer waits automatically). - Run with
asyncio.run(main(), debug=True)and deliberately put heavy synchronous code (for i in range(10_000_000): ...) without yielding inside a coroutine. Check what warnings are printed.
In one line: The event loop runs only one coroutine at a time, with
awaitas the yield point. CPU-bound work has no yield and blocks concurrency →to_thread/ProcessPoolExecutor.Future⊂Task. BackgroundTasks must keep references.gather(result list) /wait(done-pending) /as_completed(in order); new code usesTaskGroup. async generator (async def+yield),async for,async with. CatchCancelledError, clean up, andraise. Block cancellation withshield.debug=Trueis essential during development.
Next chapter #
Next, Chapter 19 GIL and concurrency — threading vs multiprocessing vs asyncio covers CPU-bound concurrency briefly hinted at in this chapter — the identity of the GIL, threading vs multiprocessing vs asyncio, and the free-threaded builds of Python 3.13 ~ 3.14.