实时流到底"实时"在哪?60 秒延迟还算实时吗?这章把"流"的两种 AWS 化身分清楚。
japanese-climb 想把用户每次产出的段落实时送到 AWS 做评估,要求评估结果 < 5 秒内返回给用户。同时要把所有原始产出归档到 S3 当 ML 训练料(不要求实时)。你选哪个服务来接收并分发产出?
🎣 钩子:为什么需要"流"?我们已经有 SQS 队列、有 API 直接调用,为什么还要 Kinesis?
👶 深入浅出 :
userId 保证同用户顺序。数据默认保留 24 小时(最长 7 天/365 天)→ 评估器挂了也能重放。
对应代码:src/jclimb/core_evaluator.py + data/users/u1/paragraph_attempts/
当前实现:FastAPI 同步调用 evaluator——HTTP 请求阻塞在那里,evaluator 调完 OpenAI 才返回。本地单进程没问题,并发上来就崩。
搬 AWS 的样子:
PutRecord 到 KDS(partitionKey=userId 保证同用户顺序)core_evaluator 改造的 Lambda,调 Bedrock 拿 verdictdata/users/u1/paragraph_attempts/ 那些 JSON 就是这样攒出来的signal_mapper.py 跑确定性映射 → 更新 user_cluster_state关键:评估是异步流式的,HTTP 接口立刻返回"已收到";前端轮询或 WebSocket 拿结果。
变体 1:开发者要把 CloudWatch Logs 实时归档到 S3 做长期审计,不需要实时分析、不需要重放。最省事的选法?
✅ B:Firehose 是 zero-code 从流到 S3/Redshift/OpenSearch 的工具。CloudWatch Logs 可以 subscription filter 直接接 Firehose,全程不用写代码。不要写 Lambda 当胶水。
变体 2:你给 KDS 配了 4 个 shard,单个用户突发提交,所有记录都到了同一个 shard,写超 1 MB/s 报 ProvisionedThroughputExceededException。最直接的修法?
✅ B:经典陷阱。加 shard 只解决总吞吐瓶颈,但同一个 partitionKey 永远只能进一个 shard(KDS 设计如此)。要分散同用户突发,必须改 key 策略(如 userId + 时间桶)。代价:失去单用户顺序保证——这就是题目想测的权衡。EFO 改善读取不影响写入瓶颈。