AWS上級 #7 Step Functions 入門

#3 Lambda#4 API Gateway#5 EventBridge / SQS / SNS#6 Secrets Manager まで一つの軸は関数 / メッセージ / シークレットでした。一段超えて — 複数段階の関数呼び出し / 分岐 / 並列を束ねて 1 つのワークフロー にする方式が残っています。

伝統的にこういうコードは 1 つの Lambda の中に try / except と if で書きました。でも段階が 5 個を超えると可視性もデバッグも大変になります。Step Functions がその役目を担います。

この記事が AWS 上級シリーズの最後。終わったら 実践 6 編 で — 本物のバックエンドを ECS Fargate で運用します。

Step Functions がすること #

AWS Step Functions はマネージドワークフロー (state machine) エンジン。複数段階を JSON で定義すると、AWS が段階の進行 / 再試行 / 失敗処理 / 可視化 を担当します。

1 つの Lambda がすべてをやっていた方式 #

モノリシック Lambda — 5 段階処理
def handler(event, context):
    user = fetch_user(event["userId"])
    if user.plan == "pro":
        send_pro_email(user)
    else:
        send_basic_email(user)

    try:
        run_billing(user)
    except RateLimitError:
        time.sleep(60)
        run_billing(user)

    notify_slack(user)
    update_dashboard(user)

問題:

  • 1 つの Lambda の 15 分上限の上で運用される
  • 1 段階が失敗するとどこで失敗したかログを彷徨う
  • 段階別 retry ポリシーがコードの中に刻まれている
  • 同じ流れの別バリエーションを追加すると if が暴走
  • 運用者が「今どこまで来てる?」を知るのが困難

Step Functions が解く問題 #

Step Functions の絵
入力
┌──────────────────┐
│ FetchUser        │  Lambda 呼び出し
└──────┬───────────┘
       ├ "pro" → SendProEmail
       └ "basic" → SendBasicEmail
┌──────────────────┐
│ RunBilling       │  retry: 3 回、backoff 60s
└──────┬───────────┘
┌─ Parallel ───────────────┐
│ NotifySlack │ UpdateDash │
└─────────────┴────────────┘
完了

各段階が可視化され、実行ごとにコンソールでどこで止まったかが一目でわかります。retry は宣言的、分岐はデータベース。

Standard vs Express #

Step Functions の 2 モード。

StandardExpress
実行時間最大 1 年最大 5 分
価格モデル段階遷移 (state transition) 当たり呼び出し + メモリ + 時間 (Lambda と類似)
実行履歴90 日保管、可視化短い、CloudWatch Logs のみ
処理量〜25,000 / 秒〜100,000 / 秒
At-least-once vs Exactly-onceExactly-onceAt-least-once
適した場合人間が追跡すべきビジネスワークフロー (注文、返金)短い / 高処理量 (イベント処理、データ変換)

最初は Standard。短い処理 + 頻繁な呼び出しが明確になれば Express。

Amazon States Language (ASL) #

ワークフローは JSON で定義。ASL と呼びます。

hello.asl.json
{
  "Comment": "最初の state machine",
  "StartAt": "SayHello",
  "States": {
    "SayHello": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-2:123456789012:function:hello-fn",
      "End": true
    }
  }
}

StartAt で始まり、States の中の各状態 / 段階を定義します。各状態は Type と次の状態 (Next または End: true) を持ちます。

作る + 実行 #

state machine 作成
SM_ARN=$(aws stepfunctions create-state-machine \
  --name hello-flow \
  --definition file://hello.asl.json \
  --role-arn arn:aws:iam::123456789012:role/stepfn-role \
  --type STANDARD \
  --query stateMachineArn --output text)

# 実行
aws stepfunctions start-execution \
  --state-machine-arn $SM_ARN \
  --input '{"name":"world"}'

コンソールの可視化 — ノードがグラフで描かれ、実行ごとにノードに色が塗られます。

4 つの核心状態 #

1) Task — 実際の作業 #

最もよく使う状態。Lambda / ECS Task / SDK / SNS / SQS 等の外部リソースを呼び出し。

Task — Lambda 呼び出し
{
  "Type": "Task",
  "Resource": "arn:aws:lambda:ap-northeast-2:123456789012:function:fetch-user",
  "InputPath": "$",
  "OutputPath": "$",
  "ResultPath": "$.user",
  "Next": "BranchByPlan"
}
  • InputPath: 入ってくるデータのどの部分を関数に送るか
  • OutputPath: 次の状態に渡す部分
  • ResultPath: 関数の結果を入力データのどの位置に合わせるか

Service Integration — 直接統合 #

Lambda 以外にも AWS SDK を直接 呼び出し。Lambda の中で boto3 で呼び出していた処理を ASL ですぐに:

DynamoDB に直接 put
{
  "Type": "Task",
  "Resource": "arn:aws:states:::aws-sdk:dynamodb:putItem",
  "Parameters": {
    "TableName": "users",
    "Item": {
      "id": {"S.$": "$.user.id"},
      "name": {"S.$": "$.user.name"}
    }
  },
  "Next": "Done"
}

Lambda が 1 つ減る — コード / デプロイ / コールドスタートが消えます。

Optimized Integration #

よく使われるパターンは短縮 ARN — .sync が最後の結果まで待つ:

ECS Task 同期実行
{
  "Type": "Task",
  "Resource": "arn:aws:states:::ecs:runTask.sync",
  "Parameters": {
    "Cluster": "prod-cluster",
    "TaskDefinition": "myapp:42",
    "LaunchType": "FARGATE",
    ...
  },
  "Next": "AfterEcs"
}

ECS Task が終わるまで (もしくは失敗するまで) 待機。Step Functions はポーリングを自動。

2) Choice — 分岐 #

データ値を見て次の状態を決定。

Choice
{
  "Type": "Choice",
  "Choices": [
    {
      "Variable": "$.user.plan",
      "StringEquals": "pro",
      "Next": "SendProEmail"
    },
    {
      "Variable": "$.user.plan",
      "StringEquals": "basic",
      "Next": "SendBasicEmail"
    }
  ],
  "Default": "SendDefaultEmail"
}

3) Parallel — 並列ブランチ #

複数のブランチを同時に実行し結果をすべて受け取って合わせる。

Parallel
{
  "Type": "Parallel",
  "Branches": [
    {
      "StartAt": "NotifySlack",
      "States": {
        "NotifySlack": { "Type": "Task", "Resource": "...", "End": true }
      }
    },
    {
      "StartAt": "UpdateDashboard",
      "States": {
        "UpdateDashboard": { "Type": "Task", "Resource": "...", "End": true }
      }
    }
  ],
  "Next": "Done"
}

各ブランチが独立で実行 + retry。1 つのブランチが失敗 (catch しなければ) すると全体が失敗。

4) Map — コレクション処理 #

配列の各アイテムに対して同じ流れを繰り返す。for-each の分散版。

Map
{
  "Type": "Map",
  "ItemsPath": "$.orders",
  "MaxConcurrency": 10,
  "ItemProcessor": {
    "ProcessorConfig": { "Mode": "INLINE" },
    "StartAt": "ProcessOrder",
    "States": {
      "ProcessOrder": { "Type": "Task", "Resource": "...", "End": true }
    }
  },
  "End": true
}

100 個の注文を同時 10 個ずつ処理、すべて終わると終了。Distributed Map モードは 1 万 〜 100 万アイテム規模でも可能 (S3 オブジェクト一括処理、ETL 等)。

補助状態 #

  • Pass — データ加工のみ、外部呼び出しなし
  • Wait — 一定時間 / 特定時刻まで待機
  • Succeed / Fail — 明示的な終了

エラー処理 — Retry / Catch #

ワークフローの価値の中で大きな部分です。段階ごとに宣言的に記述します。

Retry #

Retry — 3 回まで backoff で再試行
{
  "Type": "Task",
  "Resource": "arn:aws:lambda:...:run-billing",
  "Retry": [
    {
      "ErrorEquals": ["States.TaskFailed", "BillingRateLimitError"],
      "IntervalSeconds": 5,
      "MaxAttempts": 3,
      "BackoffRate": 2.0
    },
    {
      "ErrorEquals": ["States.Timeout"],
      "IntervalSeconds": 30,
      "MaxAttempts": 1
    }
  ],
  "Next": "AfterBilling"
}

States.TaskFailed は一般的な失敗、States.Timeout はタイムアウト、または Lambda が throw したユーザー定義エラー名。5 秒 → 10 秒 → 20 秒で backoff。

Catch #

Catch — 失敗時に別の流れ
{
  "Type": "Task",
  "Resource": "...",
  "Catch": [
    {
      "ErrorEquals": ["States.ALL"],
      "ResultPath": "$.error",
      "Next": "HandleFailure"
    }
  ],
  "Next": "AfterTask"
}

retry まですべて失敗すれば HandleFailure へ — 補償トランザクション (ロールバック) / 通知 / 人の介入キューへ。

よく使うパターン #

1) Saga — 補償トランザクション #

分散システムでトランザクションが効かないとき。各段階の正方向 + 失敗時の補償。

Saga
注文作成 → 決済 → 在庫減算 → 配送予約
   ↓        ↓        ↓        ↓ (失敗!)
   注文取消  決済返金  在庫戻し   X

各 Task に Catch を置き、失敗時に補償段階を逆順に実行。

2) Human-in-the-loop #

待機していて人が承認 / 拒否すれば進行:

Wait for callback
{
  "Type": "Task",
  "Resource": "arn:aws:states:::sns:publish.waitForTaskToken",
  "Parameters": {
    "TopicArn": "...",
    "Message": {
      "TaskToken.$": "$$.Task.Token",
      "OrderId.$": "$.orderId"
    }
  },
  "Next": "AfterApproval"
}

waitForTaskToken — トークンを外部 (メール、Slack ボット等) に送信し、誰かが SendTaskSuccess / SendTaskFailure API を呼び出すまで待機。最大 1 年。

3) Polling パターン #

長い外部作業の完了待ち:

StartJob → WaitState(30s) → CheckJob → Choice
                         (続行) WaitState に戻る
                         (完了) Next

4) Express ワークフロー — イベント処理 #

EventBridge / SQS がトリガー → 短い処理 (Lambda 1〜3 個) → 結果を DynamoDB / S3 に。

Express の速い処理量と短い時間上限が自然にマッチ。

Lambda と比較 — いつ何を #

Lambda 1 つで十分な場合 #

  • 段階 1〜2 個の短い処理
  • 可視化 / 人による追跡が不要
  • 非常に頻繁な呼び出し + 非常に短い (Step Functions の段階遷移コストが負担)

Step Functions がしっくりくる場合 #

  • 段階 3 個以上 + 分岐 / 再試行 / 並列
  • 失敗 / 進行状況を人が見る必要
  • 外部システムとの長い相互作用 (人の承認、外部 API)
  • ワークフロー自体がビジネス資産 (修正履歴の追跡)

一緒に使う方式 #

ほとんどは 2 つ一緒に使います。Step Functions が流れを統制し、各段階は Lambda / ECS / SDK 呼び出し。

よく出会う落とし穴 #

1) JSONPath タイポ #

"Variable": "$.user.plan" のドット / $ が 1 文字でも違えばマッチ 0。コンソールの input/output 検査機で 1 段階ずつ確認。

2) Lambda の出力が大きすぎる #

Step Functions の 1 つの状態の入力 / 出力ペイロード上限 256 KB。大きなデータは S3 に置いてキーだけ伝達。

良いパターン
{
  "s3Bucket": "myapp-pipeline",
  "s3Key": "jobs/abc123/input.json"
}

3) 各段階遷移毎に費用 #

Standard モードの段階遷移 1,000 回当たり $0.025。段階の多いワークフローの全体費用は段階数 × 呼び出し数。短い段階 (Pass) を多く置くと意外に大きい。

4) Lambda の cold start が段階毎 #

各 Task が別の Lambda なら Cold Start が段階毎。Express + Provisioned Concurrency、または 1 つの Lambda に複数段階を合わせる。

5) Retry の BackoffRate 暴走 #

MaxAttempts: 10BackoffRate: 3.0 なら 1 → 3 → 9 → 27 → 81 秒… ユーザーが待てない時間。合計が合理的か計算。

6) Catch が 1 度だけ実行 #

retry がすべて終わった後に catch が 1 度。catch の中でまた失敗すればワークフロー失敗。catch の中の task も retry オプションを検討。

7) 可視化に映らない外部呼び出し #

Lambda の中で boto3 で呼び出した部分は可視化 / トレースに引っかかりません。可能なら Service Integration で ASL の Task 状態に引き上げる。

まとめ #

今回つかんだもの:

  • Step Functions の役割 — 複数段階の関数 / SDK 呼び出しを JSON ワークフローに。可視化 / retry / 分岐 / 並列が宣言的
  • Standard vs Express — Standard は長く高価なビジネス / Express は短く高処理量のイベント
  • ASL — JSON で定義。StartAt + States + 各状態の Type / Next
  • 4 核心状態 — Task (作業) / Choice (分岐) / Parallel (並列) / Map (コレクション)
  • Service Integration — Lambda なしで SDK 直接。ECS .sync、DynamoDB、SNS 等
  • Retry / Catch — 段階毎に宣言的。backoff と catch の流れ
  • よく使うパターン — Saga (補償トランザクション)、Human-in-the-loop (waitForTaskToken)、Polling、Express イベント処理
  • Lambda vs Step Functions — 段階 1〜2 個 → Lambda 1 つ。3 個以上 + 分岐 / 再試行 / 可視化必要 → Step Functions
  • 落とし穴 — JSONPath タイポ、256KB payload (S3 回避)、段階遷移費用、Lambda 多段 cold start、BackoffRate 暴走、Catch 1 度、Service Integration を使わない外部呼び出し

シリーズを締めくくって #

#1 ECS / Fargate から 7 編 — コンテナ (ECS / ECR)、サーバーレス (Lambda / API Gateway)、メッセージ (EventBridge / SQS / SNS)、シークレット (Secrets Manager / Parameter Store)、ワークフロー (Step Functions) — AWS 運用の道具箱が集まりました。

基礎 7 編 の IAM / 費用 / セキュリティと 中級 7 編 の EC2 / VPC / S3 / RDS / ALB / CloudFront の上にこの 7 つを乗せれば、バックエンドを AWS に載せて運用するために必要なものの 90% が揃います。

次のシリーズ — AWS 実践 #

理論はすべて出ました。今度は 本物のバックエンドを 1 プロジェクトに する番です。

AWS 実践 #1 — ECS Fargate にバックエンドのデプロイ では モダン Python 実践 (FastAPI) / Django 実践 (DRF) で作った API を ECS Fargate の上に運用可能な形で載せます。RDS、ALB、ACM、Route 53、Secrets Manager が 1 つにまとまる 6 編トラックの始まりです。

X