`
liyonghui160com
  • 浏览: 763390 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

storm数据流组

阅读更多

自定义数据流组

你可以通过实现backtype.storm.grouping.CustormStreamGrouping接口创建自定义数据流组,让你自己决定哪些bolt接收哪些元组。

让我们修改单词计数器示例,使首字母相同的单词由同一个bolt接收。

    public class ModuleGrouping mplents CustormStreamGrouping, Serializable{
        int numTasks = 0;

        @Override
        public List<Integer> chooseTasks(List<Object> values) {
            List<Integer> boltIds = new ArrayList<Integer>();
            if(values.size()>0){
                String str = values.get(0).toString();
                if(str.isEmpty()){
                    boltIds.add(0);
                }else{
                    boltIds.add(str.charAt(0) % numTasks);
                }
            }
            return boltIds;
        }

        @Override
        public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
            numTasks = targetTasks.size();
        }
    }

 

这是一个CustomStreamGrouping的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的bolt。

 

按下述方式word-normalizer修改即可使用这个自定义数据流组。

    builder.setBolt("word-normalizer", new WordNormalizer())
           .customGrouping("word-reader", new ModuleGrouping());

 

 

 

 

 

 

直接数据流组

这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组。与前面的例子类似,数据源将根据单词首字母决定由哪个bolt接收元组。要使用直接数据流组,在WordNormalizer bolt中,使用emitDirect方法代替emit。

   

public void execute(Tuple input) {
        ...
        for(String word : words){
            if(!word.isEmpty()){
                ...
                collector.emitDirect(getWordCountIndex(word),new Values(word));
            }
        }
        //对元组做出应答
        collector.ack(input);
    }

    public Integer getWordCountIndex(String word) {
        word = word.trim().toUpperCase();
        if(word.isEmpty()){
            return 0;
        }else{
            return word.charAt(0) % numCounterTasks;
        }
    }

 

在prepare方法中计算任务数

    public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
        this.collector = collector;
        this.numCounterTasks = context.getComponentTasks("word-counter");
    }

 

在拓扑定义中指定数据流将被直接分组:

    builder.setBolt("word-counter", new WordCounter(),2)
           .directGrouping("word-normalizer");

 

 

 

 

分享到:
评论
2 楼 liyonghui160com 2014-09-22  
AKka 写道
谢谢分享!最近在学Storm使用,一直不明白这个流组,看了你的贴子现在有点明白了。


能帮到你就好。
1 楼 AKka 2014-09-19  
谢谢分享!最近在学Storm使用,一直不明白这个流组,看了你的贴子现在有点明白了。

相关推荐

    基于Storm流计算天猫双十一作战室项目实战.docx

    5、掌握HighCharts各类图表开发和实时无刷新加载数据; 6、掌握Storm+Kafka+HBase的经典组合,完整呈现企业实践项目; 7、可以做到以一己之力完成从后台开发(Storm、Kafka、Hbase开发)到前台HighCharts图表开发、...

    大数据架构:flume-ng+Kafka+Storm+HDFS实时系统组合

    直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边...

    漫谈大数据第四期-storm

    Storm也可被用于“连续计算”(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。 Storm的主工程师Nathan Marz表示...

    apache-storm-0.9.6.tar.gz

    Storm为分布式实时计算提供了一组... Storm也可被用于“连续计算”(continuous computation),对数据流做连续查询,在计算时就将结果以流的形式输出给用户。它还可被用于“分布式RPC”,以并行的方式运行昂贵的运算。

    streamparse:在Apache Storm拓扑中运行Python。 Pythonic API,CLI工具和拓扑DSL

    Streamparse使您可以通过Apache Storm针对实时数据流运行Python代码。 使用streamparse,您可以在Python中创建Storm螺栓和喷口,而无需编写任何Java代码。 它还提供了方便的CLI实用程序,用于管理Storm集群和项目。 ...

    Storm实时计算:流操作入门编程实践

    Storm是一个分布式是实时计算系统,它设计了一...这一组计算组件可以按照DAG图的方式编排起来(通过选择StreamGroupings来控制数据流分发流向),从而组合成一个计算逻辑更加负责的对象,那就是Topology。一个Topology

    Storm运行原理探索

    简化了流数据的可靠处理,像Hadoop一样实现实时批处理。Storm很简单,可用于任意编程语言。ApacheStorm采用Clojure开发。Storm有很多应用场景,包括实时数据分析、联机学习、持续计算、分布式RPC、ETL等。Storm速度...

    延云大数据PPD

    Storm流计算能预计算固定的维度、粒度,但业务千变万化,突发事件很多,如何对任意维度的组合进行筛选、钻取、统计? 硬盘坏了,机器宕机,怎样做到数据可靠不丢失? 小型机太贵,我们买不起,怎么办?

    延云YDB安装与使用说明书

    Storm流计算能预计算固定的维度、粒度,但业务千变万化,突发事件很多,如何对任意维度的组合进行筛选、钻取、统计? 硬盘坏了,机器宕机,怎样做到数据可靠不丢失? 小型机太贵,我们买不起,怎么办? YDB特性 1...

    延云YDB-运行程序v1.0.1

    Storm流计算能预计算固定的维度、粒度,但业务千变万化,突发事件很多,如何对任意维度的组合进行筛选、钻取、统计? 硬盘坏了,机器宕机,怎样做到数据可靠不丢失? 小型机太贵,我们买不起,怎么办? YDB特性 1...

    第七章-《大数据导论》大数据处理平台.pdf

    提高执行效率 大数据处理平台技术架构 数据采集层 数据处理层 … 批量采集 网络爬虫 流采集 分布式文 件系统 关系 数据库 NoSQL 数据库 数据存储层 机器学习 数据挖掘 搜索引擎 批量处理引擎 流处理引擎 图处理引擎 ...

    延云YDB安装与使用说明书v0.21.

    Storm流计算能预计算固定的维度、粒度,但业务千变万化,突发事件很多,如何对任意维度的组合进行筛选、钻取、统计? 硬盘坏了,机器宕机,怎样做到数据可靠不丢失? 小型机太贵,我们买不起,怎么办? YDB特性 1. ...

    大数据处理框架.pdf

    ⼤数据处理框架 ⼤数据处理框架 说起⼤数据处理啊,⼀切都起源于Google公司的经典论⽂。在当时(2000年左右),由于⽹页... Apache Storm是⼀种侧重于低延迟的流处理框架,它可以处理海量的接⼊数据,以近实时⽅式处理

    大数据技术概述.pdf

    Pig数据流处 理,数据清洗转换。Mahout数据挖掘的算法库,实现常⽤数据挖掘算法(分类、聚类、回归等),调⽤接⼝,传⼊参数,减少⼯作量,针 对海量数据进⾏数据挖掘分析。Ambari⾃动化的安装部署配置管理Hadoop...

    五种大数据架构简介.pdf

    下⽂将介绍这些框架: · 仅批处理框架: Apache Hadoop · 仅流处理框架: Apache Storm Apache Samza · 混合框架: Apache Spark Apache Flink ⼤数据处理框架是什么? ⼤数据处理框架是什么? 处理框架和处理...

    使用SMACK堆栈进行快速数据分析

    除了能够以批处理模式分析大型数据集之外,现代数据驱动型组织还...在Lambda架构中,一个“慢”大数据处理框架(如Hadoop堆栈)与一个“快速”的流处理框架(如Apache Storm)组合在一起。由快速框架处理的数据或者与

    kafka客户端

    根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的...

    大数据行业分析报告.doc

    也有一 些用户会在导入时使用来自Twitter的Storm来对数据进行流式计算,来满足部分业务的 实时计算需求。 导入与预处理过程的特点和挑战主要是导入的数据量大,每秒钟的导入量经常会达到百 兆,甚至千兆级别。 (3)...

Global site tag (gtag.js) - Google Analytics