モダンPython中級 #7 非同期入門 (asyncio)

読了 7分

中級シリーズの最後 — 非同期 です。#4 イテラブルとジェネレータ で軽く見た send / yield メカニズムの上に作られたツールが asyncioasync/await です。今回はその 最初の一歩 を一ヶ所に整理します。

同期 vs 非同期 — 一枚の絵 #

同期 — 順次
import time

def fetch(url: str) -> str:
    time.sleep(1)             # ネットワーク応答待ち (1 秒)
    return f"data from {url}"

def main():
    a = fetch("a")            # 1 秒
    b = fetch("b")            # 1 秒
    c = fetch("c")            # 1 秒
    print(a, b, c)            # 合計 3 秒

main()

3 つのリクエストを順番に行います。それぞれ 1 秒ずつ、合計 3 秒

非同期で同じことをすると:

非同期 — 同時
import asyncio

async def fetch(url: str) -> str:
    await asyncio.sleep(1)
    return f"data from {url}"

async def main():
    results = await asyncio.gather(
        fetch("a"),
        fetch("b"),
        fetch("c"),
    )
    print(results)            # 合計 1 秒

asyncio.run(main())

3 つのリクエストが 同時に 進みます。合計 1 秒

これが非同期の価値です。CPU ではなく待機がボトルネックになるところ — ネットワーク、DB、ファイル — で並行性で時間を稼ぎます。

核心となる概念 — 一ヶ所に #

1) コルーチン関数とコルーチンオブジェクト #

用語
async def fetch(url):    # コルーチン関数 (coroutine function)
    return ...

c = fetch("a")            # c はコルーチンオブジェクト (coroutine object)
                          # 関数本体はまだ実行されていない!

async def で定義すると 呼び出しても本体を実行せずコルーチンオブジェクトを返します。ジェネレータ関数と同じパターンです。

コルーチンが実際に実行されるには イベントループ がそれを回さなければなりません。

2) await — 結果を待つ #

await
async def main():
    data = await fetch("a")    # fetch が終わるまで待つ
    print(data)

await がやること:

  • 式の値 (コルーチン、Task など) を イベントループに譲る
  • それが終わったら結果を受け取って次の行に進む
  • 譲っている間 他のコルーチンが実行されうる

awaitasync def の中でのみ 使えます。

3) イベントループ #

コルーチンたちを 交互に実行するスケジューラ。一度に一つのコルーチンだけ回しますが、それが await で譲ると別のコルーチンがそれを引き継ぎます。シングルスレッドで並行性を作るメカニズムです。

asyncio.run(coro) がイベントループを作って coro を最後まで回したあと後片付けしてくれます。プログラムのエントリポイントで一度だけ呼ぶ のが一般的です。

同時実行の二つの標準 — gatherTaskGroup #

asyncio.gather — クラシックな形 #

gather
async def main():
    a, b, c = await asyncio.gather(
        fetch("a"),
        fetch("b"),
        fetch("c"),
    )

gather は複数のコルーチンを同時に開始し、すべて終わったら結果をリストで 返します。昔からある標準ツールです。

TaskGroup (3.11+) — 新しい標準 #

TaskGroup
async def main():
    async with asyncio.TaskGroup() as tg:
        a = tg.create_task(fetch("a"))
        b = tg.create_task(fetch("b"))
        c = tg.create_task(fetch("c"))

    print(a.result(), b.result(), c.result())

同じ動作ですが 2 つの面でより安全です。

  1. 途中で例外が出ると他の task たちも cleanupgather はオプションがなければそのまま進む
  2. 複数の task が同時に失敗したら 基礎 #6ExceptionGroup でまとめてくれる

3.11+ の環境なら TaskGroup がより推奨される形 です。

Task — コルーチンをバックグラウンドへ #

コルーチンを 即座に開始してバックグラウンドで回したいとき Task にします。

create_task
async def main():
    task = asyncio.create_task(fetch("a"))
    # ここで task はすでに実行中

    other_work()              # 他の作業を進める

    result = await task        # 必要なときに結果を回収

コルーチンオブジェクト自体は 誰かが await してくれるまで回らない 単純な値です。Task で包まないとイベントループに登録されず実行が始まりません。

よくある罠 — 忘れてはいけないこと #

1) 同期関数の中で await は使えない #

🚫
def main():
    data = await fetch("a")    # ✗ SyntaxError

awaitasync def の中でのみ。エントリポイントは asyncio.run(...) で同期コードから非同期を開始。

2) time.sleep は非同期を止める #

🚫 同期 sleep
async def fetch(url):
    time.sleep(1)              # イベントループ全体が止まる
    return ...
✅ asyncio.sleep
async def fetch(url):
    await asyncio.sleep(1)
    return ...

time.sleepOS スレッド全体 を眠らせます。非同期の中で使うと並行性が壊れます。同期ブロッキング関数 (DB ドライバ、requests、ファイル I/O の一部) も同じ問題があります — 非同期用のライブラリ (asyncpghttpxaiofiles) を使う必要があります。

3) コルーチンを作って await しない #

🚫 回らないコルーチン
async def main():
    fetch("a")                # 作って await しない — 回らない
    print("done")

これだと警告が出ます (coroutine 'fetch' was never awaited)。コルーチンは値にすぎない ので、誰かが await または Task にしてくれないと絶対に回りません。

4) 同期コードから同期のように呼んではいけない #

🚫
async def fetch(url): ...

result = fetch("a")    # コルーチンオブジェクト。結果ではありません。

これを解くにはエントリポイントで asyncio.run(...)

同期コードと混ぜる #

同期関数が長くかかるなら — to_thread #

同期 → 非同期変換
import asyncio

def heavy_calc(n: int) -> int:
    # CPU バウンドでもブロッキング I/O でも
    ...

async def main():
    result = await asyncio.to_thread(heavy_calc, 100)

asyncio.to_thread(fn, *args)同期関数を別スレッドで実行 して結果を await 可能な形にしてくれます。非同期コードの中で同期ライブラリをやむを得ず使わなければならないときによく登場します。

非同期コンテキストマネージャー — async with #

#3 で軽く見たところです。

async with
import httpx

async def main():
    async with httpx.AsyncClient() as client:
        resp = await client.get("https://api.example.com")
        data = resp.json()

__enter__ / __exit__ の代わりに __aenter__ / __aexit__ が定義されたオブジェクト。@asynccontextmanager で関数から作ることもできます。

@asynccontextmanager
from contextlib import asynccontextmanager

@asynccontextmanager
async def db_transaction(conn):
    tx = await conn.begin()
    try:
        yield tx
        await tx.commit()
    except Exception:
        await tx.rollback()
        raise

非同期イテレータ — async for #

async for
async def stream_lines(url: str):
    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url) as resp:
            async for line in resp.aiter_lines():
                yield line

__iter__ / __next__ の代わりに __aiter__ / __anext__ — 非同期で次の値を受け取るパターン。ストリーミングによく登場します。

タイムアウト — asyncio.timeout (3.11+) #

timeout
async def main():
    try:
        async with asyncio.timeout(5):
            data = await fetch("https://slow.example.com")
    except TimeoutError:
        print("5 秒以内に受け取れず")

3.10 以前は asyncio.wait_for(coro, timeout=5) が標準でした。新コードは asyncio.timeout コンテキストマネージャーが推奨です。

よく出会うパターン集 #

同時 N 個に制限 — Semaphore #

並行性の制限
async def fetch_all(urls: list[str], concurrency: int = 10):
    sem = asyncio.Semaphore(concurrency)

    async def fetch_one(url):
        async with sem:
            return await fetch(url)

    return await asyncio.gather(*[fetch_one(u) for u in urls])

URL 1000 個を一度に同時リクエストするとサーバーが怒ったりメモリ / 接続が枯渇します。Semaphore で 同時に N 個まで に制限します。

最初に終わったものだけ受け取る — as_completed #

as_completed
async def race(tasks):
    for coro in asyncio.as_completed(tasks):
        result = await coro
        if result.is_winner:
            return result

複数の同時作業のうち 先に終わったものから 処理したいとき。

キャンセル #

cancel
task = asyncio.create_task(fetch("a"))
await asyncio.sleep(0.5)
task.cancel()    # CancelledError が task の中で投げられる

タイムアウト、ユーザーキャンセル、親のキャンセルなどがすべてこのメカニズムの上に作られています。非同期コードは キャンセルされうるという仮定 で書く方がよく、try/except CancelledError で後片付けコードを置く場面がよく登場します。

どこまでが入門か #

この記事は 概念紹介 までです。実戦では次のテーマがさらに加わります。

  • イベントループの内部動作とポリシー
  • Future vs Task の正確な違い
  • 同期 / 非同期ライブラリの選択と統合
  • パフォーマンスデバッグ (どこで await が長くなるか)
  • バックグラウンドジョブ、ワーカーパターン
  • Web フレームワーク (FastAPI、Starlette) との統合

これらは次のシリーズ モダンPython上級 の非同期の深さ編と、最後のシリーズ モダン Python 実戦 (FastAPI) で本格的に扱います。

まとめ #

今回押さえたこと:

  • 非同期は CPU ではなく待機がボトルネックになるところ のツール — I/O 並行性
  • async def = コルーチン関数、呼び出しだけでは回らず await または Task が必要
  • await はイベントループに譲り、結果を受け取ったら再開
  • asyncio.run(coro) — エントリポイントで一度
  • 同時実行: gather (旧標準)、TaskGroup (3.11+ 推奨)
  • create_task — バックグラウンドで即座に開始
  • 罠: time.sleep 禁止 (asyncio.sleep)、コルーチンを作って await しない、同期関数で await 禁止
  • 同期関数は asyncio.to_thread で変換
  • async withasync for — 非同期コンテキスト / イテレータ
  • asyncio.timeout (3.11+) — コンテキストマネージャーでタイムアウト
  • 並行性制限は Semaphore、最初の結果のみは as_completed、終了は cancel()

シリーズの振り返り #

7 編を経て モダン Python 中級 のツールボックスが満たされました。

  • データの形 — @dataclass__slots__
  • 型 — Generic、Protocol、TypedDict、Literal
  • リソース管理 — withcontextlib
  • 流れの表現 — イテラブル、ジェネレータ、yield from
  • 関数を包む — デコレータのあらゆるパターン
  • 分岐の表現 — match-case の深さ
  • 並行性 — asyncio 入門

次のシリーズ モダン Python 上級 はライブラリ / フレームワークで出会う深さ — マジックメソッド、ディスクリプタ、メタクラス、非同期の深さ、GIL / 並行性、typing 上級、パフォーマンス — に入ります。その上に最後の 実戦 (FastAPI) シリーズがツールを一つのプロジェクトに集めます。

X