- 浏览: 198627 次
- 性别:
- 来自: 哈尔滨
文章分类
- 全部博客 (267)
- java.lang (8)
- 问题汇总 (21)
- 异常记录 (20)
- 功能实现 (19)
- 面试总结 (25)
- 技巧总结 (8)
- 常用代码 (4)
- 编程习惯 (3)
- 编码规则 (3)
- java.util (10)
- java.io (1)
- JavaWeb (9)
- MySQL (16)
- SVN (3)
- MyBatis (11)
- Velocity (7)
- 其他知识 (10)
- 人生哲理 (1)
- 人生故事 (1)
- 自我感悟 (1)
- shiro (3)
- 基础知识 (0)
- 问题总结 (1)
- Spring 标签 (1)
- Spring (3)
- 点滴生活 (1)
- DOS (1)
- CAS (4)
- Linux (9)
- Storm (6)
- Shell (1)
- regex (1)
- Collection (4)
- poi (1)
- 经典语句 (1)
- NIO (5)
- concurrent (14)
- RPC (1)
- zookeeper (3)
- 待整理 (2)
- Hadoop (9)
- RabbitMq (2)
- flume (1)
- hive (7)
- hbase (4)
- kafka (1)
- scala (1)
- GC (0)
- java.util.concurrent.atomic (1)
- java.lang.ref (6)
- JVM (2)
- algorithm (1)
- conception (1)
- java key word (1)
- sun.misc (1)
最新评论
单词统计
一、拓扑结构
1.数据来源
2.单词拆分
3.单词计数
4.统计结果
5.拓扑构建
二、代码实现
1.单词来源
package com.study.storm.test.wordcount; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; /** * Spout : 数据源 * 数据的生产者,将数据发送到一个或多个Bolt上进行数据处理 */ public class SentenceSpout extends BaseRichSpout { /** * */ private static final long serialVersionUID = -5569170406079406193L; private SpoutOutputCollector collector = null ; // 数据 private static String [] sentences = { "i am a boy ","i like eat","i do not like ","what" }; // 数组的下标,数据发送的 msgId private int index = 0 ; /** * 来自 ISpout * 不停的发送数据 * 无数据发送,可以睡眠一段时间,避免CPU频繁调用 */ @Override public void nextTuple() { // 只发送一次 if(index >= sentences.length){ return ; } // 发送的数据内容,数据的msgId 编号(不传,默认为 null ) collector.emit(new Values(sentences[index]),index); // 循环发送,避免数组越界 // index = index > sentences.length ? 0 : index++ ; index ++ ; } /** * 慎用静态成员变量,线程安全问题 * 因为 SpoutOutputCollector 是线程安全的,所以此处的 全局的 collector 可以设置为 static */ /** * 来自于 ISpout 接口 * 组件初始化时调用 * @param arg0 配置信息 * @param arg1 任务信息 * @param arg2 发射数据用的组件,线程安全 */ @SuppressWarnings("rawtypes") @Override public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) { this.collector = arg2 ; } /** * 来自IComponent * 声明输出用的字段 */ @Override public void declareOutputFields(OutputFieldsDeclarer arg0) { arg0.declare(new Fields("sentence")); } @Override public void ack(Object msgId) { // 发送成功应答 System.out.println("ack : "+msgId); } @Override public void fail(Object msgId) { System.out.println("fail : "+msgId); // 发送失败:重新发送 this.collector.emit(new Values(sentences[(Integer)msgId]),msgId); } }
2.单词拆分
package com.study.storm.test.wordcount; import java.util.Map; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 单词拆分 */ public class SentenceBolt extends BaseRichBolt { /** * */ private static final long serialVersionUID = -5420313164197072550L; private OutputCollector collector ; /** * 继承自 IBolt * 初始化的方法,在组件初始化时调用 * @param stormConf 当前Bolt的配置信息对象 * @param context 当前环境信息对象 * @param collector 对外输出 tuple 用的对象 */ @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { try { // 通过 field 获取数据,sentence 为 spout 中定义的 String sentences = input.getStringByField("sentence"); String [] words = sentences.split(" "); for(String word : words){ // 锚定:绑定 bolt 与上一级,数据发送状况传递,如果出现问题,方便查找上一级数据的来源 this.collector.emit(input,new Values(word)); } // 确认发送成功 collector.ack(input); } catch (Exception e) { collector.fail(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
3.单词计数
package com.study.storm.test.wordcount; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; /** * 单词数量统计 */ public class WordCountBolt extends BaseRichBolt { /** * */ private static final long serialVersionUID = -4811405807833269368L; private OutputCollector collector = null ; /** * 线程安全 */ private Map<String,Integer> countMap = new ConcurrentHashMap<String,Integer>(); @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector ; } @Override public void execute(Tuple input) { try { String word = input.getStringByField("word"); countMap.put(word, countMap.containsKey(word) ? countMap.get(word)+1 : 1); // 按照 declarer 的顺序发送数据,先单词,再单词数量 this.collector.emit(input,new Values(word,countMap.get(word))); collector.ack(input); } catch (Exception e) { collector.fail(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 数据的发送数据 declarer.declare(new Fields("word","count")); } }
4.统计结果
package com.study.storm.test.wordcount; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; public class ResultBolt extends BaseRichBolt { /** * */ private static final long serialVersionUID = 7436620687730623420L; private Map<String,Integer> map = new ConcurrentHashMap<String,Integer>(); private OutputCollector collector = null ; @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector ; } @Override public void execute(Tuple input) { try { // 通过field 获取数据 String word = input.getStringByField("word"); // 不可通过 getStringByField 获取,会报转换异常 Integer count = (Integer) input.getValueByField("count"); map.put(word, count); System.out.println(word +" : " + count); collector.ack(input); } catch (Exception e) { collector.fail(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } /** * 程序正常运行结束后运行 * */ @Override public void cleanup() { System.out.println("统计结果:"); for(String key : map.keySet()){ System.out.println(key + " : " + map.get(key)); } } }
5.拓扑构建
package com.study.storm.test.wordcount; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; public class WordCountTopology { public static void main(String args[]) throws AlreadyAliveException, InvalidTopologyException, InterruptedException{ // 实例化 Spout Bolt SentenceSpout sentenceSpout = new SentenceSpout(); SentenceBolt sentenceBolt = new SentenceBolt(); WordCountBolt wordCountBolt = new WordCountBolt(); ResultBolt resultBolt = new ResultBolt(); // Topology TopologyBuilder builder = new TopologyBuilder(); /** * 并发情况下考虑如何分发数据 * 一、并发级别 * node : 对应于 storm 集群中的服务器 * worker : 线程级别 * executor : 线程级别 * task : */ builder.setSpout("sentenceSpout", sentenceSpout); // 随机分发:即数据可以发送到任意的下一级的处理机器上,而不会对统计结果产生影响 builder.setBolt("sentenceBolt", sentenceBolt).shuffleGrouping("sentenceSpout"); // 按照 field = word 分发,相同的 word 传递的数据发送到同一台机器上,避免统计遗漏 // hash % taskNum // 若此处为随机分发且并行级别大于等于2,单词 a 发送 1机器,统计 a : 1 ,第二次单词发送到 2 机器,a 的数量就会失真 builder.setBolt("wordCountBolt", wordCountBolt).fieldsGrouping("sentenceBolt", new Fields("word")); // 若上一级并行数量多,则无论哪一个处理完毕都发送到同一台处理机器上,此种方式,对并发数量设置无效 builder.setBolt("resultBolt", resultBolt).globalGrouping("wordCountBolt"); // 生产拓扑 StormTopology stormTopology = builder.createTopology(); Config config = new Config(); // 集群运行 // StormSubmitter.submitTopology("wordCountTopology", config, stormTopology); LocalCluster local = new LocalCluster(); local.submitTopology("wordCountTopology", config, stormTopology); // 断点调试,调整时间 Thread.sleep(10*1000); local.killTopology("wordCountTopology"); local.shutdown(); } }
相关推荐
storm-wordcount例子 storm-wordcount例子 storm-wordcount例子 storm-wordcount例子
Storm-Java项目 ,storm_wordcount,英语单词统计
Storm的WordCount实例。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
storm之WordCount示例Java代码
风暴原型 这是一个简单的项目,可... mvn原型:generate -DarchetypeGroupId = org.apache.storm -DarchetypeArtifactId = storm-archetype -DarchetypeVersion = 1.0-SNAPSHOT -DgroupId =(your-group)-DartifactI
该项目已 MOVED 它现在是 Apache 项目的一部分,您可以在找到它
Storm本地模式【亲测可用】基于Java版本的Storm WordCount
主分支: 风暴-cli Apache Storm 的 cli 字数示例 set topology.name=test_topology; set storm.jar=./jstorm-example-0.9.0.jar;...REGISTER count=BOLT(1, "storm.starter.WordCountTopology$WordCount").FIE
本项目主要实现的功能是:统计单词的个数 jdk1.8 jstorm2.2.1 执行步骤: 1. 本地正确安装maven 2. 本地正确安装zookeeper,并启动 3. Idea导入项目源码,以...4. 可分别运行random或wordcount下topology下的main类
学生在线记录管理(STORM)系统是MySQL的数据库接口,用于保存大型课程的记录。 它维护和计算标记,并维护讲座,教程和实验室的部分/小组作业。
test_Storm_wordCount
wordcount 项目说明 WordCount, 一个Storm入门实例。 实现了如下的流程: 抓取ChinaDaily的网页内容作为数据源;对数据进行分词处理,按词频排序并打印排序结果。 相关信息 作者:robin 博客地址:
Flink入门及实战最新内容分享,包含Flink基本原理及应用场景、Flink vs storm vs sparkStreaming、Flink入门案例-wordCount、Flink集群安装部署standalone+yarn、Flink-HA高可用、Flink scala shell代码调试
04-wordcount的编写和提交集群运行.avi 05-mr程序的本地运行模式.avi 06-job提交的逻辑及YARN框架的技术机制.avi 07-MR程序的几种提交运行模式.avi 08-YARN的通用性意义.avi 09-yarn的job提交流程.avi 第四...
storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行,storm学习入门的例子,100%可运行.
linux实验环境,storm搭建完毕后的开发。eclipse开发环境,大数据界hello world——wordcount详解,bolt、分组机制、storm DRPC详解
实验结果表明,在包含有8个工作节点的WordCount基准测试中,TSAW-Storm的系统延迟和节点间数据流大小相比Storm默认调度算法分别降低了30.0%和32.9%,且各工作节点的CPU负载标准差仅为Storm默认调度算法的25.8% ;...
stormdemostorm hello world, 参考自 storm blueprint chapter1最新说明请见:本示例使用storm运行经典的wordcount程序,拓扑如下:sentence-spout—>split-bolt—>count-bolt—>report-bolt分别完成句子的产生、...
15.flink-编写java版wordcount程序.mp4 16.flink集群运行模式-从IDEA中导出jar包.mp4 17.检查flink集群运行状况-传递jar到centos.mp4 18.启动centos的nc服务器地址-端口指定9999.mp4 19.flink程序运行-执行结果查看-...
用 JavaScript 编写 Storm 拓扑。 基本示例 import WordCount from './wordcount' ; import SplitSentence from './splitsentence' ; import RandomSentence from './randomsentence' ; import Cyclone , { ...