Module 9: 批处理与流处理
📖 深度参考手册 — 本模块属于理论参考,非主线必读。 主线学习路径见 README.md。 当你在项目实战中遇到相关问题时,回来查阅。
来源:DDIA Ch10 (Batch Processing), Ch11 (Stream Processing)
数据处理有两种基本范式:批处理和流处理。批处理像”考试后统一批改试卷”,流处理像”边考边改”。这个模块帮你理解两种范式的原理、适用场景,以及它们如何在真实系统中协作。
9.1 批处理 vs 流处理
Section titled “9.1 批处理 vs 流处理”定义:批处理(Batch Processing)是对一组有界(bounded)数据集进行一次性处理——数据有明确的开始和结束,处理完就结束。流处理(Stream Processing)是对持续不断的无界(unbounded)数据流进行实时处理——数据源源不断地到来,处理永远不会”完成”。两者是数据处理的两种基本范式,各有优势和适用场景。
为什么重要:选择批处理还是流处理,直接决定了系统架构、数据延迟、资源使用模式和复杂度。批处理的优势是简单、高吞吐、容错容易(重跑一次就行);流处理的优势是低延迟、实时反馈。很多系统同时需要两者——历史数据用批处理分析,实时数据用流处理响应。
案例:
Web Crawler — 批处理典型:
每天凌晨2点启动批处理任务: 1. 从URL种子库中读取所有待爬URL列表(有界数据集) 2. 并行爬取所有URL,下载页面内容 3. 解析页面,提取新URL加入种子库 4. 更新搜索索引 5. 任务完成,等待下一次调度
特征: 有明确的开始和结束、高吞吐、延迟不敏感(隔天更新可以接受)Gaming Leaderboard — 流处理典型:
玩家不断产生游戏事件(无界数据流): → 玩家A得了50分 → 玩家B得了30分 → 玩家A又得了20分 → ...(永不停止)
流处理系统实时消费这些事件: → 收到"玩家A得50分" → 排行榜实时更新 → 玩家看到自己排名变化 → 收到"玩家B得30分" → 排行榜再次更新
特征: 无明确结束、低延迟(秒级甚至毫秒级)、持续运行| 维度 | 批处理 | 流处理 |
|---|---|---|
| 数据边界 | 有界(处理固定数据集) | 无界(持续数据流) |
| 延迟 | 分钟~小时 | 毫秒~秒 |
| 吞吐量 | 极高(优化批量操作) | 中等(逐条或小批量) |
| 容错 | 简单(重跑整个批次) | 复杂(需要检查点) |
| 代表技术 | MapReduce, Spark Batch | Flink, Kafka Streams |
先想一想 🤔 Web Crawler 一定只能用批处理吗?如果需要”发现新闻后5分钟内就收录”,应该怎么改?
点击查看解析
可以用批处理+流处理混合的方式:
- 批处理(基线):每天全量爬取所有已知URL,确保索引的完整性和一致性。
- 流处理(增量):订阅RSS/Atom feed、Twitter firehose、Webhook等实时信号源。一旦发现新内容的信号,立即触发爬取和索引更新。
信号源 → Kafka → 流处理(即时爬取+索引) → 搜索可用↑定时任务 → 批处理(全量爬取+重建索引) ────────┘Google实际上就是这样做的——对于新闻类页面,发现到收录的延迟可以低至几分钟;对于普通页面,可能几天才重新爬取一次。批处理保证完整性,流处理保证时效性。
9.2 MapReduce 模型
Section titled “9.2 MapReduce 模型”定义:MapReduce是一种将大规模数据处理分解为两个阶段的编程模型:Map阶段将输入数据转换为键值对(key-value pairs),Shuffle阶段按key将数据分组并传输到对应的处理节点,Reduce阶段对同一key的所有值进行聚合计算。这个模型的核心优势是天然可并行——Map和Reduce阶段的每个任务都可以独立运行在不同的机器上。
为什么重要:MapReduce是大数据处理的基石思想。虽然现在很多系统(如Spark)已经超越了MapReduce,但MapReduce的思维方式——“先分散处理,再汇总聚合”——是理解所有分布式数据处理框架的基础。
案例:Search Engine 构建倒排索引是最经典的MapReduce应用。
假设我们有3个网页需要建立倒排索引:
文档1: "cat sat on mat"文档2: "cat ate cheese"文档3: "dog sat on log"Map阶段(每个文档独立处理,可并行):
Map(文档1) → [(cat, doc1), (sat, doc1), (on, doc1), (mat, doc1)]Map(文档2) → [(cat, doc2), (ate, doc2), (cheese, doc2)]Map(文档3) → [(dog, doc3), (sat, doc3), (on, doc3), (log, doc3)]Shuffle阶段(按key分组,自动完成):
cat → [doc1, doc2]sat → [doc1, doc3]on → [doc1, doc3]mat → [doc1]ate → [doc2]cheese → [doc2]dog → [doc3]log → [doc3]Reduce阶段(每个key独立聚合,可并行):
Reduce(cat, [doc1, doc2]) → cat: {doc1, doc2} // "cat"出现在文档1和2Reduce(sat, [doc1, doc3]) → sat: {doc1, doc3}Reduce(on, [doc1, doc3]) → on: {doc1, doc3}Reduce(mat, [doc1]) → mat: {doc1}...最终得到倒排索引:
cat → {doc1, doc2}sat → {doc1, doc3}on → {doc1, doc3}mat → {doc1}ate → {doc2}cheese → {doc2}dog → {doc3}log → {doc3}用户搜索”cat sat”时,查询 倒排索引[cat] ∩ 倒排索引[sat] = {doc1, doc2} ∩ {doc1, doc3} = {doc1}。
在真实的搜索引擎中,这个MapReduce要复杂得多:
- Map阶段还要做分词、去停用词、词干提取(“running”→“run”)
- 倒排索引不仅记录文档ID,还记录词频、位置信息(用于短语搜索和相关性排序)
- 多轮MapReduce:先建索引,再计算PageRank,再合并
先想一想 🤔 如果有100亿个网页,每个网页平均100个词,那么Map阶段会产生约1万亿个键值对。Shuffle阶段如何高效地按key分组?
点击查看解析
Shuffle是MapReduce中最耗资源的阶段,核心思路是哈希分区+排序:
哈希分区:Map输出的每个键值对,根据
hash(key) % R(R是Reduce任务数)决定发送到哪个Reduce节点。同一个key一定去同一个Reduce节点。本地排序:每个Map任务在发送前先按key排序,这样Reduce端可以用归并排序高效合并来自多个Map的数据。
Combiner优化:如果Reduce操作是可结合的(如计数、求和),可以在Map端先做一次局部聚合(Combiner),减少网络传输量。比如一个文档中”the”出现了50次,Map端直接输出
(the, 50)而不是50个(the, 1)。磁盘溢写:1万亿个键值对无法全放内存,Map的中间结果写到本地磁盘,Reduce通过HTTP拉取。这也是MapReduce性能瓶颈所在——大量磁盘IO和网络IO。
Spark之所以比MapReduce快很多,主要就是因为Spark尽可能把中间结果保持在内存中,避免了磁盘IO。
9.3 有向无环图 (DAG) 数据流
Section titled “9.3 有向无环图 (DAG) 数据流”定义:DAG数据流是比MapReduce更灵活的数据处理模型。在MapReduce中,数据只能经过”Map→Shuffle→Reduce”这一固定模式,如果需要多步处理就要串联多个MapReduce作业(一个的输出是另一个的输入),中间结果必须写入磁盘。DAG模型允许将数据处理表达为一个有向无环图——节点是计算步骤,边是数据流——任意步骤可以有多个输入和多个输出,中间结果可以驻留内存,大幅提升效率。
为什么重要:现实中的数据处理往往不是简单的”Map→Reduce”,而是多步骤、多分支、有依赖关系的复杂管道。例如,先过滤、再关联两个数据集、然后聚合、最后排序。用MapReduce需要拆分成多个独立作业并手动管理中间数据;用DAG框架(如Spark、Flink、Tez)可以把整个管道表达为一个DAG,框架自动优化执行计划。
案例:YouTube 推荐特征计算管道 是一个典型的多步骤DAG:
┌──────────────┐ │ 用户行为日志 │ └──────┬───────┘ │ ┌──────▼───────┐ │ 数据清洗 │ (去重、过滤bot流量) └──────┬───────┘ │ ┌────────────┼────────────┐ │ │ │ ┌──────▼───┐ ┌────▼─────┐ ┌───▼──────┐ │ 用户画像 │ │ 视频特征 │ │ 交互特征 │ │ 计算 │ │ 计算 │ │ 计算 │ │ (年龄、 │ │ (类别、 │ │ (观看完成 │ │ 兴趣标签)│ │ 时长、 │ │ 率、点赞 │ └──────┬───┘ │ 热度) │ │ 率) │ │ └────┬─────┘ └───┬──────┘ │ │ │ └────────────┼────────────┘ │ ┌──────▼───────┐ │ 特征Join │ (用户画像 × 视频特征 × 交互特征) └──────┬───────┘ │ ┌────────────┼────────────┐ │ │ ┌──────▼───────┐ ┌──────▼───────┐ │ 训练数据集 │ │ 实时特征存储 │ │ (写入HDFS) │ │ (写入Redis) │ └──────────────┘ └──────────────┘这个管道的特点:
- 多输入:特征Join步骤需要三个上游的输出
- 多输出:最终结果既要存HDFS用于模型训练,又要存Redis用于实时推荐
- 可并行:用户画像、视频特征、交互特征三个计算互相独立,可以并行执行
- 无环:数据只向下流动,没有循环依赖
如果用MapReduce实现,需要拆成至少5个独立的MapReduce作业,手动管理依赖和中间数据。用Spark可以表达为一个统一的DAG,Spark引擎自动调度、优化、并行执行。
先想一想 🤔 在上面的YouTube特征计算DAG中,“用户画像计算”步骤需要3分钟,“视频特征计算”需要10分钟,“交互特征计算”需要5分钟。整个管道的总执行时间是多少?(假设特征Join需要2分钟,最终写入需要1分钟)
点击查看解析
DAG调度器会自动识别并行机会。三个特征计算步骤互相独立,可以并行执行:
数据清洗: [===] 假设1分钟用户画像(并行): [===] 3分钟视频特征(并行): [==========] 10分钟 ← 瓶颈交互特征(并行): [=====] 5分钟特征Join(等最慢的): [==] 2分钟写入: [=] 1分钟总时间 = 1 + max(3, 10, 5) + 2 + 1 = 14分钟
如果用串行的MapReduce:1 + 3 + 10 + 5 + 2 + 1 = 22分钟
DAG并行执行节省了 3+5-max(3,5) = 8分钟,相当于原来的36%。在更复杂的管道中,并行收益更大。
进一步优化:视频特征计算是瓶颈(10分钟)。可以对其内部进一步拆分(如按视频类别分片并行),或者增加计算资源(更多executor),降低瓶颈步骤的耗时。这就是关键路径优化。
9.4 流处理中的时间语义
Section titled “9.4 流处理中的时间语义”定义:在流处理中,每个事件有两种时间——事件时间(Event Time) 是事件实际发生的时间(由产生事件的设备/服务记录),处理时间(Processing Time) 是流处理引擎收到并处理该事件的时间。两者往往不同,因为事件从产生到被处理之间有传输延迟、缓冲、网络抖动等。水位线(Watermark) 是流处理引擎对”所有事件时间小于T的事件都已经到达”的一种估计,用于处理乱序到达的事件。
为什么重要:如果用处理时间来分析数据,结果会受到系统负载、网络延迟等因素的影响而失真。比如在系统过载期间,大量事件被积压,当积压消化后这些事件会在短时间内被处理——如果用处理时间统计”每分钟事件数”,会看到一个虚假的”尖峰”,而实际上这些事件是均匀发生的。使用事件时间才能反映真实世界的情况。
案例:Gaming Leaderboard 中分数可能延迟到达:
真实事件顺序(事件时间): 10:00:01 玩家A得50分 10:00:02 玩家B得30分 10:00:03 玩家A得20分
实际到达流处理引擎的顺序(处理时间): 10:00:02 收到"玩家B得30分" ← B的事件先到 10:00:05 收到"玩家A得50分" ← A的第一个事件延迟了4秒 10:00:06 收到"玩家A得20分" ← A的第二个事件延迟了3秒为什么会乱序?可能因为:
- 玩家A的手机网络不好,事件发送延迟
- 玩家A的游戏事件经过了不同的中间节点(Kafka分区),路径更长
- 服务端在不同时区的数据中心处理
如果用处理时间统计”10:00:00~10:00:03这3秒内的得分”:
- 只统计到玩家B的30分(A的事件在这个时间窗口还没到达)
- 结果不准确
如果用事件时间 + 水位线:
水位线逻辑: - 设置允许的最大延迟 = 5秒 - 当水位线推进到 10:00:03 + 5秒 = 10:00:08 时 - 认为所有事件时间 ≤ 10:00:03 的事件都已到达 - 此时关闭 10:00:00~10:00:03 的窗口并输出结果 - 结果: A=70分, B=30分 ✓ 正确!先想一想 🤔 水位线设置为”允许最大延迟5秒”。如果某个事件延迟了10秒才到达(超过了水位线),怎么办?
点击查看解析
这个事件到达时,它所属的窗口已经关闭并输出了结果——这就是迟到事件(Late Event)。
常见的处理策略:
丢弃:忽略迟到事件。适用于精度要求不高的场景(如实时仪表盘的近似统计)。
更新已输出的结果:重新计算并发出更新。Flink支持”Allowed Lateness”机制,在窗口关闭后仍保留一段时间的状态,迟到事件可以触发结果更新。
第一次输出 (水位线触发): A=70分, B=30分迟到事件到达: A又得了10分(事件时间10:00:02)第二次输出 (更新): A=80分, B=30分 (标记为UPDATE)旁路输出:把迟到事件发送到一个单独的”迟到数据”流,由下游系统(如批处理)定期处理。
水位线自适应调整:根据实际观察到的延迟动态调整水位线。如果发现越来越多的事件延迟超过5秒,自动增大水位线到10秒。代价是结果输出延迟增加。
在Gaming Leaderboard场景中,方案2最合适——玩家能接受排行榜先显示一个值、然后稍后更新为更准确的值。
9.5 窗口操作
Section titled “9.5 窗口操作”定义:窗口(Window)是将无限的数据流切分为有限块的机制,以便对每一块进行聚合计算(如计数、求和、取最大值)。三种主要窗口类型:滚动窗口(Tumbling Window) 是固定大小、互不重叠的窗口;滑动窗口(Sliding Window) 是固定大小、可重叠的窗口,每隔一个滑动步长产生一个新窗口;会话窗口(Session Window) 是由活动间隔(gap)定义的不固定大小窗口,一段时间没有新事件则窗口关闭。
为什么重要:无限的数据流无法直接做聚合计算(比如”总共有多少次点击”这个问题永远没有终值),必须通过窗口来界定”在什么范围内”做计算。不同的窗口类型适用于不同的业务需求,选对窗口类型直接影响结果的正确性和可用性。
案例:URL Shortener “最近1小时点击量” 用滑动窗口。
假设当前时间是 14:30,我们要计算 short.url/abc 的”最近1小时点击量”。
如果用滚动窗口(1小时):
窗口1: [13:00, 14:00) → 统计这1小时的点击量窗口2: [14:00, 15:00) → 统计这1小时的点击量
问题: 在14:30查询"最近1小时",得到的是 [14:00, 14:30) 的结果 (只有30分钟的数据),或者 [13:00, 14:00) 的结果(已过时的数据)如果用滑动窗口(窗口大小1小时,滑动步长1分钟):
窗口1: [13:29, 14:29)窗口2: [13:30, 14:30) ← 在14:30查询,正好是这个窗口窗口3: [13:31, 14:31)
每分钟产生一个新窗口,每个窗口覆盖完整的1小时在任何时刻查询,都能得到"最近1小时"的准确结果时间线: 13:00 13:30 14:00 14:30 15:00 | | | | |滚动: [──────窗口1──────][──────窗口2──────] 无重叠,边界固定
滑动: [──────窗口A──────] [──────窗口B──────] [──────窗口C──────] ... 每分钟一个新窗口,互相重叠会话窗口的应用场景:Web Crawler中分析爬虫的”爬取会话”——
爬虫对 example.com 的请求: 09:00:01 请求 /page1 09:00:03 请求 /page2 09:00:05 请求 /page3 09:00:08 请求 /page4 (间隔30秒无请求) 09:00:40 请求 /page5 09:00:42 请求 /page6
如果 session gap = 15秒: 会话1: [09:00:01, 09:00:08] → 4个请求 会话2: [09:00:40, 09:00:42] → 2个请求
用于分析:每次爬取会话爬了多少页面、持续多长时间先想一想 🤔 URL Shortener要实时显示”过去24小时每小时的点击量柱状图”(共24根柱子),应该用什么类型的窗口?如果还要显示”过去24小时总点击量”呢?
点击查看解析
“每小时点击量柱状图”用滚动窗口:
滚动窗口,大小=1小时:[00:00, 01:00) → 柱子1: 1200次[01:00, 02:00) → 柱子2: 800次[02:00, 03:00) → 柱子3: 350次...[23:00, 24:00) → 柱子24: 1500次每根柱子代表一个完整的1小时,互不重叠——滚动窗口天然适合。
“过去24小时总点击量”用滑动窗口:
滑动窗口,大小=24小时,滑动步长=1分钟(或实时更新):随着时间推移,窗口不断滑动14:30 查询 → [前一天14:30, 今天14:30) 的总和14:31 查询 → [前一天14:31, 今天14:31) 的总和或者更高效的做法:不直接用流处理的滑动窗口(24小时窗口状态太大),而是把滚动窗口的结果(每小时计数)存下来,查询时SUM最近24个小时的计数。这就是”预聚合+查询时汇总”的思路,比维护一个巨大的滑动窗口高效得多。
9.6 流表对偶性
Section titled “9.6 流表对偶性”定义:流和表看似不同的概念,实际上可以互相转换——流是表的变更日志:对表的每次INSERT、UPDATE、DELETE操作都是流中的一个事件;表是流的累积结果:从头到尾重放流中的所有事件,就能重建出表的当前状态。这就是流表对偶性(Stream-Table Duality),也叫”表是流的物化视图”。
为什么重要:理解流表对偶性能帮你设计更灵活的数据架构。你可以选择只存储流(变更事件),在需要时实时计算出表;也可以存储表(当前状态),在需要时从表的变更日志生成流。很多现代数据架构(如Event Sourcing、CDC)都建立在这个概念之上。
案例:News Feed = 关注关系表 + 帖子变更流的 Join。
"关注关系"是一张表: ┌─────────┬───────────┐ │ user_id │ follows │ ├─────────┼───────────┤ │ Alice │ Bob │ │ Alice │ Carol │ │ Dave │ Bob │ └─────────┴───────────┘
"发帖"是一条流: 事件1: {user: Bob, action: post, content: "Hello!", time: 10:00} 事件2: {user: Carol, action: post, content: "Hi!", time: 10:01} 事件3: {user: Bob, action: post, content: "Bye!", time: 10:02}News Feed的生成 = 流(帖子) JOIN 表(关注关系):
收到 Bob 发帖 "Hello!" → 查询关注关系表:谁关注了Bob?→ Alice, Dave → 把帖子推送到 Alice 和 Dave 的信息流
收到 Carol 发帖 "Hi!" → 查询关注关系表:谁关注了Carol?→ Alice → 把帖子推送到 Alice 的信息流但关注关系本身也会变化(也是一条流!):
事件4: {user: Dave, action: follow, target: Carol, time: 10:03} → 更新关注关系表:Dave现在也关注了Carol → 之后Carol的新帖子也会推送给Dave所以完整的模型是:两条流(帖子流 + 关注变更流) JOIN,其中关注变更流被物化为关注关系表以便高效查询。这就是流表对偶性的实际应用。
先想一想 🤔 如果Alice取消关注了Bob(关注关系表发生变化),Alice的News Feed中已经存在的Bob的旧帖子应该怎么处理?
点击查看解析
这取决于产品设计和技术权衡:
方案1:不删除旧帖子(Twitter/X的做法)
- 取消关注后,Alice的Feed中不再出现Bob的新帖子
- 但已经推送到Feed中的旧帖子保留不动
- 原因:删除旧帖子需要遍历Alice的Feed找到所有Bob的帖子并逐一删除,代价很高
- 如果Alice的Feed是一个”帖子ID列表”(fan-out-on-write),可能有几千条Bob的帖子需要清理
方案2:读取时过滤(更常见)
- Feed中保留Bob的旧帖子ID,但在Alice读取Feed时,实时检查关注关系
- 如果Alice已经取消关注Bob,则过滤掉Bob的帖子不显示
- 优点:取消关注操作是O(1)的
- 缺点:每次读取Feed都要做过滤检查,增加读取延迟
方案3:异步清理
- 取消关注后,立即停止推送新帖子
- 后台异步任务慢慢清理Feed中的旧帖子
- 在清理完成前,用方案2的读取时过滤来保证正确性
这里体现了流表对偶性的一个挑战:当”表”发生变化时,之前”流”产生的物化结果可能需要回溯更新。
9.7 Event Sourcing
Section titled “9.7 Event Sourcing”定义:Event Sourcing(事件溯源)是一种不存储实体的当前状态,而是存储所有导致状态变化的事件的数据存储模式。实体的当前状态通过按顺序重放所有事件来计算得出。例如,银行账户不存储”余额=1000”,而是存储所有交易记录:“开户+0”、“存入+500”、“存入+800”、“取出-300”——余额可以通过重放这些事件计算出来:0+500+800-300=1000。
为什么重要:Event Sourcing提供了几个独特优势:
- 完整审计追踪:所有变化都有记录,可追溯任何时间点的状态
- 时间旅行:可以重建任意历史时刻的状态(“3月15日14:00时库存是多少?”)
- Bug修复后重算:如果发现某个计算逻辑有bug,修复后重放所有事件即可得到正确状态
- 解耦读写:写入只是追加事件(append-only),极其高效
案例:Hotel Reservation 记录所有预订/取消事件。
传统做法(只存当前状态):
rooms 表: ┌──────────┬─────────┬──────────┐ │ room_id │ status │ guest │ ├──────────┼─────────┼──────────┤ │ 101 │ booked │ Alice │ └──────────┴─────────┴──────────┘Event Sourcing做法(存储所有事件):
events 表: ┌──────┬──────────────────┬────────────────────────────────────┐ │ seq │ timestamp │ event │ ├──────┼──────────────────┼────────────────────────────────────┤ │ 1 │ 2026-03-01 09:00 │ RoomListed{room:101, price:500} │ │ 2 │ 2026-03-10 14:00 │ RoomBooked{room:101, guest:Bob, │ │ │ │ checkin:3/20, checkout:3/22} │ │ 3 │ 2026-03-12 10:00 │ BookingCancelled{room:101, │ │ │ │ guest:Bob, reason:"plans changed"}│ │ 4 │ 2026-03-15 16:00 │ RoomBooked{room:101, guest:Alice, │ │ │ │ checkin:3/20, checkout:3/25} │ │ 5 │ 2026-03-18 09:00 │ PriceChanged{room:101, price:600} │ └──────┴──────────────────┴────────────────────────────────────┘当前状态通过重放事件得出:
- 事件1:101号房上架,500元/晚
- 事件2:Bob预订了3/20-3/22
- 事件3:Bob取消了 → 101号房恢复可用
- 事件4:Alice预订了3/20-3/25
- 事件5:价格改为600元/晚(不影响Alice已确认的预订价格)
→ 当前状态:101号房已被Alice预订,3/20-3/25,价格500元/晚
Event Sourcing的实际价值场景:
- 客户投诉”我明明没取消预订”→ 直接查事件日志,有完整记录
- 发现定价逻辑有bug,多收了客户钱 → 修复逻辑后,重放事件,找出所有受影响的订单并退款
- 分析”平均预订到取消的时间间隔”→ 直接从事件流统计
先想一想 🤔 Event Sourcing的事件日志会越来越大(永远追加不删除)。如果酒店运营了10年,每次查询房间状态都要重放10年的事件,性能怎么办?
点击查看解析
核心解决方案:快照(Snapshot)。
不需要每次都从第1条事件重放。定期生成快照:快照(2026-01-01): room 101 = {status: available, price: 450}之后的事件:事件10001: RoomBooked{...}事件10002: PriceChanged{...}...查询当前状态 = 加载最新快照 + 重放快照之后的事件如果快照每天生成一次,最多只需重放1天的事件(而不是10年)其他优化:
物化视图:为高频查询维护一个实时更新的”当前状态表”(就是CQRS中的读模型,见8.8)。查询直接读物化视图,O(1)。
事件归档:很老的事件(如5年前)归档到冷存储(S3),只在需要审计或重算时才加载。
压缩(Compaction):对于同一实体的多次更新,只保留最新的快照。但这会丢失历史——需要权衡审计需求。
实际上,Event Sourcing系统通常同时维护事件日志(写入用)和物化视图(查询用),两者通过异步消费事件流保持同步。
9.8 CQRS
Section titled “9.8 CQRS”定义:CQRS(Command Query Responsibility Segregation,命令查询职责分离)是将系统的写入操作(Command)和读取操作(Query)分离到不同的模型、甚至不同的数据存储中的架构模式。写模型针对写入优化(如追加日志、事件存储),读模型针对查询优化(如物化视图、搜索索引、缓存)。两个模型通过异步的事件/消息同步保持最终一致。
为什么重要:在很多系统中,读和写的模式差异巨大——写入可能是简单的”追加一条记录”,但读取可能需要复杂的聚合、关联、排序。如果用同一个数据模型同时服务读写,往往顾此失彼。CQRS允许读写各自用最优的数据结构和存储引擎,代价是引入了最终一致性(读模型可能短暂落后于写模型)。
案例:News Feed 的写入(发帖)和读取(刷信息流)用不同数据结构。
写模型(针对发帖优化): ┌──────────────────────────────────────┐ │ posts 表(追加写入,极快) │ │ ┌──────┬────────┬──────────┬───────┐│ │ │ id │ author │ content │ time ││ │ ├──────┼────────┼──────────┼───────┤│ │ │ 1001 │ Bob │ "Hello!" │ 10:00 ││ │ │ 1002 │ Carol │ "Hi!" │ 10:01 ││ │ └──────┴────────┴──────────┴───────┘│ │ 写入操作: INSERT INTO posts (...) │ │ → 简单追加,O(1) │ └──────────────────────────────────────┘
↓ 异步事件同步 ↓ (Kafka/消息队列)
读模型(针对Feed查询优化): ┌──────────────────────────────────────┐ │ feed 表(每个用户一个预计算的帖子列表)│ │ ┌─────────┬──────────────────────┐ │ │ │ user_id │ feed_post_ids │ │ │ ├─────────┼──────────────────────┤ │ │ │ Alice │ [1002, 1001, ...] │ │ │ │ Dave │ [1001, ...] │ │ │ └─────────┴──────────────────────┘ │ │ 读取操作: GET feed[Alice] │ │ → 直接返回预计算好的列表,O(1) │ └──────────────────────────────────────┘写的时候:Bob发帖 → 写入posts表(1次写入)→ 发事件到消息队列
异步扇出:消费者从队列取出事件 → 查询”谁关注了Bob” → 把帖子ID追加到Alice和Dave的feed列表
读的时候:Alice刷Feed → 直接读取feed[Alice] → 返回预排序的帖子ID列表 → 根据ID批量获取帖子内容
如果不用CQRS,读取Feed需要实时执行:
SELECT * FROM postsWHERE author IN (SELECT follows FROM relationships WHERE user_id = 'Alice')ORDER BY time DESCLIMIT 20这个JOIN查询在数据量大时非常慢(Alice关注了1000人,每人发了上万条帖子)。
CQRS把这个复杂查询的计算提前到写入时完成,读取时直接取结果。
先想一想 🤔 CQRS中,写模型和读模型通过异步事件同步。如果Bob发了一条帖子后立即查看Alice的Feed(此时异步扇出还没完成),Alice看不到Bob的新帖子。这个”最终一致性”问题如何处理?
点击查看解析
几种实用策略:
Read-your-own-writes(读自己的写):Bob自己查看自己的Timeline时,除了读Feed列表,还从posts表中获取自己最新的帖子,合并后展示。确保发帖者能立即看到自己的帖子。
同步+异步混合扇出:对于发帖者的”密切关注”列表(如互相关注的好友),同步写入他们的Feed;对于普通关注者,异步扇出。这样最亲密的好友能立即看到,其他人有几秒延迟。
客户端乐观更新:Bob的客户端在发帖成功后,本地立即把帖子加到Alice的Feed展示中(如果Alice碰巧在看),不等服务端扇出完成。如果后续发现服务端扇出失败,再回滚。
接受延迟:对于大多数社交产品,帖子晚几秒出现在关注者Feed中是完全可以接受的。用户不会精确到秒去检查。Instagram和Twitter的帖子有时也有几秒的延迟才出现在关注者Timeline上。
核心思想:对于不同的读取场景,提供不同级别的一致性保证。自己的内容要强一致,别人的内容可以最终一致。
9.9 Lambda vs Kappa 架构
Section titled “9.9 Lambda vs Kappa 架构”定义:Lambda架构同时维护两条数据处理管道——批处理管道(处理全量历史数据,保证准确性)和流处理管道(处理实时数据,保证低延迟)。查询时合并两条管道的结果。Kappa架构只使用一条流处理管道——所有数据(包括历史数据)都通过流处理引擎处理。如果需要重新计算历史数据,就从消息队列的起始位置重新消费。
为什么重要:
- Lambda的优势:批处理和流处理可以用不同的技术各自优化;批处理结果可以作为”真值”校正流处理的近似结果。
- Lambda的劣势:需要维护两套代码(批处理逻辑和流处理逻辑必须保持一致),运维复杂度翻倍。
- Kappa的优势:只有一套代码,架构简单。
- Kappa的劣势:流处理引擎需要能高效处理大量历史数据的回放,对引擎要求更高。
案例:YouTube 实时观看统计 + 每日汇总报表。
Lambda架构设计:
观看事件流 (Kafka) │ ├──→ 流处理管道 (Flink) │ │ │ ├─→ 实时仪表盘: "过去5分钟观看量: 12,345" │ └─→ 实时排行: "当前热门视频 Top 10" │ (近似值,可能有少量误差) │ └──→ 批处理管道 (Spark, 每天凌晨运行) │ ├─→ 每日报表: "昨天总观看量: 8,234,567" ├─→ 创作者收入结算: 精确到每次观看的广告分成 └─→ 校正流处理的累计值 (批处理结果覆盖流处理的历史累计)为什么YouTube需要Lambda而不是纯Kappa?
- 收入结算要求100%准确:流处理可能丢失少量事件(如极端故障情况),批处理对全量数据重新计算保证准确性。
- 实时仪表盘要求低延迟:不能等到每天批处理跑完才显示数据,需要流处理提供秒级更新。
- 两套结果互相校验:如果流处理和批处理的结果差异超过阈值,触发告警,可能是数据管道有bug。
Kappa架构设计(如果选择Kappa):
观看事件流 (Kafka, 保留7天原始数据) │ └──→ 流处理管道 (Flink) │ ├─→ 实时仪表盘 ├─→ 实时排行 └─→ 每日报表 (通过每天重放当天数据计算)
需要重算历史? → 启动新的 Flink 任务 → 从 Kafka 的7天前位置开始重新消费 → 重新计算所有结果 → 用新结果替换旧结果先想一想 🤔 一个创业公司正在构建视频平台(类似小型YouTube),每天100万次观看。应该选Lambda还是Kappa架构?3年后用户增长到每天10亿次观看时,需要切换架构吗?
点击查看解析
创业初期(100万次/天):选Kappa
理由:
- 100万次/天的规模,一台好一点的机器就能处理。维护两套管道(Lambda)是过度设计。
- 创业阶段产品变化快,业务逻辑频繁修改。一套代码(Kappa)比两套代码(Lambda)迭代速度快一倍。
- 如果需要重算历史,Kafka保留30天数据,重放一遍几分钟就完成。
- 精确度要求没那么高——还没有广告收入分成的场景。
增长到10亿次/天:考虑Lambda或混合
可能需要的原因:
- 广告收入结算需要精确到每次观看,流处理的at-least-once语义可能导致少量重复计数,影响财务准确性。引入批处理作为校正。
- Kafka保留30天的10亿次/天数据 = 300亿条事件,重放一次可能需要几小时到几天,不再是”几分钟就能重放完”。
- 不同业务对延迟和精度的要求分化:实时仪表盘要秒级更新,月度报表要100%准确。
但不一定是完整的Lambda:
- 可以用Flink的exactly-once语义(配合Kafka事务)来提高流处理精度,减少对批处理校正的依赖。
- 可以只对”财务相关”的指标跑批处理校正,其他指标仍然只用流处理。
- 也就是”Lambda和Kappa的混合体”——按需引入批处理。
总结:从简单开始(Kappa),按需演进(部分Lambda)。
练习1:YouTube 观看统计系统设计
Section titled “练习1:YouTube 观看统计系统设计”为YouTube设计观看统计系统,要求:
- 实时:视频页面显示”当前观看人数”(秒级更新)
- 近实时:创作者后台显示”过去1小时观看趋势”(分钟级更新)
- 离线:每日生成”频道数据报告”(次日可查看)
选择Lambda还是Kappa架构?画出数据流图,标注每个组件的技术选型。
点击查看解析
架构选择:Lambda(混合型)
三种不同延迟要求 → 需要不同的处理策略 → Lambda的批+流两条管道更合适。
数据流图:
用户观看事件 │ ▼┌─────────┐│ Kafka │ (事件总线,保留7天)└─────┬───┘ │ ├──────────────────────────────────────────┐ │ │ ▼ ▼┌─────────────────┐ ┌────────────────┐│ Flink 流处理 │ │ Spark 批处理 ││ │ │ (每天凌晨运行) ││ 任务1: │ │ ││ 秒级聚合 │ │ 从Kafka/HDFS ││ "当前观看人数" │ │ 读取全天数据 ││ → Redis │ │ 精确统计 ││ (TTL=30s) │ │ → HDFS/数据仓库││ │ │ ││ 任务2: │ │ ││ 分钟级窗口聚合 │ └────────┬───────┘│ "过去1小时趋势" │ ││ → TimescaleDB │ ▼│ │ ┌────────────────┐└─────────────────┘ │ 报告生成服务 │ │ 每日频道报告 │ │ → PostgreSQL │ └────────────────┘各组件技术选型和理由:
| 组件 | 技术 | 理由 |
|---|---|---|
| 事件总线 | Kafka | 高吞吐、持久化、支持多消费者 |
| 实时处理 | Flink | 精确一次语义、事件时间支持、低延迟 |
| 秒级数据存储 | Redis | 极低延迟读写、支持TTL自动过期 |
| 趋势数据存储 | TimescaleDB | 时序数据优化、SQL查询、自动压缩 |
| 批处理 | Spark | 高吞吐、成熟生态、SQL支持 |
| 报告数据存储 | PostgreSQL | 结构化数据、复杂查询、事务保证 |
流处理任务详解:
任务1——“当前观看人数”:
Flink SQL:SELECT video_id, COUNT(*) as viewersFROM watch_eventsWHERE event_type = 'heartbeat' -- 客户端每10秒发一次心跳GROUP BY video_id, TUMBLE(event_time, INTERVAL '10' SECOND)结果写入Redis,key=viewers:{video_id},TTL=30秒。前端每5秒轮询一次。
任务2——“过去1小时趋势”:
Flink SQL:SELECT video_id, window_start, COUNT(*) as viewsFROM watch_eventsWHERE event_type = 'view_start'GROUP BY video_id, TUMBLE(event_time, INTERVAL '1' MINUTE)每分钟一个数据点,写入TimescaleDB。创作者后台查询最近60个数据点画折线图。
练习2:URL Shortener 点击统计 MapReduce
Section titled “练习2:URL Shortener 点击统计 MapReduce”URL Shortener积累了10亿条点击日志,格式如下:
{short_code: "abc123", timestamp: "2026-03-15T10:00:00Z", ip: "1.2.3.4", ...}用MapReduce思维描述:如何从这10亿条日志中统计每个短链接的点击量,并输出点击量Top 100?
点击查看解析
方案:两轮MapReduce
第一轮:统计每个short_code的点击量
Map阶段(1000个Map任务并行,每个处理100万条日志):
Map(日志记录) → (short_code, 1)
输入: {short_code: "abc123", timestamp: ..., ip: ...}输出: ("abc123", 1)
输入: {short_code: "def456", timestamp: ..., ip: ...}输出: ("def456", 1)
输入: {short_code: "abc123", timestamp: ..., ip: ...}输出: ("abc123", 1)Combiner优化(Map端局部聚合,大幅减少Shuffle数据量):
Combiner在每个Map任务内合并: ("abc123", 1) + ("abc123", 1) + ("abc123", 1) → ("abc123", 3) ("def456", 1) + ("def456", 1) → ("def456", 2)
原本每个Map任务输出100万个(key, 1)Combiner后可能只输出几万个(key, count)Shuffle数据量减少了约100倍!Shuffle(按short_code分区):
所有 "abc123" 的计数发送到Reduce任务#42所有 "def456" 的计数发送到Reduce任务#17...Reduce阶段(假设100个Reduce任务):
Reduce("abc123", [3, 5, 2, 8, ...]) → ("abc123", 总计=15234)Reduce("def456", [2, 7, 1, ...]) → ("def456", 总计=8921)...第一轮输出:每个short_code的总点击量(可能有数百万个short_code)
第二轮:取Top 100
Map阶段(每个Map任务读取一部分第一轮结果):
每个Map任务内部维护一个大小为100的最小堆遍历本任务的所有(short_code, count)对堆满后,新的count > 堆顶 → 替换堆顶
每个Map任务输出本地Top 100Reduce阶段(只有1个Reduce任务):
输入: 所有Map任务的本地Top 100(假设有100个Map任务 → 10000条)在10000条中取全局Top 100用最小堆,O(10000 × log100) ≈ 极快最终输出:点击量最高的100个短链接及其点击量。
性能分析:
10亿条日志,每条约100字节 → 总共约100GB
第一轮MapReduce: Map: 100GB输入 → 并行读取 → 每个Map任务处理约100MB Combiner: 100GB → 约1GB (100倍压缩) Shuffle: 1GB网络传输 Reduce: 1GB → 几百MB (最终的short_code→count映射) 耗时: 约10-30分钟 (取决于集群大小)
第二轮MapReduce: 输入: 几百MB (第一轮输出) 处理: 几秒钟 输出: 100条记录
总耗时: 约10-30分钟,主要在第一轮可以用Spark优化:一行代码搞定
clicks.groupBy("short_code").count().orderBy(desc("count")).limit(100)Spark内部会自动做类似的两阶段聚合和局部Top-K优化。
本模块核心收获:数据处理有两种基本范式——批处理适合高吞吐、可接受延迟的场景,流处理适合低延迟、实时响应的场景。MapReduce的”分散-聚合”思想是基础,DAG模型是进化。流处理中的时间语义(事件时间vs处理时间)和窗口操作是核心概念。Event Sourcing和CQRS提供了强大的数据架构模式。Lambda和Kappa架构是批流融合的两种路线。大多数系统从简单开始,按需演进。