`
hongs_yang
  • 浏览: 59549 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

RDD的依赖关系

阅读更多

RDD的依赖关系

 

Rdd之间的依赖关系通过rdd中的getDependencies来进行表示,

 

在提交job后,会通过在 DAGShuduler.submitStage-->getMissingParentStages

 

privatedef getMissingParentStages(stage: Stage): List[Stage] = {

 

valmissing = new HashSet[Stage]

 

valvisited = new HashSet[RDD[_]]

 

def visit(rdd: RDD[_]) {

 

if (!visited(rdd)) {

 

visited += rdd

 

if (getCacheLocs(rdd).contains(Nil)) {

 

for (dep <- rdd.dependencies) {

 

depmatch {

 

caseshufDep: ShuffleDependency[_,_] =>

 

valmapStage = getShuffleMapStage(shufDep, stage.jobId)

 

if (!mapStage.isAvailable) {

 

missing += mapStage

 

}

 

casenarrowDep: NarrowDependency[_] =>

 

visit(narrowDep.rdd)

 

}

 

}

 

}

 

}

 

}

 

visit(stage.rdd)

 

missing.toList

 

}

 

在以上代码中得到rdd的相关dependencies,每一个rdd生成时传入rdddependencies信息。

 

SparkContext.textFile,时生成的HadoopRDD时。此RDD的默认为dependencysNil.

 

Nil是一个空的列表。

 

class HadoopRDD[K, V](

 

sc: SparkContext,

 

broadcastedConf: Broadcast[SerializableWritable[Configuration]],

 

initLocalJobConfFuncOpt: Option[JobConf => Unit],

 

inputFormatClass: Class[_ <: InputFormat[K, V]],

 

keyClass: Class[K],

 

valueClass: Class[V],

 

minSplits: Int)

 

extends RDD[(K, V)](sc, Nil) with Logging {

 

 

 

Dependency分为ShuffleDependencyNarrowDependency

 

其中NarrowDependency又包含OneToOneDependency/RangeDependency

 

Dependency唯一的成员就是rdd, 即所依赖的rdd, parent rdd

 

abstractclass Dependency[T](valrdd: RDD[T]) extends Serializable

 

 

 

OneToOneDependency关系:

 

最简单的依赖关系, parentchild里面的partitions是一一对应的, 典型的操作就是map, filter

 

其实partitionId就是partitionRDD中的序号, 所以如果是一一对应,

 

那么parentchild中的partition的序号应该是一样的,如下是OneToOneDependency的定义

 

/**

 

* Represents a one-to-one dependency between partitions of the parent and child RDDs.

 

*/

 

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {

 

此类的Dependencyparent中的partitionIdchildRDD中的partitionId是一对一的关系。

 

也就是partition本身范围不会改变, 一个parition经过transform还是一个partition,

 

虽然内容发生了变化, 所以可以在local完成,此类场景通常像mapreduce中只有map的场景,

 

第一个RDD执行完成后的MAPparition直接运行第二个RDDMap,也就是local执行。

 

overridedef getParents(partitionId: Int) = List(partitionId)

 

}

 

 

 

RangeDependency关系:

 

此类应用虽然仍然是一一对应, 但是是parent RDD中的某个区间的partitions对应到child RDD中的某个区间的partitions
典型的操作是
union, 多个parent RDD合并到一个child RDD, 故每个parent RDD都对应到child RDD中的一个区间
需要注意的是
, 这里的union不会把多个partition合并成一个partition, 而是的简单的把多个RDD中的partitions放到一个RDD里面, partition不会发生变化,

 

 

 

rdd参数,parentRDD

 

inStart参数,parentRDDpartitionId计算的起点位置。

 

outStart参数,childRDD中计算parentRDDpartitionId的起点位置,

 

length参数,parentRDDpartition的个数。

 

class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)

 

extends NarrowDependency[T](rdd) {

 

 

 

overridedef getParents(partitionId: Int) = {

 

检查partitionId的合理性,此partitionIdchildRDDpartitionId中的范围需要合理。

 

if (partitionId >= outStart && partitionId < outStart + length) {

 

计算出ParentRDDpartitionId的值。

 

List(partitionId - outStart + inStart)

 

} else {

 

Nil

 

}

 

}

 

}

 

 

 

典型的应用场景union的场景把两个RDD合并到一个新的RDD中。

 

def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))

 

使用union, 第二个参数是, 两个RDDarray, 返回值就是把这两个RDD union后产生的新的RDD

 

 

 

ShuffleDependency关系:

 

此类依赖首先要求是Product2PairRDDFunctionsk,v的形式,这样才能做shuffle,和hadoop一样。

 

其次, 由于需要shuffle, 所以当然需要给出partitioner,默认是HashPartitioner 如何完成shuffle

 

然后, shuffle不象map可以在local进行, 往往需要网络传输或存储, 所以需要serializerClass

 

默认是JavaSerializer,一个类名,用于序列化网络传输或者以序列化形式缓存起来的各种对象。

 

默认情况下Java的序列化机制可以序列化任何实现了Serializable接口的对象,

 

但是速度是很慢的,

 

因此当你在意运行速度的时候我们建议你使用spark.KryoSerializer 并且配置 Kryo serialization

 

可以是任何spark.Serializer的子类。

 

 

 

最后, 每个shuffle需要分配一个全局的id, context.newShuffleId()的实现就是把全局id累加

 

 

 

class ShuffleDependency[K, V](

 

@transient rdd: RDD[_ <: Product2[K, V]],

 

valpartitioner: Partitioner,

 

valserializerClass: String = null)

 

extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {

 

 

 

valshuffleId: Int = rdd.context.newShuffleId()

 

}

 

 

 

 

 

生成RDD过程分析

 

生成rdd我们还是按wordcount中的例子来说明;

 

val file = sc.textFile("/hadoop-test.txt")

 

valcounts = file.flatMap(line => line.split(" "))

 

.map(word => (word, 1)).reduceByKey(_ + _)

 

counts.saveAsTextFile("/newtest.txt")

 

 

 

1.首先SparkContext.textFile通过调用hadoopFile生成HadoopRDD实例,

 

textFile-->hadoopFile-->HadoopRDD,此时RDDDependencyNil,一个空的列表。

 

此时的HadoopRDDRDD<K,V>,每执行next方法时返回一个Pair,也就是一个KV(通过compute函数)

 

2.textFile得到HadoopRDD后,调用map函数,

 

map中每执行一次得到一个KV(computegetNext,new NextIterator[(K, V)] )

 

取出value的值并toString,生成MappedRDD<String>。此RDD的上层RDD就是1中生成的RDD

 

同时此RDDDependencyOneToOneDependency

 

def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {

 

hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],

 

minSplits).map(pair => pair._2.toString)

 

}

 

def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

 

以上代码中传入的this其实就是1中生成的HadoopRDD.

 

 

 

3.flatMap函数,把2中每一行输出通过一定的条件修改成0到多个新的item.生成FlatMappedRDD实例,

 

同时根据implicit隐式转换生成PairRDDFunctions。下面两处代码中的红色部分。

 

在生成FlatMappedRDD是,此时的上一层RDD就是2中生成的RDD

 

同时此RDDDependencyOneToOneDependency

 

class FlatMappedRDD[U: ClassTag, T: ClassTag](

 

prev: RDD[T],

 

f: T => TraversableOnce[U])

 

extends RDD[U](prev)

 

 

 

implicitdef rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =

 

new PairRDDFunctions(rdd)

 

 

 

4.map函数,由于3中生成的FlatMappedRDD生成出来的结果,通过implicit的隐式转换生成PairRDDFunctions

 

此时的map函数需要生成隐式转换传入的RDD<K,V>的一个RDD

 

因此map函数的执行需要生成一个MappedRDD<K,V> RDD,同时此RDDDependencyOneToOneDependency

 

以下代码的红色部分。 ---RDD[(K, V)]。。

 

valcounts = file.flatMap(line => line.split(" "))

 

.map(word => (word, 1)).reduceByKey(_ + _)

 

5.reduceByKey函数,此函数通过implicit的隐式转换中的函数来进行,主要是传入一个计算两个value的函数。

 

reduceByKey这类的shuffleRDD时,最终生成一个ShuffleRDD,

 

RDD生成的DependencyShuffleDependency

 

具体说明在下面的reduceByKey代码中,

 

首先在每一个map生成MapPartitionsRDD把各partitioner中的数据通过进行合并。合并通过Aggregator实例。

 

最后通过对合并后的MapPartitionsRDD,RDD相当于mapreduce中的combiner,生成ShuffleRDD.

 

def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {

 

reduceByKey(defaultPartitioner(self), func)

 

}

 

 

 

def combineByKey[C](createCombiner: V => C,//创建combiner,通过V的值创建C

 

mergeValue: (C, V) => C,//combiner已经创建C已经有一个值,把第二个的V叠加到C中,

 

mergeCombiners: (C, C) => C,//把两个C进行合并,其实就是两个value的合并。

 

partitioner: Partitioner,//Shuffle时需要的Partitioner

 

mapSideCombine: Boolean = true,//为了减小传输量, 很多combine可以在map端先做,

 

比如叠加, 可以先在一个partition中把所有相同的keyvalue叠加, shuffle

 

serializerClass: String = null): RDD[(K, C)] = {

 

if (getKeyClass().isArray) {

 

if (mapSideCombine) {

 

thrownew SparkException("Cannot use map-side combining with array keys.")

 

}

 

if (partitioner.isInstanceOf[HashPartitioner]) {

 

thrownew SparkException("Default partitioner cannot partition array keys.")

 

}

 

}

 

生成一个Aggregator实例。

 

valaggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)

 

如果RDD本身的partitioner与传入的partitioner相同,表示不需要进行shuffle

 

if (self.partitioner == Some(partitioner)) {

 

生成MapPartitionsRDD,直接在map端当前的partitioner下调用Aggregator.combineValuesByKey

 

把相同的keyvalue进行合并。

 

self.mapPartitionsWithContext((context, iter) => {

 

new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))

 

}, preservesPartitioning = true)

 

} elseif (mapSideCombine) {

 

生成MapPartitionsRDD,先在map端当前的partitioner下调用Aggregator.combineValuesByKey

 

把相同的keyvalue进行合并。

 

combineValuesByKey中检查如果key对应的C如果不存在,通过createCombiner创建C

 

否则key已经存在C时,通过mergeValue把新的V与上一次的C进行合并,

 

mergeValue其实就是传入的reduceByKey(_ + _) 括号中的函数,与reduce端函数相同。

 

valcombined = self.mapPartitionsWithContext((context, iter) => {

 

aggregator.combineValuesByKey(iter, context)

 

}, preservesPartitioning = true)

 

生成 ShuffledRDD,进行shuffle操作,因为此时会生成ShuffleDependency,重新生成一个新的stage.

 

valpartitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)

 

.setSerializer(serializerClass)

 

在上一步完成,也就是shuffle完成,重新在reduce端进行合并操作。通过Aggregator.combineCombinersByKey

 

spark这些地方的方法定义都是通过动态加载执行的函数的方式,所以可以做到map端执行完成后reduce再去执行后续的处理。

 

因为函数在map时只是进行了定义,reduce端才对函数进行执行。

 

partitioned.mapPartitionsWithContext((context, iter) => {

 

new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))

 

}, preservesPartitioning = true)

 

} else {

 

不执行map端的合并操作,直接shuffle,并在reduce中执行合并。

 

// Don't apply map-side combiner.

 

valvalues = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)

 

values.mapPartitionsWithContext((context, iter) => {

 

new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))

 

}, preservesPartitioning = true)

 

}

 

}

 

 

 

0
0
分享到:
评论

相关推荐

    Spark的灵魂:RDD和DataSet

    讲解RDD 依赖关系,包括窄依赖、宽依赖; 解析Spark 中DAG 逻辑视图;对RDD 内部的计算机制及计算过程进行深度解析;讲解Spark RDD 容错原理及其四大核心要点解析对Spark RDD 中Runtime 流程进行解析;通过一个...

    【SparkCore篇04】RDD函数传递和依赖关系1

    【其实就是对象在进程间传输需要序列化】valconf:SparkConf=new SparkConf()val rdd: RDD[String] = sc.pa

    Spark基础知识04——窄依赖、宽依赖、DAG、缓存

    RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。 窄依赖: 窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用 总结:窄依赖我们...

    fantj2016#java-reader#3. Spark-初识RDD1

    (1)一组分片(Partition),即数据集的基本组成单位 (2)一个计算每个分区的函数 (3)RDD之间的依赖关系 (4)一个Partitioner,即RD

    走进Spark Streaming

    离散流DStream DStream是Spark Streaming中的一个最基本的抽象,代表了一系列连续的数据流,本质上...DStream与DStream之间存在依赖关系,在一个固定时间点,两个存在依赖关系的DStream对应的RDD也存在依赖关系,每个

    大数据高频面试题.pdf

    RDD的特性(RDD的解释) 1.RDD可以看做是⼀些列partition所组成的 2.RDD之间的依赖关系 3.算⼦是作⽤在partition之上的 4.分区器是作⽤在kv形式的RDD上 5.partition提供的最佳计算位置,利于数据处理的本地化即计算...

    Spark学习笔记—Spark计算模型

    通过RDD之间的依赖关系形成了Spark的调度顺序。 1.RDD的几种创建方式: (1).从hadoop文件系统输入创建如HDFS,也可以是其他与hadoop兼容的持久化存储系统如Hive,Hbase创建。 (2).从父RDD转换得到新的RDD。 (3).调用...

    预测算法调研报告.doc

    但是Hadoop在实际应用过程中仍存在很多不足: 第一、一个MapReduce任务只有M ap和Reduce两个阶段,复杂的计算需要大量的Job共同完成,Job之间的依赖关系需要由 开发者自己管理,这增加了开发者的研发难度。...

    基于大数据平台数据分析技术选型调研.pdf

    RDD提供了⽐MapReduce 丰富的模型,可以快速在内存中对数据集进⾏多次迭代,来⽀持复杂的数据挖掘算法和图形计算算法 4. Spark 多个作业之间数据通信是基于内存,效率更⾼ 缺点: 1. Spark 是基于内存的,由于内存...

    C++高级参考手册 完全自学 内容详细 讲解通俗易懂

    1.1.2 继承:类型关系 1.1.3 多态性 1.1.4 操作概念:OOP程序像什么 1.2 为什么C++会成功 1.2.1 较好的C 1.2.2 采用渐进的学习方式 1.2.3 运行效率 1.2.4 系统更容易表达和理解 1.2.5 “库”使你事半功倍 ...

    C++编程思想 例子和叙述讲解C++编程

    1.1.2 继承:类型关系 1 1.1.3 多态性 2 1.1.4 操作概念:OOP程序像什么 3 1.2 为什么C++会成功 3 1.2.1 较好的C 3 1.2.2 采用渐进的学习方式 4 1.2.3 运行效率 4 1.2.4 系统更容易表达和理解 4 1.2.5 “库”使你...

    C++编程思想.pdf

    1.1.2 继承:类型关系 1 1.1.3 多态性 2 1.1.4 操作概念:OOP程序像什么 3 1.2 为什么C++会成功 3 1.2.1 较好的C 3 1.2.2 采用渐进的学习方式 4 1.2.3 运行效率 4 1.2.4 系统更容易表达和理解 4 1.2.5 “库”使你...

    C++编程思想

    1.1.2 继承:类型关系 1 1.1.3 多态性 2 1.1.4 操作概念:OOP程序像什么 3 1.2 为什么C++会成功 3 1.2.1 较好的C 3 1.2.2 采用渐进的学习方式 4 1.2.3 运行效率 4 1.2.4 系统更容易表达和理解 4 1.2.5 “库”使你...

    C++编程思想 PDF

    1.1.2 继承:类型关系 1 1.1.3 多态性 2 1.1.4 操作概念:OOP程序像什么 3 1.2 为什么C++会成功 3 1.2.1 较好的C 3 1.2.2 采用渐进的学习方式 4 1.2.3 运行效率 4 1.2.4 系统更容易表达和理解 4 1.2.5 “库...

    C++编程思想.rar

    1.1.2 继承:类型关系 1 1.1.3 多态性 2 1.1.4 操作概念:OOP程序像什么 3 1.2 为什么C++会成功 3 1.2.1 较好的C 3 1.2.2 采用渐进的学习方式 4 1.2.3 运行效率 4 1.2.4 系统更容易表达和理解 4 1.2.5 “库”使你...

    c++ 编程思想 (高清pdf)

    1.1.2 继承:类型关系 1 1.1.3 多态性 2 1.1.4 操作概念:OOP程序像什么 3 1.2 为什么C++会成功 3 1.2.1 较好的C 3 1.2.2 采用渐进的学习方式 4 1.2.3 运行效率 4 1.2.4 系统更容易表达和理解 4 1.2.5 “库”使你...

    C++编程思想(程序员必读)

    1.1.2 继承:类型关系 1 1.1.3 多态性 2 1.1.4 操作概念:OOP程序像什么 3 1.2 为什么C++会成功 3 1.2.1 较好的C 3 1.2.2 采用渐进的学习方式 4 1.2.3 运行效率 4 1.2.4 系统更容易表达和理解 4 1.2.5 “库”使你...

Global site tag (gtag.js) - Google Analytics