接着上篇文章,本篇来看下如何在scala中完成使用spark sql将不同日期的数据导入不同的es索引里面。
首下看下用到的依赖包有哪些:
elasticsearch-spark-20_2.11 5.3.2
elasticsearch 2.3.4
spark-sql_2.11 2.1.0
spark-hive_2.11 2.1.0
spark-core_2.11 2.1.0
hadoop-client 2.7.3
scala-library 2.11.8
下面看相关的代码,代码可直接在跑在win上的idea中,使用的是local模式,数据是模拟造的:
import org.apache.spark.sql.types.{DataTypes, StructField}
import org.apache.spark.sql.{Row, SparkSession}//导入Row对象
/**
* spark sql to es 本地测试例子
*/
object SparkGroupES {
def main(args: Array[String]): Unit = {
//构建spark session
val spark = SparkSession
.builder().master("local[1]")
.appName("Spark SQL basic example")
.config("es.nodes","192.168.10.125").config("es.port","9200")
.getOrCreate()
//导入es-spark的包
import org.elasticsearch.spark.sql._
import spark.implicits._
//使用Seq造数据,四列数据
val df = spark.sparkContext.parallelize(Seq(
(0,"p1",30.9,"2017-03-04"),
(0,"u",22.1,"2017-03-05"),
(1,"r",19.6,"2017-03-04"),
(2,"cat40",20.7,"2017-03-05"),
(3,"cat187",27.9,"2017-03-04"),
(4,"cat183",11.3,"2017-03-06"),
(5,"cat8",35.6,"2017-03-08"))
).toDF("id", "name", "price","dt")//转化df的四列数据s
//创建表明为pro
df.createTempView("pro")
import spark.sql //导入sql函数
//按照id分组,统计每组数量,统计每组里面最小的价格,然后收集每组里面的数据
val ds=sql("select dt, count(*) as c ,collect_list(struct(id,name, price)) as res from pro group by dt ")
//需要多次查询的数据,可以缓存起来
ds.cache()
//获取查询的结果,遍历获取结果集
ds.select("dt","c","res").collect().foreach(line=>{
val dt=line.getAs[String]("dt") //获取日期
val count=line.getAs[Long]("c")//获取数量
val value=line.getAs[Seq[Row]]("res")//获取每组内的数据集合,注意是一个Row实体
println("日期:"+dt+" 销售数量: "+count)
//创建一个schema针对struct结构
val schema = DataTypes
.createStructType( Array[StructField](
DataTypes.createStructField("id", DataTypes.IntegerType, false), //不允许为null
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("price", DataTypes.DoubleType, true)
))
//将value转化成rdd
val rdd=spark.sparkContext.makeRDD(value)
//将rdd注册成DataFrame
val df =spark.createDataFrame(rdd,schema)
//保存每一个分组的数据到es索引里面
EsSparkSQL.saveToEs(df,"spark"+dt+"/spark",Map("es.mapping.id" -> "id"))
// value.foreach(row=>{//遍历组内数据集合,然后打印
// println(row.getAs[String]("name")+" "+row.getAs[Double]("price"))
// })
})
println("索引成功")
spark.stop()
}
}
分析下,代码执行过程:
(1)首先创建了一个SparkSession对象,注意这是新版本的写法,然后加入了es相关配置
(2)导入了隐式转化的es相关的包
(3)通过Seq+Tuple创建了一个DataFrame对象,并注册成一个表
(4)导入spark sql后,执行了一个sql分组查询
(5)获取每一组的数据
(6)处理组内的Struct结构
(7)将组内的Seq[Row]转换为rdd,最终转化为df
(8)执行导入es的方法,按天插入不同的索引里面
(9)结束
需要注意的是必须在执行collect方法后,才能在循环内使用sparkContext,否则会报错的,在服务端是不能使用sparkContext的,只有在Driver端才可以。
分享到:
相关推荐
这是一个基于Scala语言开发的Spark RDD、Spark SQL、Spark Streaming相关Demo,包含35个文件。主要文件类型包括29个Scala源文件、2个Markdown文档、1个Reduced文件、1个XML文件、1个Java源文件和1个TXT文件。该项目...
包中构建了Java以及Scala混合框架的maven打包框架以及关于spark core,spark sql 、spark streaming的一些典型案例或者算子使用。
通用load/write方法 手动指定选项 Spark SQL的DataFrame接口支持多种数据源的操作。... Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,...scala> val df = spark.read.load(hdfs://hadoop001:9000/nam
scala sdk-2.12 与spark版本冲突,spark-1.6最高支持scala sdk-2.10.x版本
本资源收集了scala与大数据spark的基础的学习笔记,有兴趣的同学可以下载学习
spark-recommender, 在 Scala 中使用 Apache Spark 框架,编写了可以扩展的推荐系统 火花推荐采用 Scala 框架编写的可以扩展推荐系统,使用了 Apache Spark 框架。实现的算法包括:k-最近邻居带聚类的k-近邻带簇树的...
Scala和Spark大数据分析函数式编程、数据流和机器学习
本资源提供了一套基于Scala的Apache Spark相关RDD、SQL、Streaming Demos的设计源码,包含35个文件,其中包括29个Scala源代码文件,2个Markdown文档,1个Reduced文件,1个XML配置文件,1个Java源代码文件,以及1个...
scala和spark的安装和配置,以及启动spark,分发节点。
基于Spark1.6,使用Spark SQL框架和sqlite数据库,把唐诗三百首,宋诗三百首和元明清诗精选导入数据库,可以按来源,体裁及作者,方便地查出诗句或试题中包含某个关键字的作品,还可以在选中作品后,进行五绝,七绝...
2:兼容java,在scala中可以直接调用java方法。 2:函数式编程,柯里化函数,匿名函数,高阶函数等。 3:代码行简单。 4:支持并发控制,Actor Model机制 5:目前比较流行的kafka,spark均由scala开发。
sparknotebook, 在ipython中,使用 Scala 运行 Apache Spark的例子 sparknotebook重要我正处于删除 IScala的过程中,因为它的开发出现停滞。 我正在用 jupyter 替换它。 ,jupyter-scala还没有为 Scala 2.10构建。 ...
采用ARIMA模型(自回归积分滑动平均模型)+三次指数平滑法(Holt-Winters),用scala语言实现的在spark平台运行的分布式时间序列预测算法
Scala and Spark for Big Data Analytics by Md. Rezaul Karim English | 25 July 2017 | ISBN: 1785280848 | ASIN: B072J4L8FQ | 898 Pages | AZW3 | 20.56 MB Harness the power of Scala to program Spark and ...
scala与spark文档合集,有好多本,包括快学Scala,scala与spark文档合集
Scala中集合的使用 大学生 1. List 列表的使用 2. Set 集合的使用 3.Map 映射的使用 4. 元组的使用
原始用的jetty做的http接口,最近有时间,研究了下spring boot + scala + spark做大数据计算
本项目基于Scala开发,包含148个文件,包括Scala源代码、CRC校验文件、TXT文本文件、以及多个...系统实现了基于Scala的Spark_Core、Spark_SQL和Spark_Streaming功能,界面友好,功能完善,适合用于大数据处理和分析。
包含jdk-8u211-linux-x64.tar、scala-2.11.4.tgz、spark-1.6.0-bin-hadoop2.6.tgz,下载地址为百度云
这是一个基于Scala的Apache Spark大数据处理设计,使用Scala、Java、Python、Shell、JavaScript、CSS、HTML、Ruby和C语言开发,包含14108个文件。主要文件类型包括3966个Scala文件、1559个Q文件、1004个TXT文件、961...