モダンPython上級 #4 非同期の深さ — イベントループ、gather/wait、async generator
中級 #7 非同期入門 で async/await の意味と asyncio.run、gather、TaskGroup まで見ました。今回はその次の段階です。イベントループが実際にどう動くのか、Future と Task の違い、gather と wait の細かい違い、async generator、async iteration までを整理します。
イベントループ — もう一段の深さ #
イベントループは 一度に 1 つのコルーチンだけを実行 します。マルチスレッドではありません。それなのにどうやって並行性が生まれるのでしょうか?
def run_loop(coros):
queue = list(coros)
while queue:
coro = queue.pop(0)
try:
# コルーチンを次の await まで進める
future = coro.send(None)
except StopIteration as e:
continue # コルーチン終了
# future が終わったらコルーチンをキューに戻す
future.add_done_callback(lambda _: queue.append(coro))要点。
- コルーチンを
send(None)で 1 段階進める awaitに到達した地点まで実行されて止まる- その時点の
Futureが完了したら、コルーチンを再びキューに入れる - 次のコルーチンに移る (他のコルーチンがその間に実行されることがある)
await が協調的な譲渡点 だというのがこの図のカギです。譲渡が起きるところでだけ他のコルーチンが割り込めます。
譲渡しないコードは並行性を妨げる #
async def slow():
total = 0
for i in range(10_000_000): # CPU バウンド、await なし
total += i
return total
async def main():
await asyncio.gather(slow(), slow(), slow()) # 3 つを同時に?見た目には 3 つが同時に回っているように見えますが、実際には直列 です。slow() の中に await がないので譲渡が起こりません。CPU バウンドは to_thread または multiprocessing に送らなければいけません (#5)。
Future — 未来に埋まる結果
#
Future は 「まだ決まっていない結果」 を表すオブジェクトです。
async def main():
loop = asyncio.get_running_loop()
fut = loop.create_future()
# 1 秒後に結果を埋める
loop.call_later(1, fut.set_result, "done")
result = await fut
print(result) # done状態。
- pending — 結果未決
- done — 結果が決まった (
fut.set_result(x)またはfut.set_exception(e)) - cancelled — キャンセル済み
Future は通常 直接作ることはありません。 ライブラリ内部 (スレッドプールから結果を回収、シグナルなど) で使うものです。一般のコードでは Task を通じて間接的に触れます。
Task — Future + コルーチン
#
async def main():
task = asyncio.create_task(work()) # Task の生成
print(isinstance(task, asyncio.Future)) # TrueTask は Future のサブクラス です。コルーチンを包んでイベントループに登録した形です。Future が一般化された「結果の約束」で、Task は「コルーチンが埋める約束」。
Task の done、result、cancel
#
task = asyncio.create_task(work())
await asyncio.sleep(0.1)
print(task.done()) # False / True
if task.done():
if task.exception():
print("失敗:", task.exception())
else:
print("成功:", task.result())
task.cancel() # キャンセル試行Task 参照の保持 — 罠 #
async def main():
asyncio.create_task(background_work()) # 結果を無視
# ...Task オブジェクトへの参照がなくなると、ガベージコレクタが Task を片付けてしまうことがあります。バックグラウンド作業が突然消える事故 が発生します。常にどこかで参照を保持 してください。
async def main():
background_tasks = set()
task = asyncio.create_task(background_work())
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
# ...TaskGroup を使えばこの厄介事は自動で解決します — だから 3.11+ では推奨されています。
gather と wait — 細かい違い
#
gather — 結果のリスト、順序を保つ
#
results = await asyncio.gather(
fetch("a"), fetch("b"), fetch("c"),
return_exceptions=False,
)
# results: [a の結果, b の結果, c の結果] ← 呼び出し順return_exceptions=True オプションを与えると 例外も結果として 入ってきます — 1 つの task が失敗しても他の task の結果を受け取れます。
results = await asyncio.gather(
fetch("a"),
fetch_might_fail("b"),
return_exceptions=True,
)
for r in results:
if isinstance(r, Exception):
print("失敗:", r)
else:
print("成功:", r)wait — done/pending の 2 つのセット、順序保証なし
#
done, pending = await asyncio.wait(
[task1, task2, task3],
return_when=asyncio.FIRST_COMPLETED,
)return_when オプション。
ALL_COMPLETED— デフォルト、全部終わるまでFIRST_COMPLETED— 1 つでも終わったら返すFIRST_EXCEPTION— 例外が 1 つでも発生したら返す
wait は 結果のリストではなく done/pending の 2 つのセット を返します。結果を受け取るには task.result() を直接呼んでください。
いつどちらを? #
| 場面 | 道具 |
|---|---|
| 全部終わるまで、結果を順序通り | gather |
| 一部だけ終わったら進める / 最初の結果だけ処理 | wait |
| 最初の結果から順に処理 | as_completed |
| 新しいコード、3.11+ 環境、cleanup を保証 | TaskGroup |
新しいコードはできるだけ TaskGroup です。gather/wait は古いコード/一部の場面でだけ。
as_completed — 最初の結果から処理
#
async def main():
coros = [fetch(u) for u in urls]
for coro in asyncio.as_completed(coros):
result = await coro
print("受信:", result)各作業が終わる 順に 結果を受け取ります。全部終わるのを待たずに早めに処理したいとき。
同期関数との統合 — to_thread
#
CPU バウンドまたはブロッキング I/O の同期関数を非同期の中で安全に呼び出す。
import asyncio
import requests # 同期ライブラリ
async def fetch(url):
response = await asyncio.to_thread(requests.get, url)
return response.textasyncio.to_thread(fn, *args) が 別スレッドで実行 し、await 可能な形に包んでくれます。内部的には loop.run_in_executor を呼びます。
run_in_executor — より細かく
#
from concurrent.futures import ProcessPoolExecutor
async def main():
loop = asyncio.get_running_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_heavy_function, arg)ThreadPoolExecutor または ProcessPoolExecutor を直接指定して、より細かい制御が可能です。
Async generator — 非同期 yield #
async def stream_chunks(url):
async with httpx.AsyncClient() as client:
async with client.stream("GET", url) as resp:
async for chunk in resp.aiter_bytes():
yield chunk
async def main():
async for chunk in stream_chunks("https://example.com"):
print(len(chunk))async def + yield = async generator。呼び出し時に非同期イテレータを返します。ストリーミング の場面に本当によく合います。
同期 vs 非同期 generator #
| 同期 generator | async generator | |
|---|---|---|
| 定義 | def fn(): yield | async def fn(): yield |
| 消費 | for x in g: | async for x in g: |
| 中で await | ❌ | ✅ |
| 中で sleep/io | ❌ (または同期) | ✅ |
Async iteration — __aiter__ / __anext__
#
非同期イテラブルを直接作りたいとき。
class Counter:
def __init__(self, limit):
self.limit = limit
self.n = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.n >= self.limit:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.n += 1
return self.n
async def main():
async for i in Counter(3):
print(i)
# 0.1 秒間隔で 1, 2, 3ほとんどは async generator の方が短く直感的なので、クラス形式はあまり使いません。
Async context manager — __aenter__ / __aexit__
#
中級 #3 で短く見たところ。
from contextlib import asynccontextmanager
@asynccontextmanager
async def open_db():
conn = await connect_async()
try:
yield conn
finally:
await conn.close()
async def main():
async with open_db() as conn:
await conn.execute(...)並行性パターン — よく出会う場面 #
1) Semaphore で並行性を制限 #
sem = asyncio.Semaphore(10)
async def fetch_one(url):
async with sem:
return await fetch(url)
async def fetch_all(urls):
return await asyncio.gather(*[fetch_one(u) for u in urls])2) Producer / Consumer — Queue
#
queue = asyncio.Queue(maxsize=10)
async def producer():
for i in range(100):
await queue.put(i)
await queue.put(None) # 終了シグナル
async def consumer():
while True:
item = await queue.get()
if item is None:
break
process(item)
queue.task_done()
async def main():
async with asyncio.TaskGroup() as tg:
tg.create_task(producer())
tg.create_task(consumer())Queue は producer/consumer 間のバックプレッシャを自然に処理してくれます (キューが埋まると producer が自動で待機)。
3) 最初の結果だけ受け取って残りをキャンセル #
async def main():
tasks = [
asyncio.create_task(fetch_a()),
asyncio.create_task(fetch_b()),
]
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for t in pending:
t.cancel()
return done.pop().result()複数のバックエンドに同じリクエストを投げ、最初に応答したものを採用するパターン。
4) Timeout #
async def main():
try:
async with asyncio.timeout(5):
data = await slow_op()
except TimeoutError:
print("5 秒超過")中級 #7 で見たそれ。3.11+ 標準。
キャンセル — 深いトピック #
コルーチンが cancel されると CancelledError が await の箇所で投げられます。
async def work():
try:
await long_operation()
except asyncio.CancelledError:
await cleanup() # 後片付けは非同期でも可能
raise # ← 必ず再送出!重要: CancelledError を捕まえて再送出しないと — task が正常終了したと見なされて cleanup が断ち切られます。常に raise を一緒に 置いてください。
shielding — 一部だけキャンセルから守る #
async def critical():
return await asyncio.shield(important_work())shield で包んだ部分は 外部からの cancel の影響を受けません。 決済の最終段階のように、途中で切れてはいけない処理。
デバッグ — asyncio.run(debug=True)
#
PYTHONASYNCIODEBUG=1 python app.pyまたは
asyncio.run(main(), debug=True)デバッグモードでは次のことが有効になります。
- 譲渡なしで長すぎるコルーチンの警告 (ブロッキング疑い)
- 抜けた
awaitの警告 - task が GC されるとき結果未使用の警告
開発中は常にオン が良いです。非同期バグの 90% がここで捕まります。
まとめ #
今回見たこと。
- イベントループは 一度に 1 つのコルーチンだけ —
awaitが譲渡点 - 譲渡のない CPU バウンドは並行性を妨げる →
to_thread/ProcessPoolExecutor Future(結果の約束) ⊂Task(コルーチン + Future)- バックグラウンドの Task は 参照を保持 しないと GC される
gather(結果のリスト)、wait(done/pending のセット)、as_completed(順番に)、TaskGroup(cleanup 保証)to_thread/run_in_executorで同期関数を統合- async generator (
async def+yield)、async for、async with - パターン: Semaphore、Queue、first-wins、timeout
CancelledErrorは捕まえて後片付け後 必ずraiseshieldで cancel をブロックdebug=Trueは開発中の必須
次回 (#5 GIL と並行性) では、今回短く触れた CPU バウンドの並行性 — GIL の正体、threading vs multiprocessing vs asyncio、そして Python 3.13〜3.14 の free-threaded ビルドまで扱います。