一、前言
先有batch,因为trident内部是基于batch来实现的,然后有partition,分区后再分配并发度,然后才能进行并发处理。并发度的分配是利用parallelismHint来实现。
二、实战
main方法
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IOException { FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("a"), new Values("b"), new Values("a"), new Values( "c")); //设置为true,数据源会源源不断发送 spout.setCycle(true); TridentTopology topology = new TridentTopology(); topology.newStream("spout", spout) .shuffle() .partitionAggregate(new Fields("sentence"), new SumWord(), new Fields("sum")) /** * 设置3个并发度,可以理解为3个分区操作 */ .parallelismHint(3) .each(new Fields("sum"), new PrintFilter_partition()); Config config = new Config(); config.setNumWorkers(2); config.setNumAckers(1); config.setDebug(false); StormSubmitter.submitTopology("trident__partition_aggregate", config, topology.build()); }
SumWord:
package com.storm.trident.partitionAggregate.分区聚合; import java.util.HashMap; import java.util.Map; import org.apache.commons.collections.MapUtils; import org.apache.storm.trident.operation.BaseAggregator; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.operation.TridentOperationContext; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.tuple.Values; public class SumWord extends BaseAggregator<Map<String,Integer>> { /** * */ private static final long serialVersionUID = 1L; /** * 属于哪个batch */ private Object batchId; /** * 属于哪个分区 */ private int partitionId; /** * 分区数量 */ private int numPartitions; /** * 用来统计 */ private Map<String,Integer> state; @SuppressWarnings("rawtypes") @Override public void prepare(Map conf, TridentOperationContext context) { state = new HashMap<String,Integer>(); partitionId = context.getPartitionIndex(); numPartitions = context.numPartitions(); } @Override public Map<String, Integer> init(Object batchId, TridentCollector collector) { this.batchId = batchId; return state; } @Override public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) { System.out.println(tuple+";partitionId="+partitionId+";partitions="+numPartitions +",batchId:" + batchId); String word = tuple.getString(0); val.put(word, MapUtils.getInteger(val, word, 0)+1); System.out.println("sumWord:" + val); } @Override public void complete(Map<String, Integer> val, TridentCollector collector) { collector.emit(new Values(val)); } }
打印方法
import org.apache.storm.trident.operation.BaseFilter; import org.apache.storm.trident.tuple.TridentTuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PrintFilter_partition extends BaseFilter { private static final Logger LOGGER = LoggerFactory.getLogger(PrintFilter_partition.class); /** * */ private static final long serialVersionUID = 1L; @Override public boolean isKeep(TridentTuple tuple) { LOGGER.info("打印出来的tuple:" + tuple); return true; } }
测试效果:
2016-12-22 18:39:26.060 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257308:0 2016-12-22 18:39:26.062 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{}] 2016-12-22 18:39:26.066 STDIO [INFO] [b];partitionId=0;partitions=5,batchId:257308:0 2016-12-22 18:39:26.115 STDIO [INFO] sumWord:{a=1} 2016-12-22 18:39:26.116 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257308:0 2016-12-22 18:39:26.117 STDIO [INFO] sumWord:{a=2} 2016-12-22 18:39:26.120 STDIO [INFO] sumWord:{b=1} 2016-12-22 18:39:26.121 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=2}] 2016-12-22 18:39:26.121 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1}] 2016-12-22 18:39:26.196 STDIO [INFO] [c];partitionId=4;partitions=5,batchId:257309:0 2016-12-22 18:39:26.197 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{}] 2016-12-22 18:39:26.198 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1}] 2016-12-22 18:39:26.197 STDIO [INFO] sumWord:{c=1, a=2} 2016-12-22 18:39:26.205 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=2}] 2016-12-22 18:39:26.683 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257310:0 2016-12-22 18:39:26.684 STDIO [INFO] sumWord:{c=1, a=3} 2016-12-22 18:39:26.685 STDIO [INFO] [b];partitionId=0;partitions=5,batchId:257310:0 2016-12-22 18:39:26.687 STDIO [INFO] [a];partitionId=2;partitions=5,batchId:257310:0 2016-12-22 18:39:26.689 STDIO [INFO] sumWord:{a=1} 2016-12-22 18:39:26.691 STDIO [INFO] sumWord:{b=2} 2016-12-22 18:39:26.692 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2}] 2016-12-22 18:39:26.693 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=3}] 2016-12-22 18:39:26.690 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=1}] 2016-12-22 18:39:27.188 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{a=1}] 2016-12-22 18:39:27.190 STDIO [INFO] [c];partitionId=0;partitions=5,batchId:257311:0 2016-12-22 18:39:27.192 STDIO [INFO] sumWord:{b=2, c=1} 2016-12-22 18:39:27.199 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=3}] 2016-12-22 18:39:27.203 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}] 2016-12-22 18:39:27.673 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257312:0 2016-12-22 18:39:27.675 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}] 2016-12-22 18:39:27.674 STDIO [INFO] sumWord:{c=1, a=4} 2016-12-22 18:39:27.677 STDIO [INFO] [b];partitionId=2;partitions=5,batchId:257312:0 2016-12-22 18:39:27.678 STDIO [INFO] sumWord:{b=1, a=1} 2016-12-22 18:39:27.680 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257312:0 2016-12-22 18:39:27.680 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}] 2016-12-22 18:39:27.681 STDIO [INFO] sumWord:{c=1, a=5} 2016-12-22 18:39:27.683 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=1, a=5}] 2016-12-22 18:39:28.227 STDIO [INFO] [c];partitionId=4;partitions=5,batchId:257313:0 2016-12-22 18:39:28.232 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}] 2016-12-22 18:39:28.236 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1}] 2016-12-22 18:39:28.253 STDIO [INFO] sumWord:{c=2, a=5} 2016-12-22 18:39:28.256 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{c=2, a=5}] 2016-12-22 18:39:28.741 STDIO [INFO] [a];partitionId=4;partitions=5,batchId:257314:0 2016-12-22 18:39:28.744 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}] 2016-12-22 18:39:28.743 STDIO [INFO] sumWord:{c=2, a=6} 2016-12-22 18:39:28.748 STDIO [INFO] [a];partitionId=0;partitions=5,batchId:257314:0 2016-12-22 18:39:28.749 STDIO [INFO] sumWord:{b=2, c=1, a=1} 2016-12-22 18:39:28.756 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=1, a=1}] 2016-12-22 18:39:28.755 STDIO [INFO] [b];partitionId=4;partitions=5,batchId:257314:0 2016-12-22 18:39:28.763 STDIO [INFO] sumWord:{b=1, c=2, a=6} 2016-12-22 18:39:28.769 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}] 2016-12-22 18:39:29.218 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}] 2016-12-22 18:39:29.219 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, a=1}] 2016-12-22 18:39:29.221 STDIO [INFO] [c];partitionId=0;partitions=5,batchId:257315:0 2016-12-22 18:39:29.228 STDIO [INFO] sumWord:{b=2, c=2, a=1} 2016-12-22 18:39:29.229 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=2, a=1}] 2016-12-22 18:39:29.689 STDIO [INFO] [b];partitionId=2;partitions=5,batchId:257316:0 2016-12-22 18:39:29.693 STDIO [INFO] sumWord:{b=2, a=1} 2016-12-22 18:39:29.694 STDIO [INFO] [a];partitionId=0;partitions=5,batchId:257316:0 2016-12-22 18:39:29.697 STDIO [INFO] sumWord:{b=2, c=2, a=2} 2016-12-22 18:39:29.704 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}] 2016-12-22 18:39:29.723 STDIO [INFO] [a];partitionId=2;partitions=5,batchId:257316:0 2016-12-22 18:39:29.706 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, c=2, a=2}] 2016-12-22 18:39:29.740 STDIO [INFO] sumWord:{b=2, a=2} 2016-12-22 18:39:29.746 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, a=2}] 2016-12-22 18:39:30.197 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=1, c=2, a=6}] 2016-12-22 18:39:30.199 c.t.s.t.p.分.PrintFilter_partition [INFO] 打印出来的tuple:[{b=2, a=2}]
相关推荐
《Storm Trident API 使用详解》 Storm Trident API 是 Apache Storm 框架中用于构建实时大数据处理应用程序的关键组件。它的核心概念是"Stream",一种无界的数据序列,它被分割成一系列批次(Batch),以便在...
**Storm Trident:分布式流处理框架详解** Storm Trident是Twitter开源的、基于Apache Storm的一个高级抽象,它提供了一种更强大且高效的方式来处理实时数据流。Trident的核心理念是将数据流划分为一系列的小批量...
通过Trident,可以实现更复杂的数据处理逻辑,如分区Trident Spout的开发,以优化数据处理性能。 项目2和3则分别针对省份销售排行和非跳出UV(用户活跃度)进行了实时分析,同样利用了HighCharts进行数据可视化,并...
9. **Trident**:Trident是Storm的一个高级抽象,提供了更强大的状态管理和容错功能,适合复杂的实时处理任务。 以上只是课程大纲的主要内容,实际学习中会涉及到更多细节和实践操作,旨在培养能够熟练运用大数据...
嵌入式系统开发_STM32微控制器_ESP8266WiFi模块_心率传感器_加速度计_OLED显示屏_蓝牙40_低功耗设计_实时操作系统_智能手表_多功能健康监测_运动数据记录_
驾校自动化_网页自动化爬虫技术_Python27多线程HTTP请求模拟_龙泉驾校2014版约车系统自动预约助手_通过模拟登录和循环请求实现自动约车功能_支持失败自动递增车号重试_
Linux系统编程_操作系统内核_系统调用_进程线程_信号处理_文件IO_进程间通信_多线程同步_网络编程_UNIX环境编程_中文翻译勘误_错误修正_代码示例优化_技术文档校对_开
wanjunshe_Python-Tensorflow_12888_1745868924470
scratch少儿编程逻辑思维游戏源码-铅笔画.zip
即时通讯应用开发_基于LeanCloud云服务与Android原生开发_集成QQ第三方登录与即时聊天功能的社交应用_实现用户注册登录创建聊天室发送文字消息展示用户信息头像昵称并提供
scratch少儿编程逻辑思维游戏源码-伞兵大乱斗(云变量).zip
scratch少儿编程逻辑思维游戏源码-楼层酷跑.zip
scratch少儿编程逻辑思维游戏源码-零下之寒颤.zip
scratch少儿编程逻辑思维游戏源码-密室逃生.zip
少儿编程scratch项目源代码文件案例素材-爪猫足球.zip
命令行完成git本地仓库创建、将代码提交到暂存区、查看暂存区信息、将代码提交到本地仓库、将本地仓库关联到远程仓库、推送到远程仓库全过程的截图
少儿编程scratch项目源代码文件案例素材-纸.zip
scratch少儿编程逻辑思维游戏源码-日本冒险.zip
scratch少儿编程逻辑思维游戏源码-狼人杀跑酷.zip
scratch少儿编程逻辑思维游戏源码-史莱姆杀手.zip