非同期の深さ — イベントループ、gather/wait、async generator
中級入門の次の段階として、イベントループの実際の動作、Future と Task の違い、gather vs wait、async generator と async iteration までまとめます。
第14章 非同期入門 (asyncio) で async/await の意味と asyncio.run、gather、TaskGroup まで見ました。本章はその続きとして、イベントループが実際にどう動くのか、Future と Task の違い、gather と wait の微妙な違い、async generator、async iteration までを整理します。
本章の終わりで簡単に言及する CPU バウンド並行性は、次の第19章 GIL と並行性 — threading vs multiprocessing vs asyncio で詳しく扱います。そして本章の非同期デバッグ / 性能測定は第21章 パフォーマンス — cProfile、py-spy、メモリプロファイリング と対をなします。
イベントループ — もう一段深く #
イベントループは 一度に 1 つのコルーチンだけを実行 します。マルチスレッドではありません。それなのにどうやって並行性が生まれるのか?
def run_loop(coros):
queue = list(coros)
while queue:
coro = queue.pop(0)
try:
# コルーチンを次の await まで進める
future = coro.send(None)
except StopIteration as e:
continue # コルーチン終了
# future が終わったらコルーチンをキューに戻す
future.add_done_callback(lambda _: queue.append(coro))ポイント。
- コルーチンを
send(None)で 1 段階進める awaitに出会う地点まで実行されて止まる- その地点の
Futureが完了したらコルーチンをキューに戻す - 次のコルーチンに移る (他のコルーチンがその間に実行される可能性がある)
await が協力的な譲渡点 だというのが本図の核心です。譲渡が起きる地点でのみ他のコルーチンが割り込めます。
譲渡しないコードは並行性を阻む #
async def slow():
total = 0
for i in range(10_000_000): # CPU バウンド、await なし
total += i
return total
async def main():
await asyncio.gather(slow(), slow(), slow()) # 3 つを同時に?見かけは 3 つが同時に回るようですが 実際には直列 です。slow() の中に await がないので譲渡が起きません。CPU バウンドは to_thread または multiprocessing に送らなければなりません (第19章 GIL と並行性)。
Future — 未来に埋められる結果
#
Future は 「まだ決定されていない結果」 を表現するオブジェクトです。
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
# 1 秒後に結果を埋める
loop.call_later(1, fut.set_result, "done")
result = await fut
print(result) # done状態。
- pending — 結果未決定
- done — 結果決定 (
fut.set_result(x)またはfut.set_exception(e)) - cancelled — 取り消し
Future は通常 直接作ることはありません。 ライブラリ内部 (スレッドプールから結果回収、シグナルなど) で使う道具です。一般のコードでは Task を介して間接的に触ります。
Task — Future + コルーチン
#
async def main():
task = asyncio.create_task(work()) # Task 生成
print(isinstance(task, asyncio.Future)) # TrueTask は Future のサブクラス です。コルーチンを包んでイベントループに登録した形です。Future が一般化された「結果の約束」で、Task は「コルーチンが埋める約束」。
Task の done、result、cancel
#
task = asyncio.create_task(work())
await asyncio.sleep(0.1)
print(task.done()) # False / True
if task.done():
if task.exception():
print("失敗:", task.exception())
else:
print("成功:", task.result())
task.cancel() # 取り消し試行Task 参照の維持 — 罠 #
async def main():
asyncio.create_task(background_work()) # 結果を無視
# ...Task オブジェクトへの参照が消えると、ガベージコレクタが Task を片付ける可能性があります。バックグラウンド作業が突然消える事故 が発生します。常にどこかに参照を維持 してください。
async def main():
background_tasks = set()
task = asyncio.create_task(background_work())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
# ...TaskGroup を使えば本頭痛が自動で解消されます — だから 3.11+ で推奨されます。
gather と wait — 両者の微妙な違い
#
gather — 結果リスト、順序保存
#
results = await asyncio.gather(
fetch("a"), fetch("b"), fetch("c"),
return_exceptions=False,
)
# results: [a の結果, b の結果, c の結果] ← 呼び出し順return_exceptions=True オプションを与えると 例外も結果のように 入ってきます — 1 つの task が失敗しても他の task の結果を受け取れます。
results = await asyncio.gather(
fetch("a"),
fetch_might_fail("b"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print("失敗:", r)
else:
print("成功:", r)wait — done / pending の 2 セット、順序保証なし
#
done, pending = await asyncio.wait(
[task1, task2, task3],
return_when=asyncio.FIRST_COMPLETED,
)return_when オプション。
ALL_COMPLETED— デフォルト、すべて終わるまでFIRST_COMPLETED— 1 つでも終われば返すFIRST_EXCEPTION— 例外が 1 つでも発生すれば返す
wait は 結果リストではなく done / pending の 2 セット を返します。結果を受け取るには task.result() を直接呼んでください。
いつ何を? #
| 場面 | 道具 |
|---|---|
| すべて終わるまで、結果は順序通り | gather |
| 一部だけ終われば進む / 最初の結果だけ処理 | wait |
| 最初の結果から順に処理 | as_completed |
| 新コード、3.11+ 環境、cleanup 保証 | TaskGroup |
新しいコードは可能な限り TaskGroup です。gather / wait は古いコード / 一部の場合だけ。
as_completed — 最初の結果から処理
#
async def main():
coros = [fetch(u) for u in urls]
for coro in asyncio.as_completed(coros):
result = await coro
print("受信:", result)各作業が終わる 順番に 結果を受け取ります。すべてが終わるのを待たずに早く処理したいとき。
同期関数との統合 — to_thread
#
CPU バウンドまたはブロッキング I/O の同期関数を非同期の中で安全に呼び出す。
import asyncio
import requests # 同期ライブラリ
async def fetch(url):
response = await asyncio.to_thread(requests.get, url)
return response.textasyncio.to_thread(fn, *args) が 別のスレッドで実行 して、await 可能な形に包んでくれます。内部的に loop.run_in_executor を呼び出します。
run_in_executor — より細かく
#
from concurrent.futures import ProcessPoolExecutor
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_function, arg)ThreadPoolExecutor または ProcessPoolExecutor を直接指定してより細かい制御が可能です。第19章 GIL と並行性 で両者の選択基準を扱います。
Async generator — 非同期 yield #
async def stream_chunks(url):
async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
async def main():
async for chunk in stream_chunks("https://example.com"):
print(len(chunk))async def + yield = async generator。呼び出すと非同期イテレータを返します。ストリーミング の場面によく合います。
同期 vs 非同期 generator #
| 同期 generator | async generator | |
|---|---|---|
| 定義 | def fn(): yield | async def fn(): yield |
| 消費 | for x in g: | async for x in g: |
| 中で await | ❌ | ✅ |
| 中で sleep / io | ❌ (または同期) | ✅ |
Async iteration — __aiter__ / __anext__
#
非同期イテラブルを自作したいとき。
class Counter:
def __init__(self, limit):
self.limit = limit
self.n = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.n >= self.limit:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.n += 1
return self.n
async def main():
async for i in Counter(3):
print(i)
# 0.1 秒間隔で 1, 2, 3ほとんどは async generator の方が短くて直感的なので、クラス形式はあまり使いません。
Async context manager — __aenter__ / __aexit__
#
第10章 コンテキストマネージャ で簡単に見た道具。
from contextlib import asynccontextmanager
@asynccontextmanager
async def open_db():
conn = await connect_async()
try:
yield conn
finally:
await conn.close()
async def main():
async with open_db() as conn:
await conn.execute(...)並行性パターン — よく出会う形 #
1) Semaphore で並行性制限 #
sem = asyncio.Semaphore(10)
async def fetch_one(url):
async with sem:
return await fetch(url)
async def fetch_all(urls):
return await asyncio.gather(*[fetch_one(u) for u in urls])2) Producer / Consumer — Queue
#
queue = asyncio.Queue(maxsize=10)
async def producer():
for i in range(100):
await queue.put(i)
await queue.put(None) # 終了シグナル
async def consumer():
while True:
item = await queue.get()
if item is None:
break
process(item)
queue.task_done()
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
tg.create_task(consumer())Queue は producer / consumer 間のバックプレッシャを自然に処理してくれます (キューが満杯になると producer が自動で待機)。第27章 非同期とバックグラウンドジョブ で本パターンを再び扱います。
3) 最初の結果だけ受けて残りを取り消し #
async def main():
tasks = [
asyncio.create_task(fetch_a()),
asyncio.create_task(fetch_b()),
]
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for t in pending:
t.cancel()
return done.pop().result()複数のバックエンドに同じリクエストを投げて、先に応答したものを採用するパターン。
4) Timeout #
async def main():
try:
async with asyncio.timeout(5):
data = await slow_op()
except TimeoutError:
print("5 秒超過")第14章 で見たもの。3.11+ 標準。
取り消し — 深いテーマ #
コルーチンが cancel されると CancelledError が await 地点で投げられます。
async def work():
try:
await long_operation()
except asyncio.CancelledError:
await cleanup() # クリーンアップは非同期でも可能
raise # ← 再度投げる必要あり!重要: CancelledError を捕まえて再度投げないと — task が正常終了したと見なされて cleanup が途切れます。常に raise を一緒に 置いてください。
shielding — 一部だけ取り消しを阻止 #
async def critical():
return await asyncio.shield(important_work())shield で包んだ部分は 外部の cancel の影響を受けません。 決済の最終段階のように、途中で切れてはいけない場合。
デバッグ — asyncio.run(debug=True)
#
PYTHONASYNCIODEBUG=1 python app.pyまたは
asyncio.run(main(), debug=True)デバッグモードでは。
- 譲渡なしに長く回るコルーチンの警告 (ブロッキング疑い)
- 漏れた
awaitの警告 - task が GC されるときに結果未使用の警告
開発中は常にオンにする のが良いです。非同期バグの 90% がここで捕まります。本格的なプロファイリングは第21章 パフォーマンス — cProfile、py-spy、メモリプロファイリング。
練習問題 #
- 100 個の URL を同時に fetch しつつ
Semaphore(10)で並行性を 10 に制限する関数を作成してください。同じ仕事を並行性 1 / 10 / 100 で実行して時間を比較します。高すぎるとなぜむしろ遅くなるのかを直接観察します。 asyncio.TaskGroupで producer / consumer の 2 つのコルーチンがasyncio.Queue(maxsize=10)を共有しながら 100 個のアイテムを流すコードを作成してください。キューが満杯になると producer が自動で待機するバックプレッシャが動作するか確認します。asyncio.run(main(), debug=True)で実行し、わざと譲渡のない重い同期コード (for i in range(10_000_000): ...) をコルーチンの中に置いてください。どんな警告が出力されるか確認します。
一行まとめ: イベントループは一度に 1 つのコルーチンだけ、
awaitが譲渡点。CPU バウンドは譲渡がないので並行性を阻む →to_thread/ProcessPoolExecutor。Future⊂Task。バックグラウンド Task は参照の維持が必須。gather(結果リスト) /wait(done-pending) /as_completed(順番通り)、新コードはTaskGroup。async generator (async def+yield)、async for、async with。CancelledErrorは捕まえて片付けてからraise。shieldで取り消しを遮断。debug=Trueは開発中の必須。
次の章 #
次の 第19章 GIL と並行性 — threading vs multiprocessing vs asyncio では、本章で簡単に触れた CPU バウンド並行性 — GIL の正体、threading vs multiprocessing vs asyncio、そして Python 3.13 ~ 3.14 の free-threaded ビルドまでを扱います。