`

spark总体概况

阅读更多

1. spark vs hadoop

这里写图片描述

PS:Databricks团队特别说明,为了和Hadoop对比,这次用于排序的Spark集群没有使用它们的内存缓存机制,他们也是用硬盘存储的中间结果! 
http://tieba.yunxunmi.com/mtieba-hadoop-kz-58b9e430a78747f7fb1ea9f9e6374597.html 
但是我们要明白,spark的目标是与hadoop共存的,就算很多地方比hadoop优秀,但spark绝对不是替代hadoop的,目前 spark的standalone模式还是有很多局限性,而在中国 像董西成这样hadoop 以及YARN的布道者,导致中国很多人对YARN的理解和使用超过mesos,而mesos是一套资源管理框架,在外国用的比较多,比如twitter

2. spark 整体框架

部署图: 
部署图

从部署图中可以看到 
整个集群分为 Master 节点和 Worker 节点,相当于 Hadoop 的 Master 和 Slave 节点。 
Master 节点上常驻 Master 守护进程,负责管理全部的 Worker 节点。 
Worker 节点上常驻 Worker 守护进程,负责与 Master 节点通信并管理 executors。 
Driver 官方解释是 “The process running the main() function of the application and creating the SparkContext”。 核心就算创建SparkContext 
Application 就是用户自己写的 Spark 程序(driver program),比如 WordCount.scala。 driver program 可在 Master 上运行也可在 Worker 上运行

目前Hadoop1.x(JobTracker and TaskTracker),JobTracker是单点的,Hadoop2.x(ResourceManager,NodeManager,ApplicationManager),ResourceManager也是单点
那 spark master也是单点么? 
NO,可以支持多master 
在SPARK_HOME/conf/spark_env.sh配置如下信息: 
- ZOOKEEPER实现HA: 
spark.deploy.recoveryMode=ZOOKEEPER 
spark.deploy.zookeeper.url=zk_server_1:2181,zk_server_2:2181 
spark.deploy.zookeeper.dir=/dir 
or 
export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=ZOOKEEPER ” 
export SPARK_DAEMON_JAVA_OPTS=”${SPARK_DAEMON_JAVA_OPTS} -Dspark.deploy.zookeeper.url=zk_server1:2181,zk_server_2:2181” 
- FILESYSTEM实现HA: 

export SPARK_DAEMON_JAVA_OPTS=”-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/nfs/spark/recovery”

但是要注意,当我们配置完多master后,启动在提交任务或者启动spark-shell时,需要增加MASTER=spark://master001:7077,master002:7077

最简单的wordcount: 
这里写图片描述

spark 任务分析: 
这里写图片描述

具体到Your Program: 
这里写图片描述

  1. what is RDD? 
    这里写图片描述

    这里写图片描述

  2. what is transformation? 
    map,filter, flatMap, mapPartitions, mapPartitionsWithIndex, sample, pipe, union, intersection,distinct, groupByKey, reduceByKey, sortByKey, join, cogroup, cartesian, coalesce,repartition 这些都是transformation,属于lazy execution,主要做数据转换
  3. what is action? 
    reduce, collect, count, take,first, takeSample, saveAsTextFile, saveAsSequenceFile, saveAsObjectFile, countByKey,foreach这些操作会触发spark真正任务执行

这里写图片描述

 

这里写图片描述

这里写图片描述

下面简单分析下 job 的生成和提交代码:

  1. rdd.action() 会调用 DAGScheduler.runJob(rdd, processPartition, resultHandler) 来生成 job。
  2. runJob() 会首先通过rdd.getPartitions()来得到 finalRDD 中应该存在的 partition 的个数和类型:Array[Partition]。然后根据 partition 个数 new 出来将来要持有 result 的数组 Array[Result](partitions.size)
  3. 最后调用 DAGScheduler 的runJob(rdd, cleanedFunc, partitions, allowLocal, resultHandler)来提交 job。cleanedFunc 是 processParittion 经过闭包清理后的结果,这样可以被序列化后传递给不同节点的 task。
  4. DAGScheduler 的 runJob 继续调用submitJob(rdd, func, partitions, allowLocal, resultHandler) 来提交 job。
  5. submitJob() 首先得到一个 jobId,然后再次包装 func,向 DAGSchedulerEventProcessActor 发送 JobSubmitted 信息,该 actor 收到信息后进一步调用dagScheduler.handleJobSubmitted()来处理提交的 job。之所以这么麻烦,是为了符合事件驱动模型。
  6. handleJobSubmmitted() 首先调用 finalStage = newStage() 来划分 stage,然后submitStage(finalStage)。由于 finalStage 可能有 parent stages,实际先提交 parent stages,等到他们执行完,finalStage 需要再次提交执行。再次提交由 handleJobSubmmitted() 最后的 submitWaitingStages() 负责。

分析一下 newStage() 如何划分 stage:

  1. 该方法在 new Stage() 的时候会调用 finalRDD 的 getParentStages()。
  2. getParentStages() 从 finalRDD 出发,反向 visit 逻辑执行图,遇到 NarrowDependency 就将依赖的 RDD 加入到 stage,遇到 ShuffleDependency 切开 stage,并递归到 ShuffleDepedency 依赖的 stage。
  3. 一个 ShuffleMapStage(不是最后形成 result 的 stage)形成后,会将该 stage 最后一个 RDD 注册到MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size),这一步很重要,因为 shuffle 过程需要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置。

分析一下 submitStage(stage) 如何提交 stage 和 task:

  1. 先确定该 stage 的 missingParentStages,使用getMissingParentStages(stage)。如果 parentStages 都可能已经执行过了,那么就为空了。
  2. 如果 missingParentStages 不为空,那么先递归提交 missing 的 parent stages,并将自己加入到 waitingStages 里面,等到 parent stages 执行结束后,会触发提交 waitingStages 里面的 stage。
  3. 如果 missingParentStages 为空,说明该 stage 可以立即执行,那么就调用submitMissingTasks(stage, jobId)来生成和提交具体的 task。如果 stage 是 ShuffleMapStage,那么 new 出来与该 stage 最后一个 RDD 的 partition 数相同的 ShuffleMapTasks。如果 stage 是 ResultStage,那么 new 出来与 stage 最后一个 RDD 的 partition 个数相同的 ResultTasks。一个 stage 里面的 task 组成一个 TaskSet,最后调用taskScheduler.submitTasks(taskSet)来提交一整个 taskSet。
  4. 这个 taskScheduler 类型是 TaskSchedulerImpl,在 submitTasks() 里面,每一个 taskSet 被包装成 manager: TaskSetMananger,然后交给schedulableBuilder.addTaskSetManager(manager)。schedulableBuilder 可以是 FIFOSchedulableBuilder 或者 FairSchedulableBuilder 调度器。submitTasks() 最后一步是通知backend.reviveOffers()去执行 task,backend 的类型是 SchedulerBackend。如果在集群上运行,那么这个 backend 类型是 SparkDeploySchedulerBackend。
  5. SparkDeploySchedulerBackend 是 CoarseGrainedSchedulerBackend 的子类,backend.reviveOffers()其实是向 DriverActor 发送 ReviveOffers 信息。SparkDeploySchedulerBackend 在 start() 的时候,会启动 DriverActor。DriverActor 收到 ReviveOffers 消息后,会调用launchTasks(scheduler.resourceOffers(Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId))))) 来 launch tasks。scheduler 就是 TaskSchedulerImpl。scheduler.resourceOffers()从 FIFO 或者 Fair 调度器那里获得排序后的 TaskSetManager,并经过TaskSchedulerImpl.resourceOffer(),考虑 locality 等因素来确定 task 的全部信息 TaskDescription。调度细节这里暂不讨论。
  6. DriverActor 中的 launchTasks() 将每个 task 序列化,如果序列化大小不超过 Akka 的 akkaFrameSize,那么直接将 task 送到 executor 那里执行executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))。 

3. shuffle

在 Spark 中,没有这样功能明确的阶段。Spark将用户定义的计算过程转化为一个被称作Job逻辑执行图的有向无环图(DAG),图中的顶点代表RDD,边代表RDD之间的依赖关系。再将这个逻辑执行图转化为物理执行图,具体方法是:从逻辑图后往前推算,遇到 ShuffleDependency 就断开,最后根据断开的次数n,将其化分为(n+1)个stage。每个 stage 里面 task 的数目由该 stage 最后一个 RDD 中的 partition 个数决定。因此,Spark的Job的shuffle数是不固定的。 
在Spark早期的版本中,Spark使用的是hash-based的shuffle,通常使用 HashMap 来对 shuffle 来的数据进行聚合,不会对数据进行提前排序。而Hadoop MapReduce 一直使用的就是 sort-based shuffle,进入 combine和 reduce的数据都会先经过排序(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。不过在Spark1.1已经支持sorted-basedshuffle,在这一点上做到了扬长避短。这次排序比赛中所使用的是Spark 1.2,采用的就是sorted-based shuffle。 
此外,Databricks还创建了一个外部shuffle服务,该服务和Spark执行器(executor)本身是分离的。这个服务使得即使是Spark 执行器在因GC导致的暂停时仍然可以正常进行shuffle。

Shuffle write

由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。 
shuffle write 的任务很简单,那么实现也很简单:将 shuffle write 的处理逻辑加入到 ShuffleMapStage(ShuffleMapTask 所在的 stage) 的最后,该 stage 的 final RDD 每输出一个 record 就将其 partition 并持久化。图示如下: 
这里写图片描述
上图有 4 个 ShuffleMapTask 要在同一个 worker node 上运行,CPU core 数为 2,可以同时运行两个 task。每个 task 的执行结果(该 stage 的 finalRDD 中某个 partition 包含的 records)被逐一写到本地磁盘上。每个 task 包含 R 个缓冲区,R = reducer 个数(也就是下一个 stage 中 task 的个数),缓冲区被称为 bucket,其大小为spark.shuffle.file.buffer.kb ,默认是 32KB(Spark 1.1 版本以前是 100KB)。 
其实 bucket 是一个广义的概念,代表 ShuffleMapTask 输出结果经过 partition 后要存放的地方,这里为了细化数据存放位置和数据名称,仅仅用 bucket 表示缓冲区。 
ShuffleMapTask 的执行过程很简单:先利用 pipeline 计算得到 finalRDD 中对应 partition 的 records。每得到一个 record 就将其送到对应的 bucket 里,具体是哪个 bucket 由partitioner.partition(record.getKey()))决定。每个 bucket 里面的数据会不断被写到本地磁盘上,形成一个 ShuffleBlockFile,或者简称 FileSegment。之后的 reducer 会去 fetch 属于自己的 FileSegment,进入 shuffle read 阶段。

这样的实现很简单,但有几个问题: 
产生的 FileSegment 过多。每个 ShuffleMapTask 产生 R(reducer 个数)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。一般 Spark job 的 M 和 R 都很大,因此磁盘上会存在大量的数据文件。 
缓冲区占用内存空间大。每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 M * R 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 worker node 上同时存在的 bucket 个数可以达到 cores * R 个(一般 worker 同时可以运行 cores 个 ShuffleMapTask),占用的内存空间也就达到了cores * R * 32 KB。对于 8 核 1000 个 reducer 来说,占用内存就是 256MB。 
目前来看,第二个问题还没有好的方法解决,因为写磁盘终究是要开缓冲区的,缓冲区太小会影响 IO 速度。但第一个问题有一些方法去解决,下面介绍已经在 Spark 里面实现的 FileConsolidation 方法。先上图: 
这里写图片描述

可以明显看出,在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i’,每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。这样,每个 worker 持有的文件数降为 cores * R。FileConsolidation 功能可以通过spark.shuffle.consolidateFiles=true来开启。

 

Shuffle read

先看一张包含 ShuffleDependency 的物理执行图,来自 reduceByKey: 
这里写图片描述

  • 在什么时候 fetch?当 parent stage 的所有 ShuffleMapTasks 结束后再 fetch。理论上讲,一个 ShuffleMapTask 结束后就可以 fetch,但是为了迎合 stage 的概念(即一个 stage 如果其 parent stages 没有执行完,自己是不能被提交执行的),还是选择全部 ShuffleMapTasks 执行完再去 fetch。因为 fetch 来的 FileSegments 要先在内存做缓冲,所以一次 fetch 的 FileSegments 总大小不能太大。Spark 规定这个缓冲界限不能超过 spark.reducer.maxMbInFlight,这里用 softBuffer 表示,默认大小为 48MB。一个 softBuffer 里面一般包含多个 FileSegment,但如果某个 FileSegment 特别大的话,这一个就可以填满甚至超过 softBuffer 的界限。
  • 边 fetch 边处理还是一次性 fetch 完再处理?边 fetch 边处理。本质上,MapReduce shuffle 阶段就是边 fetch 边使用 combine() 进行处理,只是 combine() 处理的是部分数据。MapReduce 为了让进入 reduce() 的 records 有序,必须等到全部数据都 shuffle-sort 后再开始 reduce()。因为 Spark 不要求 shuffle 后的数据全局有序,因此没必要等到全部数据 shuffle 完成后再处理。那么如何实现边 shuffle 边处理,而且流入的 records 是无序的?答案是使用可以 aggregate 的数据结构,比如 HashMap。每 shuffle 得到(从缓冲的 FileSegment 中 deserialize 出来)一个 <Key, Value> record,直接将其放进 HashMap 里面。如果该 HashMap 已经存在相应的 Key,那么直接进行 aggregate 也就是 func(hashMap.get(Key), Value),比如上面 WordCount 例子中的 func 就是 hashMap.get(Key) + Value,并将 func 的结果重新 put(key) 到 HashMap 中去。这个 func 功能上相当于 reduce(),但实际处理数据的方式与 MapReduce reduce() 有差别

  • fetch 来的数据存放到哪里?刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。这里我们主要讨论处理后的数据,可以灵活设置这些数据是“只用内存”还是“内存+磁盘”。如果spark.shuffle.spill = false就只用内存。内存使用的是AppendOnlyMap ,类似 Java 的HashMap,内存+磁盘使用的是ExternalAppendOnlyMap,如果内存空间不足时,ExternalAppendOnlyMap可以将 <K, V> records 进行 sort 后 spill 到磁盘上,等到需要它们的时候再进行归并,后面会详解。使用“内存+磁盘”的一个主要问题就是如何在两者之间取得平衡?在 Hadoop MapReduce 中,默认将 reducer 的 70% 的内存空间用于存放 shuffle 来的数据,等到这个空间利用率达到 66% 的时候就开始 merge-combine()-spill。在 Spark 中,也适用同样的策略,一旦 ExternalAppendOnlyMap 达到一个阈值就开始 spill,具体细节下面会讨论。
  • 怎么获得要 fetch 的数据的存放位置?在上一章讨论物理执行图中的 stage 划分的时候,我们强调 “一个 ShuffleMapStage 形成后,会将该 stage 最后一个 final RDD 注册到MapOutputTrackerMaster.registerShuffle(shuffleId, rdd.partitions.size),这一步很重要,因为 shuffle 过程需要 MapOutputTrackerMaster 来指示 ShuffleMapTask 输出数据的位置”。因此,reducer 在 shuffle 的时候是要去 driver 里面的 MapOutputTrackerMaster 询问 ShuffleMapTask 输出的数据位置的。每个 ShuffleMapTask 完成时会将 FileSegment 的存储位置信息汇报给 MapOutputTrackerMaster。

 

4.akka

消息队列系统 
在spark中 作为消息系统为master,worker,driver等通信 
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)

// Master to Worker 
case class RegisteredWorker(masterUrl: String, masterWebUiUrl: String) extends DeployMessage 
这里写图片描述
http://hongbinzuo.github.io/2014/12/16/Akka-Tutorial-with-Code-Conncurrency-and-Fault-Tolerance/

5.tachyon

分布式文件系统,介于内存和磁盘之间的存储介质 
http://tachyon-project.org/index.html

架构图: 
这里写图片描述

6. netty

目前在spark中主要作为spark shuffle处理后 从各个解决拉取shuffle数据

7.安装和启动

. 配置SPARK_HOME/conf/spark-env.sh文件 
这里写图片描述
. 配置SPARK_HOME/conf/slaves文件 
这里写图片描述 
. 启动spark&验证 
这里写图片描述 
这里写图片描述

尊重原创,未经允许不得转载:http://blog.csdn.net/stark_summer/article/details/45917603

6
0
分享到:
评论
3 楼 Stark_Summer 2015-05-28  
qindongliang1922 写道
写的不错,赞一个!

昨晚 睡前看了下,发现有些地方不满意,又改了下,谢谢关注~
2 楼 mangguo 2015-05-28  
1 楼 qindongliang1922 2015-05-27  
写的不错,赞一个!

相关推荐

    Spark总体架构和运行流程

    Spark 总体架构 Spark 运行架构如图 1 所示,包括集群资源管理器(Cluster Manager)、多个运行作业任务的工作结点(Worker Node)、每个应用的任务控制结点(Driver)和每个工作结点上负责具体任务的执行进程...

    Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面.zip

    Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...

    大数据Spark企业级实战

    《大数据Spark企业级实战》详细解析了企业级Spark开发所需的几乎所有技术内容,涵盖Spark的架构设计、Spark的集群搭建、Spark内核的解析、Spark SQL、MLLib、GraphX、Spark Streaming、Tachyon、SparkR、Spark多语言...

    Spark 入门实战系列

    Spark 入门实战系列,适合初学者,文档包括十部分内容,质量很好,为了感谢文档作者,也为了帮助更多的人入门,传播作者的心血,特此友情转贴: 1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建....

    Spark和TiDB (Spark on TiDB)

    SparkTI (Spark on TiDB)是TiDB基于Apache Spark的独立于原生系统的计算引擎。它将Spark和TiDB深度集成,在原有MySQL Workload之外借助Spark支持了更多样的用户场景和API。这个项目在SparkSQL和Catalyst引擎之外实现...

    基于spark的大数据论文资料

    本资料是集合20篇知网被引最高的基于spark的大数据论文,包括大数据Spark技术研究_刘峰波、大数据下基于Spark的电商实时推荐系统的设计与实现_岑凯伦、基于Spark的Apriori算法的改进_牛海玲、基于Spark的大数据混合...

    spark3.0入门到精通

    ├─Spark-day01 │ 01-[了解]-Spark发展历史和特点介绍.mp4 │ 03-[掌握]-Spark环境搭建-Standalone集群模式.mp4 │ 06-[理解]-Spark环境搭建-On-Yarn-两种模式.mp4 │ 07-[掌握]-Spark环境搭建-On-Yarn-两种...

    spark Linux 版本安装包

    spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包spark Linux 版本安装包...

    spark笔记整理文档

    spark笔记整理文档spark笔记整理文档spark笔记整理文档

    Spark实战高手之路 - Spark亚太研究院.part4.rar

    Spark实战高手之路 【Spark亚太研究院系列丛书】《Spark机器学习库(v1.2.0)》-王宇舟 【Spark亚太研究院系列丛书】Spark实战高手之路-第1章(1) 【Spark亚太研究院系列丛书】Spark实战高手之路-第1章(2) ...

    实验七:Spark初级编程实践

    使用命令./bin/spark-shell启动spark 图2启动spark 2. Spark读取文件系统的数据 (1) 在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数; 图3 spark统计行数 (2) 在spark-...

    Spark经典论文合集

    An Architecture for Fast and General Data Processing on Large Clusters.pdf Discretized Streams An ...Spark SQL Relational Data Processing in Spark.pdf spark.pdf 大型集群上的快速和通用数据处理架构.pdf

    Spark全面精讲(基于Spark2版本+含Spark调优+超多案例)【不是王家林版本】

    0基础spark,基于spark2,内容完整全面,学完精通spark

    spark相关jar包

    spark相关jar包

    8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf

    1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战....

    Apache Spark 2.4 and beyond

    Apache Spark 2.4 comes packed with a lot of new functionalities and improvements, including the new barrier execution mode, flexible streaming sink, the native AVRO data source, PySpark’s eager ...

    Apache Spark 2 for Beginners [2016]

    Apache Spark 2.0 for Beginners English | ISBN: 1785885006 | 2016 | Key Features This book offers an easy introduction to the Spark framework published on the latest version of Apache Spark 2 Perform ...

    Spark从入门到精通

    本课程主要讲解的内容包括:Scala编程、Hadoop与Spark集群搭建、Spark核心编程、Spark内核源码深度剖析、Spark性能调优、Spark SQL、Spark Streaming。 本课程的最大特色包括: 1、代码驱动讲解Spark的各个技术点...

    8.SparkMLlib(下)--SparkMLlib实战.pdf

    1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战....

Global site tag (gtag.js) - Google Analytics