Contents
14 Chapter

Asyncio intro

What async/await means, the event loop, asyncio.gather and TaskGroup, and mixing with sync code — your first steps into asyncio in one place.

The final chapter of Part 2. asyncio and async/await are tools built on top of the send / yield mechanism we briefly saw in Chapter 11 iterables, generators, yield from. This chapter gathers the first steps in one place.

This chapter lays out the model that underpins Chapter 18 asyncio in depth — event loop, gather/wait, async generator and Chapter 19 GIL and concurrency in Part 3. Almost every line of Part 4’s FastAPI sits on top of async def, so getting this chapter down helps with the entire back half of the book.

Sync vs async — one picture #

Sync — sequential
import time

def fetch(url: str) -> str:
    time.sleep(1)             # wait for network response (1 second)
    return f"data from {url}"

def main():
    a = fetch("a")            # 1 second
    b = fetch("b")            # 1 second
    c = fetch("c")            # 1 second
    print(a, b, c)            # total 3 seconds

main()

Three requests, one after another. One second each, 3 seconds total.

The same thing in async:

Async — concurrent
import asyncio

async def fetch(url: str) -> str:
    await asyncio.sleep(1)
    return f"data from {url}"

async def main():
    results = await asyncio.gather(
        fetch("a"),
        fetch("b"),
        fetch("c"),
    )
    print(results)            # total 1 second

asyncio.run(main())

Three requests proceed at the same time. Total 1 second.

That’s the value of async. When the bottleneck is waiting, not the CPU — network, DB, files — you buy time with concurrency. The CPU-bound case is compared with threading / multiprocessing in Chapter 19 GIL and concurrency.

Core concepts — all in one place #

1) Coroutine function and coroutine object #

Terminology
async def fetch(url):    # coroutine function
    return ...

c = fetch("a")            # c is a coroutine object
                          # the function body has NOT run yet!

Defining with async def makes the call return a coroutine object without executing the body. Same pattern as a generator function — exactly the pause / resume model we saw in Chapter 11.

For the coroutine to actually run, the event loop has to drive it.

2) await — wait for a result #

await
async def main():
    data = await fetch("a")    # wait for fetch to complete
    print(data)

What await does:

  • Yields control to the event loop with the expression’s value (coroutine, Task, etc.)
  • When that completes, takes the result and proceeds to the next line
  • Other coroutines may run while it yields

await is only usable inside async def.

3) The event loop #

A scheduler that runs coroutines in rotation. It runs only one coroutine at a time, but when one yields with await, another takes over execution. It’s the mechanism for concurrency on a single thread. The depth is in Chapter 18 asyncio in depth.

asyncio.run(coro) creates an event loop, runs coro to completion, and tears it down. Typically called once at the program entry point.

Two standard ways to run concurrently — gather and TaskGroup #

asyncio.gather — classic form #

gather
async def main():
    a, b, c = await asyncio.gather(
        fetch("a"),
        fetch("b"),
        fetch("c"),
    )

gather starts multiple coroutines concurrently and returns the results as a list once they all finish. The long-standing standard tool.

TaskGroup (3.11+) — the new standard #

TaskGroup
async def main():
    async with asyncio.TaskGroup() as tg:
        a = tg.create_task(fetch("a"))
        b = tg.create_task(fetch("b"))
        c = tg.create_task(fetch("c"))

    print(a.result(), b.result(), c.result())

Same behavior, but safer in two ways.

  1. If one task fails midway, the others get cleaned upgather (without options) just keeps going.
  2. If multiple tasks fail at once, they’re bundled into the ExceptionGroup from Chapter 6 exceptions.

On 3.11+, TaskGroup is the recommended form.

Task — running a coroutine in the background #

When you want to start a coroutine right away and let it run in the background, wrap it as a Task.

create_task
async def main():
    task = asyncio.create_task(fetch("a"))
    # task is already running here

    other_work()              # do other work

    result = await task        # collect the result when needed

A coroutine object on its own is just a plain value that doesn’t run until someone awaits it. You need to wrap it as a Task to register it with the event loop and start execution.

Common pitfalls — don’t forget #

1) Can’t await inside a sync function #

🚫
def main():
    data = await fetch("a")    # ✗ SyntaxError

await only inside async def. Start async from sync code at the entry point with asyncio.run(...).

2) time.sleep blocks async #

🚫 sync sleep
async def fetch(url):
    time.sleep(1)              # the entire event loop stops
    return ...
✅ asyncio.sleep
async def fetch(url):
    await asyncio.sleep(1)
    return ...

time.sleep puts the entire OS thread to sleep. Use it inside async and concurrency breaks. Other sync blocking functions (DB drivers, requests, some file I/O) have the same problem — you need async libraries (asyncpg, httpx, aiofiles).

3) Creating a coroutine without awaiting it #

🚫 Coroutine that doesn't run
async def main():
    fetch("a")                # created and not awaited — never runs
    print("done")

This raises a warning (coroutine 'fetch' was never awaited). A coroutine is just a value, so unless someone awaits it or wraps it as a Task, it never runs.

4) Don’t call async from sync as if it were sync #

🚫
async def fetch(url): ...

result = fetch("a")    # a coroutine object. Not the result.

To unwrap this, use asyncio.run(...) at the entry point.

Mixing with sync code #

When a sync function takes a long time — to_thread #

Sync → async conversion
import asyncio

def heavy_calc(n: int) -> int:
    # CPU-bound or blocking I/O
    ...

async def main():
    result = await asyncio.to_thread(heavy_calc, 100)

asyncio.to_thread(fn, *args) runs a sync function in a separate thread and makes the result awaitable. Comes up often when you have to use a sync library from async code.

Async context managers — async with #

We saw this briefly in Chapter 10 context managers.

async with
import httpx

async def main():
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://api.example.com")
        data = resp.json()

Objects with __aenter__ / __aexit__ instead of __enter__ / __exit__. You can also make them from a function with @asynccontextmanager.

@asynccontextmanager
from contextlib import asynccontextmanager

@asynccontextmanager
async def db_transaction(conn):
    tx = await conn.begin()
    try:
        yield tx
        await tx.commit()
    except Exception:
        await tx.rollback()
        raise

Async iterators — async for #

async for
async def stream_lines(url: str):
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as resp:
            async for line in resp.aiter_lines():
                yield line

__aiter__ / __anext__ instead of __iter__ / __next__ — the pattern for receiving the next value asynchronously. Common in streaming. Async generator depth is in Chapter 18 asyncio in depth.

Timeouts — asyncio.timeout (3.11+) #

timeout
async def main():
    try:
        async with asyncio.timeout(5):
            data = await fetch("https://slow.example.com")
    except TimeoutError:
        print("didn't make it in 5 seconds")

Before 3.10, asyncio.wait_for(coro, timeout=5) was the standard. New code prefers the asyncio.timeout context manager.

Common patterns #

Cap concurrency at N — Semaphore #

Concurrency limit
async def fetch_all(urls: list[str], concurrency: int = 10):
    sem = asyncio.Semaphore(concurrency)

    async def fetch_one(url):
        async with sem:
            return await fetch(url)

    return await asyncio.gather(*[fetch_one(u) for u in urls])

If you fire off 1000 URLs at once, the server gets angry or memory / connections run out. A Semaphore caps only N in flight at a time.

Take only the first to finish — as_completed #

as_completed
async def race(tasks):
    for coro in asyncio.as_completed(tasks):
        result = await coro
        if result.is_winner:
            return result

When you want to process multiple concurrent tasks as they finish, earliest first.

Cancellation #

cancel
task = asyncio.create_task(fetch("a"))
await asyncio.sleep(0.5)
task.cancel()    # CancelledError is raised inside the task

Timeouts, user cancellations, and parent cancellation are all built on this mechanism. Async code should assume it can be cancelled — the pattern of using try/except CancelledError for cleanup comes up often.

What counts as intro? #

This chapter goes up to concept introduction. The following come on top in practice:

  • The internals and policies of the event loop → Chapter 18 asyncio in depth
  • The exact difference between Future and Task → Chapter 18
  • Picking and integrating sync / async libraries → Chapter 19 GIL and concurrency
  • Performance debugging (where awaits get long) → Chapter 21 performance
  • Background tasks and worker patterns → Chapter 27 async and background jobs
  • Integration with web frameworks (FastAPI) → all of Part 4

Exercises #

  1. Assume async def fetch(url: str) -> str: returns a dummy result after await asyncio.sleep(1). Write two versions running three URLs: (1) sequentially with await and (2) concurrently with asyncio.gather. Measure actual elapsed time with time.perf_counter().
  2. Rewrite the same thing with asyncio.TaskGroup. Make one task deliberately raise an exception, and verify how the other tasks get cleaned up and how the final exception is bundled into an ExceptionGroup.
  3. Write a function that handles 100 URLs with concurrency capped at 5 via Semaphore(5). Vary the concurrency to 1 / 5 / 100 and compare total time. Form your own hypothesis about what value you’d pick in real operations.

In one line: Async is a concurrency tool for I/O where the bottleneck is waiting, not CPU. async def = coroutine function, await yields to the event loop. Entry point is asyncio.run. Concurrent execution: gather (old) / TaskGroup (3.11+ recommended). create_task for background, to_thread to convert sync functions, asyncio.timeout for timeouts, Semaphore to cap concurrency, cancel() for cancellation. Do not use time.sleep.

Wrapping up Part 2 #

Through seven chapters in Part 2, we’ve filled in the code-structuring tools.

  • Data shape — @dataclass, __slots__
  • Types — Generic, Protocol, TypedDict, Literal
  • Resource management — with, contextlib
  • Flow expression — iterables, generators, yield from
  • Wrapping functions — every pattern of decorators
  • Branch expression — match-case in depth
  • Concurrency — asyncio intro

Next is Part 3, Depth · Concurrency — the depths you meet inside libraries and frameworks: magic methods, descriptors, metaclasses, asyncio in depth, GIL / concurrency, advanced typing, and performance.

Next chapter #

In Chapter 15 magic methods in depth and protocols we cover every hook by which objects integrate with language features — __call__, __getitem__, __hash__, __format__, __getattr__, and so on. We go to the level of building those magic methods directly — the very ones that Chapter 8 dataclass was auto-generating for us.

X