一、前言
先有batch,因为trident内部是基于batch来实现的,然后有partition,分区后再分配并发度,然后才能进行并发处理。并发度的分配是利用parallelismHint来实现。
二、实战
main方法
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IOException { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("a"), new Values("b"), new Values("a"), new Values( "c")); //设置为true,数据源会源源不断发送 spout.setCycle(true); TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .shuffle() .partitionAggregate(new Fields("sentence"), new SumWord(), new Fields("sum")) /** * 设置3个并发度,可以理解为3个分区操作 */ .parallelismHint(3) .each(new Fields("sum"), new PrintFilter_partition()); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(1); config.setDebug(false); StormSubmitter.submitTopology("trident__partition_aggregate", config, topology.build()); }
SumWord:
package com.storm.trident.partitionAggregate.分区聚合; import java.util.HashMap; import java.util.Map; import org.apache.commons.collections.MapUtils; import org.apache.storm.trident.operation.BaseAggregator; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.operation.TridentOperationContext; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Values; public class SumWord extends BaseAggregator<Map<String,Integer>> { /** * */ private static final long serialVersionUID = 1L; /** * 属于哪个batch */ private Object batchId; /** * 属于哪个分区 */ private int partitionId; /** * 分区数量 */ private int numPartitions; /** * 用来统计 */ private Map<String,Integer> state; @SuppressWarnings("rawtypes") @Override public void prepare(Map conf, TridentOperationContext context) { state = new HashMap<String,Integer>(); partitionId = context.getPartitionIndex(); numPartitions = context.numPartitions(); } @Override public Map<String, Integer> init(Object batchId, TridentCollector collector) { this.batchId = batchId; return state; } @Override public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) { System.out.println(tuple+";partitionId="+partitionId+";partitions="+numPartitions +",batchId:" + batchId); String word = tuple.getString(0); val.put(word, MapUtils.getInteger(val, word, 0)+1); System.out.println("sumWord:" + val); } @Override public void complete(Map<String, Integer> val, TridentCollector collector) { collector.emit(new Values(val)); } }
打印方法
import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.tuple.TridentTuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PrintFilter_partition extends BaseFilter { private static final Logger LOGGER = LoggerFactory.getLogger(PrintFilter_partition.class); /** * */ private static final long serialVersionUID = 1L; @Override public boolean isKeep(TridentTuple tuple) { LOGGER.info("打印出来的tuple:" + tuple); return true; } }
测试效果:
2016-12-22 18:39:26.060 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257308:0 2016-12-22 18:39:26.062 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{}] 2016-12-22 18:39:26.066 STDIO [INFO] [b];partitionId=0;partitions=5,batchId:257308:0 2016-12-22 18:39:26.115 STDIO [INFO] sumWord:{a=1} 2016-12-22 18:39:26.116 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257308:0 2016-12-22 18:39:26.117 STDIO [INFO] sumWord:{a=2} 2016-12-22 18:39:26.120 STDIO [INFO] sumWord:{b=1} 2016-12-22 18:39:26.121 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=2}] 2016-12-22 18:39:26.121 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1}] 2016-12-22 18:39:26.196 STDIO [INFO] [c];partitionId=4;partitions=5,batchId:257309:0 2016-12-22 18:39:26.197 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{}] 2016-12-22 18:39:26.198 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1}] 2016-12-22 18:39:26.197 STDIO [INFO] sumWord:{c=1, a=2} 2016-12-22 18:39:26.205 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=2}] 2016-12-22 18:39:26.683 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257310:0 2016-12-22 18:39:26.684 STDIO [INFO] sumWord:{c=1, a=3} 2016-12-22 18:39:26.685 STDIO [INFO] [b];partitionId=0;partitions=5,batchId:257310:0 2016-12-22 18:39:26.687 STDIO [INFO] [a];partitionId=2;partitions=5,batchId:257310:0 2016-12-22 18:39:26.689 STDIO [INFO] sumWord:{a=1} 2016-12-22 18:39:26.691 STDIO [INFO] sumWord:{b=2} 2016-12-22 18:39:26.692 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2}] 2016-12-22 18:39:26.693 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=3}] 2016-12-22 18:39:26.690 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=1}] 2016-12-22 18:39:27.188 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=1}] 2016-12-22 18:39:27.190 STDIO [INFO] [c];partitionId=0;partitions=5,batchId:257311:0 2016-12-22 18:39:27.192 STDIO [INFO] sumWord:{b=2, c=1} 2016-12-22 18:39:27.199 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=3}] 2016-12-22 18:39:27.203 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}] 2016-12-22 18:39:27.673 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257312:0 2016-12-22 18:39:27.675 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}] 2016-12-22 18:39:27.674 STDIO [INFO] sumWord:{c=1, a=4} 2016-12-22 18:39:27.677 STDIO [INFO] [b];partitionId=2;partitions=5,batchId:257312:0 2016-12-22 18:39:27.678 STDIO [INFO] sumWord:{b=1, a=1} 2016-12-22 18:39:27.680 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257312:0 2016-12-22 18:39:27.680 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}] 2016-12-22 18:39:27.681 STDIO [INFO] sumWord:{c=1, a=5} 2016-12-22 18:39:27.683 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=5}] 2016-12-22 18:39:28.227 STDIO [INFO] [c];partitionId=4;partitions=5,batchId:257313:0 2016-12-22 18:39:28.232 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}] 2016-12-22 18:39:28.236 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}] 2016-12-22 18:39:28.253 STDIO [INFO] sumWord:{c=2, a=5} 2016-12-22 18:39:28.256 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=2, a=5}] 2016-12-22 18:39:28.741 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257314:0 2016-12-22 18:39:28.744 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}] 2016-12-22 18:39:28.743 STDIO [INFO] sumWord:{c=2, a=6} 2016-12-22 18:39:28.748 STDIO [INFO] [a];partitionId=0;partitions=5,batchId:257314:0 2016-12-22 18:39:28.749 STDIO [INFO] sumWord:{b=2, c=1, a=1} 2016-12-22 18:39:28.756 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1, a=1}] 2016-12-22 18:39:28.755 STDIO [INFO] [b];partitionId=4;partitions=5,batchId:257314:0 2016-12-22 18:39:28.763 STDIO [INFO] sumWord:{b=1, c=2, a=6} 2016-12-22 18:39:28.769 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}] 2016-12-22 18:39:29.218 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}] 2016-12-22 18:39:29.219 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}] 2016-12-22 18:39:29.221 STDIO [INFO] [c];partitionId=0;partitions=5,batchId:257315:0 2016-12-22 18:39:29.228 STDIO [INFO] sumWord:{b=2, c=2, a=1} 2016-12-22 18:39:29.229 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=2, a=1}] 2016-12-22 18:39:29.689 STDIO [INFO] [b];partitionId=2;partitions=5,batchId:257316:0 2016-12-22 18:39:29.693 STDIO [INFO] sumWord:{b=2, a=1} 2016-12-22 18:39:29.694 STDIO [INFO] [a];partitionId=0;partitions=5,batchId:257316:0 2016-12-22 18:39:29.697 STDIO [INFO] sumWord:{b=2, c=2, a=2} 2016-12-22 18:39:29.704 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}] 2016-12-22 18:39:29.723 STDIO [INFO] [a];partitionId=2;partitions=5,batchId:257316:0 2016-12-22 18:39:29.706 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=2, a=2}] 2016-12-22 18:39:29.740 STDIO [INFO] sumWord:{b=2, a=2} 2016-12-22 18:39:29.746 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, a=2}] 2016-12-22 18:39:30.197 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}] 2016-12-22 18:39:30.199 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, a=2}]
相关推荐
Storm Trident实战之计算网站PV.rar
Storm Trident API 使用详解.docx
Storm流计算从入门到精通之技术篇(高并发策略、批处理事务、Trident精解、运维监控、企业场景) Storm入门教程 之Storm原理和概念详解,出自Storm流计算从入门到精通之技术篇,Storm入门视频教程用到技术:Storm集群...
Storm视频教程通过含3个Storm完整项目,均为企业实际项目,其中一个是完全由Storm Trident开发。本课程每个技术均采用最新稳定版本,学完后可以从Kafka到Storm项目开发及HighCharts图表开发一个人搞定!涨工资?身价...
1、课程中完整开发3个Storm项目,均为企业实际项目,其中一个是完全由Storm Trident开发。 项目源码均可以直接运行,也可直接用于商用或企业。 2、Storm全面、系统、深入讲解 3、注重实践,对较抽象难懂的技术点如...
《Storm实战:构建大数据实时计算》是一本系统并且具有实践指导意义的Storm工具书和参考书,对Storm整个技术体系进行了全面的讲解,不仅包括对基本概念、特性的介绍,也涵盖了一些原理说明。 实战性很强,各章节...
storm_Trident例子
三叉戟教程实用的Storm Trident教程本教程以的的出色为基础。 流浪者的设置基于Taylor Goetz的。 Hazelcast状态代码基于wurstmeister的。 看看随附的。本教程的结构浏览Part * .java,了解Trident的基础知识使用...
三叉戟的例子一组用 Storm Trident 编写的应用程序。应用用法建造$ git clone git@github.com:mayconbordin/trident-examples.git$ cd trident-examples/$ mvn -P < profile> package 使用local配置文件以本地模式...
storm_trident_state storm trident自定义state的实现,实现了3种插入数据库的持久化方法,事务,不透明事务,不透明分区事务 下载下面,运行:mvn eclipse:eclipse即可在eclipse使用此项目。
3、掌握Storm Trident项目开发模式; 4、掌握Storm集成Kafka开发及项目实战; 5、掌握HighCharts各类图表开发和实时无刷新加载数据; 6、掌握Storm+Kafka+HBase的经典组合,完整呈现企业实践项目; 7、可以做到以...
三叉戟弹性搜索ElasticSearch 的 Storm Trident 集成层
三叉戟《风暴蓝图:分布式实时计算模式》一书的源码和翻译=============(已完成,待校对)(未开始)(已完成,待校对)(已完成,待校对)(未开始)(未开始)(进行中)(未开始)(未开始)(未开始)
卡夫卡风暴这是从Kafka中读取并在Elastic Search中建立索引的简单Apache Storm Trident拓扑。 ##运行此Storm拓扑所需的设置### 1)Zookeeper。 Download from ...
兔子暴风雨一个将原始网络数据包流聚合到Storm数据处理平台中的小实验。 Trident用于数据包分析的连续计算和状态处理。 最后,有一个简单的仪表板可以实时观察分析。要求单节点或多节点Storm集群。 也可以在本地群集...
Storm流计算项目(文档中含有视频下载地址和解压密码),内容包含 storm、trident、kafka、hbase、cdh、hightcharts 等内容
22.项目1-地区销售额-项目需求分析和分区Trident Spout开发 23.项目1-地区销售额-Trident代码开发一 24.项目1-地区销售额-Trident代码开发二 25.项目1-地区销售额-基于HBase存储的State运用 26.项目2-省份销售排行-...
Storm分布式实时计算模式由Apache Storm 项目核心贡献者吉奥兹、奥尼尔亲笔撰 写,融合了作者丰富的Storm实战经验,通过大量示例,全面而系统地讲解使用Storm进行分布式实 时计算的核心概念及应用,并针对不同的应用...
22.项目1-地区销售额-项目需求分析和分区Trident Spout开发 23.项目1-地区销售额-Trident代码开发一 24.项目1-地区销售额-Trident代码开发二 25.项目1-地区销售额-基于HBase存储的State运用 26.项目2-省份销售排行-...