モダンPython実践 #5 非同期とバックグラウンドタスク

中級 #7上級 #4 で見た非同期が FastAPI の中でどう自然に 解かれていくのか — そして レスポンス後に処理すべき作業 (メール、重い変換、外部 API) を扱うパターンを整理します。

async def vs def — FastAPI が適切に処理 #

FastAPI のルートはどちらも受け付けます。

どちらも可能
@router.get("/sync")
def sync_handler():
    ...

@router.get("/async")
async def async_handler():
    ...

違い:

  • async def — イベントループの上で直接実行。await が必要なら譲渡可能
  • def — 自動的に スレッドプール に送られて実行。イベントループをブロックしない

FastAPI が 自動で適切な方法で処理 します。なので同期ライブラリを使うルートは単に def で書いても他のリクエストをブロックしません。

絶対にやってはいけないこと — async def の中で同期ブロッキング #

🚫 最大の落とし穴
@router.get("/items")
async def list_items():
    response = requests.get("https://api.example.com")   # 同期、ブロッキング!
    return response.json()

async def の中で同期ブロッキング関数を呼ぶと — イベントループ全体が止まります。 他のリクエストも止まります。上級 #5 で見た GIL + 非同期の出会い。

解決は 2 つ:

✅ 1. 非同期ライブラリ
import httpx

@router.get("/items")
async def list_items():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")
    return response.json()
✅ 2. 同期をスレッドで
import asyncio
import requests

@router.get("/items")
async def list_items():
    response = await asyncio.to_thread(requests.get, "https://api.example.com")
    return response.json()

または 単に def で書く — FastAPI が適切にスレッドへ送ります。

✅ 3. def で書く
@router.get("/items")
def list_items():
    response = requests.get("https://api.example.com")
    return response.json()

実践ガイド — いつ何を? #

ポジション最初に試す
非同期ライブラリだけ使用 (httpx、asyncpg、redis-py async)async def
同期ライブラリ (requests、psycopg2) だけ使用def
両者が混在async def + to_thread で同期部分を包む
CPU バウンド処理def または async def + ProcessPoolExecutor

このシリーズは #3 で SQLAlchemy 非同期を使ったので async def が標準

BackgroundTasks — レスポンス後の処理 #

メール送信、通知、ログのような作業を レスポンスが出た後 バックグラウンドで回したいとき。

基本の使い方
from fastapi import BackgroundTasks

def send_email_sync(to: str, subject: str, body: str):
    # 実際のメール送信 (smtplib など)
    print(f"メール送信: {to}")

@router.post("/users")
async def create_user(
    payload: UserCreate,
    background_tasks: BackgroundTasks,
    db: DBSession,
) -> UserOut:
    user = await user_service.create_user(db, payload.email, payload.password)
    background_tasks.add_task(
        send_email_sync, user.email, "ようこそ", "登録ありがとうございます",
    )
    return user

流れ:

  1. ルート本体が正常終了
  2. レスポンスがクライアントに送信される
  3. その後 background_tasks の関数たちが 順番に 実行される

非同期関数も OK #

async task
async def send_email_async(to: str, subject: str, body: str):
    async with httpx.AsyncClient() as client:
        await client.post("https://email-api/", json={...})

background_tasks.add_task(send_email_async, user.email, "ようこそ", "...")

add_task は同期 / 非同期どちらも受けます。

限界 — 同じプロセスの中でだけ #

BackgroundTasks の作業は 現在のプロセスが生きていなければ実行されません。 つまり:

  • 速い 後処理 (数秒以内)
  • 失敗しても再試行不要 な作業
  • ❌ 重い動画 / 画像変換
  • ❌ 外部 API 呼び出し失敗時に再試行が必要
  • ❌ サーバー再起動時に生き残るべき作業

こういう場面は 外部キュー が必要になります。

外部キュー — いつ導入? #

シグナル外部キューが必要?
作業が 1 秒未満いいえ、BackgroundTasks で十分
失敗時の再試行ポリシーが必要
作業結果を追跡 / 照会する必要
ワーカーを別途スケールしたい
スケジューリング (毎日 0 時の作業など)
優先度キューが必要

該当するなら別のワーカーインフラを導入してください。

選択肢 #

ツール評価
Celery最も古く広く使われる。複雑。Redis/RabbitMQ バックエンド
ARQasync フレンドリー。Redis のみ。シンプル
Dramatiqシンプルなインターフェース。バックエンド多様
RQシンプル。Redis。同期中心
TaskiqFastAPI 風の依存性注入

非同期コードベースには ARQ または Taskiq が自然で、大きなプロジェクトは Celery が検証された標準です。

ARQ — async キューのミニガイド #

最もシンプルな導入を ARQ で見ていきます。

インストール
uv add arq
app/tasks/worker.py
from arq.connections import RedisSettings
from app.core.config import settings

async def send_email(ctx, to: str, subject: str, body: str):
    print(f"[worker] メール送信: {to}")
    # 実際の送信コード

class WorkerSettings:
    redis_settings = RedisSettings.from_dsn(settings.redis_url)
    functions = [send_email]
    max_jobs = 10
    job_timeout = 60
ルートで enqueue
from arq import create_pool
from arq.connections import RedisSettings

@router.post("/users")
async def create_user(payload: UserCreate, db: DBSession):
    user = await user_service.create_user(...)
    pool = await create_pool(RedisSettings.from_dsn(settings.redis_url))
    await pool.enqueue_job("send_email", user.email, "ようこそ", "...")
    return user

ワーカーの実行:

別プロセスでワーカー
uv run arq app.tasks.worker.WorkerSettings

Web サーバーとワーカーが 別プロセス で動きます。片方が死んでも他方は生きており、ワーカーだけ別途スケール可能。

依存性注入は自分で #

ARQ では FastAPI の Depends が動きません。ワーカー関数の中で DB セッションなどを自分で作る必要があります。

ワーカーの中で DB
from app.db.session import AsyncSessionLocal

async def send_email(ctx, user_id: int):
    async with AsyncSessionLocal() as db:
        user = await db.get(User, user_id)
        # 送信ロジック

ストリーミングレスポンス — 長く流す #

ファイルダウンロード、大きな JSON、Server-Sent Events (SSE) のように レスポンスを一度に作らず流したい とき。

StreamingResponse
from fastapi.responses import StreamingResponse

@router.get("/csv")
async def export_csv():
    async def generate():
        yield "id,title\n"
        async for row in stream_rows():
            yield f"{row.id},{row.title}\n"

    return StreamingResponse(generate(), media_type="text/csv")

中級 #4 の async generator がそのまま動作します。メモリにすべて載せず 流します。

Server-Sent Events (SSE) #

SSE — リアルタイム通知
@router.get("/events")
async def events():
    async def event_stream():
        for i in range(100):
            yield f"data: tick {i}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
    )

ブラウザの EventSource が自動で受け取ります。WebSocket ほど双方向ではありませんが、サーバー → クライアントの一方向リアルタイム には十分。

WebSocket #

双方向リアルタイムが必要なら WebSocket。

WebSocket の基本
from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws/{room}")
async def websocket_endpoint(websocket: WebSocket, room: str):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"[{room}] echo: {data}")
    except WebSocketDisconnect:
        print(f"クライアント接続終了")

チャット、リアルタイムコラボ、ゲームのような場面。ただし 状態管理、認証、ルーム分離 などが追加で必要な領域なので、本格導入は別の学習で。

並行性の制限 — 外部 API の保護 #

上級 #4 で見た Semaphore がここでも有用です。1 つのルートで外部 API を 100 個呼ばなければならないなら、同時に N 個 だけ:

同時呼び出しの制限
import asyncio

_external_sem = asyncio.Semaphore(10)

async def call_external(url: str) -> dict:
    async with _external_sem:
        async with httpx.AsyncClient() as client:
            response = await client.get(url)
            return response.json()

外部 API のレートリミット、自分のサービスのメモリ保護の両方に役立ちます。

タイムアウト — 無限待機の防止 #

外部呼び出しのタイムアウト
async with asyncio.timeout(5):
    response = await client.get(url)

外部 API が応答しないときに リクエストが無限に待たされる のを防ぎます。中級 #7 で見た道具。

httpx 自体にもタイムアウトオプション:

httpx timeout
async with httpx.AsyncClient(timeout=10.0) as client:
    ...

ライフサイクルイベント — lifespan #

サーバー起動 / 終了時にリソースを準備 / 整理。

lifespan
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 起動時
    print("サーバー起動")
    redis = await create_redis_pool()
    app.state.redis = redis

    yield

    # 終了時
    print("サーバー終了")
    await redis.close()

app = FastAPI(lifespan=lifespan)

中級 #3@asynccontextmanager パターンそのまま。古い @app.on_event("startup") デコレータは deprecated。

よく出会う落とし穴 #

1) BackgroundTasks の例外無視 #

🚫 無視される例外
def task_that_might_fail():
    raise SomethingWrong()    # レスポンスには影響しないが、ログだけ残る

BackgroundTasks は 例外をレスポンスに反映しません。 作業の失敗がユーザーに見えません。重要な作業は 外部キュー + 再試行 が安全。

2) 依存性で受け取った DB セッションをバックグラウンドへ渡す #

🚫 セッションリーク
@router.post("/")
async def create(db: DBSession, bg: BackgroundTasks):
    bg.add_task(use_db_later, db)   # ← 危険
    return ...

ルートが終わると依存性 (DB セッション) がクローズされます。バックグラウンドでそれを使うと 既にクローズ済みのセッション 事故。バックグラウンドの作業は セッションを新しく作って 使ってください。

async def use_db_later(user_id: int):
    async with AsyncSessionLocal() as db:
        ...

bg.add_task(use_db_later, user.id)

3) def ルートで非同期ライブラリを使う #

🚫
@router.get("/")
def handler():
    asyncio.run(something_async())   # イベントループの入れ子 — エラー

def ルートの中で非同期コードの実行が必要なら ルート自体を async def に変えてください。

まとめ #

今回つかんだもの:

  • async def はイベントループ、def はスレッドプール — FastAPI が自動
  • async def の中で同期ブロッキング = イベントループ停止
  • 解決: 非同期ライブラリ / to_thread / def に変える
  • BackgroundTasks — レスポンス後に同じプロセスで後処理、速い仕事に向く
  • 外部キュー (ARQ/Celery/Dramatiq) — 再試行、追跡、別途スケールが必要なとき
  • ARQ ミニガイド — async フレンドリー、Redis のみ
  • StreamingResponse + async generator — 大きなレスポンス、SSE
  • WebSocket で双方向リアルタイム
  • Semaphore で並行性制限、asyncio.timeout でタイムアウト
  • lifespan で起動 / 終了のリソース管理 (@asynccontextmanager)
  • 落とし穴: バックグラウンドの例外無視、クローズ済みセッションの使用、def-async の混乱

次回(#6 テストとデプロイ)がシリーズの最後です。pytest + httpx で統合テスト、依存性オーバーライドで外部依存を分離、そして Docker とクラウドデプロイ までを一カ所に整理します。

X