1.简单关于气温topk小例子。
package jspark;
import java.util.Arrays;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
/**
* @author liyu
*
*/
public class HT {
public static void main(String[] args) {
// TODO Auto-generated method stub
if (args.length < 1) {
System.err.println("Usage: HT <file>");
System.exit(1);
}
for (String string : args) {
System.out.println("============="+string);
}
SparkConf sparkConf = new SparkConf().setAppName("ht1");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile(args[0], 2);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {
return Arrays.asList(s);
}
});
System.out.println("----------------------------------------------");
JavaPairRDD<String, Integer> km = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
int airTemperature = 0;
String year = s.substring(15,19);
if(s.charAt(87) == '+') {
airTemperature = Integer.parseInt(s.substring(88, 92));
} else {
airTemperature = Integer.parseInt(s.substring(87, 92));
}
return new Tuple2<String, Integer>(year, airTemperature);
}
});
//km.saveAsTextFile(args[1]);
/*List<Tuple2<String, Integer>> output = km.collect();
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}*/
System.out.println("reduce--------------------");
/*JavaRDD<Tuple2<Integer, String>> jdd = km.map(new Function<Tuple2<String,Integer>, Tuple2<Integer,String>>() {
public Tuple2<Integer,String> call(Tuple2<String, Integer> tuple) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<Integer, String>(tuple._2, tuple._1);
}
});
jdd.sortBy(new Function<Tuple2<Integer,String>, Tuple2<Integer,String>>() {
@Override
public Tuple2<Integer,String> call(Tuple2<Integer, String> key) throws Exception {
// TODO Auto-generated method stub
return null;
}
} ,true, 3);*/
JavaPairRDD<String, Integer> jrd = km.groupByKey().mapValues(new Function<Iterable<Integer>, Integer>() {
@Override
public Integer call(Iterable<Integer> v) throws Exception {
// TODO Auto-generated method stub
Integer max = 0;
for(Integer x:v){
max = x>max?x:max;
}
return max;
}
});
jrd.sortByKey().saveAsTextFile(args[1]);
List<Tuple2<String, Integer>> output1 = jrd.sortByKey(true).collect();
for (Tuple2<?,?> tuple : output1) {
System.out.println(tuple._1() + ": " + tuple._2());
}
//采用reduceByKey
/*JavaPairRDD<String, Integer> counts = km.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer k1, Integer k2) throws Exception {
// TODO Auto-generated method stub
return k1+k2;
}
});
counts.saveAsTextFile(args[1]);
List<Tuple2<String, Integer>> output1 = counts.collect();
for (Tuple2<?,?> tuple : output1) {
System.out.println(tuple._1() + ": " + tuple._2());
}*/
ctx.stop();
}
}
下面是处理气温的数据。
./bin/spark-submit --master spark://192.168.1.26:7077 --class jspark.HT --name ht1 --executor-memory 400M --driver-memory 512M --jars /opt/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.3.0-hadoop2.4.0.jar /opt/a/jwc.jar "/opt/a/190*" "/opt/spark-1.3.0-bin-hadoop2.4/test/dddd14"
此例子是处理k=1最大值(最小值和最大值的处理方式一样,定义一个变量min = x>min?min:x即可。
2若要求top2或者topk,将返回值的值变成数组,里面的实现可以是各种排序
JavaPairRDD<String,
Integer[]> jrd = km.groupByKey().mapValues(new Function<Iterable<Integer>, Integer[]>() {
int arr[] = new int[2];
@Override
public Integer[] call(Iterable<Integer> v) throws Exception {
// TODO Auto-generated method stub
//冒泡排序
//快速排序
//最小堆处理
return null;
}
});
分享到:
相关推荐
SparkTI (Spark on TiDB)是TiDB基于Apache Spark的独立于原生系统的计算引擎。它将Spark和TiDB深度集成,在原有MySQL Workload之外借助Spark支持了更多样的用户场景和API。这个项目在SparkSQL和Catalyst引擎之外实现...
├─Spark-day01 │ 01-[了解]-Spark发展历史和特点介绍.mp4 │ 03-[掌握]-Spark环境搭建-Standalone集群模式.mp4 │ 06-[理解]-Spark环境搭建-On-Yarn-两种模式.mp4 │ 07-[掌握]-Spark环境搭建-On-Yarn-两种...
Spark 入门实战系列,适合初学者,文档包括十部分内容,质量很好,为了感谢文档作者,也为了帮助更多的人入门,传播作者的心血,特此友情转贴: 1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建....
《大数据Spark企业级实战》详细解析了企业级Spark开发所需的几乎所有技术内容,涵盖Spark的架构设计、Spark的集群搭建、Spark内核的解析、Spark SQL、MLLib、GraphX、Spark Streaming、Tachyon、SparkR、Spark多语言...
An Architecture for Fast and General Data Processing on Large Clusters.pdf Discretized Streams An ...Spark SQL Relational Data Processing in Spark.pdf spark.pdf 大型集群上的快速和通用数据处理架构.pdf
使用命令./bin/spark-shell启动spark 图2启动spark 2. Spark读取文件系统的数据 (1) 在spark-shell中读取Linux系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数; 图3 spark统计行数 (2) 在spark-...
1. 理解Spark编程思想; 2. 学会在Spark Shell中编写Scala程序; 3. 学会在Spark Shell中运行Scala程序。 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 三、实验内容 (一)Spark...
我们谈大数据性能调优,到底在谈什么,它的本质是什么,以及 Spark 在性能调优部份的要点,这两点让在进入性能调优之前都是一个至关重要的问题,它的本质限制了我们调优到底要达到一个什么样的目标或者说我们是从...
本资料是集合20篇知网被引最高的基于spark的大数据论文,包括大数据Spark技术研究_刘峰波、大数据下基于Spark的电商实时推荐系统的设计与实现_岑凯伦、基于Spark的Apriori算法的改进_牛海玲、基于Spark的大数据混合...
本课程论文阐述了spark和spark集成开发环境Intellij IDEA的安装与操作,也详细说明了基于Spark的电影推荐系统的开发流程。推荐引擎是最常用的机器学校应用,我们可以在各大购物网站上看到这方面的应用。基于Spark的...
1.Spark及其生态圈简介.pdf 2.Spark编译与部署(上)--基础环境搭建.pdf 2.Spark编译与部署(下)--Spark编译安装.pdf 2.Spark编译与部署(中)--Hadoop编译安装.pdf 3.Spark编程模型(上)--概念及SparkShell实战....
Apache Spark 2.4 comes packed with a lot of new functionalities and improvements, including the new barrier execution mode, flexible streaming sink, the native AVRO data source, PySpark’s eager ...
Apache Spark 2.0 for Beginners English | ISBN: 1785885006 | 2016 | Key Features This book offers an easy introduction to the Spark framework published on the latest version of Apache Spark 2 Perform ...
使用Spark框架进行网站用户购物分析 目的 1、熟悉Linux系统、MySQL、Spark、HBase、Hive、Sqoop、R、Eclipse、IntelliJ Idea等系统和软件的安装和使用; 2、了解大数据处理的基本流程; 3、熟悉数据预处理方法; 4、...
Spark Doris Connector(apache-doris-spark-connector-2.3_2.11-1.0.1-incubating-src.tar.gz) Spark Doris Connector Version:1.0.1 Spark Version:2.x Scala Version:2.11 Apache Doris是一个现代MPP分析...
Spark SQL的DataFrame接口支持多种数据源的操作。一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表。把DataFrame注册为临时表之后,就可以对该DataFrame执行SQL查询。 Spark SQL的默认数据源为Parquet...
深入理解Sp深入理解SPARK:核心思想与源码分析》结合大量图和示例,对Spark的架构、部署模式和工作模块的设计理念、实现源码与使用技巧进行了深入的剖析与解读。 《深入理解SPARK:核心思想与源码分析》一书对Spark...
在Ubuntu里安装spark,spark-2.1.0-bin-without-hadoop该版本直接下载到本地后解压即可使用。 Apache Spark 是一种用于大数据工作负载的分布式开源处理系统。它使用内存中缓存和优化的查询执行方式,可针对任何规模...
Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql),总结的很全面。 Spark零基础思维导图(内含spark-core ,spark-streaming,spark-sql)。 Spark零基础思维导图(内含spark-core ,spark-streaming,...
大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK...... 大数据笔记,包含Hadoop、Spark、Flink、Hive、Kafka、Flume、ZK.......