1.flow
1.1 shuffle abstract
1.2 shuffle flow
1.3 sort flow in shuffle
1.4 data structure in mem
2.core code paths
//SortShuffleWriter override def write(records: Iterator[Product2[K, V]]): Unit = { //-how to collect this result by partition?by index file //-1 sort result data //-both below cases will spill if over threshold val ts = System.currentTimeMillis() //-comp to reduce side combine HashShuffleReader#read(),here is real map side. if (dep.mapSideCombine) { //-as map side's Combiner;note:even if no aggregrator is provided,DAGScheduler will add it- require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") //by default,but keyOrdering,eg. sorter = new ExternalSorter[K, V, C]( //reduceByKey() dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) logInfo("-aggr:" + dep.aggregator + ",key ord:" + dep.keyOrdering +",ser:" + dep.serializer +",part:"+dep.partitioner + ",dep " + dep) sorter.insertAll(records) //-if no order is given ,using the key's hashcode to sort per partition } else { //-no combine is given,eg. groupBy.. // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the *reduce side* // if the operation being run is *sortByKey.* sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer) sorter.insertAll(records) } logInfo("*total cost of sorting(ms) " + (System.currentTimeMillis()-ts)) ///-2 write to data file then index file // Don't bother including the time to open the merged output file in the shuffle write time, // because it just opens a single file, so is typically too fast to measure accurately // (see SPARK-3570). val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)//-same with shulffle output file in insertAll() val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID) //-means that always write result dato disk event if data is much less. val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths) logInfo("-output file:" + outputFile + ",blockid:" + blockId + ",part len " + partitionLengths.length + ",total " + partitionLengths.sum +",shuffle server id " + blockManager.shuffleServerId + ",shuffleId " + dep.shuffleId) //-3 encapsulate the result(serialization is placed in Executor#launchTask()) //-used by MapOutputTracker#getServerStatuses() mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)//-not contains real data but only shuffle server } /** Close this writer, passing along whether the map completed */ override def stop(success: Boolean): Option[MapStatus] = { try { if (stopping) { return None } stopping = true if (success) { return Option(mapStatus) } else { // The map task failed, so delete our output data. shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId) return None } } finally { // Clean up our sorter, which may have its own intermediate files if (sorter != null) { val startTime = System.nanoTime() sorter.stop() context.taskMetrics.shuffleWriteMetrics.foreach( _.incShuffleWriteTime(System.nanoTime - startTime)) sorter = null } } }
//ExternalSorter /**-this file's data structure is same as spilled file. * Write all the data added into this ExternalSorter into *a file* in the disk store. This is * called by the SortShuffleWriter and can go through an efficient path of just concatenating * binary files if we decided to avoid merge-sorting. * * @param blockId block ID to write to. The index file will be blockId.name + ".index".-note * @param context a TaskContext for a running Spark task, for us to update shuffle metrics. * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) */ def writePartitionedFile( blockId: BlockId, context: TaskContext, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) //-1.case if (bypassMergeSort && partitionWriters != null) { logInfo("-bypass:" + bypassMergeSort+",pwriters:" + partitionWriters.length) // We decided to write separate files for each partition, so just concatenate them. To keep // this simple we spill out the current in-memory collection so that everything is in files.-so no order is guaranteed spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) val out = new FileOutputStream(outputFile, true) //-note here,append is 'true' val writeStartTime = System.nanoTime util.Utils.tryWithSafeFinally { for (i <- 0 until numPartitions) { val in = new FileInputStream(partitionWriters(i).fileSegment().file) util.Utils.tryWithSafeFinally { lengths(i) = org.apache.spark.util.Utils.copyStream(in, out, false, transferToEnabled) } { in.close() } } } { out.close() context.taskMetrics.shuffleWriteMetrics.foreach( _.incShuffleWriteTime(System.nanoTime - writeStartTime)) } } else if (spills.isEmpty && partitionWriters == null) {//-come here is if no shuffle spill data to disk;same as spill() logInfo("-no spills occured") //2 Case where we only have in-memory data val collection = if (aggregator.isDefined) map else buffer val it = collection.destructiveSortedWritablePartitionedIterator(comparator) //-same as spillToMergableFile() while (it.hasNext) { //-note:this is a double loops val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) //-using file's append mode,so only one file will be created val partitionId = it.nextPartition() //-base part id while (it.hasNext && it.nextPartition() == partitionId) { //-since the data has been ordered by partId,so it.writeNext(writer) //-this is a staged writing } writer.commitAndClose() //-a part data is write all out val segment = writer.fileSegment() lengths(partitionId) = segment.length //-count up it's size } } else { //-3.case spilled files and remain data in-mem(here will spill them all out and concate them to final file) logInfo("-merge spilled file and in-mem data?,part it:" + this.partitionedIterator) // Not bypassing merge-sort; get an iterator by partition and just write everything directly. for ((id, elements) <- this.partitionedIterator) { //-use multiway merge sorter if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { writer.write(elem._1, elem._2) } writer.commitAndClose() val segment = writer.fileSegment() lengths(id) = segment.length } } } context.taskMetrics.incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics.incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m => if (curWriteMetrics != null) { m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten) m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime) m.incShuffleRecordsWritten(curWriteMetrics.shuffleRecordsWritten) } } lengths }
3.FAQ(TBD TODO)
here are some questions im not clear,so any clues from you are highly appreciated:
A.when and where does ResultTask know to fetch result data from ShuffleMapTask?for example,here are a job with :
3 maps(m1,m2,m3) x 2 reduces(r1,r2)
t1:m1,m2,m3 are all running ;
t2:all maps are continued running except that m1 is finished,then r1 is notified to setup and fetchs result of m1
t3:m2 is finished.question is here:when does r1 know to fetch the result of m2? in my guess,there shuould be a share place to for reduces to know where are maps and when they are finished,but i have not found that sources with happy.
B.in BlockManager,see #Question# below
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = { require(blockId != null, "BlockId is null") //-1 get bock locations replication-note:this block is plaaced in TaskRunner#run() in case of 'Indirect result' val locations = Random.shuffle(master.getLocations(blockId)) //-deliver to BlockManagerMasterEndpoint //- #Question# why not to identify which blockmanager is most recent to this driver if in cluster even local mode?vip for (loc <- locations) { //-multi hosts for the same blockid,so once is enough if data is valid logDebug(s"Getting remote block $blockId from $loc") //-2 fetch real data val data = blockTransferService.fetchBlockSync( loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer() if (data != null) { //-get one from one of the replication node,return immediately if (asBlockResult) { return Some(new BlockResult( dataDeserialize(blockId, data), DataReadMethod.Network, data.limit())) } else { return Some(data) } } logDebug(s"The value of block $blockId is null") } logDebug(s"Block $blockId not found") None }
相关推荐
Spark-Core文档是本人经三年总结笔记汇总而来,对于自我学习Spark核心基础知识非常方便,资料中例举完善,内容丰富。具体目录如下: 目录 第一章 Spark简介与计算模型 3 1 What is Spark 3 2 Spark简介 3 3 Spark...
Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 Spark的缓存,变量,shuffle数据等清理及机制 Spark-submit关于参数及部署模式的部分解析 GroupByKey VS ReduceByKey OrderedRDDFunctions...
Making Nested Columns as First Citizens in Apache Spark SQL Simplifying Change Data Capture Using Delta Lakes Apache Arrow* Based Unified Data Exchange Apache Spark Serving-Unifying Batch, Streaming, ...
里面有 Spark 的部署方式,spark core、spark sql、spark streaming 等组件的各种面试题,包括但不限于宽窄依赖、缓存、数据广播、shuffle以及数据倾斜的查看和解决方式等等 对于大数据(离线、在线)面试的同学还是...
由于技术水平、实验条件、经验等限制,当前只讨论 Spark core standalone 版本中的核心功能,而不是全部功能。诚邀各位小伙伴们加入进来,丰富和完善文档。 好久没有写这么完整的文档了,上次写还是三年前在学 Ng ...
包括SparkCore、SparkSql、SparkStreaming、FlinkCore、ScalaLibrary、JavaSrc模块源码阅读。SparkCore包括部署Deploy模块、执行Executor模块、内存Memory模块、调度Scheduler模块、经典的Shuffle模块、存储Storage...
主题:Spark技术内幕 演讲人:张安站,百度网页搜索部系统架构高级工程师 演讲介绍:解析Apache Spark Core的核心实现,包括但不限于任务调度,Shuffle过程详解等。
SparkCore模块源码阅读,版本2.2.0。 包括部署Deploy模块、执行Executor模块、内存Memory模块、调度Scheduler模块、经典的Shuffle模块、存储Storage模块等等。 1,部署模块源码 2,执行器模块源码 原始码:与各种...
1 Spark 的五大组件 在 spark Core 中实现了 Spark 的一些基础的功 能, 例如进行内存的管理、 进行错误的恢复、 对任务 的调度、 与存储系统进行交互等功能。 它还包含了对 弹性分布式数据集的 API 定义。 Spark ...
由于技术水平、实验条件、经验等限制,当前只讨论 Spark core standalone 版本中的核心功能,而不是全部功能。诚邀各位小伙伴们加入进来,丰富和完善文档。关于学术方面的一些讨论可以参阅相关的论文以及 Matei 的...
首先,要搞清楚Spark的几个基本概念和原则,否则系统的性能调优无从谈起:每一台host上面可以并行N个worker,每一个worker下面可以并行M个executor,task们会被分配到executor上面去执行。Stage指的是一组并行运行的...
在SPARK SUMMIT 2017上,Qifan Pu Sameer Agarwal (Databricks) Reynold Xin (Databricks) Ion Stoica分享了题为《Boosting Spark Performance on Many-Core Machines》,就核心机器上的Spark性能,shuffle性能特点...
(1)executor内存(60%):执行内存,执行shuffle(join)的时候,shuffle会用这个内存区来存储数据,如果溢出写磁盘 (2)storage内存(20%):存储缓存,cache、presist、broadcast (3)other内存(20%):...