`

storm高并发PV统计

 
阅读更多
一、PV统计思考
方案需要考虑分析多线程下,注意线程安全问题。
线程安全:多线程处理的结果和单线程一致


如下是否可行?
不可行方案:
  定义static long pv, Synchronized 控制累计操作。Synchronized 和 Lock在单个JVM下有效,但在多JVM下无效。


可行方案两个方案:
1、shuffleGrouping下,pv * Executer并发数
2、bolt1进行多并发局部汇总,bolt2单线程进行全局汇总

二、实现
注意:多线程下每一个bolt中的execute方法都会执行多次,类似一个while循环。


1、bolt1进行多并发(局部)汇总处理类
public class PVBolt1 implements IRichBolt{

	/**
	 * bolt1进行多并发(局部)汇总
	 */
	OutputCollector collector = null;
	private static final long serialVersionUID = 1L;
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		this.collector = collector;
	}
	
	
	String logString;
	String session_id;
	long pv = 0;
	public void execute(Tuple input) {
		logString = input.getString(0);
		session_id = logString.split("\t")[1];
		if(session_id !=null){
			pv ++;
		}
		
		 collector.emit(new Values(Thread.currentThread().getId(),pv));
		 System.err.println("threadId = "+ Thread.currentThread().getId()+"; pv="+pv);
	}

	public void cleanup() {
		// TODO Auto-generated method stub
		
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("threadId", "count"));
		
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

}


2、bolt2单线程进行全局汇总处理类
public class PVBolt2 implements IRichBolt{

	/**
	 * bolt2单线程进行全局汇总
	 */
	private static final long serialVersionUID = 1L;
	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
		// TODO Auto-generated method stub
		
	}
	
	Map<Long,Long>counts = new HashMap<Long,Long>();
	
	public void execute(Tuple input) {
		Long thread_id = input.getLong(0);
		Long pv = input.getLong(1);
		counts.put(thread_id,pv);
		System.err.println("  threadId="+thread_id+"-------------pv="+pv);
		long word_sum = 0;
		//获取总数,遍历counts 的values,进行sum
		Iterator<Long> i = counts.values().iterator() ;
		while(i.hasNext())
		{
			word_sum += i.next();
		}
		System.err.println("PVBolt2-------------pv="+word_sum+"\r");
	}

	public void cleanup() {
		// TODO Auto-generated method stub
		
	}

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		
	}

	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

}


3、topology运行main类
public class Main {

	public static void main(String[] args) {

		TopologyBuilder builder = new TopologyBuilder();

		builder.setSpout("spout", new MySpout(), 1);
		
		builder.setBolt("bolt1", new PVBolt1(),4).shuffleGrouping("spout");
		builder.setBolt("bolt2", new PVBolt2(),1).shuffleGrouping("bolt1");
		
		Map conf = new HashMap();
		conf.put(Config.TOPOLOGY_WORKERS, 4);

		if (args.length > 0) {
			try {
					StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
			} catch (AlreadyAliveException e) {
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				e.printStackTrace();
			}catch (AuthorizationException e) {
				e.printStackTrace();
			}
		}else {
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.createTopology());
		}
		
	}

}


-------------------------------其它辅助类---------------------------

4、数据读取spout处理类

public class MySpout implements IRichSpout{

	/**
	 * 数据读取spout处理类
	 */
	private static final long serialVersionUID = 1L;

	FileInputStream fis;
	InputStreamReader isr;
	BufferedReader br;			

	SpoutOutputCollector collector = null;
	
	
	String str = null;

	
	public void nextTuple() {
		try {
			while ((str = this.br.readLine()) != null) {
				// 过滤动作
				
				collector.emit(new Values(str));
				
//				Thread.sleep(3000);
				//to do 
			}
		} catch (Exception e) {
			// TODO: handle exception
		}
		
		
	}

	
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		try {
			this.collector = collector;
			this.fis = new FileInputStream("track.log");
			this.isr = new InputStreamReader(fis, "UTF-8");
			this.br = new BufferedReader(isr);
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		// 打开文件
		
	}

	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// 发射数据格式,与bolt接收数据一致
		declarer.declare(new Fields("log"));
	}

	
	public Map<String, Object> getComponentConfiguration() {
		// 与ope方法中的map对应
		return null;
	}
	

	public void ack(Object msgId) {
		// TODO Auto-generated method stub
		
	}

	
	public void activate() {
		// TODO Auto-generated method stub
		
	}

	
	public void close() {
		// TODO Auto-generated method stub
		
	}

	
	public void deactivate() {
		// TODO Auto-generated method stub
		
	}

	
	public void fail(Object msgId) {
		// TODO Auto-generated method stub
		
	}

}


5、pom文件引用前几篇文章

6、处理结果
引用
  threadId=156-------------pv=44
PVBolt2-------------pv=44

  threadId=156-------------pv=45
PVBolt2-------------pv=45

  threadId=156-------------pv=46
PVBolt2-------------pv=46

  threadId=156-------------pv=47
PVBolt2-------------pv=47

  threadId=152-------------pv=1
PVBolt2-------------pv=48

  threadId=215-------------pv=1
PVBolt2-------------pv=49

9234 [Thread-62-bolt1-executor[5 5]]
threadId = 227; pv=1
  threadId=227-------------pv=1
PVBolt2-------------pv=50
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics