モダンPython上級 #4 非同期の深さ — イベントループ、gather/wait、async generator

中級 #7 非同期入門async/await の意味と asyncio.rungatherTaskGroup まで見ました。今回はその次の段階です。イベントループが実際にどう動くのかFutureTask の違い、gatherwait の細かい違い、async generator、async iteration までを整理します。

イベントループ — もう一段の深さ #

イベントループは 一度に 1 つのコルーチンだけを実行 します。マルチスレッドではありません。それなのにどうやって並行性が生まれるのでしょうか?

イベントループの本質 — 擬似コード
def run_loop(coros):
    queue = list(coros)
    while queue:
        coro = queue.pop(0)
        try:
            # コルーチンを次の await まで進める
            future = coro.send(None)
        except StopIteration as e:
            continue   # コルーチン終了
        # future が終わったらコルーチンをキューに戻す
        future.add_done_callback(lambda _: queue.append(coro))

要点。

  1. コルーチンを send(None) で 1 段階進める
  2. await に到達した地点まで実行されて止まる
  3. その時点の Future が完了したら、コルーチンを再びキューに入れる
  4. 次のコルーチンに移る (他のコルーチンがその間に実行されることがある)

await が協調的な譲渡点 だというのがこの図のカギです。譲渡が起きるところでだけ他のコルーチンが割り込めます。

譲渡しないコードは並行性を妨げる #

🚫 譲渡なし
async def slow():
    total = 0
    for i in range(10_000_000):    # CPU バウンド、await なし
        total += i
    return total

async def main():
    await asyncio.gather(slow(), slow(), slow())   # 3 つを同時に?

見た目には 3 つが同時に回っているように見えますが、実際には直列 です。slow() の中に await がないので譲渡が起こりません。CPU バウンドは to_thread または multiprocessing に送らなければいけません (#5)。

Future — 未来に埋まる結果 #

Future「まだ決まっていない結果」 を表すオブジェクトです。

Future を直接
async def main():
    loop = asyncio.get_running_loop()
    fut = loop.create_future()

    # 1 秒後に結果を埋める
    loop.call_later(1, fut.set_result, "done")

    result = await fut
    print(result)   # done

状態。

  • pending — 結果未決
  • done — 結果が決まった (fut.set_result(x) または fut.set_exception(e))
  • cancelled — キャンセル済み

Future は通常 直接作ることはありません。 ライブラリ内部 (スレッドプールから結果を回収、シグナルなど) で使うものです。一般のコードでは Task を通じて間接的に触れます。

TaskFuture + コルーチン #

Task の正体
async def main():
    task = asyncio.create_task(work())   # Task の生成
    print(isinstance(task, asyncio.Future))   # True

TaskFuture のサブクラス です。コルーチンを包んでイベントループに登録した形です。Future が一般化された「結果の約束」で、Task は「コルーチンが埋める約束」。

Task の doneresultcancel #

Task のインタフェース
task = asyncio.create_task(work())

await asyncio.sleep(0.1)
print(task.done())     # False / True

if task.done():
    if task.exception():
        print("失敗:", task.exception())
    else:
        print("成功:", task.result())

task.cancel()           # キャンセル試行

Task 参照の保持 — 罠 #

🚫 よく陥る罠
async def main():
    asyncio.create_task(background_work())   # 結果を無視
    # ...

Task オブジェクトへの参照がなくなると、ガベージコレクタが Task を片付けてしまうことがあります。バックグラウンド作業が突然消える事故 が発生します。常にどこかで参照を保持 してください。

✅ 参照を保持
async def main():
    background_tasks = set()

    task = asyncio.create_task(background_work())
    background_tasks.add(task)
    task.add_done_callback(background_tasks.discard)

    # ...

TaskGroup を使えばこの厄介事は自動で解決します — だから 3.11+ では推奨されています。

gatherwait — 細かい違い #

gather — 結果のリスト、順序を保つ #

gather
results = await asyncio.gather(
    fetch("a"), fetch("b"), fetch("c"),
    return_exceptions=False,
)
# results: [a の結果, b の結果, c の結果]  ← 呼び出し順

return_exceptions=True オプションを与えると 例外も結果として 入ってきます — 1 つの task が失敗しても他の task の結果を受け取れます。

例外も結果として
results = await asyncio.gather(
    fetch("a"),
    fetch_might_fail("b"),
    return_exceptions=True,
)
for r in results:
    if isinstance(r, Exception):
        print("失敗:", r)
    else:
        print("成功:", r)

wait — done/pending の 2 つのセット、順序保証なし #

wait
done, pending = await asyncio.wait(
    [task1, task2, task3],
    return_when=asyncio.FIRST_COMPLETED,
)

return_when オプション。

  • ALL_COMPLETED — デフォルト、全部終わるまで
  • FIRST_COMPLETED — 1 つでも終わったら返す
  • FIRST_EXCEPTION — 例外が 1 つでも発生したら返す

wait結果のリストではなく done/pending の 2 つのセット を返します。結果を受け取るには task.result() を直接呼んでください。

いつどちらを? #

場面道具
全部終わるまで、結果を順序通りgather
一部だけ終わったら進める / 最初の結果だけ処理wait
最初の結果から順に処理as_completed
新しいコード、3.11+ 環境、cleanup を保証TaskGroup

新しいコードはできるだけ TaskGroup です。gather/wait は古いコード/一部の場面でだけ。

as_completed — 最初の結果から処理 #

as_completed
async def main():
    coros = [fetch(u) for u in urls]
    for coro in asyncio.as_completed(coros):
        result = await coro
        print("受信:", result)

各作業が終わる 順に 結果を受け取ります。全部終わるのを待たずに早めに処理したいとき。

同期関数との統合 — to_thread #

CPU バウンドまたはブロッキング I/O の同期関数を非同期の中で安全に呼び出す。

to_thread
import asyncio
import requests   # 同期ライブラリ

async def fetch(url):
    response = await asyncio.to_thread(requests.get, url)
    return response.text

asyncio.to_thread(fn, *args)別スレッドで実行 し、await 可能な形に包んでくれます。内部的には loop.run_in_executor を呼びます。

run_in_executor — より細かく #

run_in_executor
from concurrent.futures import ProcessPoolExecutor

async def main():
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_heavy_function, arg)

ThreadPoolExecutor または ProcessPoolExecutor を直接指定して、より細かい制御が可能です。

Async generator — 非同期 yield #

async generator
async def stream_chunks(url):
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as resp:
            async for chunk in resp.aiter_bytes():
                yield chunk

async def main():
    async for chunk in stream_chunks("https://example.com"):
        print(len(chunk))

async def + yield = async generator。呼び出し時に非同期イテレータを返します。ストリーミング の場面に本当によく合います。

同期 vs 非同期 generator #

同期 generatorasync generator
定義def fn(): yieldasync def fn(): yield
消費for x in g:async for x in g:
中で await
中で sleep/io❌ (または同期)

Async iteration — __aiter__ / __anext__ #

非同期イテラブルを直接作りたいとき。

非同期イテレータ — クラス
class Counter:
    def __init__(self, limit):
        self.limit = limit
        self.n = 0

    def __aiter__(self):
        return self

    async def __anext__(self):
        if self.n >= self.limit:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        self.n += 1
        return self.n

async def main():
    async for i in Counter(3):
        print(i)
# 0.1 秒間隔で 1, 2, 3

ほとんどは async generator の方が短く直感的なので、クラス形式はあまり使いません。

Async context manager — __aenter__ / __aexit__ #

中級 #3 で短く見たところ。

@asynccontextmanager
from contextlib import asynccontextmanager

@asynccontextmanager
async def open_db():
    conn = await connect_async()
    try:
        yield conn
    finally:
        await conn.close()

async def main():
    async with open_db() as conn:
        await conn.execute(...)

並行性パターン — よく出会う場面 #

1) Semaphore で並行性を制限 #

同時 N 個
sem = asyncio.Semaphore(10)

async def fetch_one(url):
    async with sem:
        return await fetch(url)

async def fetch_all(urls):
    return await asyncio.gather(*[fetch_one(u) for u in urls])

2) Producer / Consumer — Queue #

キューベースのパイプライン
queue = asyncio.Queue(maxsize=10)

async def producer():
    for i in range(100):
        await queue.put(i)
    await queue.put(None)    # 終了シグナル

async def consumer():
    while True:
        item = await queue.get()
        if item is None:
            break
        process(item)
        queue.task_done()

async def main():
    async with asyncio.TaskGroup() as tg:
        tg.create_task(producer())
        tg.create_task(consumer())

Queue は producer/consumer 間のバックプレッシャを自然に処理してくれます (キューが埋まると producer が自動で待機)。

3) 最初の結果だけ受け取って残りをキャンセル #

first wins
async def main():
    tasks = [
        asyncio.create_task(fetch_a()),
        asyncio.create_task(fetch_b()),
    ]
    done, pending = await asyncio.wait(
        tasks, return_when=asyncio.FIRST_COMPLETED
    )
    for t in pending:
        t.cancel()
    return done.pop().result()

複数のバックエンドに同じリクエストを投げ、最初に応答したものを採用するパターン。

4) Timeout #

timeout
async def main():
    try:
        async with asyncio.timeout(5):
            data = await slow_op()
    except TimeoutError:
        print("5 秒超過")

中級 #7 で見たそれ。3.11+ 標準。

キャンセル — 深いトピック #

コルーチンが cancel されると CancelledError が await の箇所で投げられます。

cancel の処理
async def work():
    try:
        await long_operation()
    except asyncio.CancelledError:
        await cleanup()    # 後片付けは非同期でも可能
        raise              # ← 必ず再送出!

重要: CancelledError を捕まえて再送出しないと — task が正常終了したと見なされて cleanup が断ち切られます。常に raise を一緒に 置いてください。

shielding — 一部だけキャンセルから守る #

asyncio.shield
async def critical():
    return await asyncio.shield(important_work())

shield で包んだ部分は 外部からの cancel の影響を受けません。 決済の最終段階のように、途中で切れてはいけない処理。

デバッグ — asyncio.run(debug=True) #

デバッグモード
PYTHONASYNCIODEBUG=1 python app.py

または

asyncio.run(main(), debug=True)

デバッグモードでは次のことが有効になります。

  • 譲渡なしで長すぎるコルーチンの警告 (ブロッキング疑い)
  • 抜けた await の警告
  • task が GC されるとき結果未使用の警告

開発中は常にオン が良いです。非同期バグの 90% がここで捕まります。

まとめ #

今回見たこと。

  • イベントループは 一度に 1 つのコルーチンだけawait が譲渡点
  • 譲渡のない CPU バウンドは並行性を妨げる → to_thread / ProcessPoolExecutor
  • Future (結果の約束) ⊂ Task (コルーチン + Future)
  • バックグラウンドの Task は 参照を保持 しないと GC される
  • gather (結果のリスト)、wait (done/pending のセット)、as_completed (順番に)、TaskGroup (cleanup 保証)
  • to_thread / run_in_executor で同期関数を統合
  • async generator (async def + yield)、async forasync with
  • パターン: Semaphore、Queue、first-wins、timeout
  • CancelledError は捕まえて後片付け後 必ず raise
  • shield で cancel をブロック
  • debug=True は開発中の必須

次回 (#5 GIL と並行性) では、今回短く触れた CPU バウンドの並行性 — GIL の正体、threading vs multiprocessing vs asyncio、そして Python 3.13〜3.14 の free-threaded ビルドまで扱います。

X