1 使用Eclipse创建maven工程
2 pom.xml内增加storm依赖:
<dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.3</version> </dependency>
3 自定义一个 spout 一个bolt, 在spout内不断产生i 并发送到bolt内, 在bolt内实现累加并打印效果
此案例主要是结合storm简介 中关于storm术语(车头 轨道 车次 车厢 车厢内人数tuple)
对应代码写法的一个参考而已
4 代码如下:
package changping.houzhihoujue.storm; import java.util.Map; import java.util.concurrent.TimeUnit; import ch.qos.logback.core.util.TimeUtil; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 本地运行: * 实现累加 * @author zm * */ public class MyLocalStormTopology { /** * 组装火车 轨道 并让火车在轨道上行驶 * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { // 祖品列车 TopologyBuilder topologyBuilder = new TopologyBuilder(); topologyBuilder.setSpout("1", new MySpout2()); // 定义1号车厢 topologyBuilder.setBolt("2", new MyBolt1()).shuffleGrouping("1");// 定义2号车厢 并和1号车厢连接起来 // 造出轨道 LocalCluster localCluster = new LocalCluster();// 造出轨道 在本地运行 Config config = new Config(); // 轨道上运行列车, 三个参数分别为:定义的列车名,列车服务人员,轨道上跑的列车本身 localCluster.submitTopology(MyLocalStormTopology.class.getSimpleName(), config, topologyBuilder.createTopology()); TimeUnit.SECONDS.sleep(99999);// 设置列车运行时间 localCluster.shutdown();// 跑完后就停止下来, 否则storm是永不停止 } } //创建火车头 class MySpout2 extends BaseRichSpout { private Map conf; private TopologyContext context; private SpoutOutputCollector collector; // 此方法首先被调用 打开storm系统外的数据源 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.conf = conf; this.context = context; this.collector = collector; } private int i = 0; // 认为是NameNode的heartbeat,永无休息的死循环的调用 并是线程安全的操作, 这里每一次调用此方法 将i++发送到bolt public void nextTuple() { System.err.println(i); // 将数据(i++)放在弹壳(Values)中,并发送给bolt this.collector.emit(new Values(i++)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } //声明输出的字段的名称为 v1 只有在输出给别人时才会重写此方法 public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("v1")); } } // 创建车厢 class MyBolt1 extends BaseRichBolt{ private Map stormConf; private TopologyContext context; private OutputCollector collector; // 准备下要对接收storm spout发送来的数据 public void prepare(Map stormConf, TopologyContext context,OutputCollector collector) { this.stormConf = stormConf; this.context = context; this.collector = collector; } private int sum = 0; // 死循环,用于接收bolt送来的数据 这里storm每调用一次此方法 则获取发送来的tuple数据 public void execute(Tuple input) { int i = input.getIntegerByField("v1"); sum += i; System.err.println(sum); } // 只有向外发送数据时 此方法才会被调用 否则 不要实现此方法 public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
相关推荐
Apache Storm(apache-storm-2.3.0.tar.gz) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言...
STORM-User-guide-V3.2
Apache Storm(apache-storm-2.3.0-src.tar.gz 源码) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与...
Apache Storm(apache-storm-2.3.0-src.zip 源码) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何...
ZY_Storm-0.0.1-jar-0401-1
Chapter 1 Introducing Storm Chapter 2 Core Storm concepts Chapter 3 Topology design Chapter 4 Creating robust topologies Chapter 5 Moving from local to remote topologies Chapter 6 Tuning in Storm ...
ZY_Storm-0.0.1-0314-1.jar
Apache Storm(apache-storm-2.3.0.zip) 是一个免费的开源分布式实时计算系统。Apache Storm 可以轻松可靠地处理无限制的数据流,实时处理就像 Hadoop 进行批处理一样。Apache Storm 很简单,可以与任何编程语言...
启动测试环境fig up 启动kafka外壳start-kafka-shell.sh <Docker> <Zookeeper> 在外壳中,创建一个主题$KAFKA_HOME/bin/kafka-topics.sh --create --topic storm-sentence --partitions 2 --zookeeper $ZK --...
帮助不能下载apache-storm-2.4.0.tar.gz的同学下载storm安装包
storm-wordcount例子 storm-wordcount例子 storm-wordcount例子 storm-wordcount例子
storm-core-1.0.3-sources.jar 源码文件,1.0.3版本
Java webservice接口测试工具 Storm_r1.1-Adarna 方便使用
storm+kafka jar包 ,curator-client-2.8.0、curator-framework-2.8.0、curator-recipes-2.8.0、guava-18.0、kafka_2.9.2-0.8.2.2、metrics-core-2.2.0、scala-library-2.10.4、storm-kafka-0.9.2-incubating、...
apache-storm-1.2.1 2,安装包,帮助不能下载的同学apache-storm-1.2.1 2,安装包,帮助不能下载的同学apache-storm-1.2.1 2,安装包,帮助不能下载的同学apache-storm-1.2.1 2,安装包,帮助不能下载的同学apache-storm...
Storm - Distributed and fault-tolerant realtime computation By Nathan Marz
01、Storm的集群搭建 03-storm部署-2.avi