原文 http://www.cnblogs.com/hseagle/p/3979744.html
概要
Spark 1.1中对spark core的一个重大改进就是引入了sort-based shuffle处理机制,本文就该处理机制的实现进行初步的分析。
通过一个小的实验来直观的感受一下sort-based shuffle算法会产生哪些中间文件,具体实验步骤如下所述。
步骤1: 修改conf/spark-default.conf, 加入如下内容
spark.shuffle.manager SORT
步骤2: 运行spark-shell
SPARK_LOCAL_IP=127.0.0.1 $SPARK_HOME/bin/spark-shell
步骤3: 执行wordcount
sc.textFile("README.md").flatMap(l => l.split(" ")).map(w=>(w,1)).reduceByKey(_ + _).collect
步骤4: 查看生成的中间文件
find /tmp/spark-local* -type f
文件查找结果如下所示
可以看到生成了两人种后缀的文件,分别为data和index类型,这两者的用途在后续分析中会详细讲述。
如果我们做一下对比实验,将shuffle模式改为Hash,再来观看生成的文件,就会找到区别。将原先配置文件中的 SORT 改为 HASH ,重新启动spark-shell,执行相同的wordcount之后,在tmp目录下找到的文件列表如下。
两者生成的文件数量差异非常大,具体数值计算如下
在HASH模式下,每一次shuffle会生成M*R的数量的文件,如上述wordcount例子中,整个job有一次shuffle过程,由于输入文件默认分片为2,故M个数为2,而spark.default.parallelism配置的值为4,故R为4,所以总共生成1*2*4=8个文件。shuffle_0_1_2解读为shuffle+shuffle_id+map_id+reduce_id,故0_1_2表示由第0次shuffle中的第1个maptask生成的文件,该文件内容会被第2个reduce task消费
在SORT模式下,一个Map Task只生成一个文件,而不管生成的文件要被多少的Reduce消费,故文件个数是M的数量,由于wordcount中的默认分片为2,故只生成两个data文件
多次shuffle
刚才的示例中只有一次shuffle过程,我们可以通过小小的改动来达到两次shuffle,代码如下
上述代码将reduceByKey的结果通过map进行反转,即将原来的(w, count)转换为(count,w),然后根据出现次数进行归类。 groupByKey会再次导致数据shuffle过程。
在HASH模式下产生的文件如下所示
/tmp/spark-local-20140919094531-1cb6/12/shuffle_0_3_3
/tmp/spark-local-20140919094531-1cb6/0c/shuffle_0_0_0
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_2_3
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_3_2
/tmp/spark-local-20140919094531-1cb6/11/shuffle_1_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_2_2
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_3_1
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_0_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_0_3
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_3_0
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_2_1
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_0_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_1_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_0_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_1_0
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_1_0_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_2_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_1_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_0_2
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_0_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_1_0
引入一次新的shuffle,产生了大量的中间文件
如果是使用SORT,效果如何呢?只会增加M个文件,由于在新的shuffle过程中,map task数目为4,所以总共的文件是2+4=6。
值得指出的是shuffle_0和shuffle_1的执行次序问题,数字越大越先执行,由于spark job提交的时候是从后往前倒推的,故0是最后将执行,而前面的先执行。
Sort-based Shuffle的设计思想
sort-based shuffle的总体指导思想是一个map task最终只生成一个shuffle文件,那么后续的reduce task是如何从这一个shuffle文件中得到自己的partition呢,这个时候就需要引入一个新的文件类型即index文件。
其具体实现步骤如下:
Map Task在读取自己输入的partition之后,将计算结果写入到ExternalSorter
ExternalSorter会使用一个map来存储新的计算结果,新的计算结果根据partiton分类,如果是有combine操作,则需要将新的值与原有的值进行合并
如果ExternalSorter中的map占用的内存已经超越了使用的阀值,则将map中的内容spill到磁盘中,每一次spill产生一个不同的文件
当输入Partition中的所有数据都已经处理完毕之后,这时有可能一部分计算结果在内存中,另一部分计算结果在spill的一到多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里
最后将每一个partition的在data文件中的起始位置和结束位置写入到index文件
相应的源文件
几个重要的函数
SortShuffleWriter.write
ExternalSorter.insertAll
writePartitionedFile将内存中的数据和spill文件中内容一起合并到一个文件当中
而数据读取过程中则需要使用IndexShuffleBlockManager来获取Partiton的具体位置
概要
Spark 1.1中对spark core的一个重大改进就是引入了sort-based shuffle处理机制,本文就该处理机制的实现进行初步的分析。
通过一个小的实验来直观的感受一下sort-based shuffle算法会产生哪些中间文件,具体实验步骤如下所述。
步骤1: 修改conf/spark-default.conf, 加入如下内容
spark.shuffle.manager SORT
步骤2: 运行spark-shell
SPARK_LOCAL_IP=127.0.0.1 $SPARK_HOME/bin/spark-shell
步骤3: 执行wordcount
sc.textFile("README.md").flatMap(l => l.split(" ")).map(w=>(w,1)).reduceByKey(_ + _).collect
步骤4: 查看生成的中间文件
find /tmp/spark-local* -type f
文件查找结果如下所示
/tmp/spark-local-20140919091822-aa66/0f/shuffle_0_1_0.index /tmp/spark-local-20140919091822-aa66/30/shuffle_0_0_0.index /tmp/spark-local-20140919091822-aa66/0c/shuffle_0_0_0.data /tmp/spark-local-20140919091822-aa66/15/shuffle_0_1_0.data
可以看到生成了两人种后缀的文件,分别为data和index类型,这两者的用途在后续分析中会详细讲述。
如果我们做一下对比实验,将shuffle模式改为Hash,再来观看生成的文件,就会找到区别。将原先配置文件中的 SORT 改为 HASH ,重新启动spark-shell,执行相同的wordcount之后,在tmp目录下找到的文件列表如下。
/tmp/spark-local-20140919092949-14cc/10/shuffle_0_1_3 /tmp/spark-local-20140919092949-14cc/0f/shuffle_0_1_2 /tmp/spark-local-20140919092949-14cc/0f/shuffle_0_0_3 /tmp/spark-local-20140919092949-14cc/0c/shuffle_0_0_0 /tmp/spark-local-20140919092949-14cc/0d/shuffle_0_1_0 /tmp/spark-local-20140919092949-14cc/0d/shuffle_0_0_1 /tmp/spark-local-20140919092949-14cc/0e/shuffle_0_1_1 /tmp/spark-local-20140919092949-14cc/0e/shuffle_0_0_2
两者生成的文件数量差异非常大,具体数值计算如下
在HASH模式下,每一次shuffle会生成M*R的数量的文件,如上述wordcount例子中,整个job有一次shuffle过程,由于输入文件默认分片为2,故M个数为2,而spark.default.parallelism配置的值为4,故R为4,所以总共生成1*2*4=8个文件。shuffle_0_1_2解读为shuffle+shuffle_id+map_id+reduce_id,故0_1_2表示由第0次shuffle中的第1个maptask生成的文件,该文件内容会被第2个reduce task消费
在SORT模式下,一个Map Task只生成一个文件,而不管生成的文件要被多少的Reduce消费,故文件个数是M的数量,由于wordcount中的默认分片为2,故只生成两个data文件
多次shuffle
刚才的示例中只有一次shuffle过程,我们可以通过小小的改动来达到两次shuffle,代码如下
sc.textFile("README.md").flatMap(l => l.split(" ")).map(w => (w,1)).reduceByKey(_ + _).map(p=>(p._2,p._1)).groupByKey.collect
上述代码将reduceByKey的结果通过map进行反转,即将原来的(w, count)转换为(count,w),然后根据出现次数进行归类。 groupByKey会再次导致数据shuffle过程。
在HASH模式下产生的文件如下所示
/tmp/spark-local-20140919094531-1cb6/12/shuffle_0_3_3
/tmp/spark-local-20140919094531-1cb6/0c/shuffle_0_0_0
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_2_3
/tmp/spark-local-20140919094531-1cb6/11/shuffle_0_3_2
/tmp/spark-local-20140919094531-1cb6/11/shuffle_1_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_2_2
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_1_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_0_3_1
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_0_3
/tmp/spark-local-20140919094531-1cb6/10/shuffle_1_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_0_3
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_3_0
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_2_1
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_0_1_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_0_2
/tmp/spark-local-20140919094531-1cb6/0f/shuffle_1_1_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_0_1
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_0_1_0
/tmp/spark-local-20140919094531-1cb6/0d/shuffle_1_0_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_2_0
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_1_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_0_0_2
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_0_1
/tmp/spark-local-20140919094531-1cb6/0e/shuffle_1_1_0
引入一次新的shuffle,产生了大量的中间文件
如果是使用SORT,效果如何呢?只会增加M个文件,由于在新的shuffle过程中,map task数目为4,所以总共的文件是2+4=6。
/tmp/spark-local-20140919094731-034a/29/shuffle_0_3_0.data /tmp/spark-local-20140919094731-034a/30/shuffle_0_0_0.index /tmp/spark-local-20140919094731-034a/15/shuffle_0_1_0.data /tmp/spark-local-20140919094731-034a/36/shuffle_0_2_0.data /tmp/spark-local-20140919094731-034a/0c/shuffle_0_0_0.data /tmp/spark-local-20140919094731-034a/32/shuffle_0_2_0.index /tmp/spark-local-20140919094731-034a/32/shuffle_1_1_0.index /tmp/spark-local-20140919094731-034a/0f/shuffle_0_1_0.index /tmp/spark-local-20140919094731-034a/0f/shuffle_1_0_0.index /tmp/spark-local-20140919094731-034a/0a/shuffle_1_1_0.data /tmp/spark-local-20140919094731-034a/2b/shuffle_1_0_0.data /tmp/spark-local-20140919094731-034a/0d/shuffle_0_3_0.index
值得指出的是shuffle_0和shuffle_1的执行次序问题,数字越大越先执行,由于spark job提交的时候是从后往前倒推的,故0是最后将执行,而前面的先执行。
Sort-based Shuffle的设计思想
sort-based shuffle的总体指导思想是一个map task最终只生成一个shuffle文件,那么后续的reduce task是如何从这一个shuffle文件中得到自己的partition呢,这个时候就需要引入一个新的文件类型即index文件。
其具体实现步骤如下:
Map Task在读取自己输入的partition之后,将计算结果写入到ExternalSorter
ExternalSorter会使用一个map来存储新的计算结果,新的计算结果根据partiton分类,如果是有combine操作,则需要将新的值与原有的值进行合并
如果ExternalSorter中的map占用的内存已经超越了使用的阀值,则将map中的内容spill到磁盘中,每一次spill产生一个不同的文件
当输入Partition中的所有数据都已经处理完毕之后,这时有可能一部分计算结果在内存中,另一部分计算结果在spill的一到多个文件之中,这时通过merge操作将内存和spill文件中的内容合并整到一个文件里
最后将每一个partition的在data文件中的起始位置和结束位置写入到index文件
相应的源文件
SortShuffleManager.scala SortShuffleWriter.scala ExternalSorter.scala IndexShuffleBlockManager.scala
几个重要的函数
SortShuffleWriter.write
override def write(records: Iterator[_ >: Product2[K, V]]): Unit = { if (dep.mapSideCombine) { if (!dep.aggregator.isDefined) { throw new IllegalStateException("Aggregator is empty for map-side combine") } sorter = new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) sorter.insertAll(records) } else { // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't // care whether the keys get sorted in each partition; that will be done on the reduce side // if the operation being run is sortByKey. sorter = new ExternalSorter[K, V, V]( None, Some(dep.partitioner), None, dep.serializer) sorter.insertAll(records) } val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId) val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId) val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths) mapStatus = new MapStatus(blockManager.blockManagerId, partitionLengths.map(MapOutputTracker.compressSize)) }
ExternalSorter.insertAll
def insertAll(records: Iterator[_ { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) } while (records.hasNext) { elementsRead += 1 kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpill(usingMap = true) } } else { // Stick values into our buffer while (records.hasNext) { elementsRead += 1 val kv = records.next() buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C]) maybeSpill(usingMap = false) } } }
writePartitionedFile将内存中的数据和spill文件中内容一起合并到一个文件当中
def writePartitionedFile( blockId: BlockId, context: TaskContext, outputFile: File): Array[Long] = { // Track location of each range in the output file val lengths = new Array[Long](numPartitions) if (bypassMergeSort && partitionWriters != null) { // We decided to write separate files for each partition, so just concatenate them. To keep // this simple we spill out the current in-memory collection so that everything is in files. spillToPartitionFiles(if (aggregator.isDefined) map else buffer) partitionWriters.foreach(_.commitAndClose()) var out: FileOutputStream = null var in: FileInputStream = null try { out = new FileOutputStream(outputFile) for (i <- 0 until numPartitions) { in = new FileInputStream(partitionWriters(i).fileSegment().file) val size = org.apache.spark.util.Utils.copyStream(in, out, false) in.close() in = null lengths(i) = size } } finally { if (out != null) { out.close() } if (in != null) { in.close() } } } else { // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by // partition and just write everything directly. for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter( blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem
而数据读取过程中则需要使用IndexShuffleBlockManager来获取Partiton的具体位置
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(new FileInputStream(indexFile)) try { in.skip(blockId.reduceId * 8) val offset = in.readLong() val nextOffset = in.readLong() new FileSegmentManagedBuffer( getDataFile(blockId.shuffleId, blockId.mapId), offset, nextOffset - offset) } finally { in.close() } }
发表评论
-
翻译一下spark sql and dataframes
2016-03-23 15:20 1575概述 spark sql 是一个结构化执行的数据模块,它并不像 ... -
spark 中GC的调优
2016-03-14 11:02 1306注:本文转自:http://www.csdn.net/arti ... -
spark Tungsten-将硬件性能彻底压榨
2016-03-08 11:06 984Tungsten项目将是Spark自诞生以来内核级别的最大改动 ... -
关于Spark的Broadcast解析
2016-02-20 08:37 4490本文重点关注 数据块切分方法以及P2P下载数据方法 Broad ... -
spark的几个重要概念
2015-12-04 14:09 0本节主要记录以下几个概念 一:RDD的五大特点 二:RDD 窄 ... -
spark部署安装调试
2015-12-02 11:28 708本节记录spark下载-->编译-->安装--&g ... -
spark基本概念
2015-11-12 10:45 745记录一下课堂笔记: ... -
hadoop计算能力调度器配置
2015-10-29 10:39 971问题出现 hadoop默认调度器是FIFO,其原理就是先按照作 ... -
HBase在各大应用中的优化和改进
2015-10-28 14:59 642Facebook之前曾经透露过Facebook的hbase架构 ... -
一篇很好的解决系统问题过程描述文章
2015-09-23 08:40 462在网上看到的一篇解决h ... -
通过GeoHash核心原理来分析hbase rowkey设计
2015-09-08 15:49 3479注:本文是结合hbase ... -
从OpenTsdb来分析rowkey设计
2015-09-06 16:04 4907讨论此问题前,先理解 ... -
HBase中asynchbase的使用方式
2015-08-25 10:32 8116Hbase的原生java 客户端是完全同步的,当你使用原生AP ... -
Mapreduce优化的点滴
2015-07-16 15:18 796注:转载 1. 使用自定义Writable 自带的Text ... -
hadoop 如何自定义类型
2015-07-15 09:37 1212记录一下hadoop 数据类型章节的笔记,以便后期使用,本文是 ... -
napreduce shuffle 过程记录
2015-07-10 11:23 728在我看来 hadoop的核心是mapre ... -
ZooKeeper伪分布式集群安装及使用
2015-02-13 08:29 8841. zookeeper介绍 ZooKeeper是一个为分 ... -
hadoop-mahout 核心算法总结
2015-02-07 10:08 1507其实大家都知道hadoop为我们提供了一个大的框架,真正的 ... -
推荐引擎内部原理--mahout
2015-01-22 11:11 543转载自:https://www.ibm.com/devel ... -
hadoop 动态添加删除节点
2015-01-20 13:39 639转自:http://www.cnblogs.com/rill ...
相关推荐
通过merge-sort算法的实现,掌握外存算法所基于的I/O模型与内存算法基于的RAM模型的区别;理解不同的磁盘访问优化方法是如何提高数据访问性能的。
deep_sort_yolov3-毕业设计_毕设课题_行人轨迹追踪_python_AI.rardeep_sort_yolov3-毕业设计_毕设课题_行人轨迹追踪_python_AI.rardeep_sort_yolov3-毕业设计_毕设课题_行人轨迹追踪_python_AI.rardeep_sort_yolov3-...
前端开源库-import-sort-parser-babylon导入排序分析器巴比伦,一个基于javascript分析器巴比伦的导入排序分析器。
sort-deepsort-yolov3-ROS-master.zip
前端开源库-eslint-plugin-sort-imports-es6-autofixeslint-plugin-sort-imports-es6-autofix,一个排序导入规则,可以正确区分es6导入类型。
21.[开源][安卓][拖拽]drag-sort-listview-master DragSortListView(DSLV)是Android ListView的一个扩展,支持拖拽排序和左右滑动删除功能。重写了TouchInterceptor(TI)类来提供更加优美的拖拽动画效果。 DSLV...
在对传统的Sort-Merge-Join算法进一步研究的基础上,提出了一种改进的闪存数据库Sort-Merge-Join算法。该算法只对小关系进行外...通过理论分析和与传统Sort-Merge-Join算法在闪存上的比较实验,证明了该算法的优越性。
DeepSORT-YOLOv5猫狗检测和跟踪,含有yolov5猫狗检测权重,能可视化目标运动轨迹
drag sort listview android
deepsort-yolov3-车辆行人-跟踪结果
drag-sort-listview是一个支持拖拽排序和左右滑动删除功能的自定义ListView.rar,太多无法一一验证是否可用,程序如果跑不起来需要自调,部分代码功能进行参考学习。
opencv拉流yolov5推理-deepsort追踪-rtmp ffmpeg推流 人车流统计计数
ShellSort-ShellSort
eslint-plugin-sort-destruct-keys 要求对对象分解键进行排序 安装 您首先需要安装 : $ npm i eslint --save-dev 接下来,安装eslint-plugin-sort-destructure-keys : $ npm install eslint-plugin-sort-...
该项目旨在利用先进的YOLOv8目标检测算法和DeepSORT多目标跟踪算法,实现智能车辆对道路目标的准确检测与跟踪,为智能交通、自动驾驶等领域提供技术支持。 该项目以YOLOv8算法为核心,该算法以其出色的实时性和准确...
deepsort-v1.2
sort sort_使用C++实现的排序算法之BucketSort
npm install --save-dev @trivago/prettier-plugin-sort-imports 或者,使用纱线 yarn add --dev @trivago/prettier-plugin-sort-imports 用法 在更漂亮的配置文件中添加订单。 module.exports = { "printWidth":...
$ npm install eslint-plugin-sort-keys-shorthand --save-dev 配置 将“ sort-keys-shorthand”添加到插件部分。 { " plugins " : [ " sort-keys-shorthand " ] } 停用原始的sort-keys规则。 { " rules " : { ...
为此,提出基于延期着色的sort-first绘制集群架构,该系统可以实时绘制具有复杂光照效果的多边形场景,并可利用延期着色的负载可预测特性,实现绘制节点间的负载平衡。同时,提出了基于Equalizer改造现有图形绘制...