目次
14 章

非同期入門 (asyncio)

async/await の意味、イベントループ、asyncio.gather と TaskGroup、同期コードとの混在まで、asyncio の最初の一歩を整理します。

2部の最後の章です。第11章 イテラブル、ジェネレータ、yield from で少し見た send / yield メカニズムの上に作られた道具が asyncioasync/await です。本章はその 最初の一歩 を整理します。

本章は3部の第18章 非同期深掘り — イベントループ、gather/wait、async generator と第19章 GIL と並行性 の前提となるモデルを敷きます。そして4部 FastAPI のほぼすべてのコードが async def の上に乗るので、本章をきちんと押さえておくことが本書の後半全体に役立ちます。

同期 vs 非同期 — 一枚の絵 #

同期 — 順次
import time

def fetch(url: str) -> str:
    time.sleep(1)             # ネットワーク応答を待つ (1 秒)
    return f"data from {url}"

def main():
    a = fetch("a")            # 1 秒
    b = fetch("b")            # 1 秒
    c = fetch("c")            # 1 秒
    print(a, b, c)            # 計 3 秒

main()

3つのリクエストを順番に行います。それぞれ1秒ずつ、計 3秒

非同期で同じことをすると:

非同期 — 同時
import asyncio

async def fetch(url: str) -> str:
    await asyncio.sleep(1)
    return f"data from {url}"

async def main():
    results = await asyncio.gather(
        fetch("a"),
        fetch("b"),
        fetch("c"),
    )
    print(results)            # 計 1 秒

asyncio.run(main())

3つのリクエストが 同時に 進みます。計 1秒

これが非同期の価値です。CPU ではなく待ち時間がボトルネックの場合 — ネットワーク、DB、ファイル — に並行性で時間を稼ぎます。CPU がボトルネックの場合は第19章 GIL と並行性threading / multiprocessing と比較します。

核心概念 — 一か所に #

1) コルーチン関数とコルーチンオブジェクト #

用語
async def fetch(url):    # コルーチン関数 (coroutine function)
    return ...

c = fetch("a")            # c はコルーチンオブジェクト (coroutine object)
                          # 関数本体はまだ実行されない!

async def で定義すると 呼び出し時に本体を実行せずコルーチンオブジェクトを返します。ジェネレータ関数と同じパターンです — 第11章で見た一時停止 / 再開モデルそのもの。

コルーチンが実際に実行されるには イベントループ がそれを回さなければなりません。

2) await — 結果を待つ #

await
async def main():
    data = await fetch("a")    # fetch が終わるまで待つ
    print(data)

await がする仕事:

  • 式の値(コルーチン、Task など)を イベントループに譲る
  • それが終わると結果を受け取り次の行に進む
  • 譲っている間 他のコルーチンが実行されうる

awaitasync def の中でのみ 使えます。

3) イベントループ #

コルーチンたちを 交互に実行するスケジューラ。一度に1つのコルーチンしか回しませんが、それが await で譲ると別のコルーチンが実行を引き継ぎます。単一スレッドで並行性を作るメカニズムです。深さは第18章 非同期深掘り で。

asyncio.run(coro) がイベントループを作り coro を最後まで回したあとに後片付けをしてくれます。プログラムのエントリポイントで一度だけ呼び出す のが一般的です。

同時実行の2つの標準 — gatherTaskGroup #

asyncio.gather — クラシックな形 #

gather
async def main():
    a, b, c = await asyncio.gather(
        fetch("a"),
        fetch("b"),
        fetch("c"),
    )

gather は複数のコルーチンを同時に開始し、すべて終わったら結果をリストで 渡します。昔からある標準ツールです。

TaskGroup (3.11+) — 新標準 #

TaskGroup
async def main():
    async with asyncio.TaskGroup() as tg:
        a = tg.create_task(fetch("a"))
        b = tg.create_task(fetch("b"))
        c = tg.create_task(fetch("c"))

    print(a.result(), b.result(), c.result())

同じ動作ですが2つの面でより安全です。

  1. 途中で例外が出ると他の task も cleanup されるgather はオプションなしだとそのまま進む
  2. 複数の task が同時に失敗すると 第6章 例外処理ExceptionGroup にまとめてくれる

3.11+ の環境なら TaskGroup がより推奨される形 です。

Task — コルーチンをバックグラウンドへ #

コルーチンを すぐに開始してバックグラウンドで回したいとき に Task にします。

create_task
async def main():
    task = asyncio.create_task(fetch("a"))
    # ここで task はすでに実行中

    other_work()              # 別の仕事を進める

    result = await task        # 必要なときに結果を回収

コルーチンオブジェクト自体は 誰かが await してくれるまで回らない ただの値です。Task で包むことでイベントループに登録されて実行が始まります。

よくある落とし穴 — 忘れてはいけないこと #

1) 同期関数の中で await は使えない #

🚫
def main():
    data = await fetch("a")    # ✗ SyntaxError

awaitasync def の中でのみ。エントリポイントは asyncio.run(...) で同期コードから非同期を開始。

2) time.sleep は非同期を止める #

🚫 同期 sleep
async def fetch(url):
    time.sleep(1)              # イベントループ全体が止まる
    return ...
✅ asyncio.sleep
async def fetch(url):
    await asyncio.sleep(1)
    return ...

time.sleepOS スレッド全体 を眠らせます。非同期の中で使うと並行性が崩れます。同期ブロッキング関数(DB ドライバ、requests、ファイル I/O の一部)も同じ問題があります — 非同期用のライブラリ (asyncpghttpxaiofiles) を使わなければなりません。

3) コルーチンを作って await しない #

🚫 回らないコルーチン
async def main():
    fetch("a")                # 作って await しない — 回らない
    print("done")

これをやると警告が出ます (coroutine 'fetch' was never awaited)。コルーチンは値にすぎない ので、誰かが await または Task で作ってくれなければ絶対に回りません。

4) 同期コードから同期のように呼び出してはいけない #

🚫
async def fetch(url): ...

result = fetch("a")    # コルーチンオブジェクト。結果ではない。

これを解消するにはエントリポイントで asyncio.run(...)

同期コードと混ぜる #

同期関数が長くかかるなら — to_thread #

同期 → 非同期変換
import asyncio

def heavy_calc(n: int) -> int:
    # CPU バウンドでもブロッキング I/O でも
    ...

async def main():
    result = await asyncio.to_thread(heavy_calc, 100)

asyncio.to_thread(fn, *args)同期関数を別スレッドで実行 し、結果を await 可能な形にしてくれます。非同期コードの中で同期ライブラリをやむなく使わなければならないときによく登場。

非同期コンテキストマネージャ — async with #

第10章 コンテキストマネージャ で短く見た道具です。

async with
import httpx

async def main():
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://api.example.com")
        data = resp.json()

__enter__ / __exit__ の代わりに __aenter__ / __aexit__ が定義されたオブジェクト。@asynccontextmanager で関数からも作れます。

@asynccontextmanager
from contextlib import asynccontextmanager

@asynccontextmanager
async def db_transaction(conn):
    tx = await conn.begin()
    try:
        yield tx
        await tx.commit()
    except Exception:
        await tx.rollback()
        raise

非同期イテレータ — async for #

async for
async def stream_lines(url: str):
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as resp:
            async for line in resp.aiter_lines():
                yield line

__iter__ / __next__ の代わりに __aiter__ / __anext__ — 非同期で次の値を受け取るパターン。ストリーミングによく登場します。async generator の深さは第18章 非同期深掘り で。

タイムアウト — asyncio.timeout (3.11+) #

timeout
async def main():
    try:
        async with asyncio.timeout(5):
            data = await fetch("https://slow.example.com")
    except TimeoutError:
        print("5 秒以内に受け取れず")

3.10 以前は asyncio.wait_for(coro, timeout=5) が標準でした。新コードは asyncio.timeout のコンテキストマネージャが推奨されます。

よく出会うパターン集 #

同時 N 個に制限 — Semaphore #

並行性制限
async def fetch_all(urls: list[str], concurrency: int = 10):
    sem = asyncio.Semaphore(concurrency)

    async def fetch_one(url):
        async with sem:
            return await fetch(url)

    return await asyncio.gather(*[fetch_one(u) for u in urls])

URL 1000個を一度に同時リクエストするとサーバーが怒るか、メモリ / コネクションが枯渇します。Semaphore で 同時に N 個まで 流れるように止めます。

最初に終わったものだけ受け取る — as_completed #

as_completed
async def race(tasks):
    for coro in asyncio.as_completed(tasks):
        result = await coro
        if result.is_winner:
            return result

複数の同時作業の中で 先に終わったものから 処理したいとき。

キャンセル #

cancel
task = asyncio.create_task(fetch("a"))
await asyncio.sleep(0.5)
task.cancel()    # CancelledError が task の中で投げられる

タイムアウト、ユーザーキャンセル、親キャンセルなどがすべて本メカニズムの上に作られます。非同期コードは キャンセルされうるという前提 で書く方がよいです — try/except CancelledError で整理コードを置くパターンがよく登場します。

どこまでが入門か #

本章は 概念紹介 までです。実戦では次のテーマが追加で入ってきます。

  • イベントループの内部動作とポリシー → 第18章 非同期深掘り
  • Future vs Task の正確な違い → 第18章
  • 同期 / 非同期ライブラリの選択と統合 → 第19章 GIL と並行性
  • パフォーマンスのデバッグ(どこで await が長くなるか) → 第21章 パフォーマンス
  • バックグラウンドジョブ、ワーカーパターン → 第27章 非同期とバックグラウンドジョブ
  • ウェブフレームワーク (FastAPI) との統合 → 4部全体

練習問題 #

  1. async def fetch(url: str) -> str:await asyncio.sleep(1) 後にダミー結果を返すと仮定してください。3つの URL を (1) 順番に await する版と (2) asyncio.gather で同時に実行する版の2つを書いて、実際にかかる時間を time.perf_counter() で測定します。
  2. asyncio.TaskGroup で同じ仕事を書き直してください。1つの task でわざと例外を投げるようにして、残りの task がどのように cleanup されるか、最終的な例外が ExceptionGroup にまとめられるかを確認します。
  3. Semaphore(5) で並行性を5に制限して URL 100個を処理する関数を書いてください。並行性を 1 / 5 / 100 と変えながら総時間の変化を比較します。実運用ではどの値を選ぶか、自分なりに仮説を立ててみます。

一行まとめ: 非同期は CPU ではなく待ちがボトルネックの I/O 並行性ツール。async def = コルーチン関数、await がイベントループに譲る。エントリポイントは asyncio.run。同時実行は gather(旧) / TaskGroup(3.11+ 推奨)。create_task でバックグラウンド、to_thread で同期関数の変換、asyncio.timeout でタイムアウト、Semaphore で並行性制限、cancel() でキャンセル。time.sleep 禁止。

2 部のまとめ #

2部の7章を経て コード構造化の道具 が揃いました。

  • データの形 — @dataclass__slots__
  • 型 — Generic、Protocol、TypedDict、Literal
  • リソース管理 — withcontextlib
  • 流れの表現 — イテラブル、ジェネレータ、yield from
  • 関数を包む — デコレータのすべてのパターン
  • 分岐の表現 — match-case 深掘り
  • 並行性 — asyncio 入門

次の3部 深さ・並行性 は、ライブラリ / フレームワークで出会う深さ — マジックメソッド、ディスクリプタ、メタクラス、非同期深掘り、GIL / 並行性、typing 上級、パフォーマンス — に入っていきます。

次の章 #

次の 第15章 マジックメソッド深掘りとプロトコル では、オブジェクトが言語機能と統合されるすべてのフック — __call____getitem____hash____format____getattr__ などを扱います。第8章 dataclass が自動で作ってくれていたマジックメソッドを 直接作るレベル まで入っていきます。

X