一、前言
groupBy分组操作,根据指定属性进行分组,如果后面是aggregate()的话,先根据partitionBy分区,在每个partition上分组,分完组后,在每个分组上进行聚合。
二、实战
main:
public static void main(String[] args) throws Exception {
/**
* 多设置几个并行度,分组后如果分组不够,那么将有并行度空闲跑者
*/
@SuppressWarnings("unchecked")
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("a"), new Values("b"), new Values("a"),new Values("c"),
new Values("c"),new Values("c"),new Values("d"));
spout.setCycle(false);
TridentTopology tridentTopology = new TridentTopology();
tridentTopology
.newStream("spout", spout)
.parallelismHint(3)
.shuffle()
.groupBy(new Fields("sentence"))
.aggregate(new Fields("sentence"), new MyAgg(),
new Fields("Map"))
.parallelismHint(5)
.each(new Fields("sentence","Map"), new MyBolt());
Config config = new Config();
config.setDebug(false);
StormSubmitter.submitTopology("trident_groupby_aggregate_many", config,
tridentTopology.build());
}
MyAgg:
package com.storm.trident.groupby.先分组后聚合; import java.util.HashMap; import java.util.Map; import org.apache.storm.trident.operation.BaseAggregator; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.operation.TridentOperationContext; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Values; public class MyAgg extends BaseAggregator<Map<String, Integer>> { private static final long serialVersionUID = 1L; /** * 属于哪个分区 */ private int partitionId; /** * 分区数量 */ private int numPartitions; @SuppressWarnings("rawtypes") @Override public void prepare(Map conf, TridentOperationContext context) { partitionId = context.getPartitionIndex(); numPartitions = context.numPartitions(); } @Override public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) { String word = tuple.getString(0); Integer value = val.get(word); if (value == null) { value = 0; } value++; // 把数据保存到一个map对象中 val.put(word, value); val.put(word + "属于哪个分区", partitionId); System.out.println("I am partition [" + partitionId + "] and I have kept a tweet by: " + numPartitions); } @Override public void complete(Map<String, Integer> val, TridentCollector collector) { collector.emit(new Values(val)); } @Override public Map<String, Integer> init(Object arg0, TridentCollector arg1) { return new HashMap<String, Integer>(); } }
MyBolt:
package com.storm.trident.groupby.先分组后聚合; import java.util.Map; import java.util.Map.Entry; import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.tuple.TridentTuple; public class MyBolt extends BaseFilter { /** * */ private static final long serialVersionUID = 1L; @SuppressWarnings("unchecked") @Override public boolean isKeep(TridentTuple tuple) { System.out.println("打印出来的tuple:" + tuple); Map<String, Integer> value = ((Map<String,Integer>) tuple.getValue(1)); for (Entry<String, Integer> entry : value.entrySet()) { System.out.println("key:"+ entry.getKey()+",value:" + entry.getValue()); } return false; } }
三、测试
打包在storm集群里跑
查看log日志,主要日志如下
2016-12-22 18:36:11.293 STDIO [INFO] I am partition [3] and I have kept a tweet by: 5 2016-12-22 18:36:11.302 STDIO [INFO] I am partition [4] and I have kept a tweet by: 5 2016-12-22 18:36:11.304 STDIO [INFO] 打印出来的tuple:[b, {b属于哪个分区=4, b=1}] 2016-12-22 18:36:11.306 STDIO [INFO] key:b属于哪个分区,value:4 2016-12-22 18:36:11.317 STDIO [INFO] I am partition [3] and I have kept a tweet by: 5 2016-12-22 18:36:11.321 STDIO [INFO] key:b,value:1 2016-12-22 18:36:11.335 STDIO [INFO] 打印出来的tuple:[a, {a属于哪个分区=3, a=2}] 2016-12-22 18:36:11.341 STDIO [INFO] key:a属于哪个分区,value:3 2016-12-22 18:36:11.344 STDIO [INFO] key:a,value:2 2016-12-22 18:36:11.423 STDIO [INFO] I am partition [0] and I have kept a tweet by: 5 2016-12-22 18:36:11.424 STDIO [INFO] I am partition [0] and I have kept a tweet by: 5 2016-12-22 18:36:11.425 STDIO [INFO] I am partition [0] and I have kept a tweet by: 5 2016-12-22 18:36:11.433 STDIO [INFO] 打印出来的tuple:[c, {c=3, c属于哪个分区=0}] 2016-12-22 18:36:11.433 STDIO [INFO] key:c,value:3 2016-12-22 18:36:11.439 STDIO [INFO] key:c属于哪个分区,value:0 2016-12-22 18:36:11.923 STDIO [INFO] I am partition [1] and I have kept a tweet by: 5 2016-12-22 18:36:11.933 STDIO [INFO] 打印出来的tuple:[d, {d=1, d属于哪个分区=1}] 2016-12-22 18:36:11.939 STDIO [INFO] key:d,value:1 2016-12-22 18:36:11.942 STDIO [INFO] key:d属于哪个分区,value:1
相关推荐
Storm Trident实战之计算网站PV.rar
Storm Trident API 使用详解.docx
Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...
Storm视频教程通过含3个Storm完整项目,均为企业实际项目,其中一个是完全由Storm Trident开发。本课程每个技术均采用最新稳定版本,学完后可以从Kafka到Storm项目开发及HighCharts图表开发一个人搞定!涨工资?身价...
1、课程中完整开发3个Storm项目,均为企业实际项目,其中一个是完全由Storm Trident开发。 项目源码均可以直接运行,也可直接用于商用或企业。 2、Storm全面、系统、深入讲解 3、注重实践,对较抽象难懂的技术点如...
《Storm实战:构建大数据实时计算》是一本系统并且具有实践指导意义的Storm工具书和参考书,对Storm整个技术体系进行了全面的讲解,不仅包括对基本概念、特性的介绍,也涵盖了一些原理说明。 实战性很强,各章节...
storm_Trident例子
三叉戟教程实用的Storm Trident教程本教程以的的出色为基础。 流浪者的设置基于Taylor Goetz的。 Hazelcast状态代码基于wurstmeister的。 看看随附的。本教程的结构浏览Part * .java,了解Trident的基础知识使用...
三叉戟的例子一组用 Storm Trident 编写的应用程序。应用用法建造$ git clone git@github.com:mayconbordin/trident-examples.git$ cd trident-examples/$ mvn -P < profile> package 使用local配置文件以本地模式...
storm_trident_state storm trident自定义state的实现,实现了3种插入数据库的持久化方法,事务,不透明事务,不透明分区事务 下载下面,运行:mvn eclipse:eclipse即可在eclipse使用此项目。
3、掌握Storm Trident项目开发模式; 4、掌握Storm集成Kafka开发及项目实战; 5、掌握HighCharts各类图表开发和实时无刷新加载数据; 6、掌握Storm+Kafka+HBase的经典组合,完整呈现企业实践项目; 7、可以做到以...
三叉戟弹性搜索ElasticSearch 的 Storm Trident 集成层
三叉戟《风暴蓝图:分布式实时计算模式》一书的源码和翻译=============(已完成,待校对)(未开始)(已完成,待校对)(已完成,待校对)(未开始)(未开始)(进行中)(未开始)(未开始)(未开始)
卡夫卡风暴这是从Kafka中读取并在Elastic Search中建立索引的简单Apache Storm Trident拓扑。 ##运行此Storm拓扑所需的设置### 1)Zookeeper。 Download from ...
Storm流计算项目(文档中含有视频下载地址和解压密码),内容包含 storm、trident、kafka、hbase、cdh、hightcharts 等内容
兔子暴风雨一个将原始网络数据包流聚合到Storm数据处理平台中的小实验。 Trident用于数据包分析的连续计算和状态处理。 最后,有一个简单的仪表板可以实时观察分析。要求单节点或多节点Storm集群。 也可以在本地群集...
Storm分布式实时计算模式由Apache Storm 项目核心贡献者吉奥兹、奥尼尔亲笔撰 写,融合了作者丰富的Storm实战经验,通过大量示例,全面而系统地讲解使用Storm进行分布式实 时计算的核心概念及应用,并针对不同的应用...
该库在 Google Cloud Datastore ( ) 之上实现了 Trident 状态。 它支持非事务性、事务性和不透明状态类型。
01.Storm项目实战课程大纲 02.CDH5搭建之CM5安装部署 03.CDH5搭建和CM界面化集群管理 04.Hadoop、HBase、Zookeeper集群管理和角色分配 05.Kafka基础知识和集群搭建 06.Kafka基本操作和最优设置 07.Kafka Java API ...