`

strom使用示例

 
阅读更多

 

strom程序开发需要几个组件

1 Topology 即程序的主要入口 main,配置文件要通过主函数加载后放到conf中,然后bolt才能拿到,./conf可以到jar包中得到配置文件。

2 Sport 程序的数据来源 

3 bolt程序处理节点 ,一个程序可能n个bolt节点 。 

 

一 Topology  

 

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;

public class MyTopology1 {
public static void main(String[] args) {

	try{
		
		
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomWordSpout(), 1).setNumTasks(8);
        
        builder.setBolt("longBolt", new longBolt(),1).shuffleGrouping("spout");
        
        //初始化配置文件,保存配置文件到全局config
        ConfigFactory.init("./conf/config-kafka.xml");
        Config config = new Config();
        
        config.put("KafkaConfig", KafkaConfig.getProducerPro() ) ; 
        config.put("redis.url", ConfigFactory.getString("redis.url") ) ; 
  
        
        config.setDebug(true);
        
         
        LocalCluster localCluster = new LocalCluster();
		localCluster.submitTopology("MyTopology1", config, builder.createTopology());
        if(args!=null && args.length > 0) {
            config.setNumWorkers(2);
            
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {        
            config.setMaxTaskParallelism(3);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("MyTopology1", config, builder.createTopology());
        
            Thread.sleep(1000);

            //cluster.shutdown();
        }
	}catch (Exception e) {
		e.printStackTrace();
	}

}
}

二、 Spout 数据来源

  

import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class RandomWordSpout extends BaseRichSpout{

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	private SpoutOutputCollector collector;
	
	//模拟一些数据
	String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
	
	//不断地往下一个组件发送tuple消息
	//这里面是该spout组件的核心逻辑
	@Override
	public void nextTuple() {

		//可以从kafka消息队列中拿到数据,简便起见,我们从words数组中随机挑选一个商品名发送出去
		Random random = new Random();
		int index = random.nextInt(words.length);
		
		//通过随机数拿到一个商品名
		String godName = words[index];
		
		
		//将商品名封装成tuple,发送消息给下一个组件
		collector.emit(new Values(godName));
		
		//每发送一个消息,休眠500ms,
		Utils.sleep(10000);
		
		
	}

	//初始化方法,在spout组件实例化时调用一次
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

		this.collector = collector;
		
	}

	//声明本spout组件发送出去的tuple中的数据的字段名
	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {

		declarer.declare(new Fields("orignname"));
		
	}

}

 三、bolt业务处理代码

 

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

public class longBolt2 extends BaseBasicBolt {
	

	 
	private static final long serialVersionUID = -4561045897494106937L;

	@Override
	public void prepare(Map stormConf, TopologyContext context) {
		 
		
	}
	
	public void execute(Tuple tuple, BasicOutputCollector collector) {
 
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("word"));
	}

}

 

启动一个storm程序: 

 storm jar storm-kafka-0.0.2_wzt.jar   com.jusfoun.testStrom.MyTopology1 MyTopology1

杀死之前的程序:

storm kill MyTopology1

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics