← 返回学习台
D1 · 摄取与转换 34%

Ch04 · Step Functions 工作流编排

多步骤、有分支、要重试、要可视化——这就是 Step Functions 的甜区。也是 DEA 必考。

🎯 先试试

japanese-climb 的转写管道:(1) 下载 YouTube 音频 → (2) 调 Whisper 转写 → (3) 清洗字幕(去广告/语气词)→ (4) 写 S3 + 更新 DynamoDB。每步都可能失败需要重试;第 2 步偶尔卡 5 分钟;偶尔 Whisper 报"not Japanese"需要走人工审核分支。最合适的编排服务?

💡 讲透

🎣 钩子:明明 4 个 Lambda 一个调一个就完了,为啥要引入 Step Functions?

🏠 生活类比:拍一个 30 秒短视频。
A 链调:你叫摄影 → 摄影叫剪辑 → 剪辑叫调色——出问题导演(你)只看到"最后没产出",不知道卡在哪。
B EventBridge:每个工序完了发"喊话"通知下一个——松耦合,但没人手里有完整分镜表,"这条片子整体走到哪了"看不到。
C Step Functions Standard:你画了张分镜表(状态机),每步谁干啥、卡住怎么重拍、特殊情况走哪条分支——拍摄过程导演的监视器上能看到现在卡在哪。
D Express:同样的分镜表,但适合 5 分钟以内的小广告,不适合电影。

👶 深入浅出

  • Step Functions = 状态机。用 JSON(ASL, Amazon States Language)声明每个 state 干什么、出错怎么 Retry/Catch、要不要分支(Choice)、要不要并行(Parallel/Map)。
  • 可视化:控制台直接看到每次执行卡在哪 state、绿色 = 通过、红色 = 失败、详情点开看输入输出。
  • 声明式 RetryRetry: [{ErrorEquals:["States.TaskFailed"], MaxAttempts:3, BackoffRate:2}]——不用写代码。
  • 200+ AWS 直接集成:state 可直接调 DDB PutItem / S3 PutObject / SQS SendMessage,不用 Lambda 当胶水。
graph LR S([Start]) --> DL[Download
Lambda] DL --> W[Whisper
Lambda] W --> C{Choice
is Japanese?} C -->|no| HR[人工审核
SNS 邮件
waitForTaskToken] C -->|yes| CL[Cleanup
Lambda] CL --> ST[Store
S3 + DDB 直接集成] ST --> E([End]) DL -.Retry x3.-> DL W -.Retry x3.-> W W -.Catch all.-> HR style C fill:#fff4ef,stroke:#ff6f3c style HR fill:#fde0e0,stroke:#e03131

🎯 每个选项为什么对/错

❌ A · Lambda 链调
看着最简单——但没有可见性:哪步失败、走到哪、卡多久——只能从一堆 CloudWatch Log 里挖。没有内置重试每步要 try/except。第 2 步卡 5 分钟,第 1 个 Lambda 在 await 浪费计费,可能撞 15min 上限。
真实后果:能跑但调试地狱;要加"not Japanese 走人工审核"分支你要在代码里写一堆 if-else,最后变成手工状态机。
❌ B · EventBridge 串
每步发 event → 下一步订阅触发:松耦合很好。但它是事件驱动不是工作流:你看不到"端到端这次跑到哪"的视图。失败处理每步独立重试+DLQ;"整条管道失败回滚"做不到。
真实后果:高扇出+异构生产者场景超好用;你这种"明确顺序+分支+回滚"的场景,会自己慢慢造出一个 Step Functions。
❌ D · Express 状态机
Express 是为高吞吐+短时长 (< 5 分钟)+不要历史设计的(按时长计费而不是 transition)。你第 2 步可能卡 5 分钟、整条加起来可能 10 分钟——顶到 Express 上限。Express 不保留执行历史(只能 CloudWatch Logs 看)→ 调试难。
真实后果:能跑但碰到长视频就 timeout,丢失中间状态。Express 适合 IoT 数据处理、流式 ETL,不适合编排长任务。
✅ C · Standard 状态机
状态机模型:声明式画流程,可视化看执行,内置 Retry/Catch,Choice 实现分支,.waitForTaskToken 模式挂起等人工审核回调。Standard 最长 1 年,按 state transition 计费($0.025/1000)——你这场景一次执行 4-5 个 transition,极便宜

📌 考点速记

  • Standard vs Express 必考:
    · Standard:1 年,exactly-once,$0.025/1000 transitions,有可视化历史,关键业务流
    · Express:5 分钟,at-least-once (async) / at-most-once (sync),按时长+内存计费,高吞吐数据处理
  • Map state:数组 fan-out(Inline 模式 40 并发;Distributed 模式 10000 并发,适合 S3 manifest 大批量)
  • Parallel state:多分支固定并行(不是数组)
  • Choice state:基于 JSON Path 的条件分支
  • Wait state:等几秒/到某时间点
  • 直接服务集成:不用 Lambda 也能调 DDB/S3/SQS(省 Lambda 钱+延迟)
  • .waitForTaskToken:挂起等外部回调(人工审核必备);外部用 SendTaskSuccess/SendTaskFailure 唤醒
  • Standard 启动 ~50ms,Express ~30ms

🛠 回到 japanese-climb

对应代码:src/jclimb/transcript.py + src/jclimb/youtube_collector.py

当前实现:Python 函数链式调,失败 raise 中断。重跑要手动看日志、手动跳过已完成步骤。

搬 AWS:Step Functions Standard 状态机:

{
  "Comment": "japanese-climb transcript pipeline",
  "StartAt": "DownloadAudio",
  "States": {
    "DownloadAudio": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:youtube-download",
      "Retry": [{"ErrorEquals":["States.TaskFailed"], "MaxAttempts":3, "BackoffRate":2}],
      "Next": "CallWhisper"
    },
    "CallWhisper": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:whisper-transcribe",
      "Retry": [...],
      "Catch": [{"ErrorEquals":["States.ALL"], "Next":"HumanReview"}],
      "Next": "DetectLanguage"
    },
    "DetectLanguage": {
      "Type": "Choice",
      "Choices":[{"Variable":"$.lang", "StringEquals":"ja", "Next":"Cleanup"}],
      "Default": "HumanReview"
    },
    "Cleanup": { "Type":"Task", ..., "Next":"StoreFinal" },
    "HumanReview": { "Type":"Task", "Resource":"arn:aws:states:::sns:publish.waitForTaskToken", ... },
    "StoreFinal": { "Type":"Task", "Resource":"arn:aws:states:::aws-sdk:s3:putObject", "End":true }
  }
}
    

整条管道在控制台可视化:红线 = 失败点一眼可见。HumanReview 用 .waitForTaskToken — Lambda 发邮件附带 task token,审核者点链接回调 SendTaskSuccess 才继续。

📌 闪卡

Standard vs Express 最长执行时间?
Standard 1 年;Express 5 分钟。差异源于计费模型:Standard 按 state transition 数 ($0.025/1000);Express 按时长+内存 — 长跑 Express 会非常贵。
怎么让 Step Functions 等待外部人工操作?
.waitForTaskToken 集成模式:state 发任务后挂起,外部拿 task token 调 SendTaskSuccessSendTaskFailure 唤醒。常配 SNS(人工审核)/SQS(外部系统)。
Map state 默认最大并发?怎么扩大?
Inline mode 默认 40。要处理几万项时改 Distributed Map mode,并发 10000+,自动分批,适合 S3 manifest / 大数组场景。
不写 Lambda 直接从 state 读写 DynamoDB 怎么实现?
AWS SDK 直接集成arn:aws:states:::aws-sdk:dynamodb:putItem 这类 ARN 直接调用。免 Lambda 调用费 + 免冷启动 + 更快。Step Functions 集成模式总共三种:Request Response、Run a Job (.sync)、Wait for Callback (.waitForTaskToken)。

🔁 变体题

变体 1:你想把 100 万条 S3 文件并行用 Lambda 处理(每个 Lambda < 1 分钟),用 Step Functions。最合适?

✅ CDistributed Map 专为大批量并行设计,并发上万,自动分批(child workflows),适合 S3 manifest / list 大文件场景。Inline Map 顶多 40 并发。Parallel 不是数组迭代器(A 也不可能手列百万分支)。EventBridge fan-out 不能聚合 100 万个结果回主流程。

变体 2:Step Functions Standard 工作流跑到一半发现某 Lambda 间歇性 502。想 30 秒后重试,重试 3 次,每次间隔翻倍。怎么配?

✅ B声明式 Retry 是 Step Functions 的核心卖点之一。BackoffRate=2 表示间隔指数翻倍(30→60→120 秒)。这正是为什么选 Step Functions 而不是自己写——状态机帮你管重试逻辑。A 是反模式(Lambda 内重试占用 Lambda 计费时间)。