0
0

把事件总线交给 NATS JetStream 前,我会先分清 stream 和 consumer 边界

很多小团队最早的异步处理都长在业务代码旁边:接口里写一段后台线程,定时脚本顺手扫一张表,失败后靠日志和人工补跑。量不大时可以撑住,一旦涉及订单状态、通知投递、AI 任务回调或文件处理,问题就会集中爆发:消息到底进没进队列,谁消费过,失败能不能重放,消费者扩容会不会互相抢状态。NATS JetStream 适合放在这个阶段评估。官方文档把 JetStream 描述为 NATS 内置的持久化能力,用 stream 存消息,用 consumer 提供消费视图,同时保留 NATS 原本轻量发布订阅的体验。

NATS JetStream stream consumer ack replay 流程示意图

从 publisher 到 subject、stream、pull consumer、ack 和 replay 的边界示意。 来源:Codex image generation

先把 subject 当成入口契约

NATS 的主题模型很轻,服务只要向 subject 发布事件,订阅者就能收到。但引入 JetStream 时,我会先限制 subject 命名范围,避免一开始就把所有事件都塞进持久层。比如 orders.createdjobs.completedfiles.converted 这类事实型事件适合进入 stream,因为它们后续可能需要补投、回放或审计。像临时心跳、实时进度条和可丢弃通知,可以继续走普通 NATS pub/sub,减少存储压力。

这个边界一旦写清楚,团队就能在代码评审里讨论事件契约,避免只盯某个消费者实现。事件名、payload 版本、幂等键和保留时间都应该跟 subject 一起进入文档。

stream 负责保存事实

JetStream 的 stream 更像一段可配置的消息日志,职责比普通队列更明确。官方 stream 文档强调,stream 可以绑定一组 subject,并配置保留策略、存储方式、消息大小和时间限制。小团队落地时,我会从一条业务主线开始,例如文件转码任务或 AI 批处理任务,把输入事件写入一个 stream,再设定保留窗口。保留窗口不要凭感觉拉满,先按排障需要决定,比如保留 7 天足够覆盖多数工单回溯。

这里的关键是把 stream 当成事实存档。业务数据库记录当前状态,JetStream 记录事件发生和可重放线索。两者职责分开后,修复失败任务时就不用从数据库里猜历史输入。

consumer 负责不同处理视角

consumer 是 JetStream 里很重要的抽象。官方 consumer 文档把它描述为 stream 上的消费视图,可以是 durable 或 ephemeral,也可以用 push 或 pull 模式。我的默认选择通常是 pull consumer,因为 worker 主动拉取消息,更容易配合批量大小、并发数和本地限流。一个 stream 可以服务多个 consumer,例如一个负责实际处理,一个负责审计抽样,一个负责低优先级重算。

ack 也要提前定规矩。处理完成后再确认消息,失败时让消息进入重试或后续人工处理路径。消费者代码必须具备幂等能力,因为网络抖动、进程重启或超时都可能让同一条消息再次出现。这个要求应当写进消费者契约,而不能只靠代码注释提醒。

replay 先服务排障,再服务产品能力

JetStream 最值得小团队先用起来的能力,是把失败现场变成可复盘对象。一次任务失败后,可以从 stream 里定位原始事件,重新投给测试 consumer,观察处理链路和日志输出。等排障流程稳定后,再考虑把 replay 做成内部运营能力,例如批量补发通知、重跑某段时间的 AI 分析任务,或给新版本消费者做 shadow 验证。

我的落地顺序很克制:第一周只迁一条低风险异步链路,定义 subject 和 payload,创建一个 stream,接一个 pull consumer,补上 ack、重试和死信处理记录。第二周再加观测指标,包括积压数量、消费延迟、重投次数和失败原因。到这个程度,NATS JetStream 的价值就很清楚了。它把原来散落在脚本、日志和人工补跑里的消息生命周期,收拢成一条可以保存、消费、确认和回放的工程路径。

主要来源

NATS JetStream 概念文档: https://docs.nats.io/nats-concepts/jetstream

NATS Stream 文档: https://docs.nats.io/nats-concepts/jetstream/streams

NATS Consumer 文档: https://docs.nats.io/nats-concepts/jetstream/consumers

NATS JetStream model deep dive: https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive

NATS Server GitHub 仓库: https://github.com/nats-io/nats-server

NATS Server Releases: https://github.com/nats-io/nats-server/releases

评论