- 浏览: 23672 次
- 性别:
- 来自: 深圳
文章分类
最新评论
public class SentenceSpout extends BaseRichSpout{
private static final long serialVersionUID = 1L;
/**
* This output collector exposes the API for emitting tuples from an {@link org.apache.storm.topology.IRichSpout}.
* The main difference between this output collector and {@link OutputCollector}
* for {@link org.apache.storm.topology.IRichBolt} is that spouts can tag messages with ids so that they can be
* acked or failed later on. This is the Spout portion of Storm's API to
* guarantee that each message is fully processed at least once.
*/
private SpoutOutputCollector collector;
//private OutputCollector collector;
//准备测试数据
private String[] sentences={
"my dog has fleas",
"i like cold beverages",
"the dog ate my homework",
"don't have a cow man",
"i don't think i like fleas"};
private int index=0;
/**
* private Map<String, StreamInfo> _fields = new HashMap<>();
* public void declareStream(String streamId, boolean direct, Fields fields) {
* if(_fields.containsKey(streamId)) {
* throw new IllegalArgumentException("Fields for " + streamId + " already set");
* }
* _fields.put(streamId, new StreamInfo(fields.toList(), direct));
* }
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentences"));
}
/**
* open方法在ISpout接口中定义,所有Spout组件在初始化时调用这个方法,open()方法接收三个参数
* 一个包含了Storm配置信息的map
* TopologyContext对象提供了topology中组件的信息
* SpoutOutputCollector对象提供了发射tuple的方法
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector=collector;
}
/**
* 是所有spout实现的核心所在,Storm通过调用这个方法向输出的collector发射tuple
*/
public void nextTuple() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.collector.emit(new Values(sentences[index]));
//System.out.println("===============");
index++;
if(index>=sentences.length){
index=0;
}
}
}
public class SplitSentenceBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
public void execute(Tuple input) {
String sentence=input.getStringByField("sentences");
String[] words=sentence.split(" ");
for(String word :words){
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("words"));
}
}
public class WordCountBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private HashMap<String,Long> counts=null;
/**
* 通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化
* 在prepare()方法中对不可序列化的对象进行实例化
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
this.counts=new HashMap<String,Long>();
}
public void execute(Tuple input) {
String word=input.getStringByField("words");
Long count=this.counts.get(word);
if(count==null){
count=0L;
}
count++;
//出现就添加到map中,word相同的,会覆盖掉 所以最后的word就是准确的数据
this.counts.put(word,count);
this.collector.emit(new Values(word,count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
public class ReportBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private HashMap<String,Long> counts=null;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.counts=new HashMap<String,Long>();
}
public void execute(Tuple input) {
String word=input.getStringByField("word");
Long count=input.getLongByField("count");
this.counts.put(word, count);
System.out.println("--------FINAL COUNTS--------");
List<String> keys=new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for(String key:keys){
System.out.println(key+":"+this.counts.get(key));
}
System.out.println("----------------------------");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
}
public class WordCountTopology{
private static final String SENTENCE_SPOUT_ID="sentence-sput";
private static final String SPLIT_BOLT_ID="split-bolt";
private static final String COUNT_BOLT_ID="count-bolt";
private static final String REPORT_BOLT_ID="report-bolt";
private static final String TOPOLOGY_NAME="word-count-topology";
public static void main(String[] args) throws InterruptedException {
SentenceSpout spout=new SentenceSpout();
SplitSentenceBolt splitbolt=new SplitSentenceBolt();
WordCountBolt countbolt=new WordCountBolt();
ReportBolt reportbolt=new ReportBolt();
TopologyBuilder builder=new TopologyBuilder();
// 设置并发为2个executor,每个Task指派各自的executor线程
builder.setSpout(SENTENCE_SPOUT_ID,spout,2);
// 设置并发为2个executor,每个executor执行2个task
builder.setBolt(SPLIT_BOLT_ID,splitbolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
// 有时候我们需要将特定数据的tuple路由到特殊的bolt实例中,在此我们使用fieldsGrouping
// 来保证所有"word"字段值相同的tuple会被路由到同一个WordCountBolt实例中
builder.setBolt(COUNT_BOLT_ID,countbolt,2).fieldsGrouping(SPLIT_BOLT_ID,new Fields("words"));
builder.setBolt(REPORT_BOLT_ID,reportbolt).globalGrouping(COUNT_BOLT_ID);
/*Map conf=new HashMap();
conf.put(Config.TOPOLOGY_WORKERS,4);
conf.put(Config.TOPOLOGY_DEBUG,true);*/
Config conf = new Config();
//conf.setDebug(true);
LocalCluster cluster=new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME,conf,builder.createTopology());
// Thread.sleep(1000);
// cluster.shutdown();
}
}
private static final long serialVersionUID = 1L;
/**
* This output collector exposes the API for emitting tuples from an {@link org.apache.storm.topology.IRichSpout}.
* The main difference between this output collector and {@link OutputCollector}
* for {@link org.apache.storm.topology.IRichBolt} is that spouts can tag messages with ids so that they can be
* acked or failed later on. This is the Spout portion of Storm's API to
* guarantee that each message is fully processed at least once.
*/
private SpoutOutputCollector collector;
//private OutputCollector collector;
//准备测试数据
private String[] sentences={
"my dog has fleas",
"i like cold beverages",
"the dog ate my homework",
"don't have a cow man",
"i don't think i like fleas"};
private int index=0;
/**
* private Map<String, StreamInfo> _fields = new HashMap<>();
* public void declareStream(String streamId, boolean direct, Fields fields) {
* if(_fields.containsKey(streamId)) {
* throw new IllegalArgumentException("Fields for " + streamId + " already set");
* }
* _fields.put(streamId, new StreamInfo(fields.toList(), direct));
* }
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentences"));
}
/**
* open方法在ISpout接口中定义,所有Spout组件在初始化时调用这个方法,open()方法接收三个参数
* 一个包含了Storm配置信息的map
* TopologyContext对象提供了topology中组件的信息
* SpoutOutputCollector对象提供了发射tuple的方法
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector=collector;
}
/**
* 是所有spout实现的核心所在,Storm通过调用这个方法向输出的collector发射tuple
*/
public void nextTuple() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.collector.emit(new Values(sentences[index]));
//System.out.println("===============");
index++;
if(index>=sentences.length){
index=0;
}
}
}
public class SplitSentenceBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
}
public void execute(Tuple input) {
String sentence=input.getStringByField("sentences");
String[] words=sentence.split(" ");
for(String word :words){
this.collector.emit(new Values(word));
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("words"));
}
}
public class WordCountBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private HashMap<String,Long> counts=null;
/**
* 通常情况下最好是在构造函数中对基本数据类型和可序列化的对象进行赋值和实例化
* 在prepare()方法中对不可序列化的对象进行实例化
*/
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector=collector;
this.counts=new HashMap<String,Long>();
}
public void execute(Tuple input) {
String word=input.getStringByField("words");
Long count=this.counts.get(word);
if(count==null){
count=0L;
}
count++;
//出现就添加到map中,word相同的,会覆盖掉 所以最后的word就是准确的数据
this.counts.put(word,count);
this.collector.emit(new Values(word,count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
public class ReportBolt extends BaseRichBolt{
private static final long serialVersionUID = 1L;
private HashMap<String,Long> counts=null;
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.counts=new HashMap<String,Long>();
}
public void execute(Tuple input) {
String word=input.getStringByField("word");
Long count=input.getLongByField("count");
this.counts.put(word, count);
System.out.println("--------FINAL COUNTS--------");
List<String> keys=new ArrayList<String>();
keys.addAll(this.counts.keySet());
Collections.sort(keys);
for(String key:keys){
System.out.println(key+":"+this.counts.get(key));
}
System.out.println("----------------------------");
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
}
public class WordCountTopology{
private static final String SENTENCE_SPOUT_ID="sentence-sput";
private static final String SPLIT_BOLT_ID="split-bolt";
private static final String COUNT_BOLT_ID="count-bolt";
private static final String REPORT_BOLT_ID="report-bolt";
private static final String TOPOLOGY_NAME="word-count-topology";
public static void main(String[] args) throws InterruptedException {
SentenceSpout spout=new SentenceSpout();
SplitSentenceBolt splitbolt=new SplitSentenceBolt();
WordCountBolt countbolt=new WordCountBolt();
ReportBolt reportbolt=new ReportBolt();
TopologyBuilder builder=new TopologyBuilder();
// 设置并发为2个executor,每个Task指派各自的executor线程
builder.setSpout(SENTENCE_SPOUT_ID,spout,2);
// 设置并发为2个executor,每个executor执行2个task
builder.setBolt(SPLIT_BOLT_ID,splitbolt,2).setNumTasks(4).shuffleGrouping(SENTENCE_SPOUT_ID);
// 有时候我们需要将特定数据的tuple路由到特殊的bolt实例中,在此我们使用fieldsGrouping
// 来保证所有"word"字段值相同的tuple会被路由到同一个WordCountBolt实例中
builder.setBolt(COUNT_BOLT_ID,countbolt,2).fieldsGrouping(SPLIT_BOLT_ID,new Fields("words"));
builder.setBolt(REPORT_BOLT_ID,reportbolt).globalGrouping(COUNT_BOLT_ID);
/*Map conf=new HashMap();
conf.put(Config.TOPOLOGY_WORKERS,4);
conf.put(Config.TOPOLOGY_DEBUG,true);*/
Config conf = new Config();
//conf.setDebug(true);
LocalCluster cluster=new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME,conf,builder.createTopology());
// Thread.sleep(1000);
// cluster.shutdown();
}
}
发表评论
-
Canal相关理解
2017-12-29 16:18 430转载:http://www.importnew.com/251 ... -
kettle部署
2017-12-26 16:04 6611.将jmbi sql先上生产环境, 参考附件jmbi.sql ... -
crontab定时运行MR不行,手动shell可以执行成功问题排查过程
2017-12-26 15:48 786设置了定时任务,但MR任务没有执行。 第一步:手动执行she ... -
Flume+kafka+Spark Steaming demo2
2017-11-22 13:15 428一,flume配置 # Name the components ... -
Flume+Kafka+Spark Steaming demo
2017-11-21 15:21 405一.准备flume配置 a1.sources = r1 a1. ... -
HBase表导出成HDFS
2017-10-19 19:40 858导出步骤:在old cluster上/opt/cloudera ... -
zepplin实战
2017-10-13 16:10 333一句话介绍Zeppelin 以笔记(Note)的形式展示的数据 ... -
Azkaban安装
2017-10-10 18:32 875一.下载 https://github.com/azkaban ... -
KYKIN安装
2017-09-30 17:35 121. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
KYKIN安装
2017-09-30 17:40 3291. Kylin的一些概念 No. 关键字 解释 1 Kyl ... -
Logstash安装部署配置
2017-04-28 10:24 949为了实现各业务平台日志信息采集到大数据平台hdf ... -
HBASE API
2017-04-18 11:01 440package org.jumore.test; impor ... -
Ambari卸载shell
2017-03-28 17:28 431#!/bin/bash # Program: # uni ... -
linux ssh 相互密码登录
2017-02-22 13:40 3541.修改集群各机器名称 vim /etc/sysconfig/ ... -
Kettle Linux 安装部署
2017-02-15 17:20 1279一.安装JDK环境:根据自己的linux系统选择相应的版本,比 ... -
hadoop环境搭建
2017-01-23 17:31 323192.168.23.231 server1 192.168. ... -
环境安装
2017-01-17 16:26 358物理机部署分配 3台物理机上部署 Zookeeper 3个,F ... -
运行Hadoop jar 第三方jar包依赖
2016-08-22 13:47 961将自己编写的MapReduce程序打包成jar后,在运行 ha ... -
windows10下运行MR错误
2016-07-05 13:45 1568当在windows下运行MR程序时,会报各种错误。现把这次碰到 ... -
HBase问题
2016-06-16 17:02 2511.java.net.UnknownHostException ...
相关推荐
代码参考传智播客课程编写,演示了如何使用storm的spout,bolt,Topology
Storm demo 例子 案例
stormdemo
stormdemo实现不安装storm系列软件的基础上,运行调试storm拓扑。
simple storm demosimple storm demosimple storm demosimple storm demosimple storm demosimple storm demo
storm入门实例,包含源码和jar
storm_Kafka_demo
storm集成kafka插件demo
jstorm storm 入门demo,包含本地模式 和 集群模式。小小的demo,仅包含4个class。
stormdemostorm hello world, 参考自 storm blueprint chapter1最新说明请见:本示例使用storm运行经典的wordcount程序,拓扑如下:sentence-spout—>split-bolt—>count-bolt—>report-bolt分别完成句子的产生、...
简单的storm入门示例,从0到1让你清楚的理解storm.下面是代码示例: import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology...
本demo根据《learning-storm》这本书籍中的实例,改写的。对于初次学习storm的朋友,是理解storm工作流程的很好入门实例
此存储库专用于 Apache Storm 项目和代码示例。 关于风暴 要了解有关 Storm 的更多信息,最好阅读 Storm 官方网页上的。 这是一个很好的指南,并且有一些非常好的链接。 它也不长且易于理解。 运行示例 在做任何事情...
该项目包含3个演示: storm demo kafka demo storm-kafka-demo 你可以很容易地测试这个。 storm-kafka-demo主类是my.storm.kafka.demo.MyKafkaTopology storm demo主类是word.count.topology.WordCountTopology 包...
storm之drpc操作demo示例
Storm JAVA版上手demo下载地址 整理学习Storm过程中的代码和文档 更新中......
我们知道storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用...
storm word count demo 项目下载,spout 继承一个基类,实现接口,这个里面主要是负责从数据源获取数据。
风暴演示演示应用程序。
程序架构包含了kafka收集,storm的实时处理,influxdb的数据存储