← 返回学习台
D1 · 摄取与转换 34%

Ch01 · Kinesis Data Streams vs Firehose

实时流到底"实时"在哪?60 秒延迟还算实时吗?这章把"流"的两种 AWS 化身分清楚。

🎯 先试试

japanese-climb 想把用户每次产出的段落实时送到 AWS 做评估,要求评估结果 < 5 秒内返回给用户。同时要把所有原始产出归档到 S3 当 ML 训练料(不要求实时)。你选哪个服务来接收并分发产出?

💡 讲透

🎣 钩子:为什么需要"流"?我们已经有 SQS 队列、有 API 直接调用,为什么还要 Kinesis?

🏠 生活类比:想象一条 7-11 关东煮传送带。
SQS 队列像取号机:每个号只发一次,谁抢到归谁——拿完就消失了。
KDS像一条流动的关东煮传送带:东西一直在带上跑(保留 24 小时),多个员工(消费者)能同时看同一盘菜,回头查"五分钟前那盘谁拿走的"也行。
Firehose像传送带末端的打包机:东西攒满一盒(或定时)才送出去,专门往冰箱(S3)送,不让你中途取。
MSK是工厂级的重型传送线:能扛每秒几十万盘,但要专人调维护。

👶 深入浅出

  • KDS = "分片(shard) + 游标(cursor)"模型。1 个 shard = 1 条并行小水管,写入 1MB/s 或 1000 record/s,读出 2MB/s(共享)。同一 partitionKey 永远进同一 shard——这就是顺序保证的秘密。
  • Firehose = "你只管塞数据,我管送到目的地"。无 shard 概念、自动 scale、自动转 Parquet、自动按时间分区。代价:必须 buffer 60 秒 / 1MB(哪个先到)才下沉。
graph LR U[用户提交段落] -->|API GW| FE[Lambda: PutRecord] FE -->|partitionKey=userId| KDS[Kinesis Data Stream
4 shards] KDS -->|实时 <1s| L1[Lambda
core_evaluator.py] KDS -->|同一 stream
tee 副本| KDF[Firehose
buffer 60s] KDF -->|自动 Parquet| S3[S3 archive
训练料] L1 --> DDB[DynamoDB
user_cluster_state] style KDS fill:#fff4ef,stroke:#ff6f3c style KDF fill:#eff8ff,stroke:#3b82f6

🎯 每个选项为什么对/错

❌ A · SQS Standard
看着也能"接收+分发"——但 SQS 是队列不是:① 一个消息被一个消费者拿走就消失,第二个消费者(Firehose 归档)看不到;要双消费就得生产端发两次。② 标准队列不保序,同一用户的两次产出可能乱序。③ 没法重放历史。
真实后果:评估器能跑,但 S3 归档要么漏要么写两遍代码;如果评估器挂半小时,期间消息全没了。
❌ C · Firehose 直接接 Lambda
Firehose 听起来"实时",其实是个误称——它最小 buffer 60 秒,到达下游肯定 > 60s。
真实后果:用户提交后等一分多钟才看到评估反馈,体验崩。Firehose 是给"流→S3/Redshift/OpenSearch 归档"用的零代码工具,不是实时分发器。
❌ D · MSK
托管 Apache Kafka,吞吐 >> KDS,但运维复杂度 >> KDS。适合:你已有 Kafka 生态、每秒几十万消息、要 exactly-once。
真实后果:japanese-climb 一开始每秒几条产出,用 MSK 月账单多几百刀、配集群一周才跑通——高射炮打蚊子。
✅ B · KDS + Firehose 副本
KDS 是"一鱼多吃":一份数据写进 stream,多个独立消费者可同时读:
评估 Lambda(事件源映射)拿到记录立刻调 Bedrock,延迟 < 5s ✓
Firehose作为另一个消费者,攒 60s 落 S3,写 Parquet ✓
partitionKey 用 userId 保证同用户顺序。数据默认保留 24 小时(最长 7 天/365 天)→ 评估器挂了也能重放。

📌 考点速记

  • 题里出现"near real-time / < 1s 延迟" → KDS(Firehose 是 60s+)
  • "无代码把流落到 S3 / Redshift / OpenSearch" → Firehose
  • 需要 replay 历史数据 → KDS(Firehose 不存)
  • 需要 多个消费者同时读 → KDS(普通模式吞吐共享 2MB/s;Enhanced Fan-Out 每消费者独立 2MB/s push)
  • "高吞吐 + Kafka 生态" → MSK
  • 顺序保证按 userId → partitionKey = userId(同 key 进同 shard,shard 内有序)
  • Shard 上限:写 1 MB/s 或 1000 rec/s;读 2 MB/s(共享)/ 2 MB/s(EFO 每消费者)

🛠 回到 japanese-climb

对应代码:src/jclimb/core_evaluator.py + data/users/u1/paragraph_attempts/

当前实现:FastAPI 同步调用 evaluator——HTTP 请求阻塞在那里,evaluator 调完 OpenAI 才返回。本地单进程没问题,并发上来就崩。

搬 AWS 的样子:

  1. API Gateway 接收用户产出 → Lambda PutRecord 到 KDS(partitionKey=userId 保证同用户顺序)
  2. Lambda 事件源映射 → 触发 core_evaluator 改造的 Lambda,调 Bedrock 拿 verdict
  3. 同一个 stream 再加一个 Firehose 消费者,每 60s 把所有产出落 S3 Parquet 当训练料——你现在 data/users/u1/paragraph_attempts/ 那些 JSON 就是这样攒出来的
  4. Verdict 入 DynamoDB → signal_mapper.py 跑确定性映射 → 更新 user_cluster_state

关键:评估是异步流式的,HTTP 接口立刻返回"已收到";前端轮询或 WebSocket 拿结果。

📌 闪卡 · 进 Leitner 队列

KDS 默认保留多久?最长?
默认 24 小时;最长 7 天,进一步开 long-term retention 可到 365 天(按容量加钱)。Firehose 不存数据。
Firehose 的最小 buffer?
buffer hint:60 秒 或 1 MiB(哪个先到)。下游最少要等这么久。所以 Firehose 永远不是"实时"。
KDS shard 的写入上限?读取?
写:1 MB/s 或 1000 records/s(per shard,先到为准)。
读:默认共享 2 MB/s;启用 Enhanced Fan-Out(EFO)后每个消费者独占 2 MB/s 推送。
保证同一用户的产出顺序处理,partitionKey 用什么?
partitionKey = userId。KDS 按 partitionKey 的 hash 映射到 shard,同 key 进同 shard,shard 内严格有序。

🔁 变体题 · 当场巩固

变体 1:开发者要把 CloudWatch Logs 实时归档到 S3 做长期审计,不需要实时分析、不需要重放。最省事的选法?

✅ B:Firehose 是 zero-code 从流到 S3/Redshift/OpenSearch 的工具。CloudWatch Logs 可以 subscription filter 直接接 Firehose,全程不用写代码。不要写 Lambda 当胶水。

变体 2:你给 KDS 配了 4 个 shard,单个用户突发提交,所有记录都到了同一个 shard,写超 1 MB/s 报 ProvisionedThroughputExceededException。最直接的修法?

✅ B:经典陷阱。加 shard 只解决吞吐瓶颈,但同一个 partitionKey 永远只能进一个 shard(KDS 设计如此)。要分散同用户突发,必须改 key 策略(如 userId + 时间桶)。代价:失去单用户顺序保证——这就是题目想测的权衡。EFO 改善读取不影响写入瓶颈。