Spark 0.5 关键组件与基本流程

学不会 Spark,还学不会 Spark 0.5 吗?

本文分析 Spark 0.5 中的关键组件,以及 RDD 计算的基本流程。可在 JDK 1.8 和 Scala 2.10 上运行的代码在这里

关键组件概述

RDD

RDD 是一个抽象类,用于抽象表示一个不可变的、并行计算的元素集合,包含以下信息:

  • 一系列 partitions(splits
  • 用于计算每个 split 的函数 compute
  • 一系列对其他 RDD 的依赖 dependencies
  • (可选)分区器:Optional[Partitioner]
  • (可选)计算每个 split 的优先位置 preferredLocations:Seq[String]

RDD 中,用于表示一个分区的 splits 字段是一个 Split 类型的数组。每种 RDD 使用的 Split 子类不同,例如 ParallelCollection 使用的 Split 包含了该分区的实际元素;ShuffledRDD 中的 Split 仅记录分区号。RDD 有一个 compute 操作,接收需要计算的 Split 对象,返回经计算后该分区中所有数据的 iterator。

RDD 是所有 RDD 的抽象基类,其中定义了许多终结操作,例如 foreachtakecollectreducefoldaggregatecount

另外,还有一些中间操作,这些操作返回一个新的 RDD,这些新 RDD 都带有类似 prev 的字段,指向创建它们的 RDD,同时有各自不同的Split 类型和 compute 操作。例如:

  • MappedRDD 中的 splits 就是 parent RDD 的 splits,compute 方法先调用 prev.compute(split) 得到上个 RDD 该分区的结果(该操作是递归的),然后对每个结果施加 map 函数。
  • CartesianRDD 中的 Split 记录本 split 分别依赖两个 parent RDD 中的哪个 split,而 compute 方法先分别得到两个 parent split 的结果(同样是递归调用 compute),然后使用两重循环得到两个结果序列的组合,并返回 iterator。
  • ParallelCollectionSplit 保存了实际的数据;compute 直接返回该 split 数据的 iterator。

有一类特殊的 RDD,它们的类型是 KV 对,这类 RDD 支持更多的操作,在 PairRDDFunctions 中定义,例如 reduceByKeygroupByKeycombineByKeyjoincogrouplookup 等。

Dependency

Spark 0.5 中,已经出现了以下几种依赖的类型:

  • OneToOneDependency:上游 RDD 和下游 RDD 是一对一的关系。
  • RangeDependency:上游 RDD 的某个分区区间和下游 RDD 的某个分区区间成一对一依赖。比如,上游的第 4 到 6 个分区与下游第7 到 9 分区一对一依赖。
  • ShuffleDependency:下游 RDD 的每个分区依赖上游所有分区。

其中前两种依赖并称为 NarrowDependency

依赖什么时候使用?在使用构造函数创建某种类型的 RDD 时,会使用特定的依赖,依赖类中保存了上游 RDD(prev)的引用。初始 RDD 的 dependenciesNil。沿着 dependencies 字段依赖,就可以找到所有依赖路径。

Dependency 只是用来划分 stage 的,划分好后,stage 之间的依赖关系由 Stage 类中的 parents 记录。计算时,stage 内各个 RDD 由下游 compute 调用上游 iterator 连接,跨 stage 的 RDD 由 Stage.parents 连接,RDD.dependencies 不参与计算过程。

Scheduler

这是 Spark 调度器(scheduler)的公共基类,它定义了 runJob 方法,用于实际执行 RDD 中定义的计算。runJob 的输入是 RDD、定义计算的 func、需要执行 func 的分区 ID 列表,输出是包含每个分区结果的数组。所有终结操作(如 collecttake 方法)都会调用 runJob,区别是传入的操作 func 不同。

DAGScheduler

DAGScheduler 是非常重要的一种调度器,它是继承 Scheduler 的 trait,用于支持基于阶段(stage)的调度。这种 scheduler 将每个任务(job)划分为各个阶段(stage),并计算 DAG 依赖图,然后计算出用于完成任务的最小调度。DAGScheduler 重写了 runJob 方法,其中做两件事:

  • 通过遍历 RDD 的依赖,划分出一系列的 stage
  • 先提交最上游 stage 中的 task;监听 task 的完成事件,提交后续 task,或重新提交失败的 task

LocalSchedulerMesosScheduler 继承自 DAGScheduler,重写了 submitTasks 函数,即提交 task 的方法不同。

另外,DAGScheduler 还提供了 taskEnded 回调函数,需要子类重写 submitTasks 时,每次 task 结束调用 taskEnded。函数中向对应 job 的消息队列发送消息,告知 task 已经结束,以及对应的结束状态。

Stage

Stage 是沿着 RDD 依赖路径划分的阶段,是在 ShuffleDependency 切断的。每个 stage 中包含的信息包括 stage ID、该 stage 的最终 RDD(可能是 ShuffledRDD 或执行终结操作的 RDD)、shuffle dependency 和父 stage 列表,另外还记录了是否为 shuffle map stage、总 partition 数和已完成的 partition 数等状态信息。

runJob 的时候,如果是 DAGScheduler,就会进行划分 stage 的操作。划分时,首先为执行终结操作的 RDD 创建 stage;然后沿着 RDD 的 dependency tree 递归搜索,遇到 ShuffledDependency,则创建新的 stage。然后,如果没有父 stage(即最上游的 stage),则将该 stage 中的所有 task 通过 submitTasks 函数提交运行。

Task

Task 是最小的执行单位,用于计算 RDD 中的一个 split。Task 包含本身的 runID,记录了对应 stage ID、RDD 即分区号、偏好的计算位置等,并且有 run 方法。Task 分为 ShuffleMapTaskResultTask

ResultTask

ResultTask 计算的是终结 RDD 的一个分区,run 方法接收 runJob 提供的 func,返回的是 func 作用于该分区数据的结果。

ShuffleMapTask

ShuffleMapTaskrun 方法较为复杂,因为需要根据 shuffle dependency 中的 partitioner 将数据传到指定的下游 partition。

首先,对于当前 partition 内的所有 KV 对,先通过 K 计算出下游 bucket ID,并将计算结果放入本地对应的 bucket;然后,将每个 bucket 的内容写进对应的 shuffle 文件里,并返回执行本 task 节点 shuffle server 的 URI(用于 scheduler 拉取)

这里总结一下 shuffle 操作的要点:

  • ShuffledRDD 的 dependency 为 shuffle dependency,而 shuffle dependency 中保存的 parent RDD 为上一步的 RDD
  • 在创建 task 的时候,是每个 dependency 创建一个 task,也就是说,如果是 shuffle map task,其中包含的 RDD 也应是上一步的 RDD
  • 提交 ShuffleMapTask 之后,首先在分配到的 client machine 上执行 task.runShuffledRDD 的上游 RDD 分区结果计算好,保存到本地的 shuffle 文件中,然后返回本机 shuffle manager 的 URI,随完成事件传回 master
  • master 记录好本 shuffleID(来自 shuffle dependency)各个分区返回的 URI
  • 当所有的 shuffle map task 执行完后,master 提交下一阶段的 stage,该 stage 会从 ShuffledRDD 开始计算
  • ShuffledRDDcompute 方法中,会调用 shuffle fetcher 的 fetch 方法,从其他 client 拉取需要的 shuffle 结果,合并好之后,返回给下一层 RDD

RDD 的计算过程

本节中,将以该操作为例,分析(理想情况下的)执行过程,串起上节介绍的概念:

val a = sc.parallelize(Seq(1, 2, 3))
val b = sc.parallelize(Seq(4, 5, 6))

println(a.map(x => x * x).cartesian(b).reduceByKey(_ + _).collect().mkString(", "))

构造 RDD

首先是 sc.parallelize 操作,创建了一个 ParallelCollection RDD,它包含了一个 splits 字段,是将传入 Seq 切成 numSlices 份,每份作为一个 ParallelCollectionSplit。也就是说,ParallelCollectionsplit 字段包含了实际的数据。这个 RDD 的 dependenciesNil

然后,对 a 进行 map 操作,这边调用了定义在 RDD 基类上的方法,生成一个 MappedRDDMappedRDDdependencies 列表只有一个,是和上游 RDD 的 OneToOneDependencycompute 方法为取上游对应 split 的 iterator,然后在上面直接进行 map 操作。

接着执行 cartesian 操作。该操作生成 CartesianRDD,这个 RDD 的构造比较复杂,对于 rdd1.cartesian(rdd2)

  • 首先,splitsrdd1.splitsrdd2.splits 的每个组合生成一个 CartesianSplit
  • preferredLocationsrdd1rdd2 的并集
  • dependencies 有两个,都是 NarrowDependencyCartesianRDD 的分区 id 需要依赖 rdd1 中的 id / numSplitsInRdd2rdd2 中的 id % numSplitsInRdd2
  • compute 函数,给定 split,返回 rdd1rdd2 中对应 split 各个元素的组合

接着是 reduceByKey 操作。该操作实际上是 combineByKey(_, func, func),创建 ShuffledRDDShuffledRDD 需要提供 aggregator(封装了 combineByKey 传入的三个函数)和 partitioner(默认与上游 RDD 相同,如果没有就新创建一个 HashPartitioner)。ShuffledRDD 构造如下:

  • splits:仅记录 split 编号。
  • preferredLocations:Nil
  • dependenciesnew ShuffleDependencyShuffleDependency 需要包含 shuffle ID(由 context 递增生成)、parent RDD、aggregator 和 partitioner)
  • compute:调用 shuffle fetcher,每接收一个上游传来的 (key, combiner) 对,就进行 merge combiner 操作,并保存到本地;然后返回本地的所有 (key, combiner) 对的 iterator。

执行 RDD

调用 collect

然后,在 RDD 上调用 collect 操作。这是一个终结操作,调用了 sc.runJob 方法,传入的 func 为,将每个分区 iterator 转化成 Array。

sc.runJob 最终调用了 DAGScheduler.runJob 方法,传入参数:

  • rdd:终结 RDD,即调用 collect 的 ShuffledRDD
  • funcfunc
  • partitions:包含所有分区号的列表
  • allowLocalfalse。这是一个优化标记,如果 job 很短(如 first),那么将直接在 master 上执行

划分 stage 与提交初始 task

DAGScheudler 中定义了一些全局字段:

  • eventQueues:一个字典,用于保存每个 runId 的消息队列。
  • nextRunIdnextStageId:都是 AtomicInteger,用于生成下一个 runId(调用 runJob 时)和 stageId(创建新 stage 时)。
  • idToStageshuffleToMapStage:字典,用于记录 stageId 到 stage 的映射,以及 shuffleId 到 shuffle map stage 的映射。
  • cacheLocs:字典,给定 RDD ID,查询 cache 位置。

DAGScheduler.runJob 方法中,也定义了仅针对当前 run 的字段:

  • results:存储每个 outputId 的最终结果。
  • finished:记录 outputId 对应的 ResultTask 是否完成。
  • waiting:记录 parent 未完成、尚未开始的 stage。
  • running:记录正在运行的 stage。
  • failed:记录由于 fetch 失败,需要重新提交的 stage。
  • pendingTasks:一个字典,用于记录每个 stage 尚未完成的 tasks。

首先进行的是划分 stage 的操作。首先使用 newStage 函数,由终结 RDD 创建一个新的 stage(finalStage),其 shuffleDependency 字段为 None(因为是终结操作而不是 shuffle 操作)。然后从 finalStage 开始深度优先搜索,分割 stage。这是由 submitStagegetMissingParentStages 共同完成的。getMissingParentStage 用于找到本层 stage RDD 上方的所有 shuffle dependency,并创建 stage;submitStage 用于递归搜索上层 stage,然后使用 submitMissingTasks 函数提交最上层所有 stage 中的 task。

submitMissingTasks 中,做了三件事:

  • 为该 stage 的所有未完成的 partition 建立对应的 task。如果是 finalStage,就建立 ResultTask;否则,建立 ShuffleMapTask。两者是否完成分别由 finished 数组和 stage.outputLocs 词典记录。
  • 将创建的 task 放入 pendingTasks(stage) 中。
  • 调用 DAGScheduler.submitTasks 函数。

submitTasks 函数的定义是由 DAGScheduler 的子类提供的。在 LocalScheduler 中,操作仅仅是从线程池中获取一个线程,执行 task.run,然后调用 taskEnded。在 MesosScheduler 中,情况更为复杂,此处从略。

submitStage 执行完毕后,runJob 方法会开始一个监听循环,持续监听本个 job 的消息队列,直到所有分区都完成计算。监听到 event 后的操作:

  • 拿到 task 完成的 event,将该 task 从对应 stage 的 pendingTasks 中删除
  • 判断 event 的完成状态(此处只考虑成功)
  • 如果 event 中携带了累加器的更新信息,就更新对应的累加器
  • 如果是 result task:
    • 将 event 中携带的 result 放入 results(outputId)
    • finished 数组中标记 outputId 已完成
  • 如果是 shuffle map task:
    • 在对应 stage 的 outputLocs 字段中添加当前 partition 的结果
    • 如果本 stage 的 pendingTasks 为空(说明本 stage 全部完成):
      • 将本 stage 从 running 中删除
      • 向 master 端的 map output tracker 注册当前 shuffle 本机的所有 output
      • 检查 waiting 中有哪些 parent stage 已完成的 stage,将其移动到 running 中,并使用 submitMissingTasks 函数提交这些 stage

所有的分区计算完成后,从 eventQueues 中删除本次 runId 的队列,然后返回 results 数组。

留下评论

注意 评论系统在中国大陆加载不稳定。

回到顶部 ↑