spark中有几个算子比较重要,开发中不是很常用,但很多算子的底层都是依靠这几个算子实现的,比如CombineByKey,像reduceByKey底层是combineByKey实现的。
首先介绍combineByKey
这个算子 主要需要三个参数,第一个是对每个分区中每个key的第一个值 进行初始化,也就是每个分区内,有多少个key就会执行多少次这个初始化
object CombineByKeyTest01 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf
conf.setMaster("local").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(("a","2"),("a","3"),("a","4"),("b","3"),("b","4")),2)
rdd1.foreachPartition(f =>{
while(f.hasNext){
print(f.next())
}
})
val rdd2=rdd1.combineByKey(x=>{println("fff=="+x);(x.toInt*2).toString()},(x1:String,x2:String)=>(x1.toInt+x2.toInt).toString(),(s1:String,s2:String)=>(s1.toInt+s2.toInt).toString())
rdd2.foreachPartition(f=>{
while(f.hasNext){
print(f.next())
}
})
sc.stop()
}
第一个分区为(a,2)(a,3),第二个分区为(a,4)(b,3)(b,4)
对于第一个分区 ,首先执行第一步,即(a,4),然后聚合,得出(a,7)
对于第二个分区,首先执行第一步,(a,8),(b,6),(b,4)然后聚合得出(a,8),(b,10)
最后,对所有分区进行聚合,调用第三个参数对于的函数,得出(a,15),(b,10)
接下来介绍Aggregate
这个算子主要是三个参数,第一个参数是初始值,第二个参数进行分区内聚合,第三个参数是对每个分区最后的结果进行聚合
object AggregateTest01 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf
conf.setMaster("local").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List("1","2","3","4","5","6","7"),2)
rdd1.foreachPartition(f =>{
while(f.hasNext){
print(f.next())
}
})
//aggregate操作
val result=rdd1.aggregate("3")(seqOphyj,combOphyj)
println(result)
sc.stop()
}
//每个分区中 每个元素 乘以2 然后相加 进行分区内聚合操作
def seqOphyj(s1:String,s2:String):String={
val ss1=(2*s1.toInt+2*s2.toInt).toString()
ss1
}
//每个分区的结果 与初始值 进行相加操作
def combOphyj(s1:String,s2:String):String={
val ss1=(s1.toInt+s2.toInt).toString()
ss1
}
}
第一个分区为1,2,3 第二个分区为4,5,6,7
对于第一个分区,首先进行第二个函数 即3*2+1*2=8 8*2+2*2=20 20*2+3*2=46
对于第二个分区,首先进行第二个函数, 3*2+4*2=14 14*2+5*2=38 38*2+6*2=88 88*2+7*2=190
然后进行调用第三个函数 3+46+190=239
接下来是AggregateByKey
这个算子基本和Aggregate类似,是对相同key的value进行聚合,但还是有区别的,区别在下面说。
object AggregateByKeyTest01 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf
conf.setMaster("local").setAppName("test")
val sc=new SparkContext(conf)
val rdd1=sc.parallelize(List(("1","2"),("1","3"),("1","4"),("2","3"),("2","4")),2)
rdd1.foreachPartition(f =>{
while(f.hasNext){
print(f.next())
}
})
val rdd2=rdd1.aggregateByKey("2")(seqOphyj, combOphyj)
rdd2.foreachPartition(f =>{
while(f.hasNext){
print(f.next())
}
})
sc.stop()
}
//每个分区中 每个元素 乘以2 然后相加 进行分区内聚合操作
def seqOphyj(s1:String,s2:String):String={
val ss1=(2*s1.toInt+2*s2.toInt).toString()
println("seq="+s1+"&&"+s2+"====="+ss1)
ss1
}
//每个分区的结果 与初始值 进行相加操作
def combOphyj(s1:String,s2:String):String={
val ss1=(s1.toInt+s2.toInt).toString()
println("com="+s1+"&&"+s2+"====="+ss1)
ss1
}
}
第一个分区为(1,2)(1,3),第二个分区为(1,4)(2,3)(2,4)
首先计算第一个分区。对于key1,value为(2*2+2*2)*2+3*2=22
再计算第二个分区,对于key1,value为2*2+4*2=12,对于key2,value为(2*2+3*2)*2+4*2=28
计算第三个方法,为(1,34)(2,28),它与Aggregate的不同点在于第三个函数执行的时候。默认值不参与运算,而aggregate是参与的。
分享到:
相关推荐
1、25个经典Spark算子的JAVA实现。2、含有详细的注释。3、全部通过junit测试。
Spark RDD 算子说明,分别讲述了Transformation和Action这两类的算子。
spark transformation & action 算子速查表,大数据实时和离线数据处理方向,希望对大家学习和工作有所帮助。
基本算子
深度解析Spark五大大算子深度解析个人原创 内容详实 业界最全`spark`内置了非常多有用的算子,通过对这些算子的组合就可以完成业务需要的功能。 `spark`的编程归根结底就是对`spark`算子的使用,因此非常有必要熟练...
结合代码详细描述RDD算子的执行流程,并配上执行流程图
Spark T ransformation和Action算子速查表,某吧最近大力宣传的pdf学习速查表,
关于spark较为简单的算子讲义 和相对的用法,基于scala语言
主要给大家介绍了关于java-spark中各种常用算子的写法的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
Spark对于大数据行业的实时处理数据来说,有着举足轻重的位置,特此学习整理了RDD 算子的各个含义,希望各位读者能够喜欢。谢谢
spark各算子实例,eclipse版maven项目。支持原创,有何问题联系作者
25个经典Spark算子的JAVA实现(已通过Junit测试)
Spark_Transformation和Action算子.md
本文档简明扼要,通俗易懂的列举了SPARK所有的算子的使用方法
spark常用算子
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.... * 第一个参数:boolean类型,表示产生的样本是否可以重复:false不重复,也就是不放回的取;t
Spark常用算子总结,map、mapPartitions、mapPartitionsWithIndex,flatMap,filter ......
Spark算子是Spark处理大规模数据的核心接口之一,它提供了丰富的操作方式和函数,支持多种转换和操作,帮助用户更高效地处理海量数据,并实现各种数据分析、挖掘和应用场景
适合新手练习,大数据的核心内容Spark Core算子操作。
第1章 Spark简介 1.1 Spark是什么 1.2 Spark生态系统BDAS 1.3 Spark架构 ...3.3 Spark算子分类及功能 33.3.1 Value型Transformation算子 3.3.2 Key-Value型Transformation算子 3.3.3 Actions算子 3.4 本章小结