LLM アプリ開発 #3 ストリーミングで応答をリアルタイム出力

読了 6分

第2回までの呼び出しは、応答が全部できあがるまで待ってから一度に受け取っていました。短い答えならよいのですが、長い答えは数秒かかるあいだ画面が止まったままになります。今回は、生成されるトークンをそのつど受け取り、画面に流し込むストリーミングを扱います。

なぜストリーミングか #

messages.create は、Claude が答えを全部完成させてから結果を返します。答えが長いほどそれだけ待つことになり、そのあいだユーザーは空の画面を見ています。

ストリーミングは違います。Claude がトークンを一つずつ生成するそばから受け取り、すぐに画面へ出力します。ChatGPT や Claude の Web で文字がするすると現れる、あの方式です。答えを最後まで作るのにかかる全体の時間は同じくらいですが、最初の文字がはるかに早く見えるため、体感の遅延が大きく減ります。ユーザーから見れば、「止まっている」ではなく「今答えている」と感じられます。

基本のストリーミング #

ストリーミングは messages.create の代わりに messages.streamwith 文と一緒に使います。

streaming.py
import anthropic

client = anthropic.Anthropic()

with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=2048,
    messages=[
        {"role": "user", "content": "宇宙についての短い話を聞かせてください。"}
    ],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

stream.text_stream は、生成されるテキストの断片を順に渡してくれます。これをループで受け取り、すぐに出力します。printend="" を与えて断片ごとに改行が入らないようにし、flush=True でバッファにためずにすぐ画面へ出します。すると文字が流れるように現れます。

受け取る答えそのものは messages.create と同じです。違いは受け取り方だけです。create は完成してから一度に、stream は作られるそばから少しずつ受け取ります。

終わったあとに完成した応答を受け取る #

ストリーミング中はテキストの断片だけが流れてきます。ところが答えが終わったあとに、たとえば使ったトークン数のような完成したメッセージ全体が必要なときがあります。そんなときは get_final_message を使います。

streaming_final.py
with client.messages.stream(
    model="claude-sonnet-4-6",
    max_tokens=2048,
    messages=[
        {"role": "user", "content": "宇宙についての短い話を聞かせてください。"}
    ],
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

    final = stream.get_final_message()
    print(f"\n\n使った出力トークン: {final.usage.output_tokens}")

final は第1回と第2回で見た response と同じ構造です。content ブロックのリストや usage のような情報をそのまま持っています。画面にはリアルタイムで流しつつ、終わったあとには全体をもう一度手にできます。

注記
より細かく扱いたいときは、text_stream の代わりに stream を直接たどってイベントの種類(event.type)を確認する方法もあります。テキストブロックがどこで始まるか、推論ブロックかどうか、といったものを区別できます。ただしこの細かい制御が必要になるのはツール呼び出し(#6)やエージェント(#10)の段階で、今は text_stream だけで十分です。

マルチターン会話にストリーミングを適用する #

第2回で見たマルチターン会話にストリーミングを乗せると、実際のチャットボットに近い形になります。やり取りしたメッセージをリストに積みながら、答えはリアルタイムで流します。

streaming_chat.py
import anthropic

client = anthropic.Anthropic()
messages = []

def chat(user_input: str) -> None:
    messages.append({"role": "user", "content": user_input})

    answer = ""
    with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=messages,
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
            answer += text
    print()

    messages.append({"role": "assistant", "content": answer})

chat("Python で星の三角形を描くコードを見せてください。")
chat("今のコードを関数にまとめてください。")

画面には文字がリアルタイムで流れ、同時に answer に答え全体をためておきます。ストリーミングが終わったら、その答えを assistant メッセージとしてリストに足します。こうすることで、次のターンで Claude が直前の答えを覚えています。第2回の積み上げパターンとまったく同じで、受け取り方がストリーミングに変わっただけです。

非同期でストリーミングする #

Web サーバーのように多くのリクエストを同時に処理する環境では、非同期の方式がより合います。SDK は AsyncAnthropic クライアントを提供しており、使い方は同期版とほとんど同じです。withasync with に、ループが async for に変わるだけです。

async_streaming.py
import asyncio
import anthropic

client = anthropic.AsyncAnthropic()

async def main():
    async with client.messages.stream(
        model="claude-sonnet-4-6",
        max_tokens=2048,
        messages=[
            {"role": "user", "content": "宇宙についての短い話を聞かせてください。"}
        ],
    ) as stream:
        async for text in stream.text_stream:
            print(text, end="", flush=True)

asyncio.run(main())

骨組みは同期版とまったく同じです。同時接続が多い Web サービスなら、最初から非同期で書いておくほうが後で楽です。

Web の画面まで流し込む #

ここまではサーバーでコンソールに出力していました。ユーザーのブラウザまでリアルタイムで流し込むには、サーバーからクライアントへ送るストリーミング応答に text_stream をつなぎます。FastAPI なら StreamingResponse を使います。

fastapi_streaming.py
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import anthropic

app = FastAPI()
client = anthropic.AsyncAnthropic()

@app.get("/chat")
async def chat(q: str):
    async def generate():
        async with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=2048,
            messages=[{"role": "user", "content": q}],
        ) as stream:
            async for text in stream.text_stream:
                yield text

    return StreamingResponse(generate(), media_type="text/plain")

generate がトークンを yield するたびに、その断片がブラウザへ流れます。実際のサービスではふつう SSE(Server-Sent Events)の形式で包みますが、骨組みはこのとおり単純です。フロントエンドでこの応答を断片ごとに受け取り、画面につなぎ合わせていきます。

よくつまずくところ #

  • with なしで使うmessages.stream はコンテキストマネージャです。with 文と一緒に使ってこそ、ストリームが終わったあとに正しく後始末されます。
  • 文字がリアルタイムで見えないflush=True を抜くと、出力がターミナルのバッファにたまって一度に出ることがあります。断片がまとまって出るなら、まず flush を確認します。
  • 非同期クライアントを同期コードで使うAsyncAnthropicasync/await の文脈でしか動きません。ふつうのスクリプトなら同期の Anthropic を使うか、上の例のように asyncio.run で包んだ async 関数の中で呼び出します。

まとめ #

今回は応答をリアルタイムで受け取って出力するストリーミングを扱いました。

  • ストリーミングは生成されるトークンをそのつど受け取り、最初の文字までの体感の遅延を減らします。
  • messages.streamwith と一緒に使い、text_stream を繰り返します。
  • マルチターン会話に乗せるときは、流しながらためた答えを assistant メッセージとして足し戻します。
  • 非同期の環境では AsyncAnthropicasync withasync for を使います。
  • 終わったあとに完成した応答全体が必要なら get_final_message を使います。
  • Web なら StreamingResponse のようなストリーミング応答に text_stream をつなぎます。

次回の「LLM アプリ開発 #4 プロンプトエンジニアリングの実務」では、同じ質問でも尋ね方によって答えの品質がどう変わるか、そして望む結果を安定して引き出すプロンプトの書き方を扱います。

X