elasticsearch-Hadoop提供ElasticSearch与Apache Storm的集成支持。从ElasticSearch读取的数据是以Storm里Tuple的形式进行操作处理。
依赖版本信息:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-starter</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>2.3.2</version>
</dependency>
Strom的extlib目录下jar包
- import java.util.Map;
- import org.apache.storm.task.OutputCollector;
- import org.apache.storm.task.TopologyContext;
- import org.apache.storm.topology.OutputFieldsDeclarer;
- import org.apache.storm.topology.base.BaseRichBolt;
- import org.apache.storm.tuple.Fields;
- import org.apache.storm.tuple.Tuple;
- import org.apache.storm.tuple.Values;
- public class HandleBolt extends BaseRichBolt {
- private static final long serialVersionUID = 1L;
- private OutputCollector collector = null;
- @SuppressWarnings("rawtypes")
- @Override
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- this.collector = collector;
- }
- @Override
- public void execute(Tuple input) {
- String name = "NA";
- if (input.contains("name")) {
- name = input.getStringByField("name");
- }
- String phone = "NA";
- if (input.contains("phone")) {
- phone = input.getStringByField("phone");
- }
- String rcall = "NA";
- if (input.contains("rcall")) {
- rcall = input.getStringByField("rcall");
- rcall = null == rcall || "null".equals(rcall) ? "NA" : rcall;
- }
- String address = "NA";
- if (input.contains("address")) {
- address = input.getStringByField("address");
- address = null == address || "null".equals(address) ? "NA" : address;
- }
- String email = "NA";
- if (input.contains("email")) {
- email = input.getStringByField("email");
- email = null == email || "null".equals(email) ? "NA" : email;
- }
- String idCard = "NA";
- if (input.contains("idCard")) {
- idCard = input.getStringByField("idCard");
- idCard = null == idCard || "null".equals(idCard) ? "NA" : idCard;
- }
- this.collector.emit(new Values(name, phone, rcall, address, email, idCard));
- this.collector.ack(input);
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("name", "phone", "rcal", "address", "email", "idCard"));
- }
- }
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.storm.Config;
- import org.apache.storm.LocalCluster;
- import org.apache.storm.StormSubmitter;
- import org.apache.storm.hdfs.bolt.HdfsBolt;
- import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
- import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
- import org.apache.storm.hdfs.bolt.format.FileNameFormat;
- import org.apache.storm.hdfs.bolt.format.RecordFormat;
- import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
- import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
- import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
- import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
- import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
- import org.apache.storm.starter.bolt.PrinterBolt;
- import org.apache.storm.topology.TopologyBuilder;
- import org.apache.storm.utils.Utils;
- public class ES2StormTopology {
- private static final String TOPOLOGY_NAME = "es-storm-topology";
- public static void main(String[] args) {
- if (args.length != 1) {
- System.exit(0);
- }
- boolean isCluster = Boolean.parseBoolean(args[0]);
- TopologyBuilder builder = new TopologyBuilder();
- String target = "operator/telecom";
- String query = "?q=*";
- Map<Object, Object> configuration = new HashMap<Object, Object>();
- configuration.put("es.nodes", "192.168.10.20:9200");
- configuration.put("es.read.field.include", "name,phone,rcall,email,idCard,zipCode,address");
- configuration.put("es.storm.spout.fields", "name,phone,rcall,email,idCard,zipCode,address");
- builder.setSpout("es-storm-spout", new ESSpout(target, query, configuration), 1);
- builder.setBolt("storm-print-bolt", new PrinterBolt()).shuffleGrouping("es-storm-spout");
- builder.setBolt("storm-handle-bolt", new HandleBolt()).shuffleGrouping("es-storm-spout");
- RecordFormat recordFormat = new DelimitedRecordFormat().withFieldDelimiter(":");
- SyncPolicy syncPolicy = new CountSyncPolicy(10);
- FileRotationPolicy fileRotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);
- FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/storm/")
- .withPrefix("es_").withExtension(".log");
- HdfsBolt hdfsBolt = new HdfsBolt().withFsUrl("hdfs://centos.host1:9000")
- .withFileNameFormat(fileNameFormat).withRecordFormat(recordFormat)
- .withRotationPolicy(fileRotationPolicy).withSyncPolicy(syncPolicy);
- builder.setBolt("storm-hdfs-bolt", hdfsBolt).globalGrouping("storm-handle-bolt");
- Config config = new Config();
- config.setDebug(true);
- if (isCluster) {
- try {
- config.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(
- TOPOLOGY_NAME, config, builder.createTopology());
- } catch (Exception e) {
- e.printStackTrace();
- }
- } else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
- Utils.sleep(100000);
- cluster.killTopology(TOPOLOGY_NAME);
- cluster.shutdown();
- }
- }
- }
注意:elasticsearch-hadoop里的EsSpout类用到的Storm版本过低,所以重写了一个ESSpout替换旧版本Storm的API。
$bin/storm jar /home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar org.platform.storm.elasticsearch.ES2StormTopology false
- import java.util.HashMap;
- import java.util.Map;
- import org.apache.storm.Config;
- import org.apache.storm.LocalCluster;
- import org.apache.storm.StormSubmitter;
- import org.apache.storm.starter.bolt.PrinterBolt;
- import org.apache.storm.topology.TopologyBuilder;
- import org.apache.storm.utils.Utils;
- import org.platform.storm.elasticsearch.bolt.ESBolt;
- import org.platform.storm.elasticsearch.spout.ESSpout;
- public class Storm2ESTopology {
- private static final String TOPOLOGY_NAME = "storm-es-topology";
- public static void main(String[] args) {
- if (args.length != 1) {
- System.exit(0);
- }
- boolean isCluster = Boolean.parseBoolean(args[0]);
- TopologyBuilder builder = new TopologyBuilder();
- String target = "operator/telecom";
- String query = "?q=*";
- Map<Object, Object> spoutConf = new HashMap<Object, Object>();
- spoutConf.put("es.nodes", "192.168.10.20:9200");
- spoutConf.put("es.read.field.include", "name,phone,rcall,email,idCard,zipCode,address");
- spoutConf.put("es.storm.spout.fields", "name,phone,rcall,email,idCard,zipCode,address");
- builder.setSpout("es-storm-spout", new ESSpout(target, query, spoutConf), 1);
- builder.setBolt("storm-print-bolt", new PrinterBolt()).shuffleGrouping("es-storm-spout");
- Map<Object, Object> boltConf = new HashMap<Object, Object>();
- boltConf.put("es.nodes", "192.168.10.20:9200");
- boltConf.put("es.index.auto.create", "true");
- boltConf.put("es.ser.writer.bytes.class", "org.platform.storm.elasticsearch.bolt.StormTupleBytesConverter");
- //boltConf.put("es.input.json", "true");
- builder.setBolt("storm-es-bolt", new ESBolt("data/telecom", boltConf))
- .globalGrouping("es-storm-spout");
- Config config = new Config();
- config.setDebug(true);
- if (isCluster) {
- try {
- config.setNumWorkers(3);
- StormSubmitter.submitTopologyWithProgressBar(
- TOPOLOGY_NAME, config, builder.createTopology());
- } catch (Exception e) {
- e.printStackTrace();
- }
- } else {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
- Utils.sleep(100000);
- cluster.killTopology(TOPOLOGY_NAME);
- cluster.shutdown();
- }
- }
- }
注意:elasticsearch-hadoop里的EsBolt、StormTupleBytesConverter类用到的Storm版本过低,所以重写了一个ESBolt、StormTupleBytesConverter替换旧版本Storm的API。
$bin/storm jar /home/hadoop/Documents/esstorm-0.0.1-SNAPSHOT.jar org.platform.storm.elasticsearch.Storm2ESTopology false
文献出自:http://blog.csdn.net/fighting_one_piece/article/details/52228641
相关推荐
分布式系统.pptx
源代码-360通用ASP防护代码(防sql注入).zip
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
2020年ti杯电赛省赛a题的项目 分为 手机端(android),主显示端,姿态检测手环端,心率滤波读取端 mcu 采用的是esp32.结合了适配esp32的arduino以及rtos框架进行开发。 开发环境 及 语言 安卓为android studio java开发 esp32为platform io c/c++ 节点间通信方式 tcp直连,手机端为总服务端 手机端 android 原生开发 主显示端 屏幕ili9341 spi 触摸xpt2046 图形 adafruit gfx ad芯片 ads112c04 测温 lmt70 姿态检测端 9轴 bno055 心率检测 心电 ads1292
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
基于网络爬虫技术的网络新闻分析主要用于网络数据爬取。本系统结构如下: (1)网络爬虫模块。 (2)中文分词模块。 (3)中3文相似度判定模块。 (4)数据结构化存储模块。 (5)数据可视化展示模块。
毕业设计:python155基于贝叶斯网络的城市火灾预测(源码 + 数据库)
项目初始化template
游戏开发资源,游戏UI,游戏GUI,游戏图标,PSD格式,XD格式,PNG下载,源文件,可编辑下载,游戏购物充值界面,宝石,图标,PS格式,AI格式等,游戏APP
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
基于VL813+fe2.1s+VL160 10端口USB_HUB评估版设计硬件(原理图+PCB+封装库)文件,,硬件采用2层板设计,大小为45*78mm,包括ALTIUM设计的原理图PCB及器件原理图PCB封装库文件,可做为你的学习设计参考。 主要器件型号如下: FE2.1 INDUCTANCE Inductance LED LED MP8771 RES Resistor Res2 Resistor USB-typec-24p USB-typec-6p USB2.0 USB2.0x2 USB3.0 VL160 VL813 XTAL Crystal Oscillator XTAL-1
linux常用命令大全 Linux 是一种开源的操作系统,它的命令行界面是其最大的特点之 一。在 Linux 中,命令行是最常用的操作方式,因此熟练掌握 Linux 常用命令是非常重要的。本文将介绍一些常用的 Linux 命令。 1. ls 命令 ls 命令用于列出当前目录下的文件和目录。使用 ls 命令时,可以加 上一些参数来控制输出的格式。例如,使用“ls -l”命令可以列出 详细的文件信息,包括文件的权限、所有者、大小等。 2. cd 命令 cd 命令用于改变当前工作目录。使用 cd 命令时,可以输入目录的 绝对路径或相对路径。例如,输入“cd /home/user”可以进入 /home/user 目录,输入“cd ..”可以返回上一级目录。 3. mkdir 命令 mkdir 命令用于创建新的目录。使用 mkdir 命令时,需要指定要创 建的目录的名称。例如,输入“mkdir test”可以创建一个名为 test 的目录。 mkdir 命令用于创建新的目录。使用 mkdir 命令时,需要指定要创 建的目录的名称。例如,输入“mkdir
来自网络至网格.pptx
pi-pi-net 是一个在linux环境下封装epoll的网络库,可以基于此库非常方便的实现Reactor网络模型,或者web,rpc,websocket等网络框架的基础框架
HTML5小游戏源码下载,JS小游戏源码下载,坦克大战,驴子跳,连连看,俄罗斯方块,圈泡泡,塔防,太空战舰,愤怒的小鸟,植物大战僵尸,水果忍者,扫雷,超级玛丽,打地鼠,坦克大战,麻将等JS小游戏源码下载,游戏开发教程,网页游戏,本地直接打开就可以玩。
Node.js,简称Node,是一个开源且跨平台的JavaScript运行时环境,它允许在浏览器外运行JavaScript代码。Node.js于2009年由Ryan Dahl创立,旨在创建高性能的Web服务器和网络应用程序。它基于Google Chrome的V8 JavaScript引擎,可以在Windows、Linux、Unix、Mac OS X等操作系统上运行。 Node.js的特点之一是事件驱动和非阻塞I/O模型,这使得它非常适合处理大量并发连接,从而在构建实时应用程序如在线游戏、聊天应用以及实时通讯服务时表现卓越。此外,Node.js使用了模块化的架构,通过npm(Node package manager,Node包管理器),社区成员可以共享和复用代码,极大地促进了Node.js生态系统的发展和扩张。 Node.js不仅用于服务器端开发。随着技术的发展,它也被用于构建工具链、开发桌面应用程序、物联网设备等。Node.js能够处理文件系统、操作数据库、处理网络请求等,因此,开发者可以用JavaScript编写全栈应用程序,这一点大大提高了开发效率和便捷性。 在实践中,许多大型企业和组织已经采用Node.js作为其Web应用程序的开发平台,如Netflix、PayPal和Walmart等。它们利用Node.js提高了应用性能,简化了开发流程,并且能更快地响应市场需求。
声网PC端SDK,用于音视频服务
python
基于Linux V4L2视频库、ASLA高级声音框架的远程视频及语音聊天