`
农村外出务工男JAVA
  • 浏览: 104721 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

storm重定向实战

阅读更多

一、 重定向

       重定向定义了我们的tuple如何被route到下一个处理层,当然不同的层之间可能会有不同的并行度。storm提供了如下的重定向操作:
    shuffle:通过随机分配算法来均衡tuple到各个分区
    broadcast:每个tuple都被广播到所有的分区,这种方式在drcp时非常有用,比如在每个分区上做stateQuery
    partitionBy:根据指定的字段列表进行划分,具体做法是用指定字段列表的hash值对分区个数做取模运算,确保相同字段列表的数据被划分到同一个分区。
    global:所有的tuple都被发送到一个分区,这个分区用来处理整个Stream。
    batchGlobal:一个Batch中的所有tuple都被发送到同一个分区,不同的Batch会去往不同的分区。
    Partition:通过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping

二、实战

   Main:

	
pubzlic static void main(String[] args) throws AlreadyAliveException,
			InvalidTopologyException, AuthorizationException {
		FixedBatchSpout spout = new FixedBatchSpout(
				new Fields("actor", "text"), 2,
				new Values("dave", "dave text"), new Values("dave",
						"dave text2"), new Values("dave", "dave text3"),
				new Values("dave", "dave text4"), new Values(
						"tanjie is a very good man", "very very good man"));
		spout.setCycle(false);
		TridentTopology topology = new TridentTopology();
		topology.newStream("spout", spout)
				.parallelismHint(5)
				.partitionBy(new Fields("actor"))
//				.shuffle()
//			        .batchGlobal()
				.each(new Fields("actor", "text"),
						new PerActorTweetsFilter("dave")).parallelismHint(5)
				.each(new Fields("actor", "text"), new PrintFilter());
		Config config = new Config();
		config.setNumWorkers(2);
		config.setNumAckers(1);
		config.setDebug(false);
		StormSubmitter.submitTopology("trident_aggregate_partitionBy", config,
				topology.build());
	}

public class PerActorTweetsFilter extends BaseFilter {

	private static final long serialVersionUID = 1L;
	private int partitionIndex;
	private String actor;

	public PerActorTweetsFilter(String actor) {
		this.actor = actor;
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map conf, TridentOperationContext context) {
		this.partitionIndex = context.getPartitionIndex();
	}

	@Override
	public boolean isKeep(TridentTuple tuple) {
		boolean filter = tuple.getString(0).equals(actor);
		if (filter) {
			System.out.println("I am partition [" + partitionIndex
					+ "] and I have kept a tweet by: " + actor);
		}
		return filter;
	}
}

   测试:

 

  1、partitionBy:相同字段hash后到同一个分区,我这个地方根据actor进行分区,hash后肯定到了同一个分区,即使我指定了5个分区

2016-11-14 17:06:50.806 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:06:50.808 STDIO [INFO] first value: dave
2016-11-14 17:06:50.814 STDIO [INFO] seconde value: dave text
2016-11-14 17:06:50.819 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:06:50.826 STDIO [INFO] first value: dave
2016-11-14 17:06:50.832 STDIO [INFO] seconde value: dave text2
2016-11-14 17:06:50.992 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:06:50.993 STDIO [INFO] first value: dave
2016-11-14 17:06:50.998 STDIO [INFO] seconde value: dave text3
2016-11-14 17:06:51.001 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:06:51.004 STDIO [INFO] first value: dave
2016-11-14 17:06:51.007 STDIO [INFO] seconde value: dave text4

    2、改成shuffle,会随机分配到某个分区

2016-11-14 17:18:16.019 STDIO [INFO] I am partition [0] and I have kept a tweet by: dave
2016-11-14 17:18:16.027 STDIO [INFO] I am partition [4] and I have kept a tweet by: dave
2016-11-14 17:18:16.028 STDIO [INFO] first value: dave
2016-11-14 17:18:16.030 STDIO [INFO] seconde value: dave text
2016-11-14 17:18:16.037 STDIO [INFO] first value: dave
2016-11-14 17:18:16.037 STDIO [INFO] seconde value: dave text2
2016-11-14 17:18:16.101 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:18:16.102 STDIO [INFO] first value: dave
2016-11-14 17:18:16.102 STDIO [INFO] seconde value: dave text3
2016-11-14 17:18:16.103 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:18:16.105 STDIO [INFO] first value: dave
2016-11-14 17:18:16.107 STDIO [INFO] seconde value: dave text4

    3、改成batchGlobal

2016-11-14 17:23:30.333 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:23:30.341 STDIO [INFO] first value: dave
2016-11-14 17:23:30.342 STDIO [INFO] seconde value: dave text
2016-11-14 17:23:30.343 STDIO [INFO] I am partition [2] and I have kept a tweet by: dave
2016-11-14 17:23:30.343 STDIO [INFO] first value: dave
2016-11-14 17:23:30.344 STDIO [INFO] seconde value: dave text2
2016-11-14 17:24:15.683 STDIO [INFO] I am partition [3] and I have kept a tweet by: dave
2016-11-14 17:24:15.684 STDIO [INFO] first value: dave
2016-11-14 17:24:15.685 STDIO [INFO] seconde value: dave text3
2016-11-14 17:24:15.685 STDIO [INFO] I am partition [3] and I have kept a tweet by: dave
2016-11-14 17:24:15.686 STDIO [INFO] first value: dave
2016-11-14 17:24:15.686 STDIO [INFO] seconde value: dave text4

 

 

 

 

1
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics