`

ES-Hadoop学习笔记-Storm交互

阅读更多

elasticsearch-Hadoop提供ElasticSearch与Apache Storm的集成支持。从ElasticSearch读取的数据是以Storm里Tuple的形式进行操作处理。

依赖版本信息:

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-starter</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.0</version>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>2.3.2</version>
</dependency>


Strom的extlib目录下jar包


 

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. import java.util.Map;  
  2.   
  3. import org.apache.storm.task.OutputCollector;  
  4. import org.apache.storm.task.TopologyContext;  
  5. import org.apache.storm.topology.OutputFieldsDeclarer;  
  6. import org.apache.storm.topology.base.BaseRichBolt;  
  7. import org.apache.storm.tuple.Fields;  
  8. import org.apache.storm.tuple.Tuple;  
  9. import org.apache.storm.tuple.Values;  
  10.   
  11. public class HandleBolt extends BaseRichBolt {  
  12.   
  13.     private static final long serialVersionUID = 1L;  
  14.   
  15.     private OutputCollector collector = null;  
  16.       
  17.     @SuppressWarnings("rawtypes")  
  18.     @Override  
  19.     public void prepare(Map stormConf, TopologyContext context,  
  20.             OutputCollector collector) {  
  21.         this.collector = collector;  
  22.     }  
  23.   
  24.     @Override  
  25.     public void execute(Tuple input) {  
  26.         String name = "NA";  
  27.         if (input.contains("name")) {  
  28.             name = input.getStringByField("name");  
  29.         }  
  30.         String phone = "NA";  
  31.         if (input.contains("phone")) {  
  32.             phone = input.getStringByField("phone");  
  33.         }  
  34.         String rcall = "NA";  
  35.         if (input.contains("rcall")) {  
  36.             rcall = input.getStringByField("rcall");  
  37.             rcall = null == rcall || "null".equals(rcall) ? "NA" : rcall;  
  38.         }  
  39.         String address = "NA";  
  40.         if (input.contains("address")) {  
  41.             address = input.getStringByField("address");  
  42.             address = null == address || "null".equals(address) ? "NA" : address;  
  43.         }  
  44.         String email = "NA";  
  45.         if (input.contains("email")) {  
  46.             email = input.getStringByField("email");  
  47.             email = null == email || "null".equals(email) ? "NA" : email;  
  48.         }  
  49.         String idCard = "NA";  
  50.         if (input.contains("idCard")) {  
  51.             idCard = input.getStringByField("idCard");  
  52.             idCard = null == idCard || "null".equals(idCard) ? "NA" : idCard;  
  53.         }  
  54.         this.collector.emit(new Values(name, phone, rcall, address, email, idCard));  
  55.         this.collector.ack(input);  
  56.     }  
  57.   
  58.     @Override  
  59.     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
  60.         declarer.declare(new Fields("name""phone""rcal""address""email""idCard"));  
  61.     }  
  62.   
  63. }  

 

[java] view plain copy
 
 在CODE上查看代码片派生到我的代码片
  1. import java.util.HashMap;  
  2. import java.util.Map;  
  3.   
  4. import org.apache.storm.Config;  
  5. import org.apache.storm.LocalCluster;  
  6. import org.apache.storm.StormSubmitter;  
  7. import org.apache.storm.hdfs.bolt.HdfsBolt;  
  8. import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;  
  9. import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;  
  10. import org.apache.storm.hdfs.bolt.format.FileNameFormat;  
  11. import org.apache.storm.hdfs.bolt.format.RecordFormat;  
  12. import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;  
  13. import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;  
  14. import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;  
  15. import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;  
  16. import org.apache.storm.hdfs.bolt.sync.SyncPolicy;  
  17. import org.apache.storm.starter.bolt.PrinterBolt;  
  18. import org.apache.storm.topology.TopologyBuilder;  
  19. import org.apache.storm.utils.Utils;  
  20.   
  21. public class ES2StormTopology {  
  22.   
  23.     private static final String TOPOLOGY_NAME = "es-storm-topology";  
  24.       
  25.     public static void main(String[] args) {  
  26.         if (args.length != 1) {  
  27.             System.exit(0);  
  28.         }  
  29.         boolean isCluster = Boolean.parseBoolean(args[0]);  
  30.           
  31.         TopologyBuilder builder = new TopologyBuilder();  
  32.         String target = "operator/telecom";  
  33.         String query = "?q=*";  
  34.         Map<Object, Object> configuration = new HashMap<Object, Object>();  
  35.         configuration.put("es.nodes""192.168.10.20:9200");  
  36.         configuration.put("es.read.field.include""name,phone,rcall,email,idCard,zipCode,address");  
  37.         configuration.put("es.storm.spout.fields""name,phone,rcall,email,idCard,zipCode,address");  
  38.         builder.setSpout("es-storm-spout"new ESSpout(target, query, configuration), 1);  
  39.           
  40.         builder.setBolt("storm-print-bolt"new PrinterBolt()).shuffleGrouping("es-storm-spout");  
  41.           
  42.         builder.setBolt("storm-handle-bolt"new HandleBolt()).shuffleGrouping("es-storm-spout");  
  43.           
  44.         RecordFormat recordFormat = new DelimitedRecordFormat().withFieldDelimiter(":");  
  45.         SyncPolicy syncPolicy = new CountSyncPolicy(10);  
  46.         FileRotationPolicy fileRotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);  
  47.         FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/storm/")  
  48.                 .withPrefix("es_").withExtension(".log");  
  49.         HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://centos.host1:9000")  
  50.                 .withFileNameFormat(fileNameFormat).withRecordFormat(recordFormat)  
  51.                 .withRotationPolicy(fileRotationPolicy).withSyncPolicy(syncPolicy);  
  52.         builder.setBolt("storm-hdfs-bolt", hdfsBolt).globalGrouping("storm-handle-bolt");  
  53.           
  54.         Config config = new Config();  
  55.         config.setDebug(true);  
  56.         if (isCluster) {  
  57.             try {  
  58.                 config.setNumWorkers(3);  
  59.                 StormSubmitter.submitTopologyWithProgressBar(  
  60.                         TOPOLOGY_NAME, config, builder.createTopology());  
  61.             } catch (Exception e) {  
  62.                 e.printStackTrace();  
  63.             }  
  64.         } else {  
  65.             LocalCluster cluster = new LocalCluster();  
  66.             cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());  
  67.             Utils.sleep(100000);  
  68.             cluster.killTopology(TOPOLOGY_NAME);  
  69.             cluster.shutdown();  
  70.         }  
  71.           
  72.     }  
  73.       
  74. }  

 

注意:elasticsearch-hadoop里的EsSpout类用到的Storm版本过低,所以重写了一个ESSpout替换旧版本Storm的API。

 

$bin/storm jar /home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar org.platform.storm.elasticsearch.ES2StormTopology false

 

 

 

[java] view plain copy
 
  1. import java.util.HashMap;  
  2. import java.util.Map;  
  3.   
  4. import org.apache.storm.Config;  
  5. import org.apache.storm.LocalCluster;  
  6. import org.apache.storm.StormSubmitter;  
  7. import org.apache.storm.starter.bolt.PrinterBolt;  
  8. import org.apache.storm.topology.TopologyBuilder;  
  9. import org.apache.storm.utils.Utils;  
  10. import org.platform.storm.elasticsearch.bolt.ESBolt;  
  11. import org.platform.storm.elasticsearch.spout.ESSpout;  
  12.   
  13. public class Storm2ESTopology {  
  14.   
  15.     private static final String TOPOLOGY_NAME = "storm-es-topology";  
  16.       
  17.     public static void main(String[] args) {  
  18.         if (args.length != 1) {  
  19.             System.exit(0);  
  20.         }  
  21.         boolean isCluster = Boolean.parseBoolean(args[0]);  
  22.           
  23.         TopologyBuilder builder = new TopologyBuilder();  
  24.           
  25.         String target = "operator/telecom";  
  26.         String query = "?q=*";  
  27.         Map<Object, Object> spoutConf = new HashMap<Object, Object>();  
  28.         spoutConf.put("es.nodes""192.168.10.20:9200");  
  29.         spoutConf.put("es.read.field.include""name,phone,rcall,email,idCard,zipCode,address");  
  30.         spoutConf.put("es.storm.spout.fields""name,phone,rcall,email,idCard,zipCode,address");  
  31.         builder.setSpout("es-storm-spout"new ESSpout(target, query, spoutConf), 1);  
  32.           
  33.         builder.setBolt("storm-print-bolt"new PrinterBolt()).shuffleGrouping("es-storm-spout");  
  34.           
  35.         Map<Object, Object> boltConf = new HashMap<Object, Object>();  
  36.         boltConf.put("es.nodes""192.168.10.20:9200");  
  37.         boltConf.put("es.index.auto.create""true");  
  38.         boltConf.put("es.ser.writer.bytes.class""org.platform.storm.elasticsearch.bolt.StormTupleBytesConverter");  
  39.         //boltConf.put("es.input.json", "true");  
  40.         builder.setBolt("storm-es-bolt"new ESBolt("data/telecom", boltConf))  
  41.             .globalGrouping("es-storm-spout");  
  42.           
  43.         Config config = new Config();  
  44.         config.setDebug(true);  
  45.         if (isCluster) {  
  46.             try {  
  47.                 config.setNumWorkers(3);  
  48.                 StormSubmitter.submitTopologyWithProgressBar(  
  49.                         TOPOLOGY_NAME, config, builder.createTopology());  
  50.             } catch (Exception e) {  
  51.                 e.printStackTrace();  
  52.             }  
  53.         } else {  
  54.             LocalCluster cluster = new LocalCluster();  
  55.             cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());  
  56.             Utils.sleep(100000);  
  57.             cluster.killTopology(TOPOLOGY_NAME);  
  58.             cluster.shutdown();  
  59.         }  
  60.           
  61.     }  
  62.       
  63. }  

 

 

注意:elasticsearch-hadoop里的EsBolt、StormTupleBytesConverter类用到的Storm版本过低,所以重写了一个ESBolt、StormTupleBytesConverter替换旧版本Storm的API。

 

$bin/storm jar /home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar org.platform.storm.elasticsearch.Storm2ESTopology false

 

文献出自:http://blog.csdn.net/fighting_one_piece/article/details/52228641

分享到:
评论

相关推荐

    分布式系统.pptx

    分布式系统.pptx

    源代码-360通用ASP防护代码(防sql注入).zip

    源代码-360通用ASP防护代码(防sql注入).zip

    node-v8.1.0-darwin-x64.tar.gz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    2020年ti杯电赛省赛A题代码整理

    2020年ti杯电赛省赛a题的项目 分为 手机端(android),主显示端,姿态检测手环端,心率滤波读取端 mcu 采用的是esp32.结合了适配esp32的arduino以及rtos框架进行开发。 开发环境 及 语言 安卓为android studio java开发 esp32为platform io c/c++ 节点间通信方式 tcp直连,手机端为总服务端 手机端 android 原生开发 主显示端 屏幕ili9341 spi 触摸xpt2046 图形 adafruit gfx ad芯片 ads112c04 测温 lmt70 姿态检测端 9轴 bno055 心率检测 心电 ads1292

    node-v6.15.0-sunos-x64.tar.gz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    node-v9.3.0-x86.msi

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    基于网络爬虫技术的网络新闻分析

    基于网络爬虫技术的网络新闻分析主要用于网络数据爬取。本系统结构如下: (1)网络爬虫模块。 (2)中文分词模块。 (3)中3文相似度判定模块。 (4)数据结构化存储模块。 (5)数据可视化展示模块。

    毕业设计:python155基于贝叶斯网络的城市火灾预测(源码 + 数据库)

    毕业设计:python155基于贝叶斯网络的城市火灾预测(源码 + 数据库)

    项目初始化template

    项目初始化template

    游戏道具3D立体blender模型图标素材-3d Game Asset Icon Vol.1.zip

    游戏开发资源,游戏UI,游戏GUI,游戏图标,PSD格式,XD格式,PNG下载,源文件,可编辑下载,游戏购物充值界面,宝石,图标,PS格式,AI格式等,游戏APP

    node-v9.0.0-x86.msi

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    基于VL813+fe2.1s+VL160 10端口USB-HUB评估版设计硬件(原理图+PCB+封装库)文件.zip

    基于VL813+fe2.1s+VL160 10端口USB_HUB评估版设计硬件(原理图+PCB+封装库)文件,,硬件采用2层板设计,大小为45*78mm,包括ALTIUM设计的原理图PCB及器件原理图PCB封装库文件,可做为你的学习设计参考。 主要器件型号如下: FE2.1 INDUCTANCE Inductance LED LED MP8771 RES Resistor Res2 Resistor USB-typec-24p USB-typec-6p USB2.0 USB2.0x2 USB3.0 VL160 VL813 XTAL Crystal Oscillator XTAL-1

    linux 常用命令知识笔记详解.zip

    linux常用命令大全 Linux 是一种开源的操作系统,它的命令行界面是其最大的特点之 一。在 Linux 中,命令行是最常用的操作方式,因此熟练掌握 Linux 常用命令是非常重要的。本文将介绍一些常用的 Linux 命令。 1. ls 命令 ls 命令用于列出当前目录下的文件和目录。使用 ls 命令时,可以加 上一些参数来控制输出的格式。例如,使用“ls -l”命令可以列出 详细的文件信息,包括文件的权限、所有者、大小等。 2. cd 命令 cd 命令用于改变当前工作目录。使用 cd 命令时,可以输入目录的 绝对路径或相对路径。例如,输入“cd /home/user”可以进入 /home/user 目录,输入“cd ..”可以返回上一级目录。 3. mkdir 命令 mkdir 命令用于创建新的目录。使用 mkdir 命令时,需要指定要创 建的目录的名称。例如,输入“mkdir test”可以创建一个名为 test 的目录。 mkdir 命令用于创建新的目录。使用 mkdir 命令时,需要指定要创 建的目录的名称。例如,输入“mkdir

    来自网络至网格.pptx

    来自网络至网格.pptx

    pi-pi-net 是一个在linux环境下封装epoll的网络库,可以基于此库非常方便的实现Reactor网络模型.zip

    pi-pi-net 是一个在linux环境下封装epoll的网络库,可以基于此库非常方便的实现Reactor网络模型,或者web,rpc,websocket等网络框架的基础框架

    HTML5小游戏源码下载网页版游戏JS小游戏-驴子跳游戏(原作者绝对实战开发指南 + 游戏源码)珍藏版.zip

    HTML5小游戏源码下载,JS小游戏源码下载,坦克大战,驴子跳,连连看,俄罗斯方块,圈泡泡,塔防,太空战舰,愤怒的小鸟,植物大战僵尸,水果忍者,扫雷,超级玛丽,打地鼠,坦克大战,麻将等JS小游戏源码下载,游戏开发教程,网页游戏,本地直接打开就可以玩。

    node-v7.5.0-linux-armv7l.tar.gz

    Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。

    声网PC端SDK,用于音视频服务

    声网PC端SDK,用于音视频服务

    什么是python-对于我们来说学习python的意义是什么

    python

    基于Linux V4L2视频库、ASLA高级声音框架的远程视频及语音聊天.zip

    基于Linux V4L2视频库、ASLA高级声音框架的远程视频及语音聊天

Global site tag (gtag.js) - Google Analytics