strom程序开发需要几个组件
1 Topology 即程序的主要入口 main,配置文件要通过主函数加载后放到conf中,然后bolt才能拿到,./conf可以到jar包中得到配置文件。
2 Sport 程序的数据来源
3 bolt程序处理节点 ,一个程序可能n个bolt节点 。
一 Topology
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; public class MyTopology1 { public static void main(String[] args) { try{ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomWordSpout(), 1).setNumTasks(8); builder.setBolt("longBolt", new longBolt(),1).shuffleGrouping("spout"); //初始化配置文件,保存配置文件到全局config ConfigFactory.init("./conf/config-kafka.xml"); Config config = new Config(); config.put("KafkaConfig", KafkaConfig.getProducerPro() ) ; config.put("redis.url", ConfigFactory.getString("redis.url") ) ; config.setDebug(true); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("MyTopology1", config, builder.createTopology()); if(args!=null && args.length > 0) { config.setNumWorkers(2); StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } else { config.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("MyTopology1", config, builder.createTopology()); Thread.sleep(1000); //cluster.shutdown(); } }catch (Exception e) { e.printStackTrace(); } } }
二、 Spout 数据来源
import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class RandomWordSpout extends BaseRichSpout{ /** * */ private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; //模拟一些数据 String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"}; //不断地往下一个组件发送tuple消息 //这里面是该spout组件的核心逻辑 @Override public void nextTuple() { //可以从kafka消息队列中拿到数据,简便起见,我们从words数组中随机挑选一个商品名发送出去 Random random = new Random(); int index = random.nextInt(words.length); //通过随机数拿到一个商品名 String godName = words[index]; //将商品名封装成tuple,发送消息给下一个组件 collector.emit(new Values(godName)); //每发送一个消息,休眠500ms, Utils.sleep(10000); } //初始化方法,在spout组件实例化时调用一次 @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } //声明本spout组件发送出去的tuple中的数据的字段名 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("orignname")); } }
三、bolt业务处理代码
import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; public class longBolt2 extends BaseBasicBolt { private static final long serialVersionUID = -4561045897494106937L; @Override public void prepare(Map stormConf, TopologyContext context) { } public void execute(Tuple tuple, BasicOutputCollector collector) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
启动一个storm程序:
storm jar storm-kafka-0.0.2_wzt.jar com.jusfoun.testStrom.MyTopology1 MyTopology1
杀死之前的程序:
storm kill MyTopology1
相关推荐
strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的jar包strom的...
GPU数据库PG_strom的安装及使用,包括postgresql的安装, PG_strom的安装。
strom zookeeper kafka 部署文档 原理解析
Strom项目依赖所需jar
Strom webService测试工具,类似于soapUI,个人更喜欢这个
Strom的基础概念,包括核心概念释义,如拓扑等;一些常用API方法和参数详解;大方面的工作流程;
strom介绍,包括出现背景,应用场景,环境搭建,基本架构。
strom学习笔记
非常好用的webservice接口测试小工具,开发时中必备,解压即可。
里面是对storm运行的一个项目,放到eclipse里面就可以跑了,对于初学者非常有用,是根据这个https://www.cnblogs.com/freeweb/p/5242631.html来的
NULL 博文链接:https://contentprovider.iteye.com/blog/1041946
pg-strom, PG Strom开发知识库 pgpg strom是PostgreSQL数据库的定制扫描提供程序模块。 它是用于使用GPU设备进行accelarate顺序扫描,hash-基于表的Join 和聚合函数。 它的基本概念是CPU和GPU应该集中在它们具有优势...
Strom安装手册.pdf
NULL 博文链接:https://gaojingsong.iteye.com/blog/2289928
基于Strom的日志实时流量分析主动防御系统的设计与实现+全部资料+详细文档(毕业设计).zip基于Strom的日志实时流量分析主动防御系统的设计与实现+全部资料+详细文档(毕业设计).zip基于Strom的日志实时流量分析...
这个是strom 1.0.2 的jar 包,版本比较老了,但是还是
NULL 博文链接:https://username2.iteye.com/blog/2272073