LLM アプリ運用 #4 バッチ処理 — 急がない仕事は半額で

読了 5分

第3回までの節約は、リアルタイムのリクエストの中での話でした。今回は問いを変えます。このリクエスト、本当に今すぐ答えが必要でしょうか。夜間に溜まった文書の分類、週次レポートの生成、評価セットの採点のような作業は、結果が数時間後に出ても何の問題もありません。Batches API はそうした作業を非同期で引き受ける代わりに、入力と出力のすべてのトークンを 50% 割引してくれます。条件なしの半額なので、該当する作業があれば最も手軽な節約です。

どんな作業がバッチ向きか #

基準はひとつ、遅延要件です。人が画面の前で待っているならリアルタイム、そうでなければバッチ候補です。

リアルタイムに残すものバッチに送るもの
チャットボットの応答、エージェント作業溜まった文書の分類・要約・埋め込み用前処理
ユーザーが押した「要約」ボタン夜間の一括レポート、週次ダイジェスト
検索時点のクエリ変換RAG 上級講座 第6回の評価セットの定期採点
新規データの事前ラベリング・タグ付け

バッチの契約条件も併せて押さえておきます。1バッチに最大 10 万件、ほとんどは 1 時間以内に終わりますが、保証は 24 時間以内で、結果は 29 日間保管されます。「ほとんどは速いが最悪は 24 時間」という幅があるため、締め切りがタイトな作業はバッチ向きではありません。

提出して、待って、回収する #

バッチの単位は、個々のリクエストに custom_id を付けた束です。流れは提出 → ポーリング → 結果回収の 3 段階です。

submit_batch.py
from anthropic.types.message_create_params import MessageCreateParamsNonStreaming
from anthropic.types.messages.batch_create_params import Request

batch = client.messages.batches.create(
    requests=[
        Request(
            custom_id=f"doc-{doc.id}",          # 結果を取り戻すための鍵
            params=MessageCreateParamsNonStreaming(
                model="claude-haiku-4-5",        # 第2回のルーティングはここでも適用される
                max_tokens=256,
                system=CLASSIFY_SYSTEM,
                messages=[{"role": "user", "content": doc.text}],
            ),
        )
        for doc in pending_docs
    ]
)
print(batch.id, batch.processing_status)
collect_batch.py
import time

while True:
    b = client.messages.batches.retrieve(batch.id)
    if b.processing_status == "ended":
        break
    time.sleep(60)

for result in client.messages.batches.results(batch.id):
    if result.result.type == "succeeded":
        msg = result.result.message
        text = next((blk.text for blk in msg.content if blk.type == "text"), "")
        save_classification(result.custom_id, text)    # custom_id で元データと結び付ける
    elif result.result.type == "errored":
        retry_later(result.custom_id)                   # サーバーエラーは再提出の対象

運用ポイントは三つです。

  • custom_id が生命線です。結果は提出順とは無関係に返ってきます。元データと結ぶ唯一の糸が custom_id なので、再実行しても安全に同じ ID になる規則(文書 ID ベースなど)で作ります。
  • 成否は件ごとに分かれます。バッチ全体が成功か失敗かではなく、結果には succeedederroredexpired が混ざって返ってきます。エラーの件だけ拾って次のバッチに再提出する流れを、最初から用意しておきます。
  • ほかの節約と重ねて適用されます。リクエストの params は通常の呼び出しと同じなので、第2回のモデルルーティングがそのまま適用され、同じ system プロンプトを使うリクエストが集まれば第3回のキャッシュも一緒に効きます。haiku とバッチの組み合わせなら、opus のリアルタイム比で 10 分の 1 以下の単価になります。

運用パターン — キューを間に挟む #

バッチを一回きりのスクリプトではなく常時運用にするときの標準形はキューです。

バッチパイプライン
イベント発生 → ジョブキュー(テーブル)に積む
                └─ (cron、例: 毎時) 溜まったものをまとめてバッチ提出
                       └─ (cron) 終わったバッチを回収して結果反映、失敗の件は再投入

アプリケーションは「分類して」という行をテーブルに入れるだけで、提出と回収は定期ジョブが受け持ちます。この構造の利点は自然な緩衝です。トラフィックが集中してもリアルタイム経路の制限(第5回のテーマ)に触れず、バッチサイズもまとめ方で調整できます。第1回の計測ラッパーと同じ形でバッチ結果の usage もログに残しておけば、機能別の費用ダッシュボードにバッチ作業も同じ基準で載ります。

リアルタイムをバッチに変える発想 #

すでにバッチである作業を移すだけでなく、プロダクト設計を変えることでバッチに送れるケースもあります。たとえば「アップロードしたら即座に要約」を「要約が準備できたら通知」に変えられるなら、その機能の費用は半額になり、トラフィックの急増にも強くなります。すべての機能で可能なわけではありませんが、第1回のログで費用上位の機能を見るとき「これは本当に同期でなければならないのか」と一度問うてみるのはタダです。

よくつまずくところ #

  • 締め切りのある作業をバッチにかける — 保証は 24 時間です。「朝 9 時までに必ず」という作業を前夜のバッチにかけると、ときどき約束を破ることになります。締め切りがあるなら余裕を持たせるか、リアルタイムに残します。
  • 結果の回収を忘れる — 提出はしたのに回収ジョブが止まっていると、費用だけ払って結果は 29 日後に消えます。回収 cron が動いているかどうかも監視対象です。
  • custom_id を連番にする — 再実行すると違う番号が付く ID では、結果を元データと結び付けられなくなります。元データから決定的に導かれる ID を使います。

まとめ #

今回は、非リアルタイムの作業を半額の経路に移しました。

  • 基準は遅延要件です。人が待っていない作業はバッチ候補で、すべてのトークンが 50% 割引になります。
  • custom_id の設計、件ごとの成否処理、回収の自動化がバッチ運用の三本柱です。
  • ルーティング・キャッシュと重ねて適用され、キューを間に挟んだパイプラインが常時運用の標準形です。

費用 3 部作(ルーティング、キャッシュ、バッチ処理)が終わりました。次のテーマはお金ではなく、止まらないことです。次回の「LLM アプリ運用 #5 信頼性 — レートリミット・リトライ・フォールバック」で、制限と障害の前で持ちこたえる構造を作ります。

X