`
qindongliang1922
  • 浏览: 2147168 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:116320
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:124587
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:58453
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:70348
社区版块
存档分类
最新评论

Spark中foreachPartition和mapPartitions的区别

阅读更多


spark的运算操作有两种类型:分别是Transformation和Action,区别如下:




Transformation:代表的是转化操作就是我们的计算流程,返回是RDD[T],可以是一个链式的转化,并且是延迟触发的。

Action:代表是一个具体的行为,返回的值非RDD类型,可以一个object,或者是一个数值,也可以为Unit代表无返回值,并且action会立即触发job的执行。




Transformation的官方文档方法集合如下:

````
map
filter
flatMap
mapPartitions
mapPartitionsWithIndex
sample
union
intersection
distinct
groupByKey
reduceByKey
aggregateByKey
sortByKey
join
cogroup
cartesian
pipe
coalesce
repartition
repartitionAndSortWithinPartitions

````



Action的官方文档方法集合如下:
````
reduce
collect
count
first
take
takeSample
takeOrdered
saveAsTextFile
saveAsSequenceFile
saveAsObjectFile
countByKey
foreach

````


结合日常开发比如常用的count,collect,saveAsTextFile他们都是属于action类型,结果值要么是空,要么是一个数值,或者是object对象。其他的如map,filter返回值都是RDD类型的,所以简单的区分两个不同之处,就可以用返回值是不是RDD[T]类型来辨别。


接着回到正题,我们说下foreachPartition和mapPartitions的分别,细心的朋友可能会发现foreachPartition并没有出现在上面的方法列表中,原因可能是官方文档并只是列举了常用的处理方法,不过这并不影响我们的使用,首先我们按照上面的区分原则来看下foreachPartition应该属于那种操作,官网文档的这个方法api如下:
````
public void foreachPartition(scala.Function1<scala.collection.Iterator<T>,scala.runtime.BoxedUnit> f)

Applies a function f to each partition of this RDD.

Parameters:
f - (undocumented)
````


从上面的返回值是空可以看出foreachPartition应该属于action运算操作,而mapPartitions是在Transformation中,所以是转化操作,此外在应用场景上区别是mapPartitions可以获取返回值,继续在返回RDD上做其他的操作,而foreachPartition因为没有返回值并且是action操作,所以使用它一般都是在程序末尾比如说要落地数据到存储系统中如mysql,es,或者hbase中,可以用它。


当然在Transformation中也可以落地数据,但是它必须依赖action操作来触发它,因为Transformation操作是延迟执行的,如果没有任何action方法来触发,那么Transformation操作是不会被执行的,这一点需要注意。



一个foreachPartition例子:

````scala
    val sparkConf=new SparkConf()
     val sc=new SparkContext(sparkConf)
      sparkConf.setAppName("spark demo example ")
    val rdd=sc.parallelize(Seq(1,2,3,4,5),3)
    
    rdd.foreachPartition(partiton=>{
      // partiton.size 不能执行这个方法,否则下面的foreach方法里面会没有数据,
      //因为iterator只能被执行一次
      partiton.foreach(line=>{
        //save(line)  落地数据
      })

    })
    
    sc.stop()
````


一个mapPartitions例子:
````
     val sparkConf=new SparkConf()
     val sc=new SparkContext(sparkConf)
      sparkConf.setAppName("spark demo example ")
    val rdd=sc.parallelize(Seq(1,2,3,4,5),3)

    rdd.mapPartitions(partiton=>{
      //只能用map,不能用foreach,因为foreach没有返回值
      partiton.map(line=>{
        //save line
      }
      )
    })

    rdd.count()//需要action,来触发执行
    sc.stop()
````



最后,需要注意一点,如果操作是iterator类型,我们是不能在循环外打印这个iterator的size,一旦执行size方法,相当于iterato就会被执行,所以后续的foreach你会发现是空值的,切记iterator迭代器只能被执行一次。




参考文档:

http://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/rdd/RDD.html

https://spark.apache.org/docs/2.1.0/rdd-programming-guide.html

有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。
0
0
分享到:
评论

相关推荐

    Spark官方中文文档

    与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。 尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 ...

    Spark中文分词+文本分类.rar

    Scala中文分词+SparkML逻辑回归 实现 中文文本分类

    Flink和Spark比较

    详细介绍了大数据库框架spark和flink的区别

    Spark和TiDB (Spark on TiDB)

    它将Spark和TiDB深度集成,在原有MySQL Workload之外借助Spark支持了更多样的用户场景和API。这个项目在SparkSQL和Catalyst引擎之外实现了一套扩展的,为TiDB定制的SQL前端(Parser,Planner和优化器):它了解TiDB...

    Spark-2.3.1源码解读

    Spark-2.3.1源码解读。 Spark Core源码阅读 Spark Context 阅读要点 ...Dstream join 操作和 RDD join 操作的区别 PIDController源码赏析及 back pressure 实现思路 Streaming Context重点摘要 checkpoint 必知必会

    实验七:Spark初级编程实践

    1、实验环境: 设备名称 LAPTOP-9KJS8HO6 处理器 Intel(R) Core(TM) i5-10300H CPU @ 2.50GHz 2.50 GHz 机带 RAM 16.0 GB (15.8 GB 可用) ...(2) 在spark-shell中读取HDFS系统文件“/user/hadoop/test.txt”

    Apache Spark 中的列式存储和向量化优化.pdf

    开源大数据存储和优化 Apache Spark 中的列式存储和向量化优化.开源大数据存储和优化 Apache Spark 中的列式存储和向量化优化.开源大数据存储和优化 Apache Spark 中的列式存储和向量化优化.

    Spark 2.0.2 Spark 2.2 中文文档 本资源为网页,不是PDF

    Apache Spark 2.0.2 中文文档 Spark 概述 编程指南 快速入门 Spark 编程指南 概述 Spark 依赖 Spark 的初始化 Shell 的使用 弹性分布式数据集(RDDS) 并行集合 外部数据集 RDD 操作 RDD 持久化 共享...

    Spark全面精讲(基于Spark2版本+含Spark调优+超多案例)【不是王家林版本】

    0基础spark,基于spark2,内容完整全面,学完精通spark

    sparkStreaming实战学习资料

    Spark中的(弹性分布式数据集)简称RDD: Spark中的Transformation操作之Value数据类型的算子: Spark中的Transformation操作之Key-Value数据类型的算子: Spark中的Action操作: Transformation-&gt;map算子: ...

    大数据Spark企业级实战

    《大数据Spark企业级实战》详细解析了企业级...并且结合Spark源码细致的解析了Spark内核和四大子框架,最后在附录中提供了的Spark的开发语言Scala快速入门实战内容,学习完此书即可胜任绝大多数的企业级Spark开发需要。

    Learning spark 中文原文翻译3-8章

    Learning spark 中文原文翻译3-8章 Learning spark 中文原文翻译3-8章 Learning spark 中文原文翻译3-8章

    Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面.zip

    Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...

    Hadoop原理与技术Spark操作实验

    2. 学会在Spark Shell中编写Scala程序; 3. 学会在Spark Shell中运行Scala程序。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)Spark基础知识 (1)输入start-...

    Spark 入门实战系列

    2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战.pdf 3.Spark编程模型(下)--IDEA搭建及实战.pdf 4.Spark运行架构.pdf 5.Hive(上)--Hive介绍及部署.pdf 5.Hive(下)-...

    spark-3.1.2.tgz & spark-3.1.2-bin-hadoop2.7.tgz.rar

    spark-3.1.2.tgz版本 & spark-3.1.2-bin-hadoop2.7.tgz版本

    IK分词器通过spark加载词典,并在spark中使用

    IK分词器通过spark加载词典,并在spark中使用

    Spark 2.0.2 中文文档

    Spark 2.0.2 中文文档 Spark 2.0.2 中文文档 Spark 2.0.2 中文文档

    [中文]Spark快速数据处理

    Spark是一个开源的通用并行分布式计算...如何创建和保存RDD(弹性分布式数据集),如何用Spark分布式处理数据,如何设置Shark,将Hive查询集成到你的Spark作业中来,如何测试Spark作业,以及如何提升Spark任务的性能。

    spark3.0入门到精通

    │ 01-[了解]-Spark发展历史和特点介绍.mp4 │ 03-[掌握]-Spark环境搭建-Standalone集群模式.mp4 │ 06-[理解]-Spark环境搭建-On-Yarn-两种模式.mp4 │ 07-[掌握]-Spark环境搭建-On-Yarn-两种模式演示.mp4 │ ...

Global site tag (gtag.js) - Google Analytics