- 浏览: 2147514 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (682)
- 软件思想 (7)
- Lucene(修真篇) (17)
- Lucene(仙界篇) (20)
- Lucene(神界篇) (11)
- Solr (48)
- Hadoop (77)
- Spark (38)
- Hbase (26)
- Hive (19)
- Pig (25)
- ELK (64)
- Zookeeper (12)
- JAVA (119)
- Linux (59)
- 多线程 (8)
- Nutch (5)
- JAVA EE (21)
- Oracle (7)
- Python (32)
- Xml (5)
- Gson (1)
- Cygwin (1)
- JavaScript (4)
- MySQL (9)
- Lucene/Solr(转) (5)
- 缓存 (2)
- Github/Git (1)
- 开源爬虫 (1)
- Hadoop运维 (7)
- shell命令 (9)
- 生活感悟 (42)
- shell编程 (23)
- Scala (11)
- MongoDB (3)
- docker (2)
- Nodejs (3)
- Neo4j (5)
- storm (3)
- opencv (1)
最新评论
-
qindongliang1922:
粟谷_sugu 写道不太理解“分词字段存储docvalue是没 ...
浅谈Lucene中的DocValues -
粟谷_sugu:
不太理解“分词字段存储docvalue是没有意义的”,这句话, ...
浅谈Lucene中的DocValues -
yin_bp:
高性能elasticsearch ORM开发库使用文档http ...
为什么说Elasticsearch搜索是近实时的? -
hackWang:
请问博主,有用solr做电商的搜索项目?
Solr中Group和Facet的用法 -
章司nana:
遇到的问题同楼上 为什么会返回null
Lucene4.3开发之第八步之渡劫初期(八)
使用Spark构建索引非常简单,因为spark提供了更高级的抽象rdd分布式弹性数据集,相比以前的使用Hadoop的MapReduce来构建大规模索引,Spark具有更灵活的api操作,性能更高,语法更简洁等一系列优点。
先看下,整体的拓扑图:
然后,再来看下,使用scala写的spark程序:
package com.easy.build.index import java.util import org.apache.solr.client.solrj.beans.Field import org.apache.solr.client.solrj.impl.HttpSolrClient import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.annotation.meta.field /** * Created by qindongliang on 2016/1/21. */ //注册model,时间类型可以为字符串,只要后台索引配置为Long即可,注解映射形式如下 case class Record( @(Field@field)("rowkey") rowkey:String, @(Field@field)("title") title:String, @(Field@field)("content") content:String, @(Field@field)("isdel") isdel:String, @(Field@field)("t1") t1:String, @(Field@field)("t2")t2:String, @(Field@field)("t3")t3:String, @(Field@field)("dtime") dtime:String ) /*** * Spark构建索引==>Solr */ object SparkIndex { //solr客户端 val client=new HttpSolrClient("http://192.168.1.188:8984/solr/monitor"); //批提交的条数 val batchCount=10000; def main2(args: Array[String]) { val d1=new Record("row1","title","content","1","01","57","58","3"); val d2=new Record("row2","title","content","1","01","57","58","45"); val d3=new Record("row3","title","content","1","01","57","58",null); client.addBean(d1); client.addBean(d2) client.addBean(d3) client.commit(); println("提交成功!") } /*** * 迭代分区数据(一个迭代器集合),然后进行处理 * @param lines 处理每个分区的数据 */ def indexPartition(lines:scala.Iterator[String] ): Unit ={ //初始化集合,分区迭代开始前,可以初始化一些内容,如数据库连接等 val datas = new util.ArrayList[Record]() //迭代处理每条数据,符合条件会提交数据 lines.foreach(line=>indexLineToModel(line,datas)) //操作分区结束后,可以关闭一些资源,或者做一些操作,最后一次提交数据 commitSolr(datas,true); } /*** * 提交索引数据到solr中 * * @param datas 索引数据 * @param isEnd 是否为最后一次提交 */ def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={ //仅仅最后一次提交和集合长度等于批处理的数量时才提交 if ((datas.size()>0&&isEnd)||datas.size()==batchCount) { client.addBeans(datas); client.commit(); //提交数据 datas.clear();//清空集合,便于重用 } } /*** * 得到分区的数据具体每一行,并映射 * 到Model,进行后续索引处理 * * @param line 每行具体数据 * @param datas 添加数据的集合,用于批量提交索引 */ def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={ //数组数据清洗转换 val fields=line.split("\1",-1).map(field =>etl_field(field)) //将清洗完后的数组映射成Tuple类型 val tuple=buildTuble(fields) //将Tuple转换成Bean类型 val recoder=Record.tupled(tuple) //将实体类添加至集合,方便批处理提交 datas.add(recoder); //提交索引到solr commitSolr(datas,false); } /*** * 将数组映射成Tuple集合,方便与Bean绑定 * @param array field集合数组 * @return tuple集合 */ def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={ array match { case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8) } } /*** * 对field进行加工处理 * 空值替换为null,这样索引里面就不会索引这个字段 * ,正常值就还是原样返回 * * @param field 用来走特定规则的数据 * @return 映射完的数据 */ def etl_field(field:String):String={ field match { case "" => null case _ => field } } /*** * 根据条件清空某一类索引数据 * @param query 删除的查询条件 */ def deleteSolrByQuery(query:String): Unit ={ client.deleteByQuery(query); client.commit() println("删除成功!") } def main(args: Array[String]) { //根据条件删除一些数据 deleteSolrByQuery("t1:03") //远程提交时,需要提交打包后的jar val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar"; //远程提交时,伪装成相关的hadoop用户,否则,可能没有权限访问hdfs系统 System.setProperty("user.name", "webmaster"); //初始化SparkConf val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index "); //上传运行时依赖的jar包 val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar" conf.setJars(seq) //初始化SparkContext上下文 val sc = new SparkContext(conf); //此目录下所有的数据,将会被构建索引,格式一定是约定好的 val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/"); //通过rdd构建索引 indexRDD(rdd); //关闭索引资源 client.close(); //关闭SparkContext上下文 sc.stop(); } /*** * 处理rdd数据,构建索引 * @param rdd */ def indexRDD(rdd:RDD[String]): Unit ={ //遍历分区,构建索引 rdd.foreachPartition(line=>indexPartition(line)); } }
ok,至此,我们的建索引程序就写完了,本例子中用的是远程提交模式,实际上它也可以支持spark on yarn (cluster 或者 client ) 模式,不过此时需要注意的是,不需要显式指定setMaster的值,而由提交任务时,通过--master来指定运行模式,另外,依赖的相关jar包,也需要通过--jars参数来提交到集群里面,否则的话,运行时会报异常,最后看下本例子里面的solr是单机模式的,所以使用spark建索引提速并没有达到最大值,真正能发挥最大威力的是,多台search集群正如我画的架构图里面,每台机器是一个shard,这就是solrcloud的模式,或者在elasticsearch里面的集群shard,这样以来,才能真正达到高效批量的索引构建
有什么问题 可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园
发表评论
-
Scala里面的排序函数的使用
2018-01-09 20:20 2609排序方法在实际的应用场景中非常常见,Scala里面有三种排序 ... -
在Scala里面如何使用元组
2018-01-08 22:05 866元组在Scala语言中是一 ... -
Spark如何读取一些大数据集到本地机器上
2018-01-04 21:07 1632最近在使用spark处理分 ... -
使用Spark SQL的临时表解决一个小问题
2017-12-28 18:27 2407最近在使用spark处理一个业务场景时,遇到一个小问题,我在 ... -
Spark任务两个小问题笔记
2017-12-26 19:52 1611今天在用spark处理数据 ... -
Spark中foreachPartition和mapPartitions的区别
2017-12-25 21:19 3283spark的运算操作有两种类型:分别是Transformat ... -
Spark Streaming优雅的关闭策略优化
2017-12-07 19:26 4100前面文章介绍了不少有关Spark Streaming的off ... -
kafka版本不一致导致的一个小问题(二)
2017-12-04 21:37 8502背景介绍: 我们公司的实时流项目现在用的spark stre ... -
谈谈如何优雅的关闭正在运行中的Spark Streaming的流程序
2017-11-30 19:20 2245前面的文章,已经简 ... -
如何管理Spark Streaming消费Kafka的偏移量(三)
2017-11-28 23:41 5162前面的文章已经介绍了在spark streaming集成kaf ... -
理解Spark的运行机制
2017-11-23 21:52 1211Spark生态系统目前已经非常成熟了,有很多类型的任务都可以使 ... -
如何管理Spark Streaming消费Kafka的偏移量(二)
2017-11-16 19:30 4706上篇文章,讨论了在spar ... -
如何管理Spark Streaming消费Kafka的偏移量(一)
2017-11-14 20:42 4030最近工作有点忙,所以更新文章频率低了点,在这里给大家说声抱 ... -
在scala中使用spark sql解决特定需求(2)
2017-07-21 16:00 2238接着上篇文章,本篇来 ... -
在scala中使用spark sql解决特定需求
2017-07-20 19:53 994spark sql一个强大之处就 ... -
Spark如何在一个SparkContext中提交多个任务
2017-07-04 19:09 6688在使用spark处理数据的时候,大多数都是提交一个job执行, ... -
如何使用scala+spark读写hbase?
2017-06-12 19:48 3408最近工作有点忙,所以文章更新频率低了点,希望大家可以谅解,好 ... -
使用ES-Hadoop插件结合spark向es插入数据
2017-05-05 17:19 5005上篇文章简单介绍了E ... -
spark sql 快速体验调试
2017-04-13 16:40 1019spark sql提供了更快的查询性能,如何能够更快的体验,开 ... -
spark on yarn 如何集成elasticsearch
2017-04-11 16:16 1510随着spark越来越流行,我们的很多组件都有可能和spark集 ...
相关推荐
项目名称:基于Spark的PSO并行计算 编程语言:scala 项目内容:将粒子群算法pso实现的了并行,并成功集成了bencmark的测试函数,可以利用该标准的测试函数,来验证算法的性能. 测试结果:在benchmark的20个测试函数当中有9...
针对串行遗传算法处理大规模问题能力有限的现状,提出了一种基于Spark平台的粗粒度并行遗传算法(SPGA)。该方法利用Spark框架并行实现了遗传算法的选择、交叉和变异操作,并对并行操作算子的性能进行了分析,优化了算法...
#资源达人分享计划#
对状态方程参数的确定问题,提出了基于Spark的变化搜索空间的并行遗传算法。把参数确定问题转化为函数最优化问题,可以使用遗传算法求解。通过将遗传算法与Spark相结合,加快算法的计算速度。在此基础上开发了基于Spark...
我在5分钟内使用100个节点构建了117M个64维向量的索引。 设置为; // version: 0.1.4 // spark.executor.instances = 100 // spark.executor.memory = 8g // spark.driver.memory = 8g val fraction = 0.00086 // ...
spark-annoy:在Apache Spark上构建Annoy索引 开发技术 - 其它.zip
为了实现大数据环境下非线性高维数据的...在Swissroll数据集和S-curve数据集上的实验结果表明,基于Spark的并行ISOMAP算法通过并行执行和计算过程的优化,极大地提高了算法的执行效率,能够适用于大规模数据集的降维处理.
基于springboot+vue+redis+mongodb+spark等大数据技术构建的图书推荐系统,课程设计项目,可用于毕设 后端 Spring Boot Redis MongoDB 前端 Vue Element-ui Axios 推荐服务 Spark Zookeeper Kafka Flume 任务调度 ...
从财经新闻网页数据开始,进行正文提取、中文分词、倒排索引构建、执行搜索和UI。 要求技术:MapReduce或Spark;执行搜索和UI采用Spark或Java 步骤: (1)新闻正文提取,采用正则表达式提取指定网站栏目新闻的标题...
spark 并行加载 greenplum 数据,为正确运行,需引入 spark 相关包和 greenplum 驱动。 <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <groupId>org.apache.spark ...
spark scada jdbc连接数据库读取数据的并发优化方法。
最终,我能够使用Spark处理大规模数据集,并通过并行化和分布式计算加速任务的执行。 其次,我开始了对Spring Boot的学习。Spring Boot是一种快速构建基于Spring框架的应用程序的方式。通过学习Spring Boot,我了解...
druid-spark-batch, 在批处理作业中,用于使用Spark的德鲁伊索引插件 druid-spark-batch用于批处理作业中使用Spark的德鲁伊索引插件这个存储库拥有一个德鲁伊扩展,用于将Spark作为运行批处理作业的引擎生成发布 ...
1.对文本文件形式的原始数据集进行预处理 2.把文本文件的数据集导入到数据仓库Hive中 3.对数据仓库Hive中的数据进行查询分析 4.使用Sqoop将数据从Hive导入MySQL ...内含三份报告和数据集,报告中有源码,是用spark做的
基于Spark的人脸检测并行处理系统 ,武志平,傅慧源,视频监控领域数据量显著增长的背景下,基于Spark搭建一套分布式视频监控系统变得十分必要。本文提出了基于Spark并行运行人脸检测算��
基于Spark的改进并行BP算法,刘永,方维,BP(Back Propagation)神经网络是一种以误差反向传播进行训练的多层前馈网络,是目前最受欢迎的神经网络模型之一。传统BP算法收敛速度慢��
最终,我能够使用Spark处理大规模数据集,并通过并行化和分布式计算加速任务的执行。 其次,我开始了对Spring Boot的学习。Spring Boot是一种快速构建基于Spring框架的应用程序的方式。通过学习Spring Boot,我了解...
本资源提供了一套基于Apache Spark的大...这些文件详细展示了如何使用Scala、Java、Python、Shell、JavaScript、CSS、HTML、Ruby和C等语言构建一个统一的大规模数据分析引擎,非常适合用于学习和参考大数据处理的开发。
使用Spark框架进行网站用户购物分析 目的 1、熟悉Linux系统、MySQL、Spark、HBase、Hive、Sqoop、R、Eclipse、IntelliJ Idea等系统和软件的安装和使用; 2、了解大数据处理的基本流程; 3、熟悉数据预处理方法; 4、...
ElasticSearch+Spark 构建高相关性搜索服务&千人千面推荐系统