目次
27 章

非同期とバックグラウンドジョブ

async ルートの使いどき、BackgroundTasks でレスポンス後処理、外部キュー(Celery、ARQ)が必要になる境界、そして同期ライブラリと安全に混ぜる方法をまとめます。

第14章 非同期入門第18章 非同期の深層 で見た非同期が FastAPI の中でどのように自然に収まるか — そして レスポンス後に処理すべき作業 (メール、重い変換、外部 API) を扱うパターンをまとめます。

本章で外部キュー (ARQ / Celery など) が必要になる境界を明示します。小さな後処理は BackgroundTasks で十分ですが、再試行 / 追跡 / 別スケールが必要になる瞬間が来ます。

async def vs def — FastAPI が自動で処理 #

FastAPI のルートは両方を受け取ります。

どちらも可能
@router.get("/sync")
def sync_handler():
    ...

@router.get("/async")
async def async_handler():
    ...

違い:

  • async def — イベントループの上で直接実行。await が必要なら譲渡可能
  • def — 自動的に スレッドプール に送られて実行。イベントループを止めない

FastAPI が 自動的に適切な方式で処理 します。なので同期ライブラリを使うルートはそのまま def で書いても他のリクエストを止めません。

絶対にやってはいけないこと — async def の中で同期ブロッキング #

🚫 最大の落とし穴
@router.get("/items")
async def list_items():
    response = requests.get("https://api.example.com")   # 同期、ブロッキング!
    return response.json()

async def の中で同期ブロッキング関数を呼ぶと — イベントループ全体が止まります。 他のリクエストも止まります。第19章 GIL と並行性 で見た GIL + 非同期の出会い。

解決は 2 つ:

✅ 1. 非同期ライブラリ
import httpx

@router.get("/items")
async def list_items():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")
    return response.json()
✅ 2. 同期をスレッドへ
import asyncio
import requests

@router.get("/items")
async def list_items():
    response = await asyncio.to_thread(requests.get, "https://api.example.com")
    return response.json()

または そのまま def で書く — FastAPI が自動でスレッドに送ります。

✅ 3. def で書く
@router.get("/items")
def list_items():
    response = requests.get("https://api.example.com")
    return response.json()

実戦ガイド — いつ何を? #

状況最初の選択
非同期ライブラリだけ使用 (httpx、asyncpg、redis-py async)async def
同期ライブラリ (requests、psycopg2) だけ使用def
両方が混在async def + to_thread で同期部分を包む
CPU バウンド処理def または async def + ProcessPoolExecutor

本書の 4部は 第25章 で SQLAlchemy 非同期を使ったので async def が標準

BackgroundTasks — レスポンス後の処理 #

メール送信、通知、ログなどの作業を レスポンスが出た後に バックグラウンドで回したいとき。

基本的な使い方
from fastapi import BackgroundTasks

def send_email_sync(to: str, subject: str, body: str):
    # 実際のメール送信 (smtplib など)
    print(f"メール送信: {to}")

@router.post("/users")
async def create_user(
    payload: UserCreate,
    background_tasks: BackgroundTasks,
    db: DBSession,
) -> UserOut:
    user = await user_service.create_user(db, payload.email, payload.password)
    background_tasks.add_task(
        send_email_sync, user.email, "ようこそ", "ご登録ありがとうございます",
    )
    return user

流れ:

  1. ルート本体が正常終了
  2. レスポンスがクライアントに送信される
  3. その後 background_tasks の関数たちが 順番に 実行

非同期関数も OK #

async task
async def send_email_async(to: str, subject: str, body: str):
    async with httpx.AsyncClient() as client:
        await client.post("https://email-api/", json={...})

background_tasks.add_task(send_email_async, user.email, "ようこそ", "...")

add_task は同期・非同期の両方を受け取ります。

限界 — 同じプロセス内でのみ #

BackgroundTasks の作業は 現在のプロセスが生きていれば実行 されます。つまり:

  • 速い 後処理 (数秒以内)
  • 失敗しても再試行不要な 作業
  • ❌ 重い動画 / 画像の変換
  • ❌ 外部 API 呼び出し失敗時に再試行が必要
  • ❌ サーバー再起動時にも生き残る必要のある作業

このような場合は 外部キュー が必要です。

外部キュー — いつ導入? #

シグナル外部キュー必要?
作業が 1 秒未満いいえ、BackgroundTasks で十分
失敗時の再試行ポリシーが必要
作業結果を追跡 / 照会する必要
ワーカーを別途スケールしたい
スケジューリング (毎日 0 時など)
優先度キューが必要

該当すれば別のワーカーインフラを導入してください。

選択肢 #

道具評価
Celery最も古く広く使われる。複雑。Redis / RabbitMQ バックエンド
ARQasync 親和。Redis のみ使用。シンプル
Dramatiqシンプルなインターフェース。バックエンド多様
RQシンプル。Redis。同期中心
TaskiqFastAPI 風の依存性注入

非同期コードベースには ARQ または Taskiq が自然で、大きなプロジェクトは Celery が検証された標準です。

ARQ — async キューのミニガイド #

最もシンプルな導入を ARQ で見ていきます。

インストール
uv add arq
app/tasks/worker.py
from arq.connections import RedisSettings
from app.core.config import settings

async def send_email(ctx, to: str, subject: str, body: str):
    print(f"[worker] メール送信: {to}")
    # 実際の送信コード

class WorkerSettings:
    redis_settings = RedisSettings.from_dsn(settings.redis_url)
    functions = [send_email]
    max_jobs = 10
    job_timeout = 60
ルートから enqueue
from arq import create_pool
from arq.connections import RedisSettings

@router.post("/users")
async def create_user(payload: UserCreate, db: DBSession):
    user = await user_service.create_user(...)
    pool = await create_pool(RedisSettings.from_dsn(settings.redis_url))
    await pool.enqueue_job("send_email", user.email, "ようこそ", "...")
    return user

ワーカーの実行:

別プロセスでワーカー
uv run arq app.tasks.worker.WorkerSettings

Web サーバーとワーカーが 別プロセス で回ります。一方が落ちても他方は生き残り、ワーカーだけを別スケール可能。

依存性注入は直接 #

ARQ は FastAPI の Depends が動作しません。ワーカー関数の中で DB セッションなどを直接作る必要があります。

ワーカー内で DB
from app.db.session import AsyncSessionLocal

async def send_email(ctx, user_id: int):
    async with AsyncSessionLocal() as db:
        user = await db.get(User, user_id)
        # 送信ロジック

ストリーミングレスポンス — 長く送る #

ファイルダウンロード、大きな JSON、Server-Sent Events (SSE) のように レスポンスを一度に作らずに流したいとき

StreamingResponse
from fastapi.responses import StreamingResponse

@router.get("/csv")
async def export_csv():
    async def generate():
        yield "id,title\n"
        async for row in stream_rows():
            yield f"{row.id},{row.title}\n"

    return StreamingResponse(generate(), media_type="text/csv")

第11章 イテラブル、ジェネレータ、yield from の async generator がそのまま動作します。メモリにすべて載せず に流します。

Server-Sent Events (SSE) #

SSE — リアルタイム通知
@router.get("/events")
async def events():
    async def event_stream():
        for i in range(100):
            yield f"data: tick {i}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
    )

ブラウザの EventSource が自動で受け取ります。WebSocket ほど双方向ではありませんが、サーバー → クライアント片方向のリアルタイム には十分。

WebSocket #

双方向リアルタイムが必要なら WebSocket。

WebSocket 基本
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"[{room}] echo: {data}")
    except WebSocketDisconnect:
        print(f"クライアント接続終了")

チャット、リアルタイム協業、ゲームのような場合に使います。ただし 状態管理、認証、ルーム分離 などが追加で必要な領域なので、本格導入は別途学習。

並行性の制限 — 外部 API の保護 #

第18章 非同期の深層 で見た Semaphore がここでも有用です。1 つのルートで外部 API を 100 個呼ぶ必要があるなら、同時に N 個 までに制限:

同時呼び出しの制限
import asyncio

_external_sem = asyncio.Semaphore(10)

async def call_external(url: str) -> dict:
    async with _external_sem:
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return response.json()

外部 API の rate limit、自サービスのメモリ保護の両方に役立ちます。

タイムアウト — 無限待ちの防止 #

外部呼び出しのタイムアウト
async with asyncio.timeout(5):
    response = await client.get(url)

外部 API が応答しないときに リクエストが無限にぶら下がる ことを防ぎます。第14章 非同期入門 で見た道具。

httpx 自体にもタイムアウトオプション:

httpx timeout
async with httpx.AsyncClient(timeout=10.0) as client:
    ...

ライフサイクルイベント — lifespan #

サーバー起動 / 終了時にリソースを準備 / 整理。

lifespan
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 起動時
    print("サーバー開始")
    redis = await create_redis_pool()
    app.state.redis = redis

    yield

    # 終了時
    print("サーバー終了")
    await redis.close()

app = FastAPI(lifespan=lifespan)

第10章 コンテキストマネージャ@asynccontextmanager パターンそのまま。旧 @app.on_event("startup") デコレータは deprecated。

よく出会う落とし穴 #

1) BackgroundTasks の例外無視 #

🚫 無視される例外
def task_that_might_fail():
    raise SomethingWrong()    # レスポンスには影響なし、ログだけが残る

BackgroundTasks は 例外をレスポンスに反映しません。 作業失敗がユーザーに見えません。重要な作業は 外部キュー + 再試行 が安全。

2) 依存性から受け取った DB セッションをバックグラウンドに渡す #

🚫 セッション漏れ
@router.post("/")
async def create(db: DBSession, bg: BackgroundTasks):
    bg.add_task(use_db_later, db)   # ← 危険
    return ...

ルートが終わると依存性 (DB セッション) は閉じられます。バックグラウンドでそれを使うと 既に閉じられたセッション 事故。バックグラウンド作業は セッションを新しく作って 使ってください。

async def use_db_later(user_id: int):
    async with AsyncSessionLocal() as db:
        ...

bg.add_task(use_db_later, user.id)

3) def ルートで非同期ライブラリを使う #

🚫
@router.get("/")
def handler():
    asyncio.run(something_async())   # イベントループのネスト — エラー

def ルートの中で非同期コードの実行が必要なら ルート自体を async def に変えてください。

練習問題 #

  1. BackgroundTasksPOST /users 登録後に歓迎メールをバックグラウンドで送るコードを書いてください (実 SMTP の代わりに print でシミュレーション)。レスポンスがクライアントに先に到達し、メールの print が後を追うかの時間順序を直接観察します。
  2. わざと BackgroundTasks 内の作業が例外を投げるようにしてから、レスポンスに影響なく作業が静かに失敗することを確認してください。その作業を外部キュー (ARQ) に移すコード変更を始めます (Redis がなければ ARQ のインストールまで)。
  3. StreamingResponse と async generator で GET /csv エンドポイントを書いてください。1 万行のダミーデータをメモリにすべて載せずに流します。curl -N で呼び出し、最初のバイトがすぐ来るか (ストリーミング) を確認します。

一行まとめ: async def はイベントループ、def はスレッドプール、FastAPI が自動。async def 内の同期ブロッキングは禁止 — 非同期ライブラリ / to_thread / def で解く。BackgroundTasks はレスポンス後・同一プロセスの速い後処理、外部キュー (ARQ / Celery など) は再試行 / 追跡 / 別スケールが必要なとき。StreamingResponse + async generator で大きなレスポンス / SSE。Semaphore で並行性制限、asyncio.timeout でタイムアウト、lifespan で起動 / 終了リソース。落とし穴はバックグラウンド例外無視 / 閉じたセッション / def-async 混乱。

次の章 #

次の 第28章 テストとデプロイ — pytest、Docker、Railway/Fly が 4部 (ソースから構成した 6 章) の最後です。pytest + httpx で統合テスト、依存性オーバーライドで外部依存性を隔離し、Docker とクラウドデプロイ までをまとめます。

X