非同期とバックグラウンドジョブ
async ルートの使いどき、BackgroundTasks でレスポンス後処理、外部キュー(Celery、ARQ)が必要になる境界、そして同期ライブラリと安全に混ぜる方法をまとめます。
第14章 非同期入門 と 第18章 非同期の深層 で見た非同期が FastAPI の中でどのように自然に収まるか — そして レスポンス後に処理すべき作業 (メール、重い変換、外部 API) を扱うパターンをまとめます。
本章で外部キュー (ARQ / Celery など) が必要になる境界を明示します。小さな後処理は BackgroundTasks で十分ですが、再試行 / 追跡 / 別スケールが必要になる瞬間が来ます。
async def vs def — FastAPI が自動で処理
#
FastAPI のルートは両方を受け取ります。
@router.get("/sync")
def sync_handler():
...
@router.get("/async")
async def async_handler():
...違い:
async def— イベントループの上で直接実行。await が必要なら譲渡可能def— 自動的に スレッドプール に送られて実行。イベントループを止めない
FastAPI が 自動的に適切な方式で処理 します。なので同期ライブラリを使うルートはそのまま def で書いても他のリクエストを止めません。
絶対にやってはいけないこと — async def の中で同期ブロッキング
#
@router.get("/items")
async def list_items():
response = requests.get("https://api.example.com") # 同期、ブロッキング!
return response.json()async def の中で同期ブロッキング関数を呼ぶと — イベントループ全体が止まります。 他のリクエストも止まります。第19章 GIL と並行性 で見た GIL + 非同期の出会い。
解決は 2 つ:
import httpx
@router.get("/items")
async def list_items():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
return response.json()import asyncio
import requests
@router.get("/items")
async def list_items():
response = await asyncio.to_thread(requests.get, "https://api.example.com")
return response.json()または そのまま def で書く — FastAPI が自動でスレッドに送ります。
@router.get("/items")
def list_items():
response = requests.get("https://api.example.com")
return response.json()実戦ガイド — いつ何を? #
| 状況 | 最初の選択 |
|---|---|
| 非同期ライブラリだけ使用 (httpx、asyncpg、redis-py async) | async def |
| 同期ライブラリ (requests、psycopg2) だけ使用 | def |
| 両方が混在 | async def + to_thread で同期部分を包む |
| CPU バウンド処理 | def または async def + ProcessPoolExecutor |
本書の 4部は 第25章 で SQLAlchemy 非同期を使ったので async def が標準。
BackgroundTasks — レスポンス後の処理
#
メール送信、通知、ログなどの作業を レスポンスが出た後に バックグラウンドで回したいとき。
from fastapi import BackgroundTasks
def send_email_sync(to: str, subject: str, body: str):
# 実際のメール送信 (smtplib など)
print(f"メール送信: {to}")
@router.post("/users")
async def create_user(
payload: UserCreate,
background_tasks: BackgroundTasks,
db: DBSession,
) -> UserOut:
user = await user_service.create_user(db, payload.email, payload.password)
background_tasks.add_task(
send_email_sync, user.email, "ようこそ", "ご登録ありがとうございます",
)
return user流れ:
- ルート本体が正常終了
- レスポンスがクライアントに送信される
- その後
background_tasksの関数たちが 順番に 実行
非同期関数も OK #
async def send_email_async(to: str, subject: str, body: str):
async with httpx.AsyncClient() as client:
await client.post("https://email-api/", json={...})
background_tasks.add_task(send_email_async, user.email, "ようこそ", "...")add_task は同期・非同期の両方を受け取ります。
限界 — 同じプロセス内でのみ #
BackgroundTasks の作業は 現在のプロセスが生きていれば実行 されます。つまり:
- ✅ 速い 後処理 (数秒以内)
- ✅ 失敗しても再試行不要な 作業
- ❌ 重い動画 / 画像の変換
- ❌ 外部 API 呼び出し失敗時に再試行が必要
- ❌ サーバー再起動時にも生き残る必要のある作業
このような場合は 外部キュー が必要です。
外部キュー — いつ導入? #
| シグナル | 外部キュー必要? |
|---|---|
| 作業が 1 秒未満 | いいえ、BackgroundTasks で十分 |
| 失敗時の再試行ポリシーが必要 | ✅ |
| 作業結果を追跡 / 照会する必要 | ✅ |
| ワーカーを別途スケールしたい | ✅ |
| スケジューリング (毎日 0 時など) | ✅ |
| 優先度キューが必要 | ✅ |
該当すれば別のワーカーインフラを導入してください。
選択肢 #
| 道具 | 評価 |
|---|---|
| Celery | 最も古く広く使われる。複雑。Redis / RabbitMQ バックエンド |
| ARQ | async 親和。Redis のみ使用。シンプル |
| Dramatiq | シンプルなインターフェース。バックエンド多様 |
| RQ | シンプル。Redis。同期中心 |
| Taskiq | FastAPI 風の依存性注入 |
非同期コードベースには ARQ または Taskiq が自然で、大きなプロジェクトは Celery が検証された標準です。
ARQ — async キューのミニガイド #
最もシンプルな導入を ARQ で見ていきます。
uv add arqfrom arq.connections import RedisSettings
from app.core.config import settings
async def send_email(ctx, to: str, subject: str, body: str):
print(f"[worker] メール送信: {to}")
# 実際の送信コード
class WorkerSettings:
redis_settings = RedisSettings.from_dsn(settings.redis_url)
functions = [send_email]
max_jobs = 10
job_timeout = 60from arq import create_pool
from arq.connections import RedisSettings
@router.post("/users")
async def create_user(payload: UserCreate, db: DBSession):
user = await user_service.create_user(...)
pool = await create_pool(RedisSettings.from_dsn(settings.redis_url))
await pool.enqueue_job("send_email", user.email, "ようこそ", "...")
return userワーカーの実行:
uv run arq app.tasks.worker.WorkerSettingsWeb サーバーとワーカーが 別プロセス で回ります。一方が落ちても他方は生き残り、ワーカーだけを別スケール可能。
依存性注入は直接 #
ARQ は FastAPI の Depends が動作しません。ワーカー関数の中で DB セッションなどを直接作る必要があります。
from app.db.session import AsyncSessionLocal
async def send_email(ctx, user_id: int):
async with AsyncSessionLocal() as db:
user = await db.get(User, user_id)
# 送信ロジックストリーミングレスポンス — 長く送る #
ファイルダウンロード、大きな JSON、Server-Sent Events (SSE) のように レスポンスを一度に作らずに流したいとき。
from fastapi.responses import StreamingResponse
@router.get("/csv")
async def export_csv():
async def generate():
yield "id,title\n"
async for row in stream_rows():
yield f"{row.id},{row.title}\n"
return StreamingResponse(generate(), media_type="text/csv")第11章 イテラブル、ジェネレータ、yield from の async generator がそのまま動作します。メモリにすべて載せず に流します。
Server-Sent Events (SSE) #
@router.get("/events")
async def events():
async def event_stream():
for i in range(100):
yield f"data: tick {i}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
)ブラウザの EventSource が自動で受け取ります。WebSocket ほど双方向ではありませんが、サーバー → クライアント片方向のリアルタイム には十分。
WebSocket #
双方向リアルタイムが必要なら WebSocket。
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"[{room}] echo: {data}")
except WebSocketDisconnect:
print(f"クライアント接続終了")チャット、リアルタイム協業、ゲームのような場合に使います。ただし 状態管理、認証、ルーム分離 などが追加で必要な領域なので、本格導入は別途学習。
並行性の制限 — 外部 API の保護 #
第18章 非同期の深層 で見た Semaphore がここでも有用です。1 つのルートで外部 API を 100 個呼ぶ必要があるなら、同時に N 個 までに制限:
import asyncio
_external_sem = asyncio.Semaphore(10)
async def call_external(url: str) -> dict:
async with _external_sem:
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()外部 API の rate limit、自サービスのメモリ保護の両方に役立ちます。
タイムアウト — 無限待ちの防止 #
async with asyncio.timeout(5):
response = await client.get(url)外部 API が応答しないときに リクエストが無限にぶら下がる ことを防ぎます。第14章 非同期入門 で見た道具。
httpx 自体にもタイムアウトオプション:
async with httpx.AsyncClient(timeout=10.0) as client:
...ライフサイクルイベント — lifespan
#
サーバー起動 / 終了時にリソースを準備 / 整理。
from contextlib import asynccontextmanager
from fastapi import FastAPI
@asynccontextmanager
async def lifespan(app: FastAPI):
# 起動時
print("サーバー開始")
redis = await create_redis_pool()
app.state.redis = redis
yield
# 終了時
print("サーバー終了")
await redis.close()
app = FastAPI(lifespan=lifespan)第10章 コンテキストマネージャ の @asynccontextmanager パターンそのまま。旧 @app.on_event("startup") デコレータは deprecated。
よく出会う落とし穴 #
1) BackgroundTasks の例外無視 #
def task_that_might_fail():
raise SomethingWrong() # レスポンスには影響なし、ログだけが残るBackgroundTasks は 例外をレスポンスに反映しません。 作業失敗がユーザーに見えません。重要な作業は 外部キュー + 再試行 が安全。
2) 依存性から受け取った DB セッションをバックグラウンドに渡す #
@router.post("/")
async def create(db: DBSession, bg: BackgroundTasks):
bg.add_task(use_db_later, db) # ← 危険
return ...ルートが終わると依存性 (DB セッション) は閉じられます。バックグラウンドでそれを使うと 既に閉じられたセッション 事故。バックグラウンド作業は セッションを新しく作って 使ってください。
async def use_db_later(user_id: int):
async with AsyncSessionLocal() as db:
...
bg.add_task(use_db_later, user.id)3) def ルートで非同期ライブラリを使う
#
@router.get("/")
def handler():
asyncio.run(something_async()) # イベントループのネスト — エラーdef ルートの中で非同期コードの実行が必要なら ルート自体を async def に変えてください。
練習問題 #
BackgroundTasksでPOST /users登録後に歓迎メールをバックグラウンドで送るコードを書いてください (実 SMTP の代わりにprintでシミュレーション)。レスポンスがクライアントに先に到達し、メールの print が後を追うかの時間順序を直接観察します。- わざと BackgroundTasks 内の作業が例外を投げるようにしてから、レスポンスに影響なく作業が静かに失敗することを確認してください。その作業を外部キュー (ARQ) に移すコード変更を始めます (Redis がなければ ARQ のインストールまで)。
StreamingResponseと async generator でGET /csvエンドポイントを書いてください。1 万行のダミーデータをメモリにすべて載せずに流します。curl -Nで呼び出し、最初のバイトがすぐ来るか (ストリーミング) を確認します。
一行まとめ:
async defはイベントループ、defはスレッドプール、FastAPI が自動。async def内の同期ブロッキングは禁止 — 非同期ライブラリ /to_thread/defで解く。BackgroundTasksはレスポンス後・同一プロセスの速い後処理、外部キュー (ARQ / Celery など) は再試行 / 追跡 / 別スケールが必要なとき。StreamingResponse+ async generator で大きなレスポンス / SSE。Semaphoreで並行性制限、asyncio.timeoutでタイムアウト、lifespanで起動 / 終了リソース。落とし穴はバックグラウンド例外無視 / 閉じたセッション / def-async 混乱。
次の章 #
次の 第28章 テストとデプロイ — pytest、Docker、Railway/Fly が 4部 (ソースから構成した 6 章) の最後です。pytest + httpx で統合テスト、依存性オーバーライドで外部依存性を隔離し、Docker とクラウドデプロイ までをまとめます。