Modern Python Advanced #4: Async in depth — event loop, gather/wait, async generator
In Intermediate #7 Async intro we saw the meaning of async/await, asyncio.run, gather, and TaskGroup. This post is the next step. We organize how the event loop actually works, the difference between Future and Task, the subtle differences between gather and wait, async generators, and async iteration.
Event loop — one level deeper #
The event loop runs only one coroutine at a time. It’s not multithreaded. So how does concurrency happen?
def run_loop(coros):
queue = list(coros)
while queue:
coro = queue.pop(0)
try:
# advance the coroutine to the next await
future = coro.send(None)
except StopIteration as e:
continue # coroutine finished
# when the future completes, put the coroutine back in the queue
future.add_done_callback(lambda _: queue.append(coro))Core:
- Advance the coroutine one step with
send(None) - Run until the next
await, then pause - When the
Futureat that point completes, put the coroutine back on the queue - Move to the next coroutine (others can run in between)
The key idea is that await is a cooperative yield point. Other coroutines can step in only where yielding happens.
Code without yields blocks concurrency #
async def slow():
total = 0
for i in range(10_000_000): # CPU-bound, no await
total += i
return total
async def main():
await asyncio.gather(slow(), slow(), slow()) # three concurrently?It looks like all three run together but it’s actually serial. There’s no await inside slow() — no yield happens. CPU-bound work should go through to_thread or multiprocessing (#5).
Future — a result that will be filled later
#
A Future is an object representing “a not-yet-decided result”.
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
# fill the result after 1 second
loop.call_later(1, fut.set_result, "done")
result = await fut
print(result) # doneStates:
- pending — result undecided
- done — result decided (
fut.set_result(x)orfut.set_exception(e)) - cancelled — cancelled
You usually don’t build Future yourself. It’s used inside libraries (collecting results from a thread pool, signals, etc.). In regular code you touch it indirectly through a Task.
Task — Future + coroutine
#
async def main():
task = asyncio.create_task(work()) # create Task
print(isinstance(task, asyncio.Future)) # TrueA Task is a subclass of Future. It wraps a coroutine and registers it with the event loop. Future is the generic “result promise”; Task is “a promise filled by a coroutine.”
Task’s done, result, cancel
#
task = asyncio.create_task(work())
await asyncio.sleep(0.1)
print(task.done()) # False / True
if task.done():
if task.exception():
print("실패:", task.exception())
else:
print("성공:", task.result())
task.cancel() # try to cancelKeep a reference to the Task — pitfall #
async def main():
asyncio.create_task(background_work()) # discard the result
# ...When the reference to the Task object disappears, the garbage collector may clean up the Task. The background task can suddenly vanish. Always keep a reference somewhere.
async def main():
background_tasks = set()
task = asyncio.create_task(background_work())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
# ...TaskGroup handles this automatically — that’s why it’s recommended in 3.11+.
gather and wait — the subtle differences
#
gather — list of results, order preserved
#
results = await asyncio.gather(
fetch("a"), fetch("b"), fetch("c"),
return_exceptions=False,
)
# results: [a result, b result, c result] ← in call orderWith return_exceptions=True, exceptions come back as results — even if one task fails, you can receive the others.
results = await asyncio.gather(
fetch("a"),
fetch_might_fail("b"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print("실패:", r)
else:
print("성공:", r)wait — two sets done/pending, no order guarantee
#
done, pending = await asyncio.wait(
[task1, task2, task3],
return_when=asyncio.FIRST_COMPLETED,
)return_when options:
ALL_COMPLETED— default, until everything finishesFIRST_COMPLETED— return as soon as one finishesFIRST_EXCEPTION— return as soon as any exception fires
wait returns two sets done/pending, not a result list. Call task.result() directly to get values.
When to use which? #
| Case | Tool |
|---|---|
| Until everything finishes, results in order | gather |
| Proceed once some finish / handle the first | wait |
| Process results as they finish | as_completed |
| New code, 3.11+, cleanup guaranteed | TaskGroup |
For new code, prefer TaskGroup whenever possible. gather/wait for legacy/special cases.
as_completed — process results as they arrive
#
async def main():
coros = [fetch(u) for u in urls]
for coro in asyncio.as_completed(coros):
result = await coro
print("받음:", result)Receive results in the order each finishes. For when you want to process quickly without waiting for all to finish.
Integrating sync functions — to_thread
#
Safely call CPU-bound or blocking I/O sync functions from inside async.
import asyncio
import requests # sync library
async def fetch(url):
response = await asyncio.to_thread(requests.get, url)
return response.textasyncio.to_thread(fn, *args) runs in a separate thread and wraps it in an awaitable. Internally it calls loop.run_in_executor.
run_in_executor — finer control
#
from concurrent.futures import ProcessPoolExecutor
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_function, arg)You can specify a ThreadPoolExecutor or ProcessPoolExecutor directly for finer control.
Async generator — async yield #
async def stream_chunks(url):
async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
async def main():
async for chunk in stream_chunks("https://example.com"):
print(len(chunk))async def + yield = async generator. Calling it returns an async iterator. Fits streaming very well.
Sync vs async generator #
| Sync generator | Async generator | |
|---|---|---|
| Definition | def fn(): yield | async def fn(): yield |
| Consume | for x in g: | async for x in g: |
| await inside | ❌ | ✅ |
| sleep/io inside | ❌ (or sync) | ✅ |
Async iteration — __aiter__ / __anext__
#
When you want to build an async iterable yourself.
class Counter:
def __init__(self, limit):
self.limit = limit
self.n = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.n >= self.limit:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.n += 1
return self.n
async def main():
async for i in Counter(3):
print(i)
# 1, 2, 3 at 0.1s intervalsFor most cases, async generators are shorter and more intuitive — class form is rarely used.
Async context manager — __aenter__ / __aexit__
#
Briefly seen in Intermediate #3.
from contextlib import asynccontextmanager
@asynccontextmanager
async def open_db():
conn = await connect_async()
try:
yield conn
finally:
await conn.close()
async def main():
async with open_db() as conn:
await conn.execute(...)Concurrency patterns — common cases #
1) Semaphore for concurrency limit #
sem = asyncio.Semaphore(10)
async def fetch_one(url):
async with sem:
return await fetch(url)
async def fetch_all(urls):
return await asyncio.gather(*[fetch_one(u) for u in urls])2) Producer / Consumer — Queue
#
queue = asyncio.Queue(maxsize=10)
async def producer():
for i in range(100):
await queue.put(i)
await queue.put(None) # termination signal
async def consumer():
while True:
item = await queue.get()
if item is None:
break
process(item)
queue.task_done()
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
tg.create_task(consumer())Queue naturally handles backpressure between producer and consumer (when the queue is full, producers wait automatically).
3) Take only the first result, cancel the rest #
async def main():
tasks = [
asyncio.create_task(fetch_a()),
asyncio.create_task(fetch_b()),
]
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for t in pending:
t.cancel()
return done.pop().result()A pattern for sending the same request to multiple backends and using whichever responds first.
4) Timeout #
async def main():
try:
async with asyncio.timeout(5):
data = await slow_op()
except TimeoutError:
print("5초 초과")The one we saw in Intermediate #7. 3.11+ standard.
Cancellation — a deep topic #
When a coroutine is cancelled, CancelledError is raised at the await.
async def work():
try:
await long_operation()
except asyncio.CancelledError:
await cleanup() # cleanup can also be async
raise # ← you must re-raise!Important: If you catch CancelledError and don’t re-raise, the task is treated as having completed normally and cleanup is severed. Always include raise.
Shielding — protect part from cancellation #
async def critical():
return await asyncio.shield(important_work())The part wrapped by shield is immune to outside cancellation. For places that mustn’t be interrupted, like the final step of a payment.
Debugging — asyncio.run(debug=True)
#
PYTHONASYNCIODEBUG=1 python app.pyOr:
asyncio.run(main(), debug=True)In debug mode:
- Warnings for coroutines that run too long without yielding (suspect blocking)
- Warnings for missing
await - Warnings for unused results when a Task is GC’d
Always turn this on during development. 90% of async bugs are caught here.
Wrap-up #
What this post covered:
- Event loop runs only one coroutine at a time —
awaitis the yield point - CPU-bound code without yields blocks concurrency → use
to_thread/ProcessPoolExecutor Future(result promise) ⊂Task(coroutine + Future)- Background Tasks must keep a reference to avoid GC
gather(result list),wait(done/pending sets),as_completed(in completion order),TaskGroup(cleanup guaranteed)- Integrate sync functions with
to_thread/run_in_executor - Async generator (
async def+yield),async for,async with - Patterns: Semaphore, Queue, first-wins, timeout
- Catch
CancelledError, do cleanup, alwaysraise shieldblocks cancellationdebug=Trueis essential during development
In the next post (#5 GIL and concurrency) we cover CPU-bound concurrency briefly mentioned here — the GIL, threading vs multiprocessing vs asyncio, and the free-threaded build of Python 3.13–3.14.