`

spark-学习笔记--31 spark性能优化

 
阅读更多

spark性能优化

 

主要是对内存使用的优化 

 

诊断内存消耗:

 

      java  object header 增大了内存消耗 

 

      常见的collection 类 增大了内存消耗

 

 

--------------------------------------------

 

 

序列化: 默认使用Java的 序列化

               同时支持 kryo 序列化:

 

 

java  使用 kryo:

 

SparkConf conf = new SparkConf().setAppName(appName);

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

conf.registerKryoClasses(new Class[]{YOUR_SPARK_CLASS.class}); 

 

 

 

scala 使用 kryo:

 

val conf = new SparkConf.setMaster(...).setAppName(...)

conf.registerKryoClasses(Array(classOf[Counter]))

val sc = new SparkContext(conf)

 

 

 

 

--------------------------------------------

 

持久化 和  checkpoint

 

 

对于 需要重新计算的RDD   可以持久化  checkpoint

 

 

--------------------------------------------

 

序列化的持久化级别

 

MEMORY_ONLY_SER 、MEMORY_AND_DISK_SER 等

 

这样的话  将数据序列化之后 再持久化  可以大大减小对内存的消耗 

如果要写入磁盘的话  也会减小磁盘IO

但是多了一个 序列化反序列化处理过程 增加了cpu消耗

 

 

--------------------------------------------

 

jvm 垃圾回收

 

GC会消耗性能  

GC线程运行会导致 工作线程暂停

应尽量减少GC

 

有可能因为内存空间不足 ,task创建的对象过大,一旦发现40%的内存不够用了 就会触发Java虚拟机垃圾回收。

极端情况下垃圾回收可能被频繁触发。

 

 

 

 

 

spark1.5下:     

  spark.storage.memoryFraction  来设置这个比例

  

 GC的一个重要配置参数是应该用于缓存RDD的内存量。

 默认情况下,Spark使用60%的已配置执行程序内存(spark.executor.memory)来缓存RDD。

 这意味着40%的内存可用于任务执行期间创建的任何对象。

 

 如果您的任务变慢并且您发现JVM经常进行垃圾收集或内存不足,则降低此值将有助于减少内存消耗。

 要将此更改为50%,您可以调用 conf.set("spark.storage.memoryFraction", "0.5")

 

 

 

 

----------------------

 

 

 

 

 

spark 2.4.0 下: 

 

存储和 执行 共享有一块共享的内存M

当一方的内存不够用时可以驱逐对方占领M

但是压缩存储 不能是存储低于R

 

这种设计可以自动调整内存的使用。

spark.memory.fraction  : M/jvm堆    默认0.6 是存储  0.4 是执行

spark.memory.storagefraction : R/M  

 

 

----------------------

 

 

高级GC调整

 

为了进一步调整垃圾收集,我们首先需要了解JVM中有关内存管理的一些基本信息:

 

Java堆空间分为Young和Old两个区域。Young代表意味着持有短命的物体,而老一代则用于生命周期较长的物体。

 

Young代进一步分为三个区域[Eden,Survivor1,Survivor2]。

 

垃圾收集过程的简化描述:当Eden已满时,在Eden上运行次要GC,并将从Eden和Survivor1中存活的对象复制到Survivor2。幸存者地区被交换。

如果对象足够大或Survivor2已满,则将其移至Old。最后,当Old接近满时,将调用完整的GC。

 

 

如果确定Eden的大小E,则可以使用该选项设置Young代的大小-Xmn=4/3*E

 

 

--------------------------------------------

并行度:

 

 

 

2、设置config属性spark.default.parallelism以更改默认值。通常,我们建议群集中每个CPU核心有2-3个任务

 

spark.default.parallelism 设置为 5 的含义:每个RDD的数据都会被拆为5份

 

3、如果读取的数据在HDFS上,增加block数,默认情况下split与block是一对一的,而split又与RDD中的partition对应,所以增加了block数,也就提高了并行度。

 

4、RDD.repartition,给RDD重新设置partition的数量 [repartitions 或者 coalesce]

 

5、reduceByKey的算子指定partition的数量 

val rdd2 = rdd1.reduceByKey(_+_ ,10) val rdd3 = rdd2.map.filter.reduceByKey(_+_)

 

6、val rdd3 = rdd1.join(rdd2) rdd3里面partiiton的数量是由父RDD中最多的partition数量来决定,因此使用join算子的时候,增加父RDD中partition的数量。

 

7、spark.sql.shuffle.partitions //spark sql中shuffle过程中partitions的数量

 

 

 

 

--------------------------------------------

 

 

广播共享数据:

 

减少重复数据占用内存

 

val sc = new SparkContext(conf)

val list = List("hello xasxt")

val broadCast = sc.broadcast(list)

 

 

 

 

 

--------------------------------------------

 

 

reduceByKey 和  groupByKey 

 

reduceByKey会先进行本地聚合

groupByKey则不会 

 

 

reduceByKey比groupByKey效率高

 



 

 

 



 

 

--------------------------------------------

 

 

 

 

  • 大小: 9 KB
  • 大小: 5.3 KB
  • 大小: 84.2 KB
  • 大小: 76.2 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics