1. ShuffleMapTask的shuffle数据在什么地方记录到MapOutputTracker中的
ShuffleMapTask的runTask方法负责写数据到shuffle map文件中。当任务执行完成成功,DAGScheduler会收到通知,在DAGScheduler的handleTaskCompletion方法中完成记录到MapOutputTracker中
event.reason match { case Success => listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType, event.reason, event.taskInfo, event.taskMetrics)) stage.pendingTasks -= task task match { case rt: ResultTask[_, _] => stage.resultOfJob match { case Some(job) => if (!job.finished(rt.outputId)) { updateAccumulators(event) job.finished(rt.outputId) = true job.numFinished += 1 // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { markStageAsFinished(stage) cleanupStateForJobAndIndependentStages(job) listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded)) } // taskSucceeded runs some user code that might throw an exception. Make sure // we are resilient against that. try { job.listener.taskSucceeded(rt.outputId, event.result) } catch { case e: Exception => // TODO: Perhaps we want to mark the stage as failed? job.listener.jobFailed(new SparkDriverExecutionException(e)) } } case None => logInfo("Ignoring result from " + rt + " because its job has finished") } case smt: ShuffleMapTask => updateAccumulators(event) ///从通知事件中获得MapStatus对西那个 val status = event.result.asInstanceOf[MapStatus] ////ExecutorId val execId = status.location.executorId logDebug("ShuffleMapTask finished on " + execId) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId) } else { stage.addOutputLoc(smt.partitionId, status) } if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) { markStageAsFinished(stage) logInfo("looking for newly runnable stages") logInfo("running: " + runningStages) logInfo("waiting: " + waitingStages) logInfo("failed: " + failedStages) if (stage.shuffleDep.isDefined) { // We supply true to increment the epoch number here in case this is a // recomputation of the map outputs. In that case, some nodes may have cached // locations with holes (from when we detected the error) and will need the // epoch incremented to refetch them. // TODO: Only increment the epoch number if this is not the first time // we registered these map outputs. ///在此处将MapOutput注册到mapOutputTracker中 mapOutputTracker.registerMapOutputs( stage.shuffleDep.get.shuffleId, stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray, changeEpoch = true) } clearCacheLocs() if (stage.outputLocs.exists(_ == Nil)) { // Some tasks had failed; let's resubmit this stage // TODO: Lower-level scheduler should also deal with this logInfo("Resubmitting " + stage + " (" + stage.name + ") because some of its tasks had failed: " + stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", ")) submitStage(stage) } else { val newlyRunnable = new ArrayBuffer[Stage] for (stage <- waitingStages) { logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage)) } for (stage <- waitingStages if getMissingParentStages(stage) == Nil) { newlyRunnable += stage } waitingStages --= newlyRunnable runningStages ++= newlyRunnable for { stage <- newlyRunnable.sortBy(_.id) jobId <- activeJobForStage(stage) } { logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable") submitMissingTasks(stage, jobId) } } } }
2. ShuffleMapTask在写shuffle map数据时(调用SortShuffleWriter.write方法),首先写内存,当内存不够使用时,将spill到磁盘;
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") sorter = new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) sorter.insertAll(records) ///Spill到磁盘 } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. sorter = new ExternalSorter[K, V, V]( None, Some(dep.partitioner), None, dep.serializer) sorter.insertAll(records) } val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) ///写到磁盘文件中 shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths) }
3. ResultTask在都去ShuffledRDD中的数据时(通过调用HashShufflerReader),首先读取到内存,当内存不够使用时,将spill到磁盘
override def read(): Iterator[Product2[K, C]] = { val ser = Serializer.getSerializer(dep.serializer) ///将shuffle数据转换成可遍历的Iterator对象 val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser) val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { ///从Mapper端读取数据前,做Combine ///combine时,可能会spill到磁盘 if (dep.mapSideCombine) { new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context)) } else { new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context)) } } else { require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!") // Convert the Product2s to pairs since this is what downstream RDDs currently expect iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2)) } // Sort the output if there is a sort ordering defined. ///对output排序,可能spill到磁盘 dep.keyOrdering match { case Some(keyOrd: Ordering[K]) => // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, // the ExternalSorter won't spill to disk. val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) sorter.insertAll(aggregatedIter) context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled sorter.iterator case None => aggregatedIter } }
4. 任务本地性处理
a.DriverActor收到
相关推荐
该文件是小弟在学习spark期间,将spark的所学内容汇总到一起的一个思维导图,内容包括了spark的实战代码和其他技术集成。小弟学术不精,在技术上难免会有疏忽,若有什么地方写错,望海涵并积极指出。
spark相关的知识点整理出来的xmind,仅供参考~ 学习在于总结,希望能够帮助大家更快速的熟悉、了解或者复习这些内容。
spark知识点个人总结
Spark知识体系-高频知识点汇总及面试常见问题总结
spark基础知识思维导图整理,包括SparkCore和SparkSQL
Spark相关知识点的精选内容,适合初学者参考学习
Spark-Core文档是本人经三年总结笔记汇总而来,对于自我学习Spark核心基础知识非常方便,资料中例举完善,内容丰富。具体目录如下: 目录 第一章 Spark简介与计算模型 3 1 What is Spark 3 2 Spark简介 3 3 Spark...
《大数据Spark企业级实战》详细解析了企业级Spark开发所需的几乎所有技术内容,涵盖Spark的架构设计、Spark的集群搭建、Spark内核的解析、Spark SQL、MLLib、GraphX、Spark Streaming、Tachyon、SparkR、Spark多语言...
6、大量全网唯一的知识点:基于排序的wordcount,Spark二次排序,Spark分组取topn,DataFrame与RDD的两种转换方式,Spark SQL的内置函数、开窗函数、UDF、UDAF,Spark Streaming的Kafka Direct API、...
Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点...
Spark 安装与使用实验报告 本实验报告的目的是学习大数据分析引擎 Spark 的安装与使用。Spark 是一个基于内存的分布式计算框架,能够高效地处理大规模数据。 一、实验目的 本实验的目的是学习 Spark 的安装与使用...
Spark 入门实战系列,适合初学者,文档包括十部分内容,质量很好,为了感谢文档作者,也为了帮助更多的人入门,传播作者的心血,特此友情转贴: 1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建....
Hadoop_Spark相关面试问题总结 - Hadoop知识库.pdf Hadoop_Spark相关面试问题总结 - Hadoop知识库.pdf Hadoop_Spark相关面试问题总结 - Hadoop知识库.pdf
SparkTI (Spark on TiDB)是TiDB基于Apache Spark的独立于原生系统的计算引擎。它将Spark和TiDB深度集成,在原有MySQL Workload之外借助Spark支持了更多样的用户场景和API。这个项目在SparkSQL和Catalyst引擎之外实现...
整理一下Spark的基本知识,介绍一下什么是Spark,大概能干什么。
使用命令./bin/spark-shell启动spark 图2启动spark 2. Spark读取文件系统的数据 (1) 在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数; 图3 spark统计行数 (2) 在spark-...
下面是 Spark 安装使用教程 PDF 中的知识点总结: 1. Spark 概述 Spark 是一个开源的计算引擎,可以快速地处理大量数据。Spark 的核心是 SparkContext,它是 Spark 的主要入口点。SparkContext 负责管理作业的执行...
本资料是集合20篇知网被引最高的基于spark的大数据论文,包括大数据Spark技术研究_刘峰波、大数据下基于Spark的电商实时推荐系统的设计与实现_岑凯伦、基于Spark的Apriori算法的改进_牛海玲、基于Spark的大数据混合...
基于Spark的电影推荐系统是使用Spark MLlib的ALS推荐算法,对会员电影评分数据和观看记录的数据构建协同过滤式的推荐引擎,对历史数据进行训练创建模型进行针对用户推荐电影和针对电影推荐用户的推荐功能,由此来...