Spark 0.5 关键组件与基本流程
本文分析 Spark 0.5 中的关键组件,以及 RDD 计算的基本流程。可在 JDK 1.8 和 Scala 2.10 上运行的代码在这里。
关键组件概述
RDD
RDD 是一个抽象类,用于抽象表示一个不可变的、并行计算的元素集合,包含以下信息:
- 一系列 partitions(
splits
) - 用于计算每个 split 的函数
compute
- 一系列对其他 RDD 的依赖
dependencies
- (可选)分区器:
Optional[Partitioner]
- (可选)计算每个 split 的优先位置 preferredLocations:
Seq[String]
RDD 中,用于表示一个分区的 splits 字段是一个 Split
类型的数组。每种 RDD 使用的 Split
子类不同,例如 ParallelCollection
使用的 Split
包含了该分区的实际元素;ShuffledRDD
中的 Split
仅记录分区号。RDD 有一个 compute
操作,接收需要计算的 Split
对象,返回经计算后该分区中所有数据的 iterator。
RDD
是所有 RDD 的抽象基类,其中定义了许多终结操作,例如 foreach
、take
、collect
、reduce
、fold
、aggregate
、count
。
另外,还有一些中间操作,这些操作返回一个新的 RDD,这些新 RDD 都带有类似 prev
的字段,指向创建它们的 RDD,同时有各自不同的Split
类型和 compute
操作。例如:
MappedRDD
中的 splits 就是 parent RDD 的 splits,compute
方法先调用prev.compute(split)
得到上个 RDD 该分区的结果(该操作是递归的),然后对每个结果施加 map 函数。CartesianRDD
中的Split
记录本 split 分别依赖两个 parent RDD 中的哪个 split,而compute
方法先分别得到两个 parent split 的结果(同样是递归调用compute
),然后使用两重循环得到两个结果序列的组合,并返回 iterator。ParallelCollection
中Split
保存了实际的数据;compute 直接返回该 split 数据的 iterator。
有一类特殊的 RDD,它们的类型是 KV 对,这类 RDD 支持更多的操作,在 PairRDDFunctions
中定义,例如 reduceByKey
、groupByKey
、combineByKey
、join
、cogroup
、lookup
等。
Dependency
Spark 0.5 中,已经出现了以下几种依赖的类型:
OneToOneDependency
:上游 RDD 和下游 RDD 是一对一的关系。RangeDependency
:上游 RDD 的某个分区区间和下游 RDD 的某个分区区间成一对一依赖。比如,上游的第 4 到 6 个分区与下游第7 到 9 分区一对一依赖。ShuffleDependency
:下游 RDD 的每个分区依赖上游所有分区。
其中前两种依赖并称为 NarrowDependency
。
依赖什么时候使用?在使用构造函数创建某种类型的 RDD 时,会使用特定的依赖,依赖类中保存了上游 RDD(prev
)的引用。初始 RDD 的 dependencies
为 Nil
。沿着 dependencies
字段依赖,就可以找到所有依赖路径。
Dependency 只是用来划分 stage 的,划分好后,stage 之间的依赖关系由 Stage
类中的 parents
记录。计算时,stage 内各个 RDD 由下游 compute
调用上游 iterator
连接,跨 stage 的 RDD 由 Stage.parents
连接,RDD.dependencies
不参与计算过程。
Scheduler
这是 Spark 调度器(scheduler)的公共基类,它定义了 runJob
方法,用于实际执行 RDD 中定义的计算。runJob
的输入是 RDD、定义计算的 func
、需要执行 func
的分区 ID 列表,输出是包含每个分区结果的数组。所有终结操作(如 collect
、take
方法)都会调用 runJob
,区别是传入的操作 func
不同。
DAGScheduler
DAGScheduler
是非常重要的一种调度器,它是继承 Scheduler
的 trait,用于支持基于阶段(stage)的调度。这种 scheduler 将每个任务(job)划分为各个阶段(stage),并计算 DAG 依赖图,然后计算出用于完成任务的最小调度。DAGScheduler
重写了 runJob
方法,其中做两件事:
- 通过遍历 RDD 的依赖,划分出一系列的 stage
- 先提交最上游 stage 中的 task;监听 task 的完成事件,提交后续 task,或重新提交失败的 task
LocalScheduler
和 MesosScheduler
继承自 DAGScheduler
,重写了 submitTasks
函数,即提交 task 的方法不同。
另外,DAGScheduler
还提供了 taskEnded
回调函数,需要子类重写 submitTasks
时,每次 task 结束调用 taskEnded
。函数中向对应 job 的消息队列发送消息,告知 task 已经结束,以及对应的结束状态。
Stage
Stage 是沿着 RDD 依赖路径划分的阶段,是在 ShuffleDependency
切断的。每个 stage 中包含的信息包括 stage ID、该 stage 的最终 RDD(可能是 ShuffledRDD
或执行终结操作的 RDD)、shuffle dependency 和父 stage 列表,另外还记录了是否为 shuffle map stage、总 partition 数和已完成的 partition 数等状态信息。
在 runJob
的时候,如果是 DAGScheduler
,就会进行划分 stage 的操作。划分时,首先为执行终结操作的 RDD 创建 stage;然后沿着 RDD 的 dependency tree 递归搜索,遇到 ShuffledDependency
,则创建新的 stage。然后,如果没有父 stage(即最上游的 stage),则将该 stage 中的所有 task 通过 submitTasks
函数提交运行。
Task
Task 是最小的执行单位,用于计算 RDD 中的一个 split。Task 包含本身的 runID,记录了对应 stage ID、RDD 即分区号、偏好的计算位置等,并且有 run 方法。Task 分为 ShuffleMapTask
和 ResultTask
。
ResultTask
ResultTask
计算的是终结 RDD 的一个分区,run
方法接收 runJob
提供的 func
,返回的是 func 作用于该分区数据的结果。
ShuffleMapTask
ShuffleMapTask
的 run
方法较为复杂,因为需要根据 shuffle dependency 中的 partitioner 将数据传到指定的下游 partition。
首先,对于当前 partition 内的所有 KV 对,先通过 K 计算出下游 bucket ID,并将计算结果放入本地对应的 bucket;然后,将每个 bucket 的内容写进对应的 shuffle 文件里,并返回执行本 task 节点 shuffle server 的 URI(用于 scheduler 拉取)
这里总结一下 shuffle 操作的要点:
ShuffledRDD
的 dependency 为 shuffle dependency,而 shuffle dependency 中保存的 parent RDD 为上一步的 RDD- 在创建 task 的时候,是每个 dependency 创建一个 task,也就是说,如果是 shuffle map task,其中包含的 RDD 也应是上一步的 RDD
- 提交
ShuffleMapTask
之后,首先在分配到的 client machine 上执行task.run
,ShuffledRDD
的上游 RDD 分区结果计算好,保存到本地的 shuffle 文件中,然后返回本机 shuffle manager 的 URI,随完成事件传回 master - master 记录好本 shuffleID(来自 shuffle dependency)各个分区返回的 URI
- 当所有的 shuffle map task 执行完后,master 提交下一阶段的 stage,该 stage 会从
ShuffledRDD
开始计算 - 在
ShuffledRDD
的compute
方法中,会调用 shuffle fetcher 的fetch
方法,从其他 client 拉取需要的 shuffle 结果,合并好之后,返回给下一层 RDD
RDD 的计算过程
本节中,将以该操作为例,分析(理想情况下的)执行过程,串起上节介绍的概念:
val a = sc.parallelize(Seq(1, 2, 3))
val b = sc.parallelize(Seq(4, 5, 6))
println(a.map(x => x * x).cartesian(b).reduceByKey(_ + _).collect().mkString(", "))
构造 RDD
首先是 sc.parallelize
操作,创建了一个 ParallelCollection
RDD,它包含了一个 splits
字段,是将传入 Seq 切成 numSlices
份,每份作为一个 ParallelCollectionSplit
。也就是说,ParallelCollection
的 split
字段包含了实际的数据。这个 RDD 的 dependencies
为 Nil
。
然后,对 a 进行 map
操作,这边调用了定义在 RDD 基类上的方法,生成一个 MappedRDD
。MappedRDD
的 dependencies
列表只有一个,是和上游 RDD 的 OneToOneDependency
,compute
方法为取上游对应 split 的 iterator
,然后在上面直接进行 map
操作。
接着执行 cartesian
操作。该操作生成 CartesianRDD
,这个 RDD 的构造比较复杂,对于 rdd1.cartesian(rdd2)
:
- 首先,
splits
为rdd1.splits
和rdd2.splits
的每个组合生成一个CartesianSplit
preferredLocations
为rdd1
和rdd2
的并集dependencies
有两个,都是NarrowDependency
。CartesianRDD
的分区id
需要依赖rdd1
中的id / numSplitsInRdd2
和rdd2
中的id % numSplitsInRdd2
compute
函数,给定 split,返回rdd1
和rdd2
中对应split
各个元素的组合
接着是 reduceByKey
操作。该操作实际上是 combineByKey(_, func, func)
,创建 ShuffledRDD
。ShuffledRDD
需要提供 aggregator(封装了 combineByKey
传入的三个函数)和 partitioner(默认与上游 RDD 相同,如果没有就新创建一个 HashPartitioner
)。ShuffledRDD 构造如下:
splits
:仅记录 split 编号。preferredLocations
:Nildependencies
:new ShuffleDependency
。ShuffleDependency
需要包含 shuffle ID(由 context 递增生成)、parent RDD、aggregator 和 partitioner)compute
:调用 shuffle fetcher,每接收一个上游传来的 (key, combiner) 对,就进行 merge combiner 操作,并保存到本地;然后返回本地的所有 (key, combiner) 对的 iterator。
执行 RDD
调用 collect
然后,在 RDD 上调用 collect
操作。这是一个终结操作,调用了 sc.runJob
方法,传入的 func 为,将每个分区 iterator 转化成 Array。
sc.runJob
最终调用了 DAGScheduler.runJob
方法,传入参数:
rdd
:终结 RDD,即调用 collect 的 ShuffledRDDfunc
:func
partitions
:包含所有分区号的列表allowLocal
:false
。这是一个优化标记,如果 job 很短(如first
),那么将直接在 master 上执行
划分 stage 与提交初始 task
DAGScheudler
中定义了一些全局字段:
eventQueues
:一个字典,用于保存每个runId
的消息队列。nextRunId
、nextStageId
:都是AtomicInteger
,用于生成下一个runId
(调用runJob
时)和stageId
(创建新 stage 时)。idToStage
、shuffleToMapStage
:字典,用于记录stageId
到 stage 的映射,以及shuffleId
到 shuffle map stage 的映射。cacheLocs
:字典,给定 RDD ID,查询 cache 位置。
DAGScheduler.runJob
方法中,也定义了仅针对当前 run 的字段:
results
:存储每个outputId
的最终结果。finished
:记录outputId
对应的ResultTask
是否完成。waiting
:记录 parent 未完成、尚未开始的 stage。running
:记录正在运行的 stage。failed
:记录由于 fetch 失败,需要重新提交的 stage。pendingTasks
:一个字典,用于记录每个 stage 尚未完成的 tasks。
首先进行的是划分 stage 的操作。首先使用 newStage
函数,由终结 RDD 创建一个新的 stage(finalStage
),其 shuffleDependency
字段为 None
(因为是终结操作而不是 shuffle 操作)。然后从 finalStage
开始深度优先搜索,分割 stage。这是由 submitStage
和 getMissingParentStages
共同完成的。getMissingParentStage
用于找到本层 stage RDD 上方的所有 shuffle dependency,并创建 stage;submitStage
用于递归搜索上层 stage,然后使用 submitMissingTasks
函数提交最上层所有 stage 中的 task。
submitMissingTasks
中,做了三件事:
- 为该 stage 的所有未完成的 partition 建立对应的 task。如果是
finalStage
,就建立ResultTask
;否则,建立ShuffleMapTask
。两者是否完成分别由finished
数组和stage.outputLocs
词典记录。 - 将创建的 task 放入
pendingTasks(stage)
中。 - 调用
DAGScheduler.submitTasks
函数。
submitTasks
函数的定义是由 DAGScheduler
的子类提供的。在 LocalScheduler
中,操作仅仅是从线程池中获取一个线程,执行 task.run
,然后调用 taskEnded
。在 MesosScheduler
中,情况更为复杂,此处从略。
当 submitStage
执行完毕后,runJob
方法会开始一个监听循环,持续监听本个 job 的消息队列,直到所有分区都完成计算。监听到 event 后的操作:
- 拿到 task 完成的 event,将该 task 从对应 stage 的
pendingTasks
中删除 - 判断 event 的完成状态(此处只考虑成功)
- 如果 event 中携带了累加器的更新信息,就更新对应的累加器
- 如果是 result task:
- 将 event 中携带的 result 放入
results(outputId)
中 - 在
finished
数组中标记outputId
已完成
- 将 event 中携带的 result 放入
- 如果是 shuffle map task:
- 在对应 stage 的
outputLocs
字段中添加当前 partition 的结果 - 如果本 stage 的
pendingTasks
为空(说明本 stage 全部完成):- 将本 stage 从
running
中删除 - 向 master 端的 map output tracker 注册当前 shuffle 本机的所有 output
- 检查
waiting
中有哪些 parent stage 已完成的 stage,将其移动到running
中,并使用submitMissingTasks
函数提交这些 stage
- 将本 stage 从
- 在对应 stage 的
所有的分区计算完成后,从 eventQueues
中删除本次 runId 的队列,然后返回 results
数组。
留下评论
注意 评论系统在中国大陆加载不稳定。