目次
31 章

logging と観測性

標準 logging の落とし穴、structlog などの構造化ロギング、OpenTelemetry での分散トレース、Sentry でのエラートラッキングまで。

第30章 でコードの静的な正確性を自動化しました。本章はさらに一歩進んで、運用中に何が起きているかを見えるようにする ツールを扱います — 標準 logging を安全に使う方法、構造化ロギング、OpenTelemetry での分散トレース、Sentry でのエラートラッキングまで。

「見えないものは直せない」が本章の一行モチベーションです。第28章 テストとデプロイ の次の段階が本章です。

print ではなく logging を使うべき理由 #

小さなスクリプトでは print で十分です。しかし運用に乗せると次のものが必要になります。

  • レベル — すべてのメッセージが同じ重要度ではありません。
  • タイムスタンプと位置 — いつどこで発生したか。
  • 送り先の分岐 — stdout / ファイル / 外部システム(例: Datadog、CloudWatch)。
  • フォーマット統一 — 人が読むか、機械がパースするか。
  • オンオフ — 運用では DEBUG を切り、デバッグ時は付ける。

標準の logging モジュールがこれらすべてを提供します。ただしメンタルモデルが複雑で、落とし穴も多いです。

標準 logging のメンタルモデル #

logging は 4 つのオブジェクトで構成されます。

オブジェクト役割
Logger呼び出し地点。logger.info(...) のあの logger
Handler出力先。StreamHandler / FileHandler / RotatingFileHandler
Formatterメッセージの形。時刻・レベル・メッセージを 1 行に
Filter条件付き通過。特定モジュールだけ、特定レベルだけ

流れはこうです。

logger.info("hello") を呼び出す
  → Logger が自分のハンドラ群に LogRecord を渡す
  → 各 Handler が自分の Formatter でフォーマット
  → Handler が最終出力 (stdout / ファイル / ネットワーク)

初めての使用 #

log_hello.py
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

handler = logging.StreamHandler()
handler.setFormatter(
    logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")
)
logger.addHandler(handler)

logger.info("hello")
logger.warning("something off")
2026-05-17 12:00:00,123 [INFO] __main__: hello
2026-05-17 12:00:00,124 [WARNING] __main__: something off

落とし穴 1 — propagate と重複出力 #

logger は 階層構造 です。app.services.userapp.servicesapp の子で、メッセージを親にも伝播(propagate)します。

# main.py
import logging
logging.basicConfig(level=logging.INFO)

logger = logging.getLogger("app.services.user")
logger.addHandler(logging.StreamHandler())  # ハンドラ追加

logger.info("hello")
INFO:app.services.user:hello   ← root logger が出力
hello                          ← 自分で追加した handler が出力

同じメッセージが 2 回出ます。親 root logger のハンドラと自分のハンドラの両方が出力したからです。

解決: 子 logger の propagate = False にするか、ハンドラを一箇所(root)だけに置きます。

logger.propagate = False

原則: アプリケーションコードでは logger = logging.getLogger(__name__) だけを呼び、ハンドラ設定は アプリ起動地点の一箇所だけ で行います。

落とし穴 2 — % フォーマットのコスト #

logger.info("user %s logged in", user_id)logger.info(f"user {user_id} logged in") には違いがあります。

前者は ロギングレベルが INFO 以上のときだけフォーマットが実行 されます。後者は呼び出し時点で常に f-string が評価されます。DEBUG メッセージに重いオブジェクトを入れると、運用(DEBUG オフ)でもコストを払い続けることになります。

# 推奨
logger.debug("user info: %s", expensive_serialize(user))

# 非推奨 (DEBUG オフでも expensive_serialize が呼ばれる)
logger.debug(f"user info: {expensive_serialize(user)}")

落とし穴 3 — 例外とトレースバック #

例外をロギングするときは logger.exception または exc_info=True

try:
    do_something()
except Exception:
    logger.exception("do_something failed")  # traceback が自動で含まれる

logger.error(str(e)) はトレースバックを失ってしまいます。

構造化ロギング — JSON 出力の価値 #

人が一行ずつ読むログは、検索・集計・アラートが困難です。JSON で出力 すれば、ログ収集器(Datadog、Loki、CloudWatch Logs Insights)がフィールド単位でインデックスを作り、クエリできます。

{"ts": "2026-05-17T12:00:00Z", "level": "info", "logger": "app.services.user", "msg": "user logged in", "user_id": 42, "ip": "1.2.3.4"}

これを標準 logging で直接作るには Formatter をカスタマイズする必要があり、煩雑です。structlog が標準解答です。

structlog 設定 #

uv add structlog
app/core/logging.py
import logging
import sys

import structlog


def configure_logging(level: str = "INFO") -> None:
    """アプリ起動時に一度呼ぶ。"""
    timestamper = structlog.processors.TimeStamper(fmt="iso")

    shared_processors = [
        structlog.contextvars.merge_contextvars,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        timestamper,
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
    ]

    structlog.configure(
        processors=[
            *shared_processors,
            structlog.processors.JSONRenderer(),
        ],
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, level)
        ),
        logger_factory=structlog.PrintLoggerFactory(file=sys.stdout),
        cache_logger_on_first_use=True,
    )

使い方 #

app/services/user.py
import structlog

logger = structlog.get_logger(__name__)


async def login(user_id: int, ip: str) -> None:
    logger.info("user logged in", user_id=user_id, ip=ip)

出力:

{"event": "user logged in", "user_id": 42, "ip": "1.2.3.4", "level": "info", "logger": "app.services.user", "timestamp": "2026-05-17T12:00:00.123Z"}

核心: メッセージは イベント名(変わらない短いラベル)で、変動データは kwargs に分離します。f"user {id} logged in" のように変数を本文に埋め込むと、同じイベントのインスタンスを集計できません。

ログレベル — 実戦基準 #

レベルいつ
DEBUG開発中のみ見る。運用では切る。変数 dump、トレース情報
INFO正常な流れの中の意味あるイベント。ログイン、決済、ジョブ完了
WARNING異常だが自動回復。リトライ成功、外部 API 遅延、代替経路使用
ERROR失敗。処理中のリクエスト/ジョブが失敗。アラート対象
CRITICALシステム全体が危険。DB ダウン、ディスクフル。即時オンコール対象

基準線: 運用は INFO から有効にし、ERROR からアラートを受けます。WARNING はまとめて見てパターンを観察します。

correlation ID — リクエスト単位の追跡 #

一人のユーザーの一リクエストが複数のモジュール・複数のサービスを経由します。すべてのログに同じ ID を付ければ、そのリクエストに何が起きたかをまとめて見られます。

structlog の contextvars がこれを解決します。

app/api/middleware.py
import uuid

import structlog
from starlette.middleware.base import BaseHTTPMiddleware


class RequestIdMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        request_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
        structlog.contextvars.clear_contextvars()
        structlog.contextvars.bind_contextvars(request_id=request_id)
        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id
        return response

これで同じリクエスト処理中に呼ばれたすべての logger.info(...) 出力に request_id が自動で含まれます。

app/main.py
from fastapi import FastAPI

from app.api.middleware import RequestIdMiddleware
from app.core.logging import configure_logging

configure_logging()
app = FastAPI()
app.add_middleware(RequestIdMiddleware)

OpenTelemetry — 分散トレースの標準 #

複数のサービスが一つのリクエストを処理する(例: web → API → DB → 外部 API)とき、各段階がどれだけかかったかどこで失敗したか を見るには trace が必要です。

OpenTelemetry(以下 OTel)がこの分野の標準です。一度 instrument すれば Jaeger、Tempo、Datadog、Honeycomb など、どこにでも送れます。

メンタルモデル #

  • Trace — 一リクエストの全体の流れ。固有 ID。
  • Span — Trace 内の一つの単位作業。開始時刻、終了時刻、属性を持ちます。
  • Span の階層 — 親子関係で呼び出しツリーを形成。
  • Metric — 時間に沿った数値(リクエスト数、レイテンシのパーセンタイル、CPU)。
  • Log — 上で見たあのログ。trace と紐づくことで価値が大きい。

FastAPI 自動 instrumentation #

uv add opentelemetry-distro opentelemetry-exporter-otlp \
       opentelemetry-instrumentation-fastapi \
       opentelemetry-instrumentation-sqlalchemy \
       opentelemetry-instrumentation-httpx
app/core/telemetry.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
    OTLPSpanExporter,
)
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor


def configure_telemetry(service_name: str, app, engine) -> None:
    resource = Resource.create({"service.name": service_name})
    provider = TracerProvider(resource=resource)
    provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
    trace.set_tracer_provider(provider)

    FastAPIInstrumentor.instrument_app(app)
    SQLAlchemyInstrumentor().instrument(engine=engine)
    HTTPXClientInstrumentor().instrument()

OTEL_EXPORTER_OTLP_ENDPOINT 環境変数で収集器のアドレスを指定します(http://localhost:4317 のような形)。ローカルで Jaeger を立ててみるのが出発点です。

docker run --rm -p 16686:16686 -p 4317:4317 jaegertracing/all-in-one

ブラウザで http://localhost:16686 から trace を見ます。

手動 span #

自動 instrumentation が捕まえられないビジネスロジックは、自分で包みます。

from opentelemetry import trace

tracer = trace.get_tracer(__name__)


async def calculate_invoice(user_id: int) -> Invoice:
    with tracer.start_as_current_span("calculate_invoice") as span:
        span.set_attribute("user.id", user_id)
        # ... 計算
        span.set_attribute("invoice.total", total)
        return invoice

Sentry — エラートラッキング #

OTel が「全体の流れと性能」だとすれば、Sentry は 「発生したエラー」そのものに深く集中 します。

  • 同じエラーを自動でグルーピング(スタックトレースのシグネチャベース)。
  • 頻度、初回発生、影響を受けたユーザー数を自動集計。
  • リリース単位の追跡 — 「このエラーは v1.2.3 で新しく入った」。
  • breadcrumb — エラー直前に何が起きていたか。

インストール #

uv add sentry-sdk
app/core/sentry.py
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration


def configure_sentry(dsn: str, environment: str, release: str) -> None:
    sentry_sdk.init(
        dsn=dsn,
        environment=environment,
        release=release,
        traces_sample_rate=0.1,         # 10% だけ trace
        profiles_sample_rate=0.1,       # 10% だけ profile
        integrations=[
            FastApiIntegration(),
            SqlalchemyIntegration(),
        ],
        send_default_pii=False,         # デフォルト PII を送信しない
    )

release を Git SHA やバージョンタグで渡すと「このコミット以降の新エラー」を自動で見せてくれます。CI の環境変数から受け取るのが一般的です。

export SENTRY_RELEASE=$(git rev-parse HEAD)

絶対にロギングしてはいけないもの #

これはセキュリティ・法的責任に直結する領域なので、自動化されたブロック が必須です。

PII(個人識別情報) #

  • メールアドレス全体、住民番号、電話番号、カード番号
  • 住所、生年月日、位置座標
  • 写真 / 音声 / 映像の原本

内部 ID(DB の primary key)は通常 PII ではないので大丈夫です。外部識別子(メールアドレスなど)が危険です。

Secret #

  • API トークン、パスワード(ハッシュであっても)
  • TLS の秘密鍵
  • データベース接続文字列(パスワード含む)
  • AWS access key

大きなペイロード #

リクエスト/レスポンス本文全体をロギングすると、2 つの問題が同時に発生します。

  • ログコストが急増。
  • PII / secret が本文に混ざっていれば一緒に漏れる。

自動ブロックのパターン #

構造化ロギングの利点がここで生きます。機微キーを自動マスキング する processor を登録します。

app/core/logging.py 拡張
SENSITIVE_KEYS = {"password", "token", "authorization", "secret", "card_number"}


def mask_sensitive(logger, method_name, event_dict):
    for key in list(event_dict.keys()):
        if key.lower() in SENSITIVE_KEYS:
            event_dict[key] = "***"
    return event_dict
structlog.configure(
    processors=[
        *shared_processors,
        mask_sensitive,
        structlog.processors.JSONRenderer(),
    ],
    ...
)

これで logger.info("login", password="hunter2") のようなミスがマスキングされて出力されます。

ローカル開発 vs 運用の設定差 #

項目ローカル運用
出力フォーマット人が読みやすいカラーテキストJSON
レベルDEBUGINFO(または WARNING)
出力先stdoutstdout(コンテナログ → 収集器)
Sentrydev DSN または無効運用 DSN、release タグ
OTel sample rate100%1〜10%

ENV 環境変数で分岐します。

app/core/logging.py
import os

def configure_logging() -> None:
    env = os.getenv("ENV", "local")
    renderer = (
        structlog.dev.ConsoleRenderer(colors=True)
        if env == "local"
        else structlog.processors.JSONRenderer()
    )
    level = "DEBUG" if env == "local" else "INFO"
    # ... structlog.configure(processors=[..., renderer], ...)

第29章の総合実習に適用する #

第29章 総合実習 — TODO API を完成させる のヘルスチェックと main に本章のツールを統合します。

app/main.py 拡張
from contextlib import asynccontextmanager
from fastapi import FastAPI

from app.core.config import settings
from app.core.logging import configure_logging
from app.core.telemetry import configure_telemetry
from app.core.sentry import configure_sentry
from app.api.middleware import RequestIdMiddleware
from app.db import engine


@asynccontextmanager
async def lifespan(app: FastAPI):
    configure_logging()
    if settings.sentry_dsn:
        configure_sentry(settings.sentry_dsn, settings.env, settings.release)
    yield


app = FastAPI(lifespan=lifespan)
app.add_middleware(RequestIdMiddleware)

if settings.otel_enabled:
    configure_telemetry("todo-api", app, engine.sync_engine)

これですべてのリクエストに request_id が付き、エラーは Sentry に、trace は Jaeger / Tempo に自動で流れていきます。

練習問題 #

  1. structlog マスキングの強化 — 本章の mask_sensitive は最上位のキーだけを検査します。user = {"email": "...", "password": "..."} のようなネストした dict のキーもマスキングするにはどう再帰処理しますか? そしてその処理が大きな dict で性能問題になりそうな閾値はどこだと考えますか?
  2. OTel 自動 instrumentation の限界 — FastAPI 自動 instrumentation はルート単位の span を作りますが、ルート内部のビジネスロジックは捕まえません。自身のプロジェクトで最も遅いルートを 1 つ選び、手動 span を 3〜4 箇所追加して、Jaeger でその span の時間比重を確認してみてください。どの段階が想定より長くかかっていましたか?
  3. Sentry release と回帰 — Sentry に release を登録すると「この release で新しく登場したエラー」を見せてくれます。GitHub Actions の release job に SENTRY_RELEASE=$(git rev-parse HEAD) を渡し、わざと一つのバージョンだけに入るエラーを作ってみてください。Sentry がそのエラーを「new in release X」と正確に分類しますか?
注記
一行まとめ — print ではなく structlog で構造化 JSON ログを出し、request_id で一リクエストを束ね、OpenTelemetry で流れと性能を、Sentry でエラーを見ます。そして PII・secret・大きなペイロードはマスキング processor で自動ブロックします。

次の章は uv でのライブラリ公開 です。運用ツールを整えたので、今度は作ったコードを他の人が使えるよう PyPI にリリースする流れを扱います。

X