`

storm-wordCount

阅读更多
单词统计


一、拓扑结构

1.数据来源

2.单词拆分

3.单词计数

4.统计结果

5.拓扑构建

二、代码实现

1.单词来源
package com.study.storm.test.wordcount;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

/**
 * Spout : 数据源
 * 数据的生产者,将数据发送到一个或多个Bolt上进行数据处理
 */
public class SentenceSpout extends BaseRichSpout {

	/**
	 * 
	 */
	private static final long serialVersionUID = -5569170406079406193L;

	private SpoutOutputCollector collector = null ;

	// 数据
	private static String [] sentences = {
		"i am a boy ","i like eat","i do not like ","what"
	};
	
	// 数组的下标,数据发送的 msgId 
	private int index = 0 ;

	/**
	 * 来自 ISpout
	 * 不停的发送数据
	 * 无数据发送,可以睡眠一段时间,避免CPU频繁调用
	 */
	@Override
	public void nextTuple() {
		// 只发送一次
		if(index >= sentences.length){
			return ;
		}
		// 发送的数据内容,数据的msgId 编号(不传,默认为 null )
		collector.emit(new Values(sentences[index]),index);
		// 循环发送,避免数组越界
//		index = index > sentences.length ? 0 : index++ ;
		index ++ ;
	}

	/**
	 * 慎用静态成员变量,线程安全问题
	 * 因为	SpoutOutputCollector 是线程安全的,所以此处的 全局的 collector 可以设置为 static 
	 */
	
	/**
	 * 来自于 ISpout 接口
	 * 组件初始化时调用
	 * @param arg0  配置信息
	 * @param arg1 任务信息
	 * @param arg2 发射数据用的组件,线程安全 
	 */
	@SuppressWarnings("rawtypes")
	@Override
	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
		this.collector = arg2 ; 
	}

	/**
	 * 来自IComponent 
	 * 声明输出用的字段
	 */
	@Override
	public void declareOutputFields(OutputFieldsDeclarer arg0) {
		arg0.declare(new Fields("sentence"));
	}

	@Override
	public void ack(Object msgId) {
		// 发送成功应答
		System.out.println("ack : "+msgId);
	}
	
	@Override
	public void fail(Object msgId) {
		System.out.println("fail : "+msgId);
		// 发送失败:重新发送
		this.collector.emit(new Values(sentences[(Integer)msgId]),msgId);
	}
}




2.单词拆分
package com.study.storm.test.wordcount;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 单词拆分
 */
public class SentenceBolt extends BaseRichBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = -5420313164197072550L;

	private OutputCollector collector ;
	/**
	 * 继承自 IBolt
	 * 初始化的方法,在组件初始化时调用
	 * @param stormConf 当前Bolt的配置信息对象
	 * @param context 当前环境信息对象
	 * @param collector 对外输出 tuple 用的对象
	 */
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple input) {
		try {
            // 通过 field 获取数据,sentence 为 spout 中定义的			
			String sentences = input.getStringByField("sentence");
			String [] words = sentences.split(" ");
			for(String word : words){
				// 锚定:绑定 bolt 与上一级,数据发送状况传递,如果出现问题,方便查找上一级数据的来源
				this.collector.emit(input,new Values(word));
			}
			// 确认发送成功
			collector.ack(input);
		} catch (Exception e) {
			collector.fail(input);
		}
		
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}

}



3.单词计数
package com.study.storm.test.wordcount;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * 单词数量统计
 */
public class WordCountBolt extends BaseRichBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = -4811405807833269368L;

	private OutputCollector collector = null ;

	/**
	 * 线程安全
	 */
	private Map<String,Integer> countMap = new ConcurrentHashMap<String,Integer>();
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector ;
	}

	@Override
	public void execute(Tuple input) {
		try {
			
			String word = input.getStringByField("word");
			countMap.put(word, countMap.containsKey(word) ? countMap.get(word)+1 : 1);
			// 按照 declarer 的顺序发送数据,先单词,再单词数量
			this.collector.emit(input,new Values(word,countMap.get(word)));
			
			collector.ack(input);
		} catch (Exception e) {
			collector.fail(input);
		}
		
		
	
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// 数据的发送数据
		declarer.declare(new Fields("word","count"));
	}

}



4.统计结果
package com.study.storm.test.wordcount;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

public class ResultBolt extends BaseRichBolt {

	/**
	 * 
	 */
	private static final long serialVersionUID = 7436620687730623420L;

	private Map<String,Integer> map = new ConcurrentHashMap<String,Integer>();
	
	private OutputCollector collector = null ;
	
	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector ;
	}

	@Override
	public void execute(Tuple input) {
		try {
			// 通过field 获取数据
			String word = input.getStringByField("word");
			// 不可通过 getStringByField 获取,会报转换异常
			Integer count = (Integer) input.getValueByField("count");
			
			map.put(word, count);
			
			System.out.println(word +" : " + count);
			
			collector.ack(input);
			
		} catch (Exception e) {
			collector.fail(input);
		}
		
		
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

	}

	/**
	 * 程序正常运行结束后运行
	 * 
	 */
	@Override
	public void cleanup() {

		System.out.println("统计结果:");
		for(String key : map.keySet()){
			System.out.println(key + " : " + map.get(key));
		}
	}
}



5.拓扑构建
package com.study.storm.test.wordcount;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountTopology {

	public static void main(String args[]) throws AlreadyAliveException, InvalidTopologyException, InterruptedException{
		
		// 实例化 Spout Bolt 
		SentenceSpout sentenceSpout = new SentenceSpout();
		SentenceBolt sentenceBolt = new SentenceBolt();
		WordCountBolt wordCountBolt = new WordCountBolt();
		ResultBolt resultBolt = new ResultBolt();
		
		// Topology 
		TopologyBuilder builder = new TopologyBuilder();
		/**
		 * 并发情况下考虑如何分发数据
		 * 一、并发级别
		 * node : 对应于 storm 集群中的服务器
		 * worker : 线程级别
		 * executor : 线程级别
		 * task : 
		 */
		builder.setSpout("sentenceSpout", sentenceSpout);
		// 随机分发:即数据可以发送到任意的下一级的处理机器上,而不会对统计结果产生影响
		builder.setBolt("sentenceBolt", sentenceBolt).shuffleGrouping("sentenceSpout");
		// 按照 field = word 分发,相同的 word 传递的数据发送到同一台机器上,避免统计遗漏
		// hash % taskNum 
		// 若此处为随机分发且并行级别大于等于2,单词 a 发送 1机器,统计 a : 1 ,第二次单词发送到 2 机器,a 的数量就会失真
		builder.setBolt("wordCountBolt", wordCountBolt).fieldsGrouping("sentenceBolt", new Fields("word"));
		// 若上一级并行数量多,则无论哪一个处理完毕都发送到同一台处理机器上,此种方式,对并发数量设置无效
		builder.setBolt("resultBolt", resultBolt).globalGrouping("wordCountBolt");
		
		// 生产拓扑
		StormTopology stormTopology = builder.createTopology();
		
		Config config = new Config();
		// 集群运行
//		StormSubmitter.submitTopology("wordCountTopology", config, stormTopology);
	
		LocalCluster local = new LocalCluster();
		local.submitTopology("wordCountTopology", config, stormTopology);
		// 断点调试,调整时间
		Thread.sleep(10*1000);
		local.killTopology("wordCountTopology");
		local.shutdown();
	}
}




分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics