Step Functions 入門
AWS のワークフローエンジンを整理します。State machine の役割、Task / Choice / Parallel / Map の4つの状態、Standard vs Express、Amazon States Language(ASL)、Lambda / ECS / SDK 統合、Retry / Catch のエラー処理、Saga · Human-in-the-loop のようなパターンまでを扱います。
第17章 Lambda 基礎、第18章 API Gateway + Lambda、第19章 EventBridge / SQS / SNS、第20章 Secrets Manager / Parameter Store まで、中心にあったのは関数 / メッセージ / シークレットでした。その上では、複数段階の関数呼び出し / 分岐 / 並列を1つのワークフローにまとめる方法が必要になります。
伝統的にこういうコードは1つの Lambda の中に try / except と if で書きました。ところが段階が5個を超えると可視性もデバッグも難しくなります。Step Functions がその方式を代わりに担います。
本章は3部 コンテナ · サーバーレスの最後です。終われば4部 第22章 ECS Fargate デプロイの骨格 に進み — コンソールで固めたメンタルモデルを Terraform コードに移し、本物のバックエンドを ECS Fargate 上に運用し始めます。
Step Functions がすること #
AWS Step Functions はマネージドなワークフロー (state machine) エンジンです。複数の段階を JSON で定義すれば、AWS が段階の進行 / 再試行 / 失敗処理 / 可視化を担います。
1つの Lambda がすべてをやっていた方式 #
def handler(event, context):
user = fetch_user(event["userId"])
if user.plan == "pro":
send_pro_email(user)
else:
send_basic_email(user)
try:
run_billing(user)
except RateLimitError:
time.sleep(60)
run_billing(user)
notify_slack(user)
update_dashboard(user)問題は次のとおりです。
- 1つの Lambda の 15分の上限の上で運用されます。
- ある段階が失敗するとどこで失敗したのかログをさまよいます。
- 段階ごとの retry ポリシーがコードの中に含まれています。
- 同じ流れの別のバリエーションを追加すると if が暴増します。
- 運用者が「今どこまで進んだ?」を知るのが難しいです。
Step Functions が解く問題 #
入力
│
▼
┌──────────────────┐
│ FetchUser │ Lambda 呼び出し
└──────┬───────────┘
├ "pro" → SendProEmail
└ "basic" → SendBasicEmail
│
▼
┌──────────────────┐
│ RunBilling │ retry: 3 回, backoff 60s
└──────┬───────────┘
▼
┌─ Parallel ───────────────┐
│ NotifySlack │ UpdateDash │
└─────────────┴────────────┘
│
▼
完了各段階が可視化され、実行ごとにコンソールでどこで止まったかがひと目で分かります。retry は宣言的で、分岐は入力データの内容で決まります。
Standard vs Express #
Step Functions の2つのモードです。
| Standard | Express | |
|---|---|---|
| 実行時間 | 最大 1 年 | 最大 5 分 |
| 価格モデル | 段階遷移 (state transition) あたり | 呼び出し + メモリ + 時間 (Lambda と似ている) |
| 実行履歴 | 90 日保管、可視化 | 短い、CloudWatch Logs のみ |
| 処理量 | ~25,000 / 秒 | ~100,000 / 秒 |
| At-least-once vs Exactly-once | Exactly-once | At-least-once |
| 適した場合 | 人間が追跡すべきビジネスワークフロー (注文、返金) | 短い / 高処理量 (イベント処理、データ変換) |
最初は Standard を使います。短い処理 + 頻繁な呼び出しが明確になれば Express に行きます。
Amazon States Language (ASL) #
ワークフローは JSON で定義します。ASL と呼びます。
{
"Comment": "最初の state machine",
"StartAt": "SayHello",
"States": {
"SayHello": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-2:123456789012:function:hello-fn",
"End": true
}
}
}StartAt で始まり、States の中の各状態 / 段階を定義します。各状態は Type と次の状態 (Next または End: true) を持ちます。
作る + 実行 #
SM_ARN=$(aws stepfunctions create-state-machine \
--name hello-flow \
--definition file://hello.asl.json \
--role-arn arn:aws:iam::123456789012:role/stepfn-role \
--type STANDARD \
--query stateMachineArn --output text)
# 実行
aws stepfunctions start-execution \
--state-machine-arn $SM_ARN \
--input '{"name":"world"}'コンソールの可視化でノードたちがグラフで描かれ、実行ごとにノードに色が付きます。
4つの核心の状態 #
1) Task — 実際の仕事 #
最もよく使う状態です。Lambda / ECS Task / SDK / SNS / SQS など外部の資源を呼び出します。
{
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-2:123456789012:function:fetch-user",
"InputPath": "$",
"OutputPath": "$",
"ResultPath": "$.user",
"Next": "BranchByPlan"
}InputPath: 入ってくるデータのどの部分を関数に送るかOutputPath: 次の状態に渡す部分ResultPath: 関数の結果を入力データのどの位置に合流させるか
Service Integration — 直接統合 #
Lambda 以外にも AWS SDK を直接呼び出します。Lambda の中で boto3 で呼び出していたことを ASL ですぐにやります。
{
"Type": "Task",
"Resource": "arn:aws:states:::aws-sdk:dynamodb:putItem",
"Parameters": {
"TableName": "users",
"Item": {
"id": {"S.$": "$.user.id"},
"name": {"S.$": "$.user.name"}
}
},
"Next": "Done"
}Lambda が1個減ります — コード / デプロイ / コールドスタートが消えます。
Optimized Integration #
よく使われるパターンは短縮 ARN です — .sync が終わりの結果まで待ちます。
{
"Type": "Task",
"Resource": "arn:aws:states:::ecs:runTask.sync",
"Parameters": {
"Cluster": "prod-cluster",
"TaskDefinition": "myapp:42",
"LaunchType": "FARGATE",
...
},
"Next": "AfterEcs"
}ECS Task が終わるまで (あるいは失敗するまで) 待機します。Step Functions がポーリングを自動でします (第15章 ECS と Fargate と連携します)。
2) Choice — 分岐 #
データの値を見て次の状態を決定します。
{
"Type": "Choice",
"Choices": [
{
"Variable": "$.user.plan",
"StringEquals": "pro",
"Next": "SendProEmail"
},
{
"Variable": "$.user.plan",
"StringEquals": "basic",
"Next": "SendBasicEmail"
}
],
"Default": "SendDefaultEmail"
}3) Parallel — 並列の枝 #
複数の枝を同時に実行し、結果をすべて受け取って合流します。
{
"Type": "Parallel",
"Branches": [
{
"StartAt": "NotifySlack",
"States": {
"NotifySlack": { "Type": "Task", "Resource": "...", "End": true }
}
},
{
"StartAt": "UpdateDashboard",
"States": {
"UpdateDashboard": { "Type": "Task", "Resource": "...", "End": true }
}
}
],
"Next": "Done"
}各枝が独立で実行され retry されます。ある枝が失敗すると (catch しなければ) 全体が失敗します。
4) Map — コレクション処理 #
配列の各アイテムに対して同じ流れを繰り返します。for-each の分散バージョンです。
{
"Type": "Map",
"ItemsPath": "$.orders",
"MaxConcurrency": 10,
"ItemProcessor": {
"ProcessorConfig": { "Mode": "INLINE" },
"StartAt": "ProcessOrder",
"States": {
"ProcessOrder": { "Type": "Task", "Resource": "...", "End": true }
}
},
"End": true
}100個の注文を同時 10個ずつ処理し、すべて終わったら終了します。Distributed Map モードは 1万 ~ 100万アイテム規模も可能です (S3 オブジェクトの一括処理、ETL など)。
補助の状態たち #
- Pass — データ加工のみ、外部呼び出しなし
- Wait — 一定時間 / 特定の時刻まで待機
- Succeed / Fail — 明示的な終了
エラー処理 — Retry / Catch #
ワークフローの価値のうち大きな部分です。段階ごとに宣言的に置きます。
Retry #
{
"Type": "Task",
"Resource": "arn:aws:lambda:...:run-billing",
"Retry": [
{
"ErrorEquals": ["States.TaskFailed", "BillingRateLimitError"],
"IntervalSeconds": 5,
"MaxAttempts": 3,
"BackoffRate": 2.0
},
{
"ErrorEquals": ["States.Timeout"],
"IntervalSeconds": 30,
"MaxAttempts": 1
}
],
"Next": "AfterBilling"
}States.TaskFailed は一般的な失敗、States.Timeout はタイムアウト、または Lambda が throw したユーザー定義のエラー名です。5秒 → 10秒 → 20秒 と backoff します。
Catch #
{
"Type": "Task",
"Resource": "...",
"Catch": [
{
"ErrorEquals": ["States.ALL"],
"ResultPath": "$.error",
"Next": "HandleFailure"
}
],
"Next": "AfterTask"
}retry まですべて失敗すると HandleFailure に行きます — 補償トランザクション (ロールバック) / 通知 / 人の介入キューに送ります。
よく使うパターン #
1) Saga — 補償トランザクション #
分散システムでトランザクションが通らないときに使います。各段階の順方向 + 失敗時の補償です。
注文作成 → 決済 → 在庫差引 → 配送予約
↓ ↓ ↓ ↓ (失敗!)
注文取消 決済返金 在庫復元 X各 Task に Catch を置き、失敗時に補償段階たちを逆順で実行します。
2) Human-in-the-loop #
待機していて人が承認 / 拒否したら進行します。
{
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish.waitForTaskToken",
"Parameters": {
"TopicArn": "...",
"Message": {
"TaskToken.$": "$$.Task.Token",
"OrderId.$": "$.orderId"
}
},
"Next": "AfterApproval"
}waitForTaskToken はトークンを外部 (メール、Slack ボットなど) に送り、誰かが SendTaskSuccess / SendTaskFailure API を呼び出すまで待機します。最大 1年です。
3) Polling パターン #
長い外部作業の完了を待機します。
StartJob → WaitState(30s) → CheckJob → Choice
↓
(継続) WaitState に回帰
(完了) Next4) Express ワークフロー — イベント処理 #
EventBridge / SQS がトリガーし → 短い処理 (Lambda 1 ~ 3 個) → 結果を DynamoDB / S3 に置きます。
Express の速い処理量と短い時間の上限が自然に合います (第19章 EventBridge / SQS / SNS と連携します)。
Lambda との比較 — いつ何を #
Lambda 1個で十分な場合 #
- 段階 1 ~ 2 個の短い処理
- 可視化 / 人の追跡が不要
- 非常に頻繁な呼び出し + 非常に短い (Step Functions の段階遷移コストが負担)
Step Functions が合う場合 #
- 段階 3 個以上 + 分岐 / 再試行 / 並列
- 失敗 / 進行状況を人が見るべき
- 外部システムとの長い相互作用 (人の承認、外部 API)
- ワークフロー自体がビジネス資産 (修正履歴の追跡)
一緒に使う方式 #
ほとんどは2つを一緒に使います。Step Functions が流れを統制し、各段階は Lambda / ECS / SDK 呼び出しを担います。
よく出会う落とし穴 #
1) JSONPath の打ち間違い #
"Variable": "$.user.plan" のドット / $ が一文字でも違うとマッチが 0 です。コンソールの input/output 検査器で1段階ずつ確認します。
2) Lambda の出力が大きすぎる #
Step Functions の1状態の入力 / 出力ペイロードの上限は 256 KB です。大きなデータは S3 に置いてキーだけ伝えます。
{
"s3Bucket": "myapp-pipeline",
"s3Key": "jobs/abc123/input.json"
}3) 段階遷移ごとにコスト #
Standard モードは段階遷移 1,000 回あたり $0.025 です。段階が多いワークフローの全体コストは段階数 × 呼び出し数です。短い段階 (Pass) を多く置きすぎると意外に大きいです。
4) Lambda の cold start が段階ごとに #
各 Task が別の Lambda だと Cold Start が段階ごとに発生します。Express + Provisioned Concurrency、または1つの Lambda に複数の段階を合わせます (第17章 Lambda 基礎 のコールドスタート参照)。
5) Retry の BackoffRate の暴走 #
MaxAttempts: 10、BackoffRate: 3.0 だと 1 → 3 → 9 → 27 → 81 秒… ユーザーが待てない時間です。合計が合理的か計算します。
6) Catch が一度だけ実行 #
retry がすべて終わったあと catch が一度実行されます。catch の中でまた失敗するとワークフローが失敗します。catch の中の task にも retry オプションを検討します。
7) 可視化に見えない外部呼び出し #
Lambda の中で boto3 で呼び出した部分は可視化 / 追跡に捉えられません。可能なら Service Integration で ASL の Task 状態に引き出します。
練習問題 #
- 自分が思い浮かべた多段階の処理(例: 注文 → 決済 → 配送)を §「Lambda との比較 — いつ何を」の基準に照らして、1つの Lambda で十分か Step Functions が必要かを判断し、理由を1行で書いてみましょう。
- Saga パターンを自分のワークフローに適用すると仮定し、各順方向の段階とそれに対応する補償段階を §「よく使うパターン」の絵のように表で描いてみましょう。どの段階に
Catchを付けるべきかも併せて表示します。 - §「よく出会う落とし穴」で段階遷移コストと段階ごとのコールドスタートがともに段階数に比例する理由を説明し、段階を減らすために 第17章 Lambda 基礎 のどの選択(1つの Lambda に合わせる / Service Integration)を使えるかを1段落で整理してみましょう。
一行まとめ: Step Functions は複数段階の関数 · SDK 呼び出しを JSON(ASL)でまとめるマネージドなワークフローエンジンで、可視化 · retry · 分岐 · 並列が宣言的。長くて追跡が必要なビジネスワークフローは Standard、短くて高処理量のイベントは Express を使います。核心の状態は Task · Choice · Parallel · Map で、Service Integration で Lambda なしに SDK を直接呼びます。段階 1 ~ 2 個は Lambda 1個で十分で、3個以上に分岐 · 再試行 · 可視化が必要なら Step Functions が合い、256KB payload · 段階遷移コスト · 多段階のコールドスタートがよくある落とし穴です。
次の章 #
理論はすべて出ました。3部でコンテナ(ECS / ECR)、サーバーレス(Lambda / API Gateway)、メッセージ(EventBridge / SQS / SNS)、シークレット(Secrets Manager / Parameter Store)、ワークフロー(Step Functions)まで AWS 運用の道具箱を集めました。次の4部 第22章 ECS Fargate デプロイの骨格 からは、コンソールで固めたメンタルモデルを Terraform コードに移し、本物のバックエンドを ECS Fargate 上に運用可能な形で載せ始めます。