自定义数据流组
你可以通过实现backtype.storm.grouping.CustormStreamGrouping接口创建自定义数据流组,让你自己决定哪些bolt接收哪些元组。
让我们修改单词计数器示例,使首字母相同的单词由同一个bolt接收。
public class ModuleGrouping mplents CustormStreamGrouping, Serializable{ int numTasks = 0; @Override public List<Integer> chooseTasks(List<Object> values) { List<Integer> boltIds = new ArrayList<Integer>(); if(values.size()>0){ String str = values.get(0).toString(); if(str.isEmpty()){ boltIds.add(0); }else{ boltIds.add(str.charAt(0) % numTasks); } } return boltIds; } @Override public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) { numTasks = targetTasks.size(); } }
这是一个CustomStreamGrouping的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的bolt。
按下述方式word-normalizer修改即可使用这个自定义数据流组。
builder.setBolt("word-normalizer", new WordNormalizer()) .customGrouping("word-reader", new ModuleGrouping());
直接数据流组
这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。要使用直接数据流组,在WordNormalizer bolt中,使用emitDirect方法代替emit。
public void execute(Tuple input) { ... for(String word : words){ if(!word.isEmpty()){ ... collector.emitDirect(getWordCountIndex(word),new Values(word)); } } //对元组做出应答 collector.ack(input); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if(word.isEmpty()){ return 0; }else{ return word.charAt(0) % numCounterTasks; } }
在prepare方法中计算任务数
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.numCounterTasks = context.getComponentTasks("word-counter"); }
在拓扑定义中指定数据流将被直接分组:
builder.setBolt("word-counter", new WordCounter(),2) .directGrouping("word-normalizer");
相关推荐
5、掌握HighCharts各类图表开发和实时无刷新加载数据; 6、掌握Storm+Kafka+HBase的经典组合,完整呈现企业实践项目; 7、可以做到以一己之力完成从后台开发(Storm、Kafka、Hbase开发)到前台HighCharts图表开发、...
直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边...
Storm也可被用于“连续计算”(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。 Storm的主工程师Nathan Marz表示...
Storm为分布式实时计算提供了一组... Storm也可被用于“连续计算”(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。
Streamparse使您可以通过Apache Storm针对实时数据流运行Python代码。 使用streamparse,您可以在Python中创建Storm螺栓和喷口,而无需编写任何Java代码。 它还提供了方便的CLI实用程序,用于管理Storm集群和项目。 ...
Storm是一个分布式是实时计算系统,它设计了一...这一组计算组件可以按照DAG图的方式编排起来(通过选择StreamGroupings来控制数据流分发流向),从而组合成一个计算逻辑更加负责的对象,那就是Topology。一个Topology
简化了流数据的可靠处理,像Hadoop一样实现实时批处理。Storm很简单,可用于任意编程语言。ApacheStorm采用Clojure开发。Storm有很多应用场景,包括实时数据分析、联机学习、持续计算、分布式RPC、ETL等。Storm速度...
Storm流计算能预计算固定的维度、粒度,但业务千变万化,突发事件很多,如何对任意维度的组合进行筛选、钻取、统计? 硬盘坏了,机器宕机,怎样做到数据可靠不丢失? 小型机太贵,我们买不起,怎么办?
Storm流计算能预计算固定的维度、粒度,但业务千变万化,突发事件很多,如何对任意维度的组合进行筛选、钻取、统计? 硬盘坏了,机器宕机,怎样做到数据可靠不丢失? 小型机太贵,我们买不起,怎么办? YDB特性 1...
Storm流计算能预计算固定的维度、粒度,但业务千变万化,突发事件很多,如何对任意维度的组合进行筛选、钻取、统计? 硬盘坏了,机器宕机,怎样做到数据可靠不丢失? 小型机太贵,我们买不起,怎么办? YDB特性 1...
提高执行效率 大数据处理平台技术架构 数据采集层 数据处理层 … 批量采集 网络爬虫 流采集 分布式文 件系统 关系 数据库 NoSQL 数据库 数据存储层 机器学习 数据挖掘 搜索引擎 批量处理引擎 流处理引擎 图处理引擎 ...
Storm流计算能预计算固定的维度、粒度,但业务千变万化,突发事件很多,如何对任意维度的组合进行筛选、钻取、统计? 硬盘坏了,机器宕机,怎样做到数据可靠不丢失? 小型机太贵,我们买不起,怎么办? YDB特性 1. ...
⼤数据处理框架 ⼤数据处理框架 说起⼤数据处理啊,⼀切都起源于Google公司的经典论⽂。在当时(2000年左右),由于⽹页... Apache Storm是⼀种侧重于低延迟的流处理框架,它可以处理海量的接⼊数据,以近实时⽅式处理
Pig数据流处 理,数据清洗转换。Mahout数据挖掘的算法库,实现常⽤数据挖掘算法(分类、聚类、回归等),调⽤接⼝,传⼊参数,减少⼯作量,针 对海量数据进⾏数据挖掘分析。Ambari⾃动化的安装部署配置管理Hadoop...
下⽂将介绍这些框架: · 仅批处理框架: Apache Hadoop · 仅流处理框架: Apache Storm Apache Samza · 混合框架: Apache Spark Apache Flink ⼤数据处理框架是什么? ⼤数据处理框架是什么? 处理框架和处理...
除了能够以批处理模式分析大型数据集之外,现代数据驱动型组织还...在Lambda架构中,一个“慢”大数据处理框架(如Hadoop堆栈)与一个“快速”的流处理框架(如Apache Storm)组合在一起。由快速框架处理的数据或者与
根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的...
也有一 些用户会在导入时使用来自Twitter的Storm来对数据进行流式计算,来满足部分业务的 实时计算需求。 导入与预处理过程的特点和挑战主要是导入的数据量大,每秒钟的导入量经常会达到百 兆,甚至千兆级别。 (3)...