モダンPython実践 #5 非同期とバックグラウンドタスク
中級 #7 と 上級 #4 で見た非同期が FastAPI の中でどう自然に 解かれていくのか — そして レスポンス後に処理すべき作業 (メール、重い変換、外部 API) を扱うパターンを整理します。
async def vs def — FastAPI が適切に処理
#
FastAPI のルートはどちらも受け付けます。
@router.get("/sync")
def sync_handler():
...
@router.get("/async")
async def async_handler():
...違い:
async def— イベントループの上で直接実行。await が必要なら譲渡可能def— 自動的に スレッドプール に送られて実行。イベントループをブロックしない
FastAPI が 自動で適切な方法で処理 します。なので同期ライブラリを使うルートは単に def で書いても他のリクエストをブロックしません。
絶対にやってはいけないこと — async def の中で同期ブロッキング
#
@router.get("/items")
async def list_items():
response = requests.get("https://api.example.com") # 同期、ブロッキング!
return response.json()async def の中で同期ブロッキング関数を呼ぶと — イベントループ全体が止まります。 他のリクエストも止まります。上級 #5 で見た GIL + 非同期の出会い。
解決は 2 つ:
import httpx
@router.get("/items")
async def list_items():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
return response.json()import asyncio
import requests
@router.get("/items")
async def list_items():
response = await asyncio.to_thread(requests.get, "https://api.example.com")
return response.json()または 単に def で書く — FastAPI が適切にスレッドへ送ります。
@router.get("/items")
def list_items():
response = requests.get("https://api.example.com")
return response.json()実践ガイド — いつ何を? #
| ポジション | 最初に試す |
|---|---|
| 非同期ライブラリだけ使用 (httpx、asyncpg、redis-py async) | async def |
| 同期ライブラリ (requests、psycopg2) だけ使用 | def |
| 両者が混在 | async def + to_thread で同期部分を包む |
| CPU バウンド処理 | def または async def + ProcessPoolExecutor |
このシリーズは #3 で SQLAlchemy 非同期を使ったので async def が標準。
BackgroundTasks — レスポンス後の処理
#
メール送信、通知、ログのような作業を レスポンスが出た後 バックグラウンドで回したいとき。
from fastapi import BackgroundTasks
def send_email_sync(to: str, subject: str, body: str):
# 実際のメール送信 (smtplib など)
print(f"メール送信: {to}")
@router.post("/users")
async def create_user(
payload: UserCreate,
background_tasks: BackgroundTasks,
db: DBSession,
) -> UserOut:
user = await user_service.create_user(db, payload.email, payload.password)
background_tasks.add_task(
send_email_sync, user.email, "ようこそ", "登録ありがとうございます",
)
return user流れ:
- ルート本体が正常終了
- レスポンスがクライアントに送信される
- その後
background_tasksの関数たちが 順番に 実行される
非同期関数も OK #
async def send_email_async(to: str, subject: str, body: str):
async with httpx.AsyncClient() as client:
await client.post("https://email-api/", json={...})
background_tasks.add_task(send_email_async, user.email, "ようこそ", "...")add_task は同期 / 非同期どちらも受けます。
限界 — 同じプロセスの中でだけ #
BackgroundTasks の作業は 現在のプロセスが生きていなければ実行されません。 つまり:
- ✅ 速い 後処理 (数秒以内)
- ✅ 失敗しても再試行不要 な作業
- ❌ 重い動画 / 画像変換
- ❌ 外部 API 呼び出し失敗時に再試行が必要
- ❌ サーバー再起動時に生き残るべき作業
こういう場面は 外部キュー が必要になります。
外部キュー — いつ導入? #
| シグナル | 外部キューが必要? |
|---|---|
| 作業が 1 秒未満 | いいえ、BackgroundTasks で十分 |
| 失敗時の再試行ポリシーが必要 | ✅ |
| 作業結果を追跡 / 照会する必要 | ✅ |
| ワーカーを別途スケールしたい | ✅ |
| スケジューリング (毎日 0 時の作業など) | ✅ |
| 優先度キューが必要 | ✅ |
該当するなら別のワーカーインフラを導入してください。
選択肢 #
| ツール | 評価 |
|---|---|
| Celery | 最も古く広く使われる。複雑。Redis/RabbitMQ バックエンド |
| ARQ | async フレンドリー。Redis のみ。シンプル |
| Dramatiq | シンプルなインターフェース。バックエンド多様 |
| RQ | シンプル。Redis。同期中心 |
| Taskiq | FastAPI 風の依存性注入 |
非同期コードベースには ARQ または Taskiq が自然で、大きなプロジェクトは Celery が検証された標準です。
ARQ — async キューのミニガイド #
最もシンプルな導入を ARQ で見ていきます。
uv add arqfrom arq.connections import RedisSettings
from app.core.config import settings
async def send_email(ctx, to: str, subject: str, body: str):
print(f"[worker] メール送信: {to}")
# 実際の送信コード
class WorkerSettings:
redis_settings = RedisSettings.from_dsn(settings.redis_url)
functions = [send_email]
max_jobs = 10
job_timeout = 60from arq import create_pool
from arq.connections import RedisSettings
@router.post("/users")
async def create_user(payload: UserCreate, db: DBSession):
user = await user_service.create_user(...)
pool = await create_pool(RedisSettings.from_dsn(settings.redis_url))
await pool.enqueue_job("send_email", user.email, "ようこそ", "...")
return userワーカーの実行:
uv run arq app.tasks.worker.WorkerSettingsWeb サーバーとワーカーが 別プロセス で動きます。片方が死んでも他方は生きており、ワーカーだけ別途スケール可能。
依存性注入は自分で #
ARQ では FastAPI の Depends が動きません。ワーカー関数の中で DB セッションなどを自分で作る必要があります。
from app.db.session import AsyncSessionLocal
async def send_email(ctx, user_id: int):
async with AsyncSessionLocal() as db:
user = await db.get(User, user_id)
# 送信ロジックストリーミングレスポンス — 長く流す #
ファイルダウンロード、大きな JSON、Server-Sent Events (SSE) のように レスポンスを一度に作らず流したい とき。
from fastapi.responses import StreamingResponse
@router.get("/csv")
async def export_csv():
async def generate():
yield "id,title\n"
async for row in stream_rows():
yield f"{row.id},{row.title}\n"
return StreamingResponse(generate(), media_type="text/csv")中級 #4 の async generator がそのまま動作します。メモリにすべて載せず 流します。
Server-Sent Events (SSE) #
@router.get("/events")
async def events():
async def event_stream():
for i in range(100):
yield f"data: tick {i}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
)ブラウザの EventSource が自動で受け取ります。WebSocket ほど双方向ではありませんが、サーバー → クライアントの一方向リアルタイム には十分。
WebSocket #
双方向リアルタイムが必要なら WebSocket。
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"[{room}] echo: {data}")
except WebSocketDisconnect:
print(f"クライアント接続終了")チャット、リアルタイムコラボ、ゲームのような場面。ただし 状態管理、認証、ルーム分離 などが追加で必要な領域なので、本格導入は別の学習で。
並行性の制限 — 外部 API の保護 #
上級 #4 で見た Semaphore がここでも有用です。1 つのルートで外部 API を 100 個呼ばなければならないなら、同時に N 個 だけ:
import asyncio
_external_sem = asyncio.Semaphore(10)
async def call_external(url: str) -> dict:
async with _external_sem:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()外部 API のレートリミット、自分のサービスのメモリ保護の両方に役立ちます。
タイムアウト — 無限待機の防止 #
async with asyncio.timeout(5):
response = await client.get(url)外部 API が応答しないときに リクエストが無限に待たされる のを防ぎます。中級 #7 で見た道具。
httpx 自体にもタイムアウトオプション:
async with httpx.AsyncClient(timeout=10.0) as client:
...ライフサイクルイベント — lifespan
#
サーバー起動 / 終了時にリソースを準備 / 整理。
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# 起動時
print("サーバー起動")
redis = await create_redis_pool()
app.state.redis = redis
yield
# 終了時
print("サーバー終了")
await redis.close()
app = FastAPI(lifespan=lifespan)中級 #3 の @asynccontextmanager パターンそのまま。古い @app.on_event("startup") デコレータは deprecated。
よく出会う落とし穴 #
1) BackgroundTasks の例外無視 #
def task_that_might_fail():
raise SomethingWrong() # レスポンスには影響しないが、ログだけ残るBackgroundTasks は 例外をレスポンスに反映しません。 作業の失敗がユーザーに見えません。重要な作業は 外部キュー + 再試行 が安全。
2) 依存性で受け取った DB セッションをバックグラウンドへ渡す #
@router.post("/")
async def create(db: DBSession, bg: BackgroundTasks):
bg.add_task(use_db_later, db) # ← 危険
return ...ルートが終わると依存性 (DB セッション) がクローズされます。バックグラウンドでそれを使うと 既にクローズ済みのセッション 事故。バックグラウンドの作業は セッションを新しく作って 使ってください。
async def use_db_later(user_id: int):
async with AsyncSessionLocal() as db:
...
bg.add_task(use_db_later, user.id)3) def ルートで非同期ライブラリを使う
#
@router.get("/")
def handler():
asyncio.run(something_async()) # イベントループの入れ子 — エラーdef ルートの中で非同期コードの実行が必要なら ルート自体を async def に変えてください。
まとめ #
今回つかんだもの:
async defはイベントループ、defはスレッドプール — FastAPI が自動async defの中で同期ブロッキング = イベントループ停止- 解決: 非同期ライブラリ /
to_thread/defに変える BackgroundTasks— レスポンス後に同じプロセスで後処理、速い仕事に向く- 外部キュー (ARQ/Celery/Dramatiq) — 再試行、追跡、別途スケールが必要なとき
- ARQ ミニガイド — async フレンドリー、Redis のみ
StreamingResponse+ async generator — 大きなレスポンス、SSE- WebSocket で双方向リアルタイム
Semaphoreで並行性制限、asyncio.timeoutでタイムアウトlifespanで起動 / 終了のリソース管理 (@asynccontextmanager)- 落とし穴: バックグラウンドの例外無視、クローズ済みセッションの使用、def-async の混乱
次回(#6 テストとデプロイ)がシリーズの最後です。pytest + httpx で統合テスト、依存性オーバーライドで外部依存を分離、そして Docker とクラウドデプロイ までを一カ所に整理します。