`

用实例理解Storm的Stream概念

阅读更多

事情源于在看基于Storm的CEP引擎:flowmix
FlowmixBuilder代码,
每个Bolt设置了这么多的Group
而且declareStream也声明了这么多的stream-id,
对于只写过WordCountTopology的小白而言,
直接懵逼了,没见过这么用的啊,我承认一开始是拒绝的,每个Bolt都设置了这么多Group,这TMD拓扑图是什么样的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public TopologyBuilder create() {
  TopologyBuilder builder = new TopologyBuilder();
  builder.setSpout(EVENT, (IRichSpout) eventsComponent, eventLoaderParallelism == -1 ? parallelismHint : eventLoaderParallelism);
  builder.setSpout(FLOW_LOADER_STREAM, (IRichSpout) flowLoaderSpout, 1);
  builder.setSpout("tick", new TickSpout(1000), 1);
  builder.setBolt(INITIALIZER, new FlowInitializerBolt(), parallelismHint)  // kicks off a flow determining where to start
            .localOrShuffleGrouping(EVENT)
            .allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM);

  declarebolt(builder, FILTER, new FilterBolt(), parallelismHint, true);
  declarebolt(builder, SELECT, new SelectorBolt(), parallelismHint, true);
  declarebolt(builder, PARTITION, new PartitionBolt(), parallelismHint, true);
  declarebolt(builder, SWITCH, new SwitchBolt(), parallelismHint, true);
  declarebolt(builder, AGGREGATE, new AggregatorBolt(), parallelismHint, true);
  declarebolt(builder, JOIN, new JoinBolt(), parallelismHint, true);
  declarebolt(builder, EACH, new EachBolt(), parallelismHint, true);
  declarebolt(builder, SORT, new SortBolt(), parallelismHint, true);
  declarebolt(builder, SPLIT, new SplitBolt(), parallelismHint, true);
  declarebolt(builder, OUTPUT, outputBolt, parallelismHint, false);

  return builder;
}
private static void declarebolt(TopologyBuilder builder, String boltName, IRichBolt bolt, int parallelism, boolean control) {
    BoltDeclarer declarer = builder.setBolt(boltName, bolt, parallelism)
        .allGrouping(FLOW_LOADER_STREAM, FLOW_LOADER_STREAM)
        .allGrouping("tick", "tick")
        .localOrShuffleGrouping(INITIALIZER, boltName)
        .localOrShuffleGrouping(FILTER, boltName)
        .fieldsGrouping(PARTITION, boltName, new Fields(FLOW_ID, PARTITION))    // guaranteed partitions will always group the same flow for flows that have joins with default partitions.
        .localOrShuffleGrouping(AGGREGATE, boltName)
        .localOrShuffleGrouping(SELECT, boltName)
        .localOrShuffleGrouping(EACH, boltName)
        .localOrShuffleGrouping(SORT, boltName)
        .localOrShuffleGrouping(SWITCH, boltName)
        .localOrShuffleGrouping(SPLIT, boltName)
        .localOrShuffleGrouping(JOIN, boltName);
  }
public static void declareOutputStreams(OutputFieldsDeclarer declarer, Fields fields) {
    declarer.declareStream(PARTITION, fields);
    declarer.declareStream(FILTER, fields);
    declarer.declareStream(SELECT, fields);
    declarer.declareStream(AGGREGATE, fields);
    declarer.declareStream(SWITCH, fields);
    declarer.declareStream(SORT, fields);
    declarer.declareStream(JOIN, fields);
    declarer.declareStream(SPLIT, fields);
    declarer.declareStream(EACH, fields);
    declarer.declareStream(OUTPUT, fields);
}

先来复习下经典的WordCountTopology

WordCountTopology Default Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public class WordCountTopologySimple {

    public static class RandomSentenceSpout extends BaseRichSpout {
        SpoutOutputCollector collector;
        Random rand;
        String[] sentences = null;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            rand = new Random();
            sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        }

        @Override
        public void nextTuple() {
            Utils.sleep(1000);
            String sentence = sentences[rand.nextInt(sentences.length)];
            System.out.println("\n" + sentence);
            this.collector.emit(new Values(sentence));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
        }
        public void ack(Object id) {}
        public void fail(Object id) {}
    }

    public static class SplitSentenceBolt extends BaseRichBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            String sentence = tuple.getStringByField("sentence");
            String[] words = sentence.split(" ");
            for (String word : words) {
                this.collector.emit(new Values(word));
            }
            this.collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    }

    public static class WordCountBolt extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit(new Values(word, count));
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word", "count"));
        }
    }

    public static class PrinterBolt extends BaseBasicBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            String first = tuple.getString(0);
            int second = tuple.getInteger(1);
            System.out.println(first + "," + second);
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer ofd) {}
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout");
        builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count");

        Config conf = new Config();
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }
}

SingleStream

默认情况下:Spout发送到下游Bolt的stream-id,以及Bolt发送到下游Bolt或者接收上游Spout/Bolt的stream-id都是default

可以对Spout/Bolt在发送消息时自定义stream-id,同时必须在声明输出字段时,指定对应的stream-id。

代码说明:发射时指定一个stream-id,声明流时指定一个stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class RandomSentenceSpout {
    public void nextTuple() {
        Utils.sleep(1000);
        String sentence = sentences[rand.nextInt(sentences.length)];
        System.out.println("\n" + sentence);
        this.collector.emit("split-stream", new Values(sentence));
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("sentence"));
    }
}
class SplitSentenceBolt {
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            this.collector.emit("count-stream", new Values(word));
        }
        this.collector.ack(tuple);
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("count-stream", new Fields("word"));
    }
}
class WordCountBolt {
    public void execute(Tuple tuple) {
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null) count = 0;
        count++;
        counts.put(word, count);
        collector.emit("print-stream", new Values(word, count));
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("print-stream", new Fields("word", "count"));
    }
}
class Topology {
    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");
        builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");        
    }
}

使用自定义stream-id,主要分成两个步骤:

下图示例细说明了拓扑图中各个组件是怎么协调工作的:

MultiStream

Spout/Bolt发射时可以指定多个stream-id,同样要在声明输出字段时指定所有在发射过程指定的stream-id。
虽然每条消息的输出消息流并不一定会用到所有的stream,比如下面示例中一条消息发射到stream1和stream3,
另外一条消息发射到stream2和stream3,stream1和stream2是互斥的,不可能同时发送到这两个stream。
但是可以看到在declareStream中,要同时指定所有的stream-id。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void execute(Tuple input) {
    String word = input.getString(0);
    //小于j的word发送给stream1; 大于j的word发送给stream2;
    if(word.compareTo("j") < 0){
        collector.emit("stream1", new Values(word));
    }else if(word.compareTo("j") > 0){
        collector.emit("stream2", new Values(word));
    }
    //不管什么都发送给stream3
    collector.emit("stream3", new Values(word));
}
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declareStream("stream1", new Fields("word"));
    outputFieldsDeclarer.declareStream("stream2", new Fields("word"));
    outputFieldsDeclarer.declareStream("stream3", new Fields("word"));
}

程序员都喜欢流程图,喏,下图左上角第一个就是了,右上角是对应到Storm中的Topology,下面两图示例了两条消息在Storm的消息流的走向。

仿照上面的示例,对WordCountTopology的Spout/Bolt的发射方法都指定一个输出的stream-id,
同时在declareOutputFields声明多个输出的stream-id。

现在虽然Spout/Bolt声明了多个输出stream-id,但是emit时还是只发射到一个stream-id中。
所以本质上和前面的SingleStream是一样的,所以Topology不需要做任何改动也还是可以运行的。

代码说明:发射时指定一个stream-id,声明流时指定多个stream-id,topology设置Bolt时除了通过Group的component-id,还会指定上游组件的stream-id
emit不变,topology不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class RandomSentenceSpout {
    public void nextTuple() {
        this.collector.emit("split-stream", new Values(sentence));              //⬅            
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("sentence"));         //⬅
        declarer.declareStream("count-stream", new Fields("sentence"));
        declarer.declareStream("print-stream", new Fields("sentence"));
    }
}
class SplitSentenceBolt {
    public void execute(Tuple tuple) {
        for (String word : words) {
            this.collector.emit("count-stream", new Values(word));              //⬅
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("word"));
        declarer.declareStream("count-stream", new Fields("word"));             //⬅  
        declarer.declareStream("print-stream", new Fields("word"));
    }
}
class WordCountBolt {
    public void execute(Tuple tuple) {
        collector.emit("print-stream", new Values(word, count));                //⬅
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("split-stream", new Fields("word", "count"));
        declarer.declareStream("count-stream", new Fields("word", "count")); 
        declarer.declareStream("print-stream", new Fields("word", "count"));    //⬅
    }
}
class Topology {
    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2).shuffleGrouping("spout", "split-stream");
        builder.setBolt("count", new WordCountBolt(), 2).fieldsGrouping("split", "count-stream", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1).shuffleGrouping("count", "print-stream");        
    }
}

那么我们为什么还要在Spout/Bolt中定义多个输出流呢?观察这部分代码,stream-id都是一样的,不同的是Fields部分,
如果将每个Spout/Bolt的多个declarer.declareStream抽取出来:

1
2
3
4
5
6
public static void declareStream(OutputFieldsDeclarer declarer, 
        Fields fields){
    declarer.declareStream("split-stream", fields);
    declarer.declareStream("count-stream", fields);
    declarer.declareStream("print-stream", fields);
}

然后在Spout/Bolt的declareOutputFields调用declareStream方法一次声明所有的stream-id,只需要传递不同的Fields即可。

代码说明:声明多个stream时,每个组件的所有stream-id都一样,传入不同的Fields
emit不变,topology不变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class RandomSentenceSpout {
    public void nextTuple() {
        this.collector.emit("split-stream", new Values(sentence));              //⬅            
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declareStream(declarer, new Fields("sentence"));
    }
}
class SplitSentenceBolt {
    public void execute(Tuple tuple) {
        for (String word : words) {
            this.collector.emit("count-stream", new Values(word));              //⬅
        }
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declareStream(declarer, new Fields("word"));
    }
}
class WordCountBolt {
    public void execute(Tuple tuple) {
        collector.emit("print-stream", new Values(word, count));                //⬅
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declareStream(declarer, new Fields("word", "count"));
    }
}
class Topology {
    main(){
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        builder.setBolt("split", new SplitSentenceBolt(), 2)
            .shuffleGrouping("spout", "split-stream");
        builder.setBolt("count", new WordCountBolt(), 2)
            .fieldsGrouping("split", "count-stream", new Fields("word"));
        builder.setBolt("print", new PrinterBolt(), 1)
            .shuffleGrouping("count", "print-stream");        
    }
}

这样的好处是,如果事先知道所有的stream-id,只需要定义好declareStream,每个bolt都调用这个全局的方法即可。
实际上这种方式对于构建动态拓扑图是很有用的。

MultiGroup

通过把所有stream-id封装到一个方法中,而emit时只指定一个stream-id。
现在每个组件emit时只指定了一个stream-id,声明输出流时都指定了相同的stream-id集合。
也就是说Spout/Bolt中虽然声明了多个stream-id,但是一条消息只会选择一个stream-id。

那么可不可以对Group方式运用同样的方式呢,我们的目的是想要把setBolt这种逻辑也抽取出一个共同的方法。
下面这种方式肯定是不对的,首先无法抽取,因为每个Bolt的Group分组策略不同。

虽然是错误的,但是我们并没有对首尾组件用多个Group,这是为什么呢?
1.Spout没有所谓的分组,因为Spout就是源头,分组时指定component指的是当前component的数据源自这个指定的component
2.最后一个Bolt我们先不设置,这里有坑…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
main(){
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 1);

    builder.setBolt("split", new SplitSentenceBolt(), 2)
            .shuffleGrouping("spout", "split-stream")                      //⬅
            .shuffleGrouping("split", "split-stream")
            .shuffleGrouping("count", "split-stream")
    ;
    builder.setBolt("count", new WordCountBolt(), 2)
            .fieldsGrouping("spout", "count-stream", new Fields("word"))
            .fieldsGrouping("split", "count-stream", new Fields("word"))   //⬅
            .fieldsGrouping("count", "count-stream", new Fields("word"))
    ;
    builder.setBolt("print", new PrinterBolt(), 1)
            .shuffleGrouping("count", "print-stream");
}

而且也无法构建拓扑图,比如WordCountBolt的输入component=”spout”时,
在拓扑图中这个组件是RandomSentenceSpout,它的输出字段名称为”sentence”,根本就没有word这个字段。
下面的错误也证实了这一点:Component: [count] subscribes from stream: [count-stream] of component [spout] with non-existent fields: #{"word"})
count这个组件(即WordCountBolt)订阅了spout组件(即RandomSentenceSpout)的count-stream输出流,但是spout组件并不存在word字段。

1
2
3
4
5
6972 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name='word-count') #
<InvalidTopologyException InvalidTopologyException(msg:
    Component: [count] subscribes from stream: [count-stream] of component [spout] with non-existent fields: #{"word"})>
7002 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

正确使用多个stream-id的姿势:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
main(){
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout", new RandomSentenceSpout(), 1);

    builder.setBolt("split", new SplitSentenceBolt(), 2)
            .shuffleGrouping("spout", "split-stream")                      //⬅
            .fieldsGrouping("split", "split-stream", new Fields("word"))
            .shuffleGrouping("count", "split-stream")
    ;
    builder.setBolt("count", new WordCountBolt(), 2)
            .shuffleGrouping("spout", "count-stream")
            .fieldsGrouping("split", "count-stream", new Fields("word"))   //⬅
            .shuffleGrouping("count", "count-stream")
    ;
    builder.setBolt("print", new PrinterBolt(), 1)
            .shuffleGrouping("count", "print-stream");
}

现在每个Bolt的Group方式都是一样的了,并且component-id也是一样的,只有最后的stream-id不同。
很好,可以像抽取declareStream那样抽取setBolt了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
main(){
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("spout",new RandomSentenceSpout(),1);

    setBolt(builder, new SplitSentenceBolt(), "split");
    setBolt(builder, new WordCountBolt(), "count");
    builder.setBolt("print", new PrinterBolt(), 1)
        .shuffleGrouping("count", "print-stream");
}
public static void setBolt(TopologyBuilder builder,IRichBolt bolt,String name){
    builder.setBolt(name, bolt, 2)
            .shuffleGrouping("spout", name + "-stream")
            .fieldsGrouping("split", name + "-stream", new Fields("word"))
            .shuffleGrouping("count", name + "-stream")
    ;
}

每个Bolt都设置了多种分组策略,而分组的第一个参数component表示数据源自哪里,
现在SplitSentenceBolt和WordCountBolt都定义了三种分组策略,
那么是不是说[split]的数据源有:[spout],[split],[count],
同样[count]的数据源也有:[spout],[split],[count],这跟实际的Topology结构就完全不一样了。
可以看到下图的拓扑结构比原先的WordCountTopology多了几条线(而且还能自己指向自己我也是醉了)。

不过虽然每个Bolt都有多个输入源,但是输入源组件不一定有指定的stream-id。
比如split的数据源虽然有三个[spout],[split],[count],但是这三个组件中stream-id=”split-stream”的组件
只有[spout],因此即使设置了三个数据源,另外两个数据源是无效的。

同样[count]的数据源虽然也有三个[spout],[split],[count],但是这三个组件中stream-id=”count-stream”的组件也只有[split]才有。

所以最后实际上拓扑图还是最原始的[spout]->[split]->[count]->[print],并不会出现之前出现的多条线以及自己指向自己的情况。

最后一个Bolt

可以把最后一个PrintBolt也都加到每个Bolt的分组策略里吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
builder.setBolt("split", new SplitSentenceBolt(), 2)
        .shuffleGrouping("spout", "split-stream")                      //⬅
        .fieldsGrouping("split", "split-stream", new Fields("word"))
        .shuffleGrouping("count", "split-stream")
        .shuffleGrouping("print", "split-stream")
;
builder.setBolt("count", new WordCountBolt(), 2)
        .shuffleGrouping("spout", "count-stream")
        .fieldsGrouping("split", "count-stream", new Fields("word"))   //⬅
        .shuffleGrouping("count", "count-stream")
        .shuffleGrouping("print", "count-stream")
;
builder.setBolt("print", new PrinterBolt(), 1)
        .shuffleGrouping("spout", "print-stream")
        .fieldsGrouping("split", "print-stream", new Fields("word"))
        .shuffleGrouping("count", "print-stream")                      //⬅
        .shuffleGrouping("print", "print-stream")
;

拓扑图是这样的,虚线表示实际上是不存在的(因为输入源本身没有发射到这些stream)。

Opps….报错显示:[count]组件订阅了[print]组件中一个不存在的[count-stream]

1
2
3
4
5
9510 [main] WARN  backtype.storm.daemon.nimbus - Topology submission exception. (topology name='word-count') #
<InvalidTopologyException InvalidTopologyException(msg:Component: 
    [count] subscribes from non-existent stream: [count-stream] of component [print])>
9552 [main] ERROR org.apache.storm.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
backtype.storm.generated.InvalidTopologyException: null

下面修改不同Bolt中和Print相关的分组方式,只有把Print全部注释掉才可以

  1. 不注释: [count] subscribes from non-existent stream: [count-stream] of component [print]
  2. 注释①: [split] subscribes from non-existent stream: [split-stream] of component [print]
  3. 注释①②: [print] subscribes from non-existent stream: [print-stream] of component [print]
  4. 注释①②③: SUCCESS!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
builder.setBolt("split", new SplitSentenceBolt(), 2)
        .shuffleGrouping("spout", "split-stream")                      //⬅
        .fieldsGrouping("split", "split-stream", new Fields("word"))
        .shuffleGrouping("count", "split-stream")
        //.shuffleGrouping("print", "split-stream")  //②
;
builder.setBolt("count", new WordCountBolt(), 2)
        .shuffleGrouping("spout", "count-stream")
        .fieldsGrouping("split", "count-stream", new Fields("word"))   //⬅
        .shuffleGrouping("count", "count-stream")
        //.shuffleGrouping("print", "count-stream")  //①
;
builder.setBolt("print", new PrinterBolt(), 1)
        .shuffleGrouping("spout", "print-stream")
        .fieldsGrouping("split", "print-stream", new Fields("word"))
        .shuffleGrouping("count", "print-stream")                      //⬅
        //.shuffleGrouping("print", "print-stream")  //③
;

发生了什么事?不存在stream为什么就不行?可是前面以SplitSentenceBolt为例,split和count也不存在split-stream啊,为什么就不会报错呢?
原因在于我们的PrintBolt只是打印数据,然后什么都不做,它没有emit出任何消息,也就没有emit消息到任何消息流,所以下图中从PrintBolt出来的线根本就不存在!

怎么办呢,很简单,给PrintBolt添加一个带有stream-id的emit,同时也要在declareOutputFields中声明这个输出流。
只要PrintBolt有输出流,就不会报错了。也就是确保每个Bolt都会往下发送消息

最终完整的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
public class WordCountTopologyStream3 {

    public static class RandomSentenceSpout extends BaseRichSpout {
        SpoutOutputCollector collector;
        Random rand;
        String[] sentences = null;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            rand = new Random();
            sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        }

        @Override
        public void nextTuple() {
            Utils.sleep(1000);
            String sentence = sentences[rand.nextInt(sentences.length)];
            System.out.println("\n" + sentence);
            this.collector.emit("split-stream", new Values(sentence));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("sentence"));
        }
        public void ack(Object id) {}
        public void fail(Object id) {}
    }

    public static class SplitSentenceBolt extends BaseRichBolt {
        private OutputCollector collector;

        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            String sentence = tuple.getStringByField("sentence");
            String[] words = sentence.split(" ");
            for (String word : words) {
                this.collector.emit("count-stream", new Values(word));
            }
            this.collector.ack(tuple);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("word"));
        }
    }

    public static class WordCountBolt extends BaseRichBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
        private OutputCollector collector;
        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple tuple) {
            String word = tuple.getString(0);
            Integer count = counts.get(word);
            if (count == null) count = 0;
            count++;
            counts.put(word, count);
            collector.emit("print-stream", new Values(word, count));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("word", "count"));
        }
    }

    public static class PrinterBolt extends BaseRichBolt {
        private OutputCollector collector;
        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        @Override
        public void execute(Tuple tuple) {
            String first = tuple.getString(0);
            int second = tuple.getInteger(1);
            System.out.println(first + "," + second);
            collector.emit("whatever-stream", new Values(first + ":" + second));  //⬅
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declareStream(declarer, new Fields("word:count"));  //⬅
        }
    }

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 1);

        setBolt(builder, new SplitSentenceBolt(), "split");
        setBolt(builder, new WordCountBolt(), "count");
        setBolt(builder, new PrinterBolt(), "print");

        Config conf = new Config();
        conf.setDebug(false);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            conf.setMaxTaskParallelism(3);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }

    public static void declareStream(OutputFieldsDeclarer declarer, Fields fields){
        declarer.declareStream("split-stream", fields);
        declarer.declareStream("count-stream", fields);
        declarer.declareStream("print-stream", fields);
        declarer.declareStream("whatever-stream", fields);      //⬅
    }

    public static void setBolt(TopologyBuilder builder, IRichBolt bolt, String name){
        builder.setBolt(name, bolt, 2)
                .shuffleGrouping("spout", name + "-stream")
                .fieldsGrouping("split", name + "-stream", new Fields("word"))
                .shuffleGrouping("count", name + "-stream")
                .shuffleGrouping("print", name + "-stream")     //⬅
        ;
    }
}

你以为这样就完了吗,如果把PrintBolt的输出stream-id去掉,即采用默认的default的话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public static class PrinterBolt extends BaseRichBolt {
    @Override
    public void execute(Tuple tuple) {
        String first = tuple.getString(0);
        int second = tuple.getInteger(1);
        System.out.println(first + "," + second);
        //collector.emit("whatever-stream", new Values(first + ":" + second));
        collector.emit(new Values(first + ":" + second));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        //declareStream(declarer, new Fields("word:count"));
        declarer.declare(new Fields("word:count"));
    }
}

public static void declareStream(OutputFieldsDeclarer declarer, Fields fields){
    declarer.declareStream("split-stream", fields);
    declarer.declareStream("count-stream", fields);
    declarer.declareStream("print-stream", fields);
    //declarer.declareStream("whatever-stream", fields);      //⬅
}

public static void setBolt(TopologyBuilder builder, IRichBolt bolt, String name){
    builder.setBolt(name, bolt, 2)
            .shuffleGrouping("spout", name + "-stream")
            .fieldsGrouping("split", name + "-stream", new Fields("word"))
            .shuffleGrouping("count", name + "-stream")
            .shuffleGrouping("print", name + "-stream")
    ;
}

还是报错:[count]组件订阅了[print]组件中不存在的[count-stream]

1
Component: [count] subscribes from non-existent stream: [count-stream] of component [print]

好吧,看来前面的组件都使用自定义的stream-id,最后一个组件也必须使用自定义的stream-id,即使这个stream-id看起来没什么意义!

EOF.

分享到:
评论

相关推荐

    storm一个简单实例

    storm的第一个简单的实例,适合初学storm的人理解和学习,已经在storm集群中跑通。注释是自己写的方便理解

    Storm编程实例

    Storm编程实例,基于Maven的Storm编程,目录结构清晰,能够使初学者很容易的看懂Storm的运行流程

    Storm API实现词频统计

    此案例使用的是IDEA开发工具,项目属于maven项目 该词频统计案例中,数据源是自动产生的(java程序自定义生成的),针对自定义生成的数据完成词频统计,完成后打包上传到storm程序中执行

    Storm的WordCount实例

    Storm的WordCount实例。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

    storm实例,可以直接运行

    storm实例,可以直接运行 命令在里面cmd.txt 修改好words111.txt路径就可以直接运行,前提是你storm集群没有问题

    spark及stream任务简单实现框架及使用实例

    spark及stream任务实现框架及使用实例,结果存入mysql数据库,包含了一套最简单的实现框架,方便添加各种简单的任务

    通过项目实例学习Java Stream.pdf

    使用Java Stream可以简化代码,提高开发效率。它的核心思想是将数据处理操作串联起来,形成一个流水线,通过一系列中间操作和最终操作来实现对集合的处理。 中间操作包括过滤(filter)、映射(map)、排序(sorted...

    WPF Prism实例(多个实例包含Prism各个重要概念实例)

    1、Prism各个重要概念实例 2、MVVM架构 3、WPF中使用Prism

    细细品味Storm_Storm简介及安装

    Storm分布式实时计算模式由Apache Storm 项目核心贡献者吉奥兹、奥尼尔亲笔撰 写,融合了作者丰富的Storm实战经验,通过大量示例,全面而系统地讲解使用Storm进行分布式实 时计算的核心概念及应用,并针对不同的应用...

    my_stream_ip.rar_P76Q_axi stream ip 实例_axi_stream_judgeaj9_strea

    学习axi stream ip 开发的实例

    Java方法概念及实例

    Java方法概念及实例;Java方法概念及实例;Java方法概念及实例;Java方法概念及实例;Java方法概念及实例

    Storm Real-time Processing Cookbook实例代码

    The input stream of a Storm cluster is handled by a component called a spout. The spout passes the data to a component called a bolt, which transforms it in some way. A bolt either persists the data ...

    Kafka与Storm整合后java客户端使用实例代码.zip

    Kafka与Storm整合后java客户端使用实例代码

    Oracle两个基本概念:数据库和实例

    数据库实例也称作服务器,是用 来访问数据库文件集的存储结构及后台进程的集合。一个数据库可以被多个实例访问(称为真正的应用群集选项)。决定实例的大小及组成的各种参数或者存储在名 称init.ora的初始化文件中,...

    论文研究-基于实例加权方法的概念漂移问题研究.pdf

    针对存在概念漂移的数据流分类问题,提出一种基于实例加权方法的数据流分类算法(EWAMDS),根据基分类器在训练实例上的分类结果调整该实例的权值,以增强漂移实例在新分类器中的影响,同时引入动态的权值修改因子以...

    storm项目-流数据监控系列3《实例运行》

    3、storm项目-流数据监控系列3《实例运行》4、storm项目-流数据监控系列4《MetaQ接口》5、storm项目-流数据监控系列5《zookeeper统一配置》 6、storm项目-流数据监控系列6《最新代码树及详解》。希望能对大家有所...

    Storm实战:构建大数据实时计算

    阿里巴巴集团数据平台事业部商家数据业务部正是最早使用Storm的技术团队之一。  《Storm实战:构建大数据实时计算》是一本系统并且具有实践指导意义的Storm工具书和参考书,对Storm整个技术体系进行了全面的讲解,...

    C#中bitmap、stream、byte类型转换实例

    将图片由bitmap类型转换为stream类型再转换为byte[]类型,然后再还原

    《Storm实时数据处理》PDF.zip

    《storm实时数据处理》通过丰富的实例,系统讲解Storm的基础知识和实时数据处理的最佳实践方法,内容涵盖Storm本地开发环境搭建、日志流数据处理、Trident、分布式远程过程调用、Topology在不同编程语言中的实现方法...

    storm统计单词数的demo

    本demo根据《learning-storm》这本书籍中的实例,改写的。对于初次学习storm的朋友,是理解storm工作流程的很好入门实例

Global site tag (gtag.js) - Google Analytics