`
bit1129
  • 浏览: 1053138 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Spark八十四】Spark零碎知识点记录

 
阅读更多

1. ShuffleMapTask的shuffle数据在什么地方记录到MapOutputTracker中的

ShuffleMapTask的runTask方法负责写数据到shuffle map文件中。当任务执行完成成功,DAGScheduler会收到通知,在DAGScheduler的handleTaskCompletion方法中完成记录到MapOutputTracker中

 

    event.reason match {
      case Success =>
        listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
          event.reason, event.taskInfo, event.taskMetrics))
        stage.pendingTasks -= task
        task match {
          case rt: ResultTask[_, _] =>
            stage.resultOfJob match {
              case Some(job) =>
                if (!job.finished(rt.outputId)) {
                  updateAccumulators(event)
                  job.finished(rt.outputId) = true
                  job.numFinished += 1
                  // If the whole job has finished, remove it
                  if (job.numFinished == job.numPartitions) {
                    markStageAsFinished(stage)
                    cleanupStateForJobAndIndependentStages(job)
                    listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
                  }

                  // taskSucceeded runs some user code that might throw an exception. Make sure
                  // we are resilient against that.
                  try {
                    job.listener.taskSucceeded(rt.outputId, event.result)
                  } catch {
                    case e: Exception =>
                      // TODO: Perhaps we want to mark the stage as failed?
                      job.listener.jobFailed(new SparkDriverExecutionException(e))
                  }
                }
              case None =>
                logInfo("Ignoring result from " + rt + " because its job has finished")
            }

          case smt: ShuffleMapTask =>
            updateAccumulators(event)
            ///从通知事件中获得MapStatus对西那个
            val status = event.result.asInstanceOf[MapStatus]
            ////ExecutorId
            val execId = status.location.executorId
            logDebug("ShuffleMapTask finished on " + execId)
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
            } else {
              stage.addOutputLoc(smt.partitionId, status)
            }
            if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) {
              markStageAsFinished(stage)
              logInfo("looking for newly runnable stages")
              logInfo("running: " + runningStages)
              logInfo("waiting: " + waitingStages)
              logInfo("failed: " + failedStages)
              if (stage.shuffleDep.isDefined) {
                // We supply true to increment the epoch number here in case this is a
                // recomputation of the map outputs. In that case, some nodes may have cached
                // locations with holes (from when we detected the error) and will need the
                // epoch incremented to refetch them.
                // TODO: Only increment the epoch number if this is not the first time
                //       we registered these map outputs.
               ///在此处将MapOutput注册到mapOutputTracker中
               mapOutputTracker.registerMapOutputs(
                  stage.shuffleDep.get.shuffleId,
                  stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
                  changeEpoch = true)
              }
              clearCacheLocs()
              if (stage.outputLocs.exists(_ == Nil)) {
                // Some tasks had failed; let's resubmit this stage
                // TODO: Lower-level scheduler should also deal with this
                logInfo("Resubmitting " + stage + " (" + stage.name +
                  ") because some of its tasks had failed: " +
                  stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
                submitStage(stage)
              } else {
                val newlyRunnable = new ArrayBuffer[Stage]
                for (stage <- waitingStages) {
                  logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
                }
                for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
                  newlyRunnable += stage
                }
                waitingStages --= newlyRunnable
                runningStages ++= newlyRunnable
                for {
                  stage <- newlyRunnable.sortBy(_.id)
                  jobId <- activeJobForStage(stage)
                } {
                  logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
                  submitMissingTasks(stage, jobId)
                }
              }
            }
          }

 

2. ShuffleMapTask在写shuffle map数据时(调用SortShuffleWriter.write方法),首先写内存,当内存不够使用时,将spill到磁盘;

 

 override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    if (dep.mapSideCombine) {
      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
      sorter = new ExternalSorter[K, V, C](
        dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
      sorter.insertAll(records) ///Spill到磁盘
    } else {
      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
      // care whether the keys get sorted in each partition; that will be done on the reduce side
      // if the operation being run is sortByKey.
      sorter = new ExternalSorter[K, V, V](
        None, Some(dep.partitioner), None, dep.serializer)
      sorter.insertAll(records)
    }

    val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
    val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) ///写到磁盘文件中
    shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
  }

 

 

3. ResultTask在都去ShuffledRDD中的数据时(通过调用HashShufflerReader),首先读取到内存,当内存不够使用时,将spill到磁盘

 

 

 

override def read(): Iterator[Product2[K, C]] = {
    val ser = Serializer.getSerializer(dep.serializer)
    ///将shuffle数据转换成可遍历的Iterator对象
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)

    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
    ///从Mapper端读取数据前,做Combine
    ///combine时,可能会spill到磁盘
      if (dep.mapSideCombine) {
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context)) 
      } else {
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else {
      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")

      // Convert the Product2s to pairs since this is what downstream RDDs currently expect
      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
    }

    // Sort the output if there is a sort ordering defined.
    ///对output排序,可能spill到磁盘
    dep.keyOrdering match {
      case Some(keyOrd: Ordering[K]) =>
        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,
        // the ExternalSorter won't spill to disk.
        val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))
        sorter.insertAll(aggregatedIter)
        context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
        context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled
        sorter.iterator
      case None =>
        aggregatedIter
    }
  }
 

 

4. 任务本地性处理

a.DriverActor收到

 

 

 

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    Spark知识点汇总

    该文件是小弟在学习spark期间,将spark的所学内容汇总到一起的一个思维导图,内容包括了spark的实战代码和其他技术集成。小弟学术不精,在技术上难免会有疏忽,若有什么地方写错,望海涵并积极指出。

    Spark汇总知识点

    spark相关的知识点整理出来的xmind,仅供参考~ 学习在于总结,希望能够帮助大家更快速的熟悉、了解或者复习这些内容。

    spark知识点个人总结

    spark知识点个人总结

    Spark知识体系-高频知识点汇总及面试常见问题总结

    Spark知识体系-高频知识点汇总及面试常见问题总结

    spark基础知识整理

    spark基础知识思维导图整理,包括SparkCore和SparkSQL

    Spark相关知识点精选

    Spark相关知识点的精选内容,适合初学者参考学习

    Spark-Core学习知识笔记整理

    Spark-Core文档是本人经三年总结笔记汇总而来,对于自我学习Spark核心基础知识非常方便,资料中例举完善,内容丰富。具体目录如下: 目录 第一章 Spark简介与计算模型 3 1 What is Spark 3 2 Spark简介 3 3 Spark...

    大数据Spark企业级实战

    《大数据Spark企业级实战》详细解析了企业级Spark开发所需的几乎所有技术内容,涵盖Spark的架构设计、Spark的集群搭建、Spark内核的解析、Spark SQL、MLLib、GraphX、Spark Streaming、Tachyon、SparkR、Spark多语言...

    Spark从入门到精通

    6、大量全网唯一的知识点:基于排序的wordcount,Spark二次排序,Spark分组取topn,DataFrame与RDD的两种转换方式,Spark SQL的内置函数、开窗函数、UDF、UDAF,Spark Streaming的Kafka Direct API、...

    Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面.zip

    Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...

    Spark 入门实战系列

    Spark 入门实战系列,适合初学者,文档包括十部分内容,质量很好,为了感谢文档作者,也为了帮助更多的人入门,传播作者的心血,特此友情转贴: 1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建....

    Hadoop_Spark相关面试问题总结 - Hadoop知识库.pdf

    Hadoop_Spark相关面试问题总结 - Hadoop知识库.pdf Hadoop_Spark相关面试问题总结 - Hadoop知识库.pdf Hadoop_Spark相关面试问题总结 - Hadoop知识库.pdf

    Spark基本知识调查

    整理一下Spark的基本知识,介绍一下什么是Spark,大概能干什么。

    Spark和TiDB (Spark on TiDB)

    SparkTI (Spark on TiDB)是TiDB基于Apache Spark的独立于原生系统的计算引擎。它将Spark和TiDB深度集成,在原有MySQL Workload之外借助Spark支持了更多样的用户场景和API。这个项目在SparkSQL和Catalyst引擎之外实现...

    基于spark的大数据论文资料

    本资料是集合20篇知网被引最高的基于spark的大数据论文,包括大数据Spark技术研究_刘峰波、大数据下基于Spark的电商实时推荐系统的设计与实现_岑凯伦、基于Spark的Apriori算法的改进_牛海玲、基于Spark的大数据混合...

    基于Spark的电影推荐系统

    基于Spark的电影推荐系统是使用Spark MLlib的ALS推荐算法,对会员电影评分数据和观看记录的数据构建协同过滤式的推荐引擎,对历史数据进行训练创建模型进行针对用户推荐电影和针对电影推荐用户的推荐功能,由此来...

    spark3.0入门到精通

    ├─Spark-day01 │ 01-[了解]-Spark发展历史和特点介绍.mp4 │ 03-[掌握]-Spark环境搭建-Standalone集群模式.mp4 │ 06-[理解]-Spark环境搭建-On-Yarn-两种模式.mp4 │ 07-[掌握]-Spark环境搭建-On-Yarn-两种...

    实验七:Spark初级编程实践

    使用命令./bin/spark-shell启动spark 图2启动spark 2. Spark读取文件系统的数据 (1) 在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数; 图3 spark统计行数 (2) 在spark-...

    超详细Spark思维导图

    超详细Spark思维导图 方便记忆Spark知识点 详细记录了 Spark主要框架Core RDD SQL Streaming 以及 调优 调度的主干知识

    8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf

    1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战....

Global site tag (gtag.js) - Google Analytics