最近工作有点忙,所以文章更新频率低了点,希望大家可以谅解,好了,言归正传,下面进入今天的主题:
如何使用scala+spark读写Hbase
软件版本如下:
scala2.11.8
spark2.1.0
hbase1.2.0
公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。
接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时操作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的操作api,势必速度回慢上许多。
关于批量操作Hbase,一般我们都会用MapReduce来操作,这样可以大大加快处理效率,原来也写过MR操作Hbase,过程比较繁琐,最近一直在用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。
整个流程如下:
(1)全量读取hbase表的数据
(2)做一系列的ETL
(3)把全量数据再写回hbase
核心代码如下:
//获取conf
val conf=HBaseConfiguration.create()
//设置读取的表
conf.set(TableInputFormat.INPUT_TABLE,tableName)
//设置写入的表
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
//创建sparkConf
val sparkConf=new SparkConf()
//设置spark的任务名
sparkConf.setAppName("read and write for hbase ")
//创建spark上下文
val sc=new SparkContext(sparkConf)
//为job指定输出格式和输出表名
val newAPIJobConfiguration1 = Job.getInstance(conf)
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)
newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
//全量读取hbase表
val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]
,classOf[ImmutableBytesWritable]
,classOf[Result]
)
//过滤空数据,然后对每一个记录做更新,并转换成写入的格式
val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)
//转换后的结果,再次做过滤
val save_rdd=final_rdd.filter(checkNull)
//最终在写回hbase表
save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)
sc.stop()
从上面的代码可以看出来,使用spark+scala操作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:
第一个:checkNotEmptyKs
作用:过滤掉空列簇的数据
def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={
val r=f._2
val rowkey=Bytes.toString(r.getRow)
val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScala
if(map.isEmpty) false else true
}
第二个:forDatas
作用:读取每一条数据,做update后,在转化成写入操作
def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={
val r=f._2 //获取Result
val put:Put=new Put(r.getRow) //声明put
val ks=Bytes.toBytes("ks") //读取指定列簇
val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala
map.foreach(kv=>{//遍历每一个rowkey下面的指定列簇的每一列的数据做转化
val kid= Bytes.toString(kv._1)//知识点id
var value=Bytes.toString(kv._2)//知识点的value值
value="修改后的value"
put.addColumn(ks,kv._1,Bytes.toBytes(value)) //放入put对象
}
)
if(put.isEmpty) null else (new ImmutableBytesWritable(),put)
}
第三个:checkNull
作用:过滤最终结果里面的null数据
def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={
if(f==null) false else true
}
上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。
除了上面的方式,还有一些开源的框架,也封装了相关的处理逻辑,使得spark操作hbase变得更简洁,有兴趣的朋友可以了解下,github链接如下:
https://github.com/nerdammer/spark-hbase-connector
https://github.com/hortonworks-spark/shc
有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
技术债不能欠,健康债更不能欠, 求道之路,与君同行。
分享到:
相关推荐
大数据 hadoop spark hbase ambari全套视频教程(购买的付费视频)
徐老师大数据培训Hadoop+HBase+ZooKeeper+Spark+Kafka+Scala+Ambari
原始用的jetty做的http接口,最近有时间,研究了下spring boot + scala + spark做大数据计算
最近看了hbase的源码根据源码写了一些scala调动hbase表的API,话不多说直接上代码!Hadoop的版本是2.7.3,scala版本是2.1.1,hbase的版本是1.1.2 如果版本不同可以修改pom的依赖项,但要注意版本冲突。 并且在scala...
word文档在压缩文件里,另有java+hadoop+scala+spark资料。(word文档内容比较粗糙,但能看懂)
win10下搭建Hadoop(jdk+mysql+hadoop+scala+hive+spark),包括jdk的安装、mysql安装和配置,hadoop安装和配置,scala安装和配置,hive安装和配置,spark安装和配置。
基于java+scala+Python+spark实现的图书推荐系统的设计与实现+详细文档+全部资料(高分毕业设计).zip基于java+scala+Python+spark实现的图书推荐系统的设计与实现+详细文档+全部资料(高分毕业设计).zip ...
apache-phoenix-4.13.0-HBase-1.3-bin.tar.gz hadoop-2.7.4.tar.gz hbase-1.3.1-bin.tar.gz jdk-8u144-linux-x64.tar.gz kafka_2.12-1.0.0.tgz scala-2.12.4.tar.gz scala-2.12.4.tgz spark-2.2.0-bin-hadoop2.7.tgz...
spark,scala,spark系列文章测试案例,scala语言学习,spark案例,单词统计,RDD转DataFrame,地址:https://blog.csdn.net/2301_79691134/article/details/134174759
自动化部署,spark分布式集群,全过程只需要输入密码即可,部署spark是分分钟的事。参考文章:http://blog.csdn.net/wangqi880/article/details/52875524
快学Scala 中文版带目录+大数据Spark企业级实战版,只要2分
这是Linux系统的spark和Scala,都是从官网下载下来的,可以配合Java8正常使用。应为有的同学去官网看不明白英语介绍,或者文件太大下不下来,所以我上传到这里。
SpringBoot + SpringData Jpa + Scala + Mysql(java+Scala混编)
搭建eclipse+scala+maven.docx
基于SpringBoot+Mybatis+Vue+Python+Scrapy+Spark+Scala协同过滤算法的新闻推荐系统的设计与实现+详细文档+全部资料(高分项目)数据爬虫使用Python+Scrapy框架,大数据推荐功能使用Spark+Scala实现协同过滤算法,...
DBSCAN集群算法的Scala + Spark实现 编译软件 下载和环境设置 首先在本地克隆存储库 git clone https://github.com/AlecioP/DBSCAN-distributed 然后移至本地存储库 cd DBSCAN-distributed 为了构建可以在EMR集群...
基于Spark+Scala+MongoDB的大数据实战,商品推荐系统设计与实现.zip 1、该资源内项目代码经过严格调试,下载即用确保可以运行! 2、该资源适合计算机相关专业(如计科、人工智能、大数据、数学、电子信息等)正在做...
生成的数据主要是模拟某学习网站学习视频课程的访问量(其中*以“ / class”开头的表示实战课程,然后通过流水线Flume + Kafka + SparkStreaming进行实时日志的收集,HBase来存储数据)*注意事项(使用的软件工具及...
拉斯维加斯:Scala + Spark缺少的MatPlotLib
这是一个简单实用的scala集成mybatis数据库查询代码,简单明了