coGroup
package spark.examples.rddapi import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ object CoGroupTest_05 { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05") val sc = new SparkContext(conf); val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (11, "F"), (81, "Y"), (77, "Z"), (31, "X")), 3) val z2 = sc.parallelize(List((4, "ABC"), (6, "B2"), (7, "Z2"), (7, "Z3"), (91, "E"), (11, "FF"), (88, "N"), (77, "S"), (36, "M")), 4) //隐式函数,定义于PairRDDFunctions //结果由两个(至多四个)RDD的Key组成,(Key,(ValuesOfRDD1Seq, ValuesOfRDD2Seq, ValuesOfRDD3Seq)) //cogroup [W]( other : RDD [(K, W)]): RDD [(K, (Seq [V], Seq [W]))] //cogroup [W1 , W2 ]( other1 : RDD [(K, W1)], other2 : RDD [(K, W2)]): RDD [(K , (Seq[V], Seq[W1], Seq[W2 ]))] val r = z1.cogroup(z2) r.collect.foreach(println) /*Result:, (4,(CompactBuffer(),CompactBuffer(ABC))) (36,(CompactBuffer(),CompactBuffer(M))) (88,(CompactBuffer(),CompactBuffer(N))) (81,(CompactBuffer(Y),CompactBuffer())) (77,(CompactBuffer(Z),CompactBuffer(S))) (9,(CompactBuffer(E),CompactBuffer())) (6,(CompactBuffer(B1),CompactBuffer(B2))) (11,(CompactBuffer(F),CompactBuffer(FF))) (3,(CompactBuffer(A),CompactBuffer())) (7,(CompactBuffer(Z1),CompactBuffer(Z2, Z3))) (91,(CompactBuffer(),CompactBuffer(E))) (31,(CompactBuffer(X),CompactBuffer())) */ } }
groupBy
package spark.examples.rddapi import org.apache.spark.{Partitioner, SparkContext, SparkConf} object GroupByTest_06 { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05") val sc = new SparkContext(conf); val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3) /** * Return an RDD of grouped items. Each group consists of a key and a sequence of elements * mapping to that key. The ordering of elements within each group is not guaranteed, and * may even differ each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. */ // def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = groupBy[K](f, defaultPartitioner(this)) //根据指定的函数进行分组,分组得到的集合的元素类型是(K,V),K是分组函数的返回值,V是组内元素列表 val r = z1.groupBy(x => if (x._1 % 2 == 0) "even" else "odd") r.collect().foreach(println) //结果: /* (even,CompactBuffer((6,B1))) (odd,CompactBuffer((3,A), (7,Z1), (9,E), (7,F), (9,Y), (77,Z), (31,X))) */ //Partitioner是HashPartitioner val r2 = z1.groupBy(_._1 % 2) r2.collect().foreach(println) //结果: /* (0,CompactBuffer((6,B1))) (1,CompactBuffer((3,A), (7,Z1), (9,E), (7,F), (9,Y), (77,Z), (31,X))) */ class MyPartitioner extends Partitioner { override def numPartitions = 3 def getPartition(key: Any): Int = { key match { case null => 0 case key: Int => key % numPartitions case _ => key.hashCode % numPartitions } } override def equals(other: Any): Boolean = { other match { case h: MyPartitioner => true case _ => false } } } println("=======================GroupBy with Partitioner====================") //分组的同时进行分区;分区的key是分组函数的计算结果? val r3 = z1.groupBy((x:(Int, String)) => x._1, new MyPartitioner()) r3.collect().foreach(println) /* //6,3,9一个分区,7,31一个分区,77一个分区 (6,CompactBuffer((6,B1))) (3,CompactBuffer((3,A))) (9,CompactBuffer((9,E), (9,Y))) (7,CompactBuffer((7,Z1), (7,F))) (31,CompactBuffer((31,X))) (77,CompactBuffer((77,Z))) */ } }
collect
package spark.examples.rddapi import org.apache.spark.{SparkContext, SparkConf} object CollectTest_07 { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05") val sc = new SparkContext(conf); val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3) /** * Return an array that contains all of the elements in this RDD. */ //这是一个行动算子 z1.collect().foreach(println) /** * Return an RDD that contains all matching values by applying `f`. */ // def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = { // filter(f.isDefinedAt).map(f) // } // val f = { // case x: (Int, String) => x // } // val z2 = z1.collect(f) // println(z2) } }
RDD有个toArray方法,已经不推荐使用了,推荐使用collect方法
相关推荐
内容根据spark rdd.scala和ParRDDFunctions.scala源码中rdd顺序整理,包含rdd功能解释。对熟悉spark rdd很有用
本文详细的描述了spark rdd的api 这些api 应该够我们日常生产使用了
包括spara rdd api,dataframe action操作、查询操作、join操作,dataframe rdd dataset 相互转换以及spark sql。
在Java中,函数需要作为实现了Spark的org.apache,spark.api.java.function包中的任一函数接口的对象传递。 函数名 实现的方法 用途 Function, R> R call(T) 接收一个输入值并返回一个输出值,用于类似map() 和filter...
本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中。 如何创建RDD? RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。 举例:从普通数组创建RDD,里面包含了1到9
spark API RDD pdf版的..........对初学者应该有所帮助
【资源说明】 1、该资源包括项目的全部源码,下载可以直接使用! 2、本项目适合作为计算机、数学、电子信息...电影评分数据汇总,(使用spark2.4+scala完成, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip
Spark RDD 练习作业(选择部分数据(可以是自拟,可以是采集的,也可以是现有的),进行多角度数据统计及分析,并进行数据整合及展示(尽量多的运用 Spark RDD API)).zip Spark RDD 练习作业(选择部分数据(可以...
【资源说明】 1、该资源内项目代码都是经过测试运行成功,功能正常的情况下才上传的,请放心下载使用。...电影评分数据汇总,(使用spark2.4+scala完成, 分析采用spark RDD的API. 数据集采用标准电影评分数据).zip
Clustering - RDD-based API(聚类 - 基于RDD的API) Dimensionality Reduction - RDD-based API(降维) Feature Extraction and Transformation - RDD-based API(特征的提取和转换) Frequent Pattern Mining...
• Master Spark Core RDD API programming techniques • Extend, accelerate, and optimize Spark routines with advanced API platform constructs, including shared variables, RDD storage, and partitioning ...
2、适用人群:主要针对计算机相关专业(如计科、信息安全、数据科学与大数据技术、人工智能、通信、物联网、数学、电子信息等)的同学或企业员工下载使用,具有较高的学习借鉴价值。 3、不仅适合小白学习实战练习,也...
Databrciks工程师,Spark Committer,Spark SQL...Spark DataFrame vs.RDD,有些类似于动态语言和静态语言的区别,在很多场景下,DataFrame优势比较明显。1.3版中,Spark进一步完善了外部数据源API,并可智能进行优化。
6、大量全网唯一的知识点:基于排序的wordcount,Spark二次排序,Spark分组取topn,DataFrame与RDD的两种转换方式,Spark SQL的内置函数、开窗函数、UDF、UDAF,Spark Streaming的Kafka Direct API、...
第2章 Spark2.2技术及原理... 14 2.1 Spark 2.2综述... 14 2.1.1 连续应用程序... 14 2.1.2 新的API 15 2.2 Spark 2.2 Core. 16 2.2.1 第二代Tungsten引擎... 16 2.2.2 SparkSession. 16 2.2.3 累加器API 17 ...
Apache Spark 2.x for Java Developers by Sourav Gulati English | 26 July 2017 | ISBN: 1787126498 | ASIN: B01LY3N7ZO | 350 Pages | AZW3 | 4.48 MB Key Features Perform big data processing with Spark—...
第4章 Spark RDD与编程API实战 第5章 Spark运行模式深入解析 第6章 Spark内核解析 第7章 GraphX大规模图计算与图挖掘实战 第8章 Spark SQL原理与实战 第9章 Machine Learning on Spark 第10章 Tachyon文件系统 第11...
Spark 的交互式脚本是一种学习 API 的简单途径,也是分析数据集交互的有力工具。Spark 包含多种运行模式,可使用单机模式,也可以使用分布式模式。为简单起见,本节采用单机模式运行 Spark。 无论采用哪种模式,只要...
玩火花rdd Apache Spark RDD示例示例,用于学习Spark RDD和DataSet API。