`

ITridentSpout、FirstN(取Top N)实现、 流合并和join

 
阅读更多
一、ITridentSpout
基于事务
static interface ITridentSpout.BatchCoordinator<X>
           
static interface ITridentSpout.Emitter<X>

接口类的实现和之前事务ITransactionalSpout 非常类似。

二、调用链用于执行多个聚合
topology.newDRPCStream("top", drpc).each(new Fields("args"), new Split(“ ”), new Fields("time")).parallelismHint(5).stateQuery(myStates,new Fields("time"),new QueryPacketDB(),new Fields("srcip", "byt", "pkt")).groupBy(new Fields("srcip")).chainedAgg().aggregate(new Fields("byt"), new Sum(), new Fields("yt")).aggregate(new Fields("pkt"), new Sum(), new Fields("kt")).chainEnd().applyAssembly(new FirstN(10, "yt", true));

如果想同事执行多个聚合,可以使用如下的调用链
mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()

这个代码将会在每个分区上执行count和sum聚合。输出将包含【“count”,“sum”】字段。

三、投影(projection)
投影操作是对数据上进行列裁剪。
如果你有一个流有【“a”,“b”,“c”,“d”】四个字段,执行下面的代码:

mystream.project(new Fields("b","d"));

输出流将只有【“b”,“d”】两个字段。

四、重分区(repartition)操作

重分区操作是通过一个函数改变元组(tuple)在task之间的分布,   重分区(repatition)需要网络传输,目的是方便聚合或查询。如下是重分区函数:
1.      Shuffle:与hadoop一样,把同步的tuple放在一个分区
2.      Broadcast:每个元组重复的发送到所有的目标分区。这个在DRPC中很有用。 如果你想做在每个分区上做一个statequery。
3.      paritionBy:根据一系列分发字段(fields)做一个语义的分区。通过对这些字 段取hash值并对目标分区数取模获取目标分区。paritionBy保证相同的分发 字段(fields)分发到相同的目标分区。
4.      global:所有的tuple分发到相同的分区。
5.      batchGobal:本批次的所有tuple发送到相同的分区,不通批次可以在不通的分 区。
6.      patition:这个函数接受用户自定义的分区函数。用户自定义函数事项 backtype.storm.grouping.CustomStreamGrouping接口。

五、合并和关联
合并(merge)多个流成为一个流,可以如下:
topology.merge(stream1, stream2, stream3);
Trident合并的流字段会以第一个流的字段命名。

另一个合并流的方法是join。类似SQL的join都是对固定输入的。而流的输入是不固定的,所以不能按照sql的方法做join。
Trident中的join只会在spout发出的每个批次间进行。
如一个流包含字段【“key”,“val1”,“val2”】,
另一个流包含字段【“x”,“val1”】:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key","a","b","c"));

Stream1的“key”和stream2的“x”关联,Trident要求所有的字段要改名字。
1.      首先是join字段。例子中stream1中的“key”对应stream2中的“x”。
2.      接下来,会把非join字段依次列出来,排列顺序按照传给join的顺序。例子中“a”,“b”对应stream1中的“val1”和“wal2”,“c”对应stream2中的“val1”。

六、FirstN

取Top N

用法:
stream.applyAssembly(new FirstN(TOP_N, "sortField", true));

Trident适合做汇总型,不大适合做去重型
















分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics