- 浏览: 53724 次
- 性别:
- 来自: 北京
文章分类
最新评论
一、ITridentSpout
基于事务
static interface ITridentSpout.BatchCoordinator<X>
static interface ITridentSpout.Emitter<X>
接口类的实现和之前事务ITransactionalSpout 非常类似。
二、调用链用于执行多个聚合
如果想同事执行多个聚合,可以使用如下的调用链
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
这个代码将会在每个分区上执行count和sum聚合。输出将包含【“count”,“sum”】字段。
三、投影(projection)
投影操作是对数据上进行列裁剪。
如果你有一个流有【“a”,“b”,“c”,“d”】四个字段,执行下面的代码:
mystream.project(new Fields("b","d"));
输出流将只有【“b”,“d”】两个字段。
四、重分区(repartition)操作
重分区操作是通过一个函数改变元组(tuple)在task之间的分布, 重分区(repatition)需要网络传输,目的是方便聚合或查询。如下是重分区函数:
1. Shuffle:与hadoop一样,把同步的tuple放在一个分区
2. Broadcast:每个元组重复的发送到所有的目标分区。这个在DRPC中很有用。 如果你想做在每个分区上做一个statequery。
3. paritionBy:根据一系列分发字段(fields)做一个语义的分区。通过对这些字 段取hash值并对目标分区数取模获取目标分区。paritionBy保证相同的分发 字段(fields)分发到相同的目标分区。
4. global:所有的tuple分发到相同的分区。
5. batchGobal:本批次的所有tuple发送到相同的分区,不通批次可以在不通的分 区。
6. patition:这个函数接受用户自定义的分区函数。用户自定义函数事项 backtype.storm.grouping.CustomStreamGrouping接口。
五、合并和关联
合并(merge)多个流成为一个流,可以如下:
topology.merge(stream1, stream2, stream3);
Trident合并的流字段会以第一个流的字段命名。
另一个合并流的方法是join。类似SQL的join都是对固定输入的。而流的输入是不固定的,所以不能按照sql的方法做join。
Trident中的join只会在spout发出的每个批次间进行。
如一个流包含字段【“key”,“val1”,“val2”】,
另一个流包含字段【“x”,“val1”】:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key","a","b","c"));
Stream1的“key”和stream2的“x”关联,Trident要求所有的字段要改名字。
1. 首先是join字段。例子中stream1中的“key”对应stream2中的“x”。
2. 接下来,会把非join字段依次列出来,排列顺序按照传给join的顺序。例子中“a”,“b”对应stream1中的“val1”和“wal2”,“c”对应stream2中的“val1”。
六、FirstN
取Top N
用法:
stream.applyAssembly(new FirstN(TOP_N, "sortField", true));
Trident适合做汇总型,不大适合做去重型
基于事务
static interface ITridentSpout.BatchCoordinator<X>
static interface ITridentSpout.Emitter<X>
接口类的实现和之前事务ITransactionalSpout 非常类似。
二、调用链用于执行多个聚合
topology.newDRPCStream("top", drpc).each(new Fields("args"), new Split(“ ”), new Fields("time")).parallelismHint(5).stateQuery(myStates,new Fields("time"),new QueryPacketDB(),new Fields("srcip", "byt", "pkt")).groupBy(new Fields("srcip")).chainedAgg().aggregate(new Fields("byt"), new Sum(), new Fields("yt")).aggregate(new Fields("pkt"), new Sum(), new Fields("kt")).chainEnd().applyAssembly(new FirstN(10, "yt", true));
如果想同事执行多个聚合,可以使用如下的调用链
mystream.chainedAgg()
.partitionAggregate(new Count(), new Fields("count"))
.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
.chainEnd()
这个代码将会在每个分区上执行count和sum聚合。输出将包含【“count”,“sum”】字段。
三、投影(projection)
投影操作是对数据上进行列裁剪。
如果你有一个流有【“a”,“b”,“c”,“d”】四个字段,执行下面的代码:
mystream.project(new Fields("b","d"));
输出流将只有【“b”,“d”】两个字段。
四、重分区(repartition)操作
重分区操作是通过一个函数改变元组(tuple)在task之间的分布, 重分区(repatition)需要网络传输,目的是方便聚合或查询。如下是重分区函数:
1. Shuffle:与hadoop一样,把同步的tuple放在一个分区
2. Broadcast:每个元组重复的发送到所有的目标分区。这个在DRPC中很有用。 如果你想做在每个分区上做一个statequery。
3. paritionBy:根据一系列分发字段(fields)做一个语义的分区。通过对这些字 段取hash值并对目标分区数取模获取目标分区。paritionBy保证相同的分发 字段(fields)分发到相同的目标分区。
4. global:所有的tuple分发到相同的分区。
5. batchGobal:本批次的所有tuple发送到相同的分区,不通批次可以在不通的分 区。
6. patition:这个函数接受用户自定义的分区函数。用户自定义函数事项 backtype.storm.grouping.CustomStreamGrouping接口。
五、合并和关联
合并(merge)多个流成为一个流,可以如下:
topology.merge(stream1, stream2, stream3);
Trident合并的流字段会以第一个流的字段命名。
另一个合并流的方法是join。类似SQL的join都是对固定输入的。而流的输入是不固定的,所以不能按照sql的方法做join。
Trident中的join只会在spout发出的每个批次间进行。
如一个流包含字段【“key”,“val1”,“val2”】,
另一个流包含字段【“x”,“val1”】:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key","a","b","c"));
Stream1的“key”和stream2的“x”关联,Trident要求所有的字段要改名字。
1. 首先是join字段。例子中stream1中的“key”对应stream2中的“x”。
2. 接下来,会把非join字段依次列出来,排列顺序按照传给join的顺序。例子中“a”,“b”对应stream1中的“val1”和“wal2”,“c”对应stream2中的“val1”。
六、FirstN
取Top N
用法:
stream.applyAssembly(new FirstN(TOP_N, "sortField", true));
Trident适合做汇总型,不大适合做去重型
发表评论
-
Trident实战之计算网站PV
2017-05-24 13:24 6161、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 703一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 474英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 385一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6481、spout public class MyOpaq ... -
分区事务IPartitionedTransactionalSpout实例
2017-05-21 11:02 5551.分区事务spout public class My ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4461、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 7881、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 577Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2060事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4141、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1083统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 866汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 647一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10391、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 669一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 552并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5011、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 373本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 638一、安装Storm wget ...
相关推荐
All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine视频合并All Video Joine...
比较实用的方法,已经用到项目里,很好用的一个方法
简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接简单的在MapReduce中实现两个表的join连接
本文实例讲述了mysql使用GROUP BY分组实现取前N条记录的方法。分享给大家供大家参考,具体如下: MySQL中GROUP BY分组取前N条记录实现 mysql分组,取记录 GROUP BY之后如何取每组的前两位下面我来讲述mysql中GROUP BY...
NULL 博文链接:https://forlan.iteye.com/blog/2245814
基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join
inner join、 left join 、right join、 outer join之间的区别
可以实现多行查询数据转换到一行上显示,多行的某列信息用“,”号隔开。
left join right join inner join 区别和联系
用C#实现对DataTable的JOIN,GROUP BY,FILTER,UNIONALL,DISTINCT
SQL语句left join/right join/inner join 的用法比较 SQL语句left join/right join/inner join 的用法比较
SQL中的left outer join,inner join,right outer join用法详解
Join on/inner join on/full join on/full outer join on/left join on/right join on/cross join on; 在使用jion时,on和where条件的区别;
sql查询中出现同样数据时需要合并行,或者合并列。
57、JoinTable的方式实现单向一对多1
数据库 我自己在 Java 中实现了 SortMergeJoin 和 HashJoin(来自 SQL 的著名 INNER JOIN)。 在更多信息。
LINQ to datable实现Left join right join full join VB2010源码
026.Python字符串_split()分割_join()合并_join()效率测试.mp4
hash join 原理和算法 1.Hash Join概述 2.Hash Join原理 3.Hash Join算法 4.Hash Join的成本
Pandas合并DataFrame_Merge,_Join,_Concat,_Append【Pandas入门教程6】