并行编程模型
从并行计算基础与 Amdahl/Gustafson 定律,到 MapReduce 编程模型(Map/Shuffle/Reduce)、执行流程、编程模式、YARN 调度与向 Spark 的演进。
🎯学习目标
- 理解并行计算的必要性与三大流派(数据并行、任务并行、流水线);
- 掌握 Amdahl 定律与 Gustafson 定律的公式、含义与适用场景;
- 掌握 MapReduce 编程模型 Map→Shuffle→Reduce 与 Combiner 的适用条件;
- 理解 MapReduce 容错(无状态/中间落盘)与推测执行;
- 掌握 WordCount、倒排索引、TopN、Join 等编程模式与 YARN 调度;理解 Spark 取代 MR 的原因。
1并行计算基础
为什么需要并行计算?数据量爆炸式增长(2000 年 6.2EB → 2025 年 180ZB),单机处理 1TB 数据(100MB/s)需约 2.8 小时;而用 100 台机器,每台只处理 10GB,几小时的任务可降到约 10 分钟。分而治之、移动计算而非移动数据是核心理念。
并行计算的三大流派
数据并行
相同操作逻辑作用于不同数据子集。代表 MapReduce/SIMD。
任务并行
不同操作逻辑并行执行。多任务分工。
流水线
源源不断的流式数据,多阶段串联处理。
并行编程模型分类
| 模型 | 代表技术 | 特点 |
|---|---|---|
| 共享内存 | multiprocessing / OpenMP | 多线程/进程共享同一内存空间,单机多核 |
| 消息传递 | MPI / mpi4py | 显式 Send/Recv,可跨机器,集群 |
| GPU 并行 | CUDA | 大规模数据并行 |
| 维度 | 单机串行 | multiprocessing | mpi4py |
|---|---|---|---|
| 通信 | — | 共享内存 | 显式 Send/Recv |
| 跨机器 | ❌ | ❌ | ✅ |
| 适合场景 | 调试 | 单机多核 | 集群 |
2Amdahl 定律与 Gustafson 定律 ⭐(核心考点)
Amdahl 定律(固定问题规模)
def amdahl_speedup(f, n):
return 1.0 / (f + (1 - f) / n)
amdahl_speedup(0.1, 4) # 4核: 3.08倍
amdahl_speedup(0.1, 16) # 16核: 6.40倍
amdahl_speedup(0.1, 256) # 256核:9.66倍
# f=0.1 时理论极限 = 1/0.1 = 10倍
Gustafson 定律(固定执行时间,扩大问题规模)
Amdahl 假设问题规模固定;但现实大数据场景中,机器多了我们要处理更多数据。Gustafson 主张:假设执行时间固定,随处理器增多可处理更大规模问题,加速比呈近线性增长。
3MapReduce 编程模型
历史:2004 年 Google 在 OSDI 发表《MapReduce: Simplified Data Processing on Large Clusters》,将分布式痛点(网络通信、容错、调度)全部封装到底层。核心理念计算向数据移动。
| 函数 | 输入 | 输出 | 动作 |
|---|---|---|---|
| Map | 键值对 <K1, V1> | 零或多个中间对 list(<K2, V2>) | 过滤、提取、转换 |
| Reduce | 中间键和值集合 <K2, list(V2)> | 最终结果 list(<K3, V3>) | 求和、求最值、拼接 |
Shuffle 洗牌
Shuffle 介于 Map 和 Reduce 之间,唯一目的:保证所有 Key 相同的中间数据都被送到同一个 Reduce 任务。过程:Map 端溢写(Spill)→ 网络拉取(Fetch)→ 归并排序(Merge)。
Combiner(Map 端迷你 Reduce)
4执行流程与容错
整体执行生命周期
| 角色 | 分工 |
|---|---|
| Client | 用户提交作业的起点 |
| Master(JobTracker/AppMaster) | 大管家,拆分任务、调度、监控 |
| Worker(TaskTracker/NodeManager) | 打工人,真正执行 Map/Reduce 代码 |
数据五步走:InputSplit(1TB 按 128MB 切块)→ Map 阶段(每分片一个 Map Task)→ Shuffle(数据大迁徙)→ Reduce(聚合)→ Output(写入 HDFS)。
容错机制
大规模集群中硬件故障是常态。MR 容错好的原因:① 无状态计算(Shared-Nothing):Map 和 Reduce 互不干扰,失败重跑即可;② 中间结果落盘:Map 结果写本地磁盘,而非直接流向 Reduce。
推测执行(Speculative Execution)
⚠ 注意:涉及写数据库等有副作用的任务不能开启推测执行(会重复写入)。
5MapReduce 编程模式
① 计数与汇总(WordCount)⭐
Map 端拆分打标签:句子拆成单词,每词附 `1`;Reduce 端累加求和。
def map_function(document):
for word in document.split():
emit(word, 1) # 输出 (Key, Value)
def reduce_function(word, counts_list):
emit(word, sum(counts_list)) # [1,1,1...] 求和
② 倒排索引(Inverted Index)
搜索引擎基石:将"文档包含哪些词"反转为"这个词出现在哪些文档"。Map 以单词为 Key、文档 ID 为 Value;Reduce 收集同一单词的所有文档 ID 去重排序成拉链(Posting List)。
③ 排序与 Top N
核心思想局部最优→全局最优:不把全量数据丢给一个 Reduce(会撑爆内存),而是 Map 端只保留局部 TopN,大幅削减 Shuffle 传输;Reduce 汇总所有局部冠军选全局 TopN。
# Map:每节点只保留局部 TopN(用最小堆)
def topn_map(shard, N):
heap = []
for item, score in shard:
heapq.heappush(heap, (score, item))
if len(heap) > N: heapq.heappop(heap)
return heap
# Reduce:从所有局部 TopN 中选全局 TopN
def topn_reduce(candidates, N):
return heapq.nlargest(N, candidates)
④ 表关联(Join)
| 类型 | 机制 | 特点 |
|---|---|---|
| Reduce-Side Join | Map 给两表打 tag、以关联键为 Key 输出,Shuffle 按 Key 分组,Reduce 按 tag 拼接 | 通用但笨重,所有数据都过 Shuffle,消耗性能 |
| Map-Side Join | "一大一小"时,将小表通过分布式缓存广播到所有 Map 节点内存,Map 处理大表时直接查内存合并 | 轻巧,完全跳过 Shuffle 和 Reduce |
6Hadoop 生态与 YARN
Hadoop 三驾马车:① HDFS(分布式文件系统)② MapReduce(分布式计算)③ YARN(分布式资源调度)。上层有 Hive、HBase、ZooKeeper。
YARN 核心架构
| 组件 | 职责 |
|---|---|
| ResourceManager (RM) | 集群全局资源 CEO,掌握所有节点 CPU/内存 |
| NodeManager (NM) | 每台机器的工头,本机资源汇报与容器管理 |
| ApplicationMaster (AM) | 每个作业专属项目经理,向 RM 申请资源、督促执行 |
| Container | 资源的最小抽象单位(如 2 核 CPU+4GB 内存) |
YARN 三种调度策略
FIFO
先进先出,谁先提交谁占全部资源。缺点:大作业饿死后面的小作业。
Capacity
容量调度,多租户队列(生产60%/测试40%),支持资源借用与归还。
Fair
公平调度,动态平分资源(1人占100%,2人各50%)。
7MapReduce 的局限与向 Spark 演进
MapReduce 致命伤:对磁盘高度依赖——每次作业输出必须落地 HDFS 磁盘。痛点场景:机器学习迭代计算(如 K-Means 迭代 100 次,每轮都重复读写磁盘极慢)。
| 维度 | MapReduce | Apache Spark |
|---|---|---|
| 中间数据 | 写入磁盘(慢) | 保留内存(极快) |
| 编程范式 | 仅 Map/Reduce | map, filter, groupBy, join... |
| 执行速度 | 基础基准 | 迭代场景快 100 倍+ |
| 应用场景 | 离线批处理 | 批处理、SQL、流计算、图计算 |
蒙特卡洛求 π 的 MPI 示例(消息传递并行):
from mpi4py import MPI
import random
comm = MPI.COMM_WORLD
rank, size = comm.Get_rank(), comm.Get_size()
N_LOCAL = 10_000_000 // size
hits = sum(1 for _ in range(N_LOCAL)
if random.random()**2 + random.random()**2 <= 1.0)
total = comm.reduce(hits, op=MPI.SUM, root=0)
if rank == 0:
print(f"π ≈ {4.0 * total / 10_000_000:.6f}")
⭐重点例题
② 要 S=5:5 = 1/(0.2 + 0.8/N) → 0.2+0.8/N = 0.2 → 0.8/N = 0,无法达到!因为 f=0.2 的理论上限 = 1/f = 5 倍,需 N→∞ 才趋近 5,有限处理器永远达不到。
③ 要达 5 倍加速,串行比例 f 必须 ≤ 1/5 = 0.2(即上限 1/f≥5)。
🎯自测(点击展开)
并行计算的三大流派是什么?
Amdahl 定律的加速比上限是多少?
Amdahl 与 Gustafson 各适用什么场景?
Shuffle 的唯一目的是什么?
Combiner 适用和不适用哪些操作?
MapReduce 为什么比 Spark 慢?
📝强化题库
选择题点选即时判分;填空题输入后"检查"或"显示答案"。