Spark 大数据处理
从 Spark 内存计算与 DAG 引擎,到运行架构、RDD 五大特性、Transformation/Action 算子、惰性求值、血缘容错,再到 Spark SQL 与 Structured Streaming。
🎯学习目标
- 理解 Spark 内存计算相比 MapReduce 磁盘计算的优势与适用场景;
- 掌握 DAG 执行引擎、Stage 划分与宽窄依赖的关系;
- 掌握 Spark 运行架构(Driver/Executor/Cluster Manager/SparkSession);
- 掌握 RDD 五大特性、Transformation/Action 算子与惰性求值;
- 理解血缘(Lineage)容错与缓存策略,了解 Spark SQL(Catalyst)与 Structured Streaming。
1Spark 简介与内存计算
Hadoop MapReduce 推动了离线大数据处理,但其执行模型偏"批处理+磁盘中转"。对日志清洗这类单次扫描够用,但对机器学习、交互式查询、图计算等迭代优化任务,中间结果反复落盘非常慢。
| 维度 | MapReduce | Spark |
|---|---|---|
| 中间结果 | 通常落本地磁盘/HDFS | 尽量保留内存,必要时再落盘 |
| 执行模型 | 固定 Map→Shuffle→Reduce | DAG + 多算子流水 |
| 迭代算法 | 每轮重复读写磁盘 | 可复用缓存数据,迭代高效 |
| 交互式查询 | 延迟较高 | 响应更快 |
| 典型任务 | 大规模离线批处理 | 批处理、SQL、机器学习、流处理 |
Spark 适用场景
| 场景 | 是否适合 | 原因 |
|---|---|---|
| 海量离线 ETL | ✅ 适合 | 并行处理强、生态成熟 |
| 交互式 SQL 分析 | ✅ 适合 | DataFrame + Catalyst 性能好 |
| 迭代机器学习 | ✅ 适合 | 缓存与迭代计算优势明显 |
| 毫秒级事务处理 | ❌ 不适合 | Spark 不是 OLTP 数据库 |
| 超小规模数据脚本 | 未必需要 | 本地 Python/Pandas 可能更轻量 |
2DAG 执行引擎 ⭐(核心考点)
Spark 把一连串 Transformation 看成一个有向无环图(DAG):节点是数据集/计算结果,边表示依赖关系。执行流程:先有逻辑 DAG,再拆成物理执行 Stage。
宽依赖 vs 窄依赖
窄依赖
Narrow宽依赖
Wide3Spark 运行架构
Spark 运行架构包括:集群资源管理器(Cluster Manager)、工作节点(Worker Node)、任务控制节点(Driver)、负责执行的进程(Executor)。
| 组件 | 职责 |
|---|---|
| Driver | 应用控制中心:运行 main、创建 SparkSession、解析算子;Action 出现时转换成 Job/Stage/Task 并申请资源;维护元信息 |
| Executor | 运行在工作节点上的进程,接收 Driver 下发的 Task 执行,管理本地 Cache 与 Shuffle 数据;长期驻留减少进程启停开销 |
| Cluster Manager | 负责集群资源管理与节点生命周期,与计算引擎解耦 |
Cluster Manager 类型与 SparkSession
| 类型 | 适用场景 |
|---|---|
| Standalone | 教学、小型集群、快速实验(Spark 自带) |
| YARN | 已有 HDFS/Hadoop 基础设施的企业 |
| Mesos | 多框架共享集群 |
| Kubernetes | 现代云环境与容器平台(云原生) |
SparkSession 统一了早期分裂的 SparkContext/SQLContext/HiveContext 等入口,整合 RDD、DataFrame、SQL 能力,通过 SparkSession.builder 配置 appName、master 等参数,让 API 更一致。
4RDD 五大特性
RDD(Resilient Distributed Dataset,弹性分布式数据集)是 Spark 最基础的数据抽象。用户只需将逻辑表达为一系列转换处理,不必担心底层分布式特性。不同 RDD 间的转换形成依赖关系,可管道化、避免中间数据存储。
| 特性 | 含义 | 作用 |
|---|---|---|
| Partitions(分区) | 分区集合 | 决定并行度与任务划分 |
| Dependencies(依赖) | 与父 RDD 的依赖 | 决定 Stage 与容错恢复 |
| Compute Function(计算函数) | 每个分区如何计算 | 定义转换逻辑 |
| Partitioner(分区器) | 键值数据如何分区 | 优化聚合与连接 |
| Preferred Locations(首选位置) | 分区的首选计算位置 | 提高数据本地性 |
5算子与惰性执行 ⭐(核心考点)
| 类别 | 代表操作 | 特点 |
|---|---|---|
| Transformation | map、filter、flatMap、reduceByKey、groupByKey、distinct | 返回新 RDD,通常惰性执行 |
| Action | collect、count、take、first、saveAsTextFile | 触发真正计算 |
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDDWordCount").master("local[*]").getOrCreate()
sc = spark.sparkContext
lines = sc.textFile("data/words.txt")
words = lines.flatMap(lambda line: line.split()) # Transformation(惰性)
pairs = words.map(lambda w: (w.lower(), 1)) # Transformation(惰性)
counts = pairs.reduceByKey(lambda a, b: a + b) # Transformation(宽依赖)
print(counts.collect()) # Action → 触发执行
spark.stop()
6血缘容错与缓存策略
血缘追踪(Lineage)
缓存策略:Cache 和 Persist
| 方法 | 说明 |
|---|---|
| cache() | ≈ persist(MEMORY_ONLY),只存内存 |
| persist() | 支持更多存储级别:MEMORY_ONLY / MEMORY_AND_DISK / DISK_ONLY |
数据复用越多,缓存收益越明显。内存放不下时选 MEMORY_AND_DISK,避免重算又防止内存溢出。
7Spark SQL 与 Structured Streaming
DataFrame 与 Spark SQL
RDD 很灵活但系统不知道字段名、类型与列结构,难以高级优化。DataFrame = 带 Schema 的分布式表 = 数据 + 结构信息。Spark SQL 让用户用 SQL 描述结构化查询,降低分析门槛。
df = spark.createDataFrame(data, ["name", "age", "major"])
df.createOrReplaceTempView("students") # 注册临时视图
result = spark.sql("""
SELECT major, COUNT(*) AS cnt, AVG(age) AS avg_age
FROM students GROUP BY major ORDER BY cnt DESC""")
result.show()
Catalyst 优化器与 Tungsten
Catalyst 是 Spark SQL 的查询优化器,工作流程:① 逻辑计划创建 → ② 逻辑计划优化(谓词下推 Push down filters、合并操作)→ ③ 物理计划生成 → ④ 代码生成。Tungsten 负责内存管理(堆外内存)与代码生成,让执行层更高效。物理计划中 Exchange 表示发生 Shuffle。
Spark Streaming
日志、点击流、传感器数据持续不断产生,业务希望边产生边分析。
- 微批(micro-batch)模型:把连续数据流按时间片切成一批批小任务,每批 ≈ RDD,复用 Spark 批处理引擎;
- 窗口操作:对一个时间窗口内的 micro-batches 做聚合,参数有窗口大小与滑动间隔(如每 10 秒统计最近 1 分钟错误日志);
- Structured Streaming:用类似批处理的 API 处理无界数据流;
- Checkpoint:把状态和元数据持久化到可靠存储,用于任务重启恢复状态、故障恢复、窗口统计与有状态计算。
lines = spark.readStream.format("socket").option("host","localhost").option("port",9999).load()
words = lines.select(explode(split(lines.value, " ")).alias("word"))
counts = words.groupBy("word").count()
query = (counts.writeStream.outputMode("complete").format("console")
.option("checkpointLocation", "chk/wc").start())
query.awaitTermination()
⭐重点例题
影响:Spark 以宽依赖(Shuffle 边界)划分 Stage。窄依赖在同一 Stage 内流水执行;遇到宽依赖就切出新 Stage。
reduceByKey 会在 Map 端先做局部聚合(类似 Combiner),把相同 Key 的值先合并,再 Shuffle 到 Reduce 端做全局聚合,大幅减少网络传输量;而 groupByKey 把每个 Key 的所有原始值都 Shuffle 到 Reduce 端,网络开销大、易内存溢出。因此大规模求和/计数优先用 reduceByKey。
🎯自测(点击展开)
Spark 相比 MapReduce 在迭代任务上为何更快?
Spark 按什么划分 Stage?
窄依赖和宽依赖的区别?各举一例。
RDD 的五大特性是什么?
Transformation 和 Action 的区别?惰性执行指什么?
RDD 分区丢失后靠什么恢复?
📝强化题库
选择题点选即时判分;填空题输入后"检查"或"显示答案"。