跳转到内容

Module 9: 批处理与流处理

📖 深度参考手册 — 本模块属于理论参考,非主线必读。 主线学习路径见 README.md。 当你在项目实战中遇到相关问题时,回来查阅。

来源:DDIA Ch10 (Batch Processing), Ch11 (Stream Processing)

数据处理有两种基本范式:批处理和流处理。批处理像”考试后统一批改试卷”,流处理像”边考边改”。这个模块帮你理解两种范式的原理、适用场景,以及它们如何在真实系统中协作。


定义:批处理(Batch Processing)是对一组有界(bounded)数据集进行一次性处理——数据有明确的开始和结束,处理完就结束。流处理(Stream Processing)是对持续不断的无界(unbounded)数据流进行实时处理——数据源源不断地到来,处理永远不会”完成”。两者是数据处理的两种基本范式,各有优势和适用场景。

为什么重要:选择批处理还是流处理,直接决定了系统架构、数据延迟、资源使用模式和复杂度。批处理的优势是简单、高吞吐、容错容易(重跑一次就行);流处理的优势是低延迟、实时反馈。很多系统同时需要两者——历史数据用批处理分析,实时数据用流处理响应。

案例

Web Crawler — 批处理典型

每天凌晨2点启动批处理任务:
1. 从URL种子库中读取所有待爬URL列表(有界数据集)
2. 并行爬取所有URL,下载页面内容
3. 解析页面,提取新URL加入种子库
4. 更新搜索索引
5. 任务完成,等待下一次调度
特征: 有明确的开始和结束、高吞吐、延迟不敏感(隔天更新可以接受)

Gaming Leaderboard — 流处理典型

玩家不断产生游戏事件(无界数据流):
→ 玩家A得了50分
→ 玩家B得了30分
→ 玩家A又得了20分
→ ...(永不停止)
流处理系统实时消费这些事件:
→ 收到"玩家A得50分" → 排行榜实时更新 → 玩家看到自己排名变化
→ 收到"玩家B得30分" → 排行榜再次更新
特征: 无明确结束、低延迟(秒级甚至毫秒级)、持续运行
维度批处理流处理
数据边界有界(处理固定数据集)无界(持续数据流)
延迟分钟~小时毫秒~秒
吞吐量极高(优化批量操作)中等(逐条或小批量)
容错简单(重跑整个批次)复杂(需要检查点)
代表技术MapReduce, Spark BatchFlink, Kafka Streams

先想一想 🤔 Web Crawler 一定只能用批处理吗?如果需要”发现新闻后5分钟内就收录”,应该怎么改?

点击查看解析

可以用批处理+流处理混合的方式:

  • 批处理(基线):每天全量爬取所有已知URL,确保索引的完整性和一致性。
  • 流处理(增量):订阅RSS/Atom feed、Twitter firehose、Webhook等实时信号源。一旦发现新内容的信号,立即触发爬取和索引更新。
信号源 → Kafka → 流处理(即时爬取+索引) → 搜索可用
定时任务 → 批处理(全量爬取+重建索引) ────────┘

Google实际上就是这样做的——对于新闻类页面,发现到收录的延迟可以低至几分钟;对于普通页面,可能几天才重新爬取一次。批处理保证完整性,流处理保证时效性。


定义:MapReduce是一种将大规模数据处理分解为两个阶段的编程模型:Map阶段将输入数据转换为键值对(key-value pairs),Shuffle阶段按key将数据分组并传输到对应的处理节点,Reduce阶段对同一key的所有值进行聚合计算。这个模型的核心优势是天然可并行——Map和Reduce阶段的每个任务都可以独立运行在不同的机器上。

为什么重要:MapReduce是大数据处理的基石思想。虽然现在很多系统(如Spark)已经超越了MapReduce,但MapReduce的思维方式——“先分散处理,再汇总聚合”——是理解所有分布式数据处理框架的基础。

案例Search Engine 构建倒排索引是最经典的MapReduce应用

假设我们有3个网页需要建立倒排索引:

文档1: "cat sat on mat"
文档2: "cat ate cheese"
文档3: "dog sat on log"

Map阶段(每个文档独立处理,可并行):

Map(文档1) → [(cat, doc1), (sat, doc1), (on, doc1), (mat, doc1)]
Map(文档2) → [(cat, doc2), (ate, doc2), (cheese, doc2)]
Map(文档3) → [(dog, doc3), (sat, doc3), (on, doc3), (log, doc3)]

Shuffle阶段(按key分组,自动完成):

cat → [doc1, doc2]
sat → [doc1, doc3]
on → [doc1, doc3]
mat → [doc1]
ate → [doc2]
cheese → [doc2]
dog → [doc3]
log → [doc3]

Reduce阶段(每个key独立聚合,可并行):

Reduce(cat, [doc1, doc2]) → cat: {doc1, doc2} // "cat"出现在文档1和2
Reduce(sat, [doc1, doc3]) → sat: {doc1, doc3}
Reduce(on, [doc1, doc3]) → on: {doc1, doc3}
Reduce(mat, [doc1]) → mat: {doc1}
...

最终得到倒排索引:

cat → {doc1, doc2}
sat → {doc1, doc3}
on → {doc1, doc3}
mat → {doc1}
ate → {doc2}
cheese → {doc2}
dog → {doc3}
log → {doc3}

用户搜索”cat sat”时,查询 倒排索引[cat] ∩ 倒排索引[sat] = {doc1, doc2} ∩ {doc1, doc3} = {doc1}

在真实的搜索引擎中,这个MapReduce要复杂得多:

  • Map阶段还要做分词、去停用词、词干提取(“running”→“run”)
  • 倒排索引不仅记录文档ID,还记录词频、位置信息(用于短语搜索和相关性排序)
  • 多轮MapReduce:先建索引,再计算PageRank,再合并

先想一想 🤔 如果有100亿个网页,每个网页平均100个词,那么Map阶段会产生约1万亿个键值对。Shuffle阶段如何高效地按key分组?

点击查看解析

Shuffle是MapReduce中最耗资源的阶段,核心思路是哈希分区+排序

  1. 哈希分区:Map输出的每个键值对,根据 hash(key) % R(R是Reduce任务数)决定发送到哪个Reduce节点。同一个key一定去同一个Reduce节点。

  2. 本地排序:每个Map任务在发送前先按key排序,这样Reduce端可以用归并排序高效合并来自多个Map的数据。

  3. Combiner优化:如果Reduce操作是可结合的(如计数、求和),可以在Map端先做一次局部聚合(Combiner),减少网络传输量。比如一个文档中”the”出现了50次,Map端直接输出(the, 50)而不是50个(the, 1)

  4. 磁盘溢写:1万亿个键值对无法全放内存,Map的中间结果写到本地磁盘,Reduce通过HTTP拉取。这也是MapReduce性能瓶颈所在——大量磁盘IO和网络IO。

Spark之所以比MapReduce快很多,主要就是因为Spark尽可能把中间结果保持在内存中,避免了磁盘IO。


定义:DAG数据流是比MapReduce更灵活的数据处理模型。在MapReduce中,数据只能经过”Map→Shuffle→Reduce”这一固定模式,如果需要多步处理就要串联多个MapReduce作业(一个的输出是另一个的输入),中间结果必须写入磁盘。DAG模型允许将数据处理表达为一个有向无环图——节点是计算步骤,边是数据流——任意步骤可以有多个输入和多个输出,中间结果可以驻留内存,大幅提升效率。

为什么重要:现实中的数据处理往往不是简单的”Map→Reduce”,而是多步骤、多分支、有依赖关系的复杂管道。例如,先过滤、再关联两个数据集、然后聚合、最后排序。用MapReduce需要拆分成多个独立作业并手动管理中间数据;用DAG框架(如Spark、Flink、Tez)可以把整个管道表达为一个DAG,框架自动优化执行计划。

案例YouTube 推荐特征计算管道 是一个典型的多步骤DAG:

┌──────────────┐
│ 用户行为日志 │
└──────┬───────┘
┌──────▼───────┐
│ 数据清洗 │ (去重、过滤bot流量)
└──────┬───────┘
┌────────────┼────────────┐
│ │ │
┌──────▼───┐ ┌────▼─────┐ ┌───▼──────┐
│ 用户画像 │ │ 视频特征 │ │ 交互特征 │
│ 计算 │ │ 计算 │ │ 计算 │
│ (年龄、 │ │ (类别、 │ │ (观看完成 │
│ 兴趣标签)│ │ 时长、 │ │ 率、点赞 │
└──────┬───┘ │ 热度) │ │ 率) │
│ └────┬─────┘ └───┬──────┘
│ │ │
└────────────┼────────────┘
┌──────▼───────┐
│ 特征Join │ (用户画像 × 视频特征 × 交互特征)
└──────┬───────┘
┌────────────┼────────────┐
│ │
┌──────▼───────┐ ┌──────▼───────┐
│ 训练数据集 │ │ 实时特征存储 │
│ (写入HDFS) │ │ (写入Redis) │
└──────────────┘ └──────────────┘

这个管道的特点:

  • 多输入:特征Join步骤需要三个上游的输出
  • 多输出:最终结果既要存HDFS用于模型训练,又要存Redis用于实时推荐
  • 可并行:用户画像、视频特征、交互特征三个计算互相独立,可以并行执行
  • 无环:数据只向下流动,没有循环依赖

如果用MapReduce实现,需要拆成至少5个独立的MapReduce作业,手动管理依赖和中间数据。用Spark可以表达为一个统一的DAG,Spark引擎自动调度、优化、并行执行。

先想一想 🤔 在上面的YouTube特征计算DAG中,“用户画像计算”步骤需要3分钟,“视频特征计算”需要10分钟,“交互特征计算”需要5分钟。整个管道的总执行时间是多少?(假设特征Join需要2分钟,最终写入需要1分钟)

点击查看解析

DAG调度器会自动识别并行机会。三个特征计算步骤互相独立,可以并行执行:

数据清洗: [===] 假设1分钟
用户画像(并行): [===] 3分钟
视频特征(并行): [==========] 10分钟 ← 瓶颈
交互特征(并行): [=====] 5分钟
特征Join(等最慢的): [==] 2分钟
写入: [=] 1分钟

总时间 = 1 + max(3, 10, 5) + 2 + 1 = 14分钟

如果用串行的MapReduce:1 + 3 + 10 + 5 + 2 + 1 = 22分钟

DAG并行执行节省了 3+5-max(3,5) = 8分钟,相当于原来的36%。在更复杂的管道中,并行收益更大。

进一步优化:视频特征计算是瓶颈(10分钟)。可以对其内部进一步拆分(如按视频类别分片并行),或者增加计算资源(更多executor),降低瓶颈步骤的耗时。这就是关键路径优化


定义:在流处理中,每个事件有两种时间——事件时间(Event Time) 是事件实际发生的时间(由产生事件的设备/服务记录),处理时间(Processing Time) 是流处理引擎收到并处理该事件的时间。两者往往不同,因为事件从产生到被处理之间有传输延迟、缓冲、网络抖动等。水位线(Watermark) 是流处理引擎对”所有事件时间小于T的事件都已经到达”的一种估计,用于处理乱序到达的事件。

为什么重要:如果用处理时间来分析数据,结果会受到系统负载、网络延迟等因素的影响而失真。比如在系统过载期间,大量事件被积压,当积压消化后这些事件会在短时间内被处理——如果用处理时间统计”每分钟事件数”,会看到一个虚假的”尖峰”,而实际上这些事件是均匀发生的。使用事件时间才能反映真实世界的情况。

案例Gaming Leaderboard 中分数可能延迟到达

真实事件顺序(事件时间):
10:00:01 玩家A得50分
10:00:02 玩家B得30分
10:00:03 玩家A得20分
实际到达流处理引擎的顺序(处理时间):
10:00:02 收到"玩家B得30分" ← B的事件先到
10:00:05 收到"玩家A得50分" ← A的第一个事件延迟了4秒
10:00:06 收到"玩家A得20分" ← A的第二个事件延迟了3秒

为什么会乱序?可能因为:

  • 玩家A的手机网络不好,事件发送延迟
  • 玩家A的游戏事件经过了不同的中间节点(Kafka分区),路径更长
  • 服务端在不同时区的数据中心处理

如果用处理时间统计”10:00:00~10:00:03这3秒内的得分”

  • 只统计到玩家B的30分(A的事件在这个时间窗口还没到达)
  • 结果不准确

如果用事件时间 + 水位线

水位线逻辑:
- 设置允许的最大延迟 = 5秒
- 当水位线推进到 10:00:03 + 5秒 = 10:00:08 时
- 认为所有事件时间 ≤ 10:00:03 的事件都已到达
- 此时关闭 10:00:00~10:00:03 的窗口并输出结果
- 结果: A=70分, B=30分 ✓ 正确!

先想一想 🤔 水位线设置为”允许最大延迟5秒”。如果某个事件延迟了10秒才到达(超过了水位线),怎么办?

点击查看解析

这个事件到达时,它所属的窗口已经关闭并输出了结果——这就是迟到事件(Late Event)

常见的处理策略:

  1. 丢弃:忽略迟到事件。适用于精度要求不高的场景(如实时仪表盘的近似统计)。

  2. 更新已输出的结果:重新计算并发出更新。Flink支持”Allowed Lateness”机制,在窗口关闭后仍保留一段时间的状态,迟到事件可以触发结果更新。

    第一次输出 (水位线触发): A=70分, B=30分
    迟到事件到达: A又得了10分(事件时间10:00:02)
    第二次输出 (更新): A=80分, B=30分 (标记为UPDATE)
  3. 旁路输出:把迟到事件发送到一个单独的”迟到数据”流,由下游系统(如批处理)定期处理。

  4. 水位线自适应调整:根据实际观察到的延迟动态调整水位线。如果发现越来越多的事件延迟超过5秒,自动增大水位线到10秒。代价是结果输出延迟增加。

在Gaming Leaderboard场景中,方案2最合适——玩家能接受排行榜先显示一个值、然后稍后更新为更准确的值。


定义:窗口(Window)是将无限的数据流切分为有限块的机制,以便对每一块进行聚合计算(如计数、求和、取最大值)。三种主要窗口类型:滚动窗口(Tumbling Window) 是固定大小、互不重叠的窗口;滑动窗口(Sliding Window) 是固定大小、可重叠的窗口,每隔一个滑动步长产生一个新窗口;会话窗口(Session Window) 是由活动间隔(gap)定义的不固定大小窗口,一段时间没有新事件则窗口关闭。

为什么重要:无限的数据流无法直接做聚合计算(比如”总共有多少次点击”这个问题永远没有终值),必须通过窗口来界定”在什么范围内”做计算。不同的窗口类型适用于不同的业务需求,选对窗口类型直接影响结果的正确性和可用性。

案例URL Shortener “最近1小时点击量” 用滑动窗口。

假设当前时间是 14:30,我们要计算 short.url/abc 的”最近1小时点击量”。

如果用滚动窗口(1小时)

窗口1: [13:00, 14:00) → 统计这1小时的点击量
窗口2: [14:00, 15:00) → 统计这1小时的点击量
问题: 在14:30查询"最近1小时",得到的是 [14:00, 14:30) 的结果
(只有30分钟的数据),或者 [13:00, 14:00) 的结果(已过时的数据)

如果用滑动窗口(窗口大小1小时,滑动步长1分钟)

窗口1: [13:29, 14:29)
窗口2: [13:30, 14:30) ← 在14:30查询,正好是这个窗口
窗口3: [13:31, 14:31)
每分钟产生一个新窗口,每个窗口覆盖完整的1小时
在任何时刻查询,都能得到"最近1小时"的准确结果
时间线: 13:00 13:30 14:00 14:30 15:00
| | | | |
滚动: [──────窗口1──────][──────窗口2──────]
无重叠,边界固定
滑动: [──────窗口A──────]
[──────窗口B──────]
[──────窗口C──────]
... 每分钟一个新窗口,互相重叠

会话窗口的应用场景:Web Crawler中分析爬虫的”爬取会话”——

爬虫对 example.com 的请求:
09:00:01 请求 /page1
09:00:03 请求 /page2
09:00:05 请求 /page3
09:00:08 请求 /page4
(间隔30秒无请求)
09:00:40 请求 /page5
09:00:42 请求 /page6
如果 session gap = 15秒:
会话1: [09:00:01, 09:00:08] → 4个请求
会话2: [09:00:40, 09:00:42] → 2个请求
用于分析:每次爬取会话爬了多少页面、持续多长时间

先想一想 🤔 URL Shortener要实时显示”过去24小时每小时的点击量柱状图”(共24根柱子),应该用什么类型的窗口?如果还要显示”过去24小时总点击量”呢?

点击查看解析

“每小时点击量柱状图”用滚动窗口

滚动窗口,大小=1小时:
[00:00, 01:00) → 柱子1: 1200次
[01:00, 02:00) → 柱子2: 800次
[02:00, 03:00) → 柱子3: 350次
...
[23:00, 24:00) → 柱子24: 1500次

每根柱子代表一个完整的1小时,互不重叠——滚动窗口天然适合。

“过去24小时总点击量”用滑动窗口

滑动窗口,大小=24小时,滑动步长=1分钟(或实时更新):
随着时间推移,窗口不断滑动
14:30 查询 → [前一天14:30, 今天14:30) 的总和
14:31 查询 → [前一天14:31, 今天14:31) 的总和

或者更高效的做法:不直接用流处理的滑动窗口(24小时窗口状态太大),而是把滚动窗口的结果(每小时计数)存下来,查询时SUM最近24个小时的计数。这就是”预聚合+查询时汇总”的思路,比维护一个巨大的滑动窗口高效得多。


定义:流和表看似不同的概念,实际上可以互相转换——流是表的变更日志:对表的每次INSERT、UPDATE、DELETE操作都是流中的一个事件;表是流的累积结果:从头到尾重放流中的所有事件,就能重建出表的当前状态。这就是流表对偶性(Stream-Table Duality),也叫”表是流的物化视图”。

为什么重要:理解流表对偶性能帮你设计更灵活的数据架构。你可以选择只存储流(变更事件),在需要时实时计算出表;也可以存储表(当前状态),在需要时从表的变更日志生成流。很多现代数据架构(如Event Sourcing、CDC)都建立在这个概念之上。

案例News Feed = 关注关系表 + 帖子变更流的 Join

"关注关系"是一张表:
┌─────────┬───────────┐
│ user_id │ follows │
├─────────┼───────────┤
│ Alice │ Bob │
│ Alice │ Carol │
│ Dave │ Bob │
└─────────┴───────────┘
"发帖"是一条流:
事件1: {user: Bob, action: post, content: "Hello!", time: 10:00}
事件2: {user: Carol, action: post, content: "Hi!", time: 10:01}
事件3: {user: Bob, action: post, content: "Bye!", time: 10:02}

News Feed的生成 = 流(帖子) JOIN 表(关注关系):

收到 Bob 发帖 "Hello!"
→ 查询关注关系表:谁关注了Bob?→ Alice, Dave
→ 把帖子推送到 Alice 和 Dave 的信息流
收到 Carol 发帖 "Hi!"
→ 查询关注关系表:谁关注了Carol?→ Alice
→ 把帖子推送到 Alice 的信息流

但关注关系本身也会变化(也是一条流!)

事件4: {user: Dave, action: follow, target: Carol, time: 10:03}
→ 更新关注关系表:Dave现在也关注了Carol
→ 之后Carol的新帖子也会推送给Dave

所以完整的模型是:两条流(帖子流 + 关注变更流) JOIN,其中关注变更流被物化为关注关系表以便高效查询。这就是流表对偶性的实际应用。

先想一想 🤔 如果Alice取消关注了Bob(关注关系表发生变化),Alice的News Feed中已经存在的Bob的旧帖子应该怎么处理?

点击查看解析

这取决于产品设计和技术权衡:

方案1:不删除旧帖子(Twitter/X的做法)

  • 取消关注后,Alice的Feed中不再出现Bob的帖子
  • 但已经推送到Feed中的旧帖子保留不动
  • 原因:删除旧帖子需要遍历Alice的Feed找到所有Bob的帖子并逐一删除,代价很高
  • 如果Alice的Feed是一个”帖子ID列表”(fan-out-on-write),可能有几千条Bob的帖子需要清理

方案2:读取时过滤(更常见)

  • Feed中保留Bob的旧帖子ID,但在Alice读取Feed时,实时检查关注关系
  • 如果Alice已经取消关注Bob,则过滤掉Bob的帖子不显示
  • 优点:取消关注操作是O(1)的
  • 缺点:每次读取Feed都要做过滤检查,增加读取延迟

方案3:异步清理

  • 取消关注后,立即停止推送新帖子
  • 后台异步任务慢慢清理Feed中的旧帖子
  • 在清理完成前,用方案2的读取时过滤来保证正确性

这里体现了流表对偶性的一个挑战:当”表”发生变化时,之前”流”产生的物化结果可能需要回溯更新


定义:Event Sourcing(事件溯源)是一种不存储实体的当前状态,而是存储所有导致状态变化的事件的数据存储模式。实体的当前状态通过按顺序重放所有事件来计算得出。例如,银行账户不存储”余额=1000”,而是存储所有交易记录:“开户+0”、“存入+500”、“存入+800”、“取出-300”——余额可以通过重放这些事件计算出来:0+500+800-300=1000。

为什么重要:Event Sourcing提供了几个独特优势:

  1. 完整审计追踪:所有变化都有记录,可追溯任何时间点的状态
  2. 时间旅行:可以重建任意历史时刻的状态(“3月15日14:00时库存是多少?”)
  3. Bug修复后重算:如果发现某个计算逻辑有bug,修复后重放所有事件即可得到正确状态
  4. 解耦读写:写入只是追加事件(append-only),极其高效

案例Hotel Reservation 记录所有预订/取消事件

传统做法(只存当前状态):

rooms 表:
┌──────────┬─────────┬──────────┐
│ room_id │ status │ guest │
├──────────┼─────────┼──────────┤
│ 101 │ booked │ Alice │
└──────────┴─────────┴──────────┘

Event Sourcing做法(存储所有事件):

events 表:
┌──────┬──────────────────┬────────────────────────────────────┐
│ seq │ timestamp │ event │
├──────┼──────────────────┼────────────────────────────────────┤
│ 1 │ 2026-03-01 09:00 │ RoomListed{room:101, price:500} │
│ 2 │ 2026-03-10 14:00 │ RoomBooked{room:101, guest:Bob, │
│ │ │ checkin:3/20, checkout:3/22} │
│ 3 │ 2026-03-12 10:00 │ BookingCancelled{room:101, │
│ │ │ guest:Bob, reason:"plans changed"}│
│ 4 │ 2026-03-15 16:00 │ RoomBooked{room:101, guest:Alice, │
│ │ │ checkin:3/20, checkout:3/25} │
│ 5 │ 2026-03-18 09:00 │ PriceChanged{room:101, price:600} │
└──────┴──────────────────┴────────────────────────────────────┘

当前状态通过重放事件得出:

  • 事件1:101号房上架,500元/晚
  • 事件2:Bob预订了3/20-3/22
  • 事件3:Bob取消了 → 101号房恢复可用
  • 事件4:Alice预订了3/20-3/25
  • 事件5:价格改为600元/晚(不影响Alice已确认的预订价格)

→ 当前状态:101号房已被Alice预订,3/20-3/25,价格500元/晚

Event Sourcing的实际价值场景

  • 客户投诉”我明明没取消预订”→ 直接查事件日志,有完整记录
  • 发现定价逻辑有bug,多收了客户钱 → 修复逻辑后,重放事件,找出所有受影响的订单并退款
  • 分析”平均预订到取消的时间间隔”→ 直接从事件流统计

先想一想 🤔 Event Sourcing的事件日志会越来越大(永远追加不删除)。如果酒店运营了10年,每次查询房间状态都要重放10年的事件,性能怎么办?

点击查看解析

核心解决方案:快照(Snapshot)

不需要每次都从第1条事件重放。定期生成快照:
快照(2026-01-01): room 101 = {status: available, price: 450}
之后的事件:
事件10001: RoomBooked{...}
事件10002: PriceChanged{...}
...
查询当前状态 = 加载最新快照 + 重放快照之后的事件
如果快照每天生成一次,最多只需重放1天的事件(而不是10年)

其他优化:

  1. 物化视图:为高频查询维护一个实时更新的”当前状态表”(就是CQRS中的读模型,见8.8)。查询直接读物化视图,O(1)。

  2. 事件归档:很老的事件(如5年前)归档到冷存储(S3),只在需要审计或重算时才加载。

  3. 压缩(Compaction):对于同一实体的多次更新,只保留最新的快照。但这会丢失历史——需要权衡审计需求。

实际上,Event Sourcing系统通常同时维护事件日志(写入用)和物化视图(查询用),两者通过异步消费事件流保持同步。


定义:CQRS(Command Query Responsibility Segregation,命令查询职责分离)是将系统的写入操作(Command)和读取操作(Query)分离到不同的模型、甚至不同的数据存储中的架构模式。写模型针对写入优化(如追加日志、事件存储),读模型针对查询优化(如物化视图、搜索索引、缓存)。两个模型通过异步的事件/消息同步保持最终一致。

为什么重要:在很多系统中,读和写的模式差异巨大——写入可能是简单的”追加一条记录”,但读取可能需要复杂的聚合、关联、排序。如果用同一个数据模型同时服务读写,往往顾此失彼。CQRS允许读写各自用最优的数据结构和存储引擎,代价是引入了最终一致性(读模型可能短暂落后于写模型)。

案例News Feed 的写入(发帖)和读取(刷信息流)用不同数据结构

写模型(针对发帖优化):
┌──────────────────────────────────────┐
│ posts 表(追加写入,极快) │
│ ┌──────┬────────┬──────────┬───────┐│
│ │ id │ author │ content │ time ││
│ ├──────┼────────┼──────────┼───────┤│
│ │ 1001 │ Bob │ "Hello!" │ 10:00 ││
│ │ 1002 │ Carol │ "Hi!" │ 10:01 ││
│ └──────┴────────┴──────────┴───────┘│
│ 写入操作: INSERT INTO posts (...) │
│ → 简单追加,O(1) │
└──────────────────────────────────────┘
↓ 异步事件同步
↓ (Kafka/消息队列)
读模型(针对Feed查询优化):
┌──────────────────────────────────────┐
│ feed 表(每个用户一个预计算的帖子列表)│
│ ┌─────────┬──────────────────────┐ │
│ │ user_id │ feed_post_ids │ │
│ ├─────────┼──────────────────────┤ │
│ │ Alice │ [1002, 1001, ...] │ │
│ │ Dave │ [1001, ...] │ │
│ └─────────┴──────────────────────┘ │
│ 读取操作: GET feed[Alice] │
│ → 直接返回预计算好的列表,O(1) │
└──────────────────────────────────────┘

写的时候:Bob发帖 → 写入posts表(1次写入)→ 发事件到消息队列

异步扇出:消费者从队列取出事件 → 查询”谁关注了Bob” → 把帖子ID追加到Alice和Dave的feed列表

读的时候:Alice刷Feed → 直接读取feed[Alice] → 返回预排序的帖子ID列表 → 根据ID批量获取帖子内容

如果不用CQRS,读取Feed需要实时执行:

SELECT * FROM posts
WHERE author IN (SELECT follows FROM relationships WHERE user_id = 'Alice')
ORDER BY time DESC
LIMIT 20

这个JOIN查询在数据量大时非常慢(Alice关注了1000人,每人发了上万条帖子)。

CQRS把这个复杂查询的计算提前到写入时完成,读取时直接取结果。

先想一想 🤔 CQRS中,写模型和读模型通过异步事件同步。如果Bob发了一条帖子后立即查看Alice的Feed(此时异步扇出还没完成),Alice看不到Bob的新帖子。这个”最终一致性”问题如何处理?

点击查看解析

几种实用策略:

  1. Read-your-own-writes(读自己的写):Bob自己查看自己的Timeline时,除了读Feed列表,还从posts表中获取自己最新的帖子,合并后展示。确保发帖者能立即看到自己的帖子。

  2. 同步+异步混合扇出:对于发帖者的”密切关注”列表(如互相关注的好友),同步写入他们的Feed;对于普通关注者,异步扇出。这样最亲密的好友能立即看到,其他人有几秒延迟。

  3. 客户端乐观更新:Bob的客户端在发帖成功后,本地立即把帖子加到Alice的Feed展示中(如果Alice碰巧在看),不等服务端扇出完成。如果后续发现服务端扇出失败,再回滚。

  4. 接受延迟:对于大多数社交产品,帖子晚几秒出现在关注者Feed中是完全可以接受的。用户不会精确到秒去检查。Instagram和Twitter的帖子有时也有几秒的延迟才出现在关注者Timeline上。

核心思想:对于不同的读取场景,提供不同级别的一致性保证。自己的内容要强一致,别人的内容可以最终一致。


定义Lambda架构同时维护两条数据处理管道——批处理管道(处理全量历史数据,保证准确性)和流处理管道(处理实时数据,保证低延迟)。查询时合并两条管道的结果。Kappa架构只使用一条流处理管道——所有数据(包括历史数据)都通过流处理引擎处理。如果需要重新计算历史数据,就从消息队列的起始位置重新消费。

为什么重要

  • Lambda的优势:批处理和流处理可以用不同的技术各自优化;批处理结果可以作为”真值”校正流处理的近似结果。
  • Lambda的劣势:需要维护两套代码(批处理逻辑和流处理逻辑必须保持一致),运维复杂度翻倍。
  • Kappa的优势:只有一套代码,架构简单。
  • Kappa的劣势:流处理引擎需要能高效处理大量历史数据的回放,对引擎要求更高。

案例YouTube 实时观看统计 + 每日汇总报表

Lambda架构设计

观看事件流 (Kafka)
├──→ 流处理管道 (Flink)
│ │
│ ├─→ 实时仪表盘: "过去5分钟观看量: 12,345"
│ └─→ 实时排行: "当前热门视频 Top 10"
│ (近似值,可能有少量误差)
└──→ 批处理管道 (Spark, 每天凌晨运行)
├─→ 每日报表: "昨天总观看量: 8,234,567"
├─→ 创作者收入结算: 精确到每次观看的广告分成
└─→ 校正流处理的累计值
(批处理结果覆盖流处理的历史累计)

为什么YouTube需要Lambda而不是纯Kappa?

  1. 收入结算要求100%准确:流处理可能丢失少量事件(如极端故障情况),批处理对全量数据重新计算保证准确性。
  2. 实时仪表盘要求低延迟:不能等到每天批处理跑完才显示数据,需要流处理提供秒级更新。
  3. 两套结果互相校验:如果流处理和批处理的结果差异超过阈值,触发告警,可能是数据管道有bug。

Kappa架构设计(如果选择Kappa)

观看事件流 (Kafka, 保留7天原始数据)
└──→ 流处理管道 (Flink)
├─→ 实时仪表盘
├─→ 实时排行
└─→ 每日报表 (通过每天重放当天数据计算)
需要重算历史?
→ 启动新的 Flink 任务
→ 从 Kafka 的7天前位置开始重新消费
→ 重新计算所有结果
→ 用新结果替换旧结果

先想一想 🤔 一个创业公司正在构建视频平台(类似小型YouTube),每天100万次观看。应该选Lambda还是Kappa架构?3年后用户增长到每天10亿次观看时,需要切换架构吗?

点击查看解析

创业初期(100万次/天):选Kappa

理由:

  • 100万次/天的规模,一台好一点的机器就能处理。维护两套管道(Lambda)是过度设计。
  • 创业阶段产品变化快,业务逻辑频繁修改。一套代码(Kappa)比两套代码(Lambda)迭代速度快一倍。
  • 如果需要重算历史,Kafka保留30天数据,重放一遍几分钟就完成。
  • 精确度要求没那么高——还没有广告收入分成的场景。

增长到10亿次/天:考虑Lambda或混合

可能需要的原因:

  • 广告收入结算需要精确到每次观看,流处理的at-least-once语义可能导致少量重复计数,影响财务准确性。引入批处理作为校正。
  • Kafka保留30天的10亿次/天数据 = 300亿条事件,重放一次可能需要几小时到几天,不再是”几分钟就能重放完”。
  • 不同业务对延迟和精度的要求分化:实时仪表盘要秒级更新,月度报表要100%准确。

但不一定是完整的Lambda

  • 可以用Flink的exactly-once语义(配合Kafka事务)来提高流处理精度,减少对批处理校正的依赖。
  • 可以只对”财务相关”的指标跑批处理校正,其他指标仍然只用流处理。
  • 也就是”Lambda和Kappa的混合体”——按需引入批处理。

总结:从简单开始(Kappa),按需演进(部分Lambda)


练习1:YouTube 观看统计系统设计

Section titled “练习1:YouTube 观看统计系统设计”

为YouTube设计观看统计系统,要求:

  • 实时:视频页面显示”当前观看人数”(秒级更新)
  • 近实时:创作者后台显示”过去1小时观看趋势”(分钟级更新)
  • 离线:每日生成”频道数据报告”(次日可查看)

选择Lambda还是Kappa架构?画出数据流图,标注每个组件的技术选型。

点击查看解析

架构选择:Lambda(混合型)

三种不同延迟要求 → 需要不同的处理策略 → Lambda的批+流两条管道更合适。

数据流图

用户观看事件
┌─────────┐
│ Kafka │ (事件总线,保留7天)
└─────┬───┘
├──────────────────────────────────────────┐
│ │
▼ ▼
┌─────────────────┐ ┌────────────────┐
│ Flink 流处理 │ │ Spark 批处理 │
│ │ │ (每天凌晨运行) │
│ 任务1: │ │ │
│ 秒级聚合 │ │ 从Kafka/HDFS │
│ "当前观看人数" │ │ 读取全天数据 │
│ → Redis │ │ 精确统计 │
│ (TTL=30s) │ │ → HDFS/数据仓库│
│ │ │ │
│ 任务2: │ │ │
│ 分钟级窗口聚合 │ └────────┬───────┘
│ "过去1小时趋势" │ │
│ → TimescaleDB │ ▼
│ │ ┌────────────────┐
└─────────────────┘ │ 报告生成服务 │
│ 每日频道报告 │
│ → PostgreSQL │
└────────────────┘

各组件技术选型和理由

组件技术理由
事件总线Kafka高吞吐、持久化、支持多消费者
实时处理Flink精确一次语义、事件时间支持、低延迟
秒级数据存储Redis极低延迟读写、支持TTL自动过期
趋势数据存储TimescaleDB时序数据优化、SQL查询、自动压缩
批处理Spark高吞吐、成熟生态、SQL支持
报告数据存储PostgreSQL结构化数据、复杂查询、事务保证

流处理任务详解

任务1——“当前观看人数”:

Flink SQL:
SELECT video_id, COUNT(*) as viewers
FROM watch_events
WHERE event_type = 'heartbeat' -- 客户端每10秒发一次心跳
GROUP BY video_id,
TUMBLE(event_time, INTERVAL '10' SECOND)

结果写入Redis,key=viewers:{video_id},TTL=30秒。前端每5秒轮询一次。

任务2——“过去1小时趋势”:

Flink SQL:
SELECT video_id,
window_start,
COUNT(*) as views
FROM watch_events
WHERE event_type = 'view_start'
GROUP BY video_id,
TUMBLE(event_time, INTERVAL '1' MINUTE)

每分钟一个数据点,写入TimescaleDB。创作者后台查询最近60个数据点画折线图。


练习2:URL Shortener 点击统计 MapReduce

Section titled “练习2:URL Shortener 点击统计 MapReduce”

URL Shortener积累了10亿条点击日志,格式如下:

{short_code: "abc123", timestamp: "2026-03-15T10:00:00Z", ip: "1.2.3.4", ...}

用MapReduce思维描述:如何从这10亿条日志中统计每个短链接的点击量,并输出点击量Top 100?

点击查看解析

方案:两轮MapReduce

第一轮:统计每个short_code的点击量

Map阶段(1000个Map任务并行,每个处理100万条日志):

Map(日志记录) → (short_code, 1)
输入: {short_code: "abc123", timestamp: ..., ip: ...}
输出: ("abc123", 1)
输入: {short_code: "def456", timestamp: ..., ip: ...}
输出: ("def456", 1)
输入: {short_code: "abc123", timestamp: ..., ip: ...}
输出: ("abc123", 1)

Combiner优化(Map端局部聚合,大幅减少Shuffle数据量):

Combiner在每个Map任务内合并:
("abc123", 1) + ("abc123", 1) + ("abc123", 1) → ("abc123", 3)
("def456", 1) + ("def456", 1) → ("def456", 2)
原本每个Map任务输出100万个(key, 1)
Combiner后可能只输出几万个(key, count)
Shuffle数据量减少了约100倍!

Shuffle(按short_code分区):

所有 "abc123" 的计数发送到Reduce任务#42
所有 "def456" 的计数发送到Reduce任务#17
...

Reduce阶段(假设100个Reduce任务):

Reduce("abc123", [3, 5, 2, 8, ...]) → ("abc123", 总计=15234)
Reduce("def456", [2, 7, 1, ...]) → ("def456", 总计=8921)
...

第一轮输出:每个short_code的总点击量(可能有数百万个short_code)

第二轮:取Top 100

Map阶段(每个Map任务读取一部分第一轮结果):

每个Map任务内部维护一个大小为100的最小堆
遍历本任务的所有(short_code, count)对
堆满后,新的count > 堆顶 → 替换堆顶
每个Map任务输出本地Top 100

Reduce阶段(只有1个Reduce任务):

输入: 所有Map任务的本地Top 100(假设有100个Map任务 → 10000条)
在10000条中取全局Top 100
用最小堆,O(10000 × log100) ≈ 极快

最终输出:点击量最高的100个短链接及其点击量。

性能分析

10亿条日志,每条约100字节 → 总共约100GB
第一轮MapReduce:
Map: 100GB输入 → 并行读取 → 每个Map任务处理约100MB
Combiner: 100GB → 约1GB (100倍压缩)
Shuffle: 1GB网络传输
Reduce: 1GB → 几百MB (最终的short_code→count映射)
耗时: 约10-30分钟 (取决于集群大小)
第二轮MapReduce:
输入: 几百MB (第一轮输出)
处理: 几秒钟
输出: 100条记录
总耗时: 约10-30分钟,主要在第一轮

可以用Spark优化:一行代码搞定

clicks.groupBy("short_code").count().orderBy(desc("count")).limit(100)

Spark内部会自动做类似的两阶段聚合和局部Top-K优化。


本模块核心收获:数据处理有两种基本范式——批处理适合高吞吐、可接受延迟的场景,流处理适合低延迟、实时响应的场景。MapReduce的”分散-聚合”思想是基础,DAG模型是进化。流处理中的时间语义(事件时间vs处理时间)和窗口操作是核心概念。Event Sourcing和CQRS提供了强大的数据架构模式。Lambda和Kappa架构是批流融合的两种路线。大多数系统从简单开始,按需演进。