`

分区事务IPartitionedTransactionalSpout实例

 
阅读更多
1.分区事务spout

public class MyPtTxSpout implements IPartitionedTransactionalSpout<MyMata>{

	/**
	 * 分区事务spout
	 */
	private static final long serialVersionUID = 1L;
	public static int BATCH_NUM = 10 ;
	public Map<Integer, Map<Long,String>> PT_DATA_MP = new HashMap<Integer, Map<Long,String>>();
	public MyPtTxSpout()
	{
		Random random = new Random();
		String[] hosts = { "www.taobao.com" };
		String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7",
				"CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };
		String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", 
				"2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" };
		//建立五个分区数据
		for (int j = 0; j < 5; j++) {
			HashMap<Long, String> dbMap = new HashMap<Long, String> ();
			for (long i = 0; i < 100; i++) {
				dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]);
			}
			PT_DATA_MP.put(j, dbMap);
		}
		
	}
	
	
	public org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout.Coordinator getCoordinator(
			Map conf, TopologyContext context) {
		return new MyCoordinator();
	}

	
	public org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout.Emitter<MyMata> getEmitter(
			Map conf, TopologyContext context) {
		return new MyEmitter();
	}
	
	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		//发射数据格式
		declarer.declare(new Fields("tx","log"));
	}

	
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}

	public class MyCoordinator implements IPartitionedTransactionalSpout.Coordinator
	{
		
		public void close() {
			// TODO Auto-generated method stub
			
		}
		
		public boolean isReady() {
			// TODO Auto-generated method stub
			Utils.sleep(1000);
			return true;
		}
		
		//声明5个分区
		public int numPartitions() {
			// TODO Auto-generated method stub
			return 5;
		}
		
	}
	public class MyEmitter implements IPartitionedTransactionalSpout.Emitter<MyMata>
	{

		
		public void close() {

		}
		
		
		public void emitPartitionBatch(TransactionAttempt tx,
				BatchOutputCollector collector, int partition,
				MyMata partitionMeta) {
			// 与普通事务不同的是,将MyCoordinator方法的发射数据方在emitPartitionBatch方法中执行
			System.err.println("emitPartitionBatch partition:"+partition);
			long beginPoint = partitionMeta.getBeginPoint() ;
			int num = partitionMeta.getNum() ;
			
			Map<Long, String> batchMap = PT_DATA_MP.get(partition);
			for (long i = beginPoint; i < num+beginPoint; i++) {
				if (batchMap.get(i)==null) {
					break;
				}
				collector.emit(new Values(tx,batchMap.get(i)));
			}
		}

		
		public MyMata emitPartitionBatchNew(TransactionAttempt tx,
				BatchOutputCollector collector, int partition,
				MyMata lastPartitionMeta) {
			// 获取批次的开始节点
			long beginPoint = 0;
			if (lastPartitionMeta == null) {
				beginPoint = 0 ;
			}else {
				beginPoint = lastPartitionMeta.getBeginPoint() + lastPartitionMeta.getNum() ;
			}
			
			MyMata mata = new MyMata() ;
			mata.setBeginPoint(beginPoint);
			mata.setNum(BATCH_NUM);
			
			//调用上面emitPartitionBatch方法接着发射数据处理
			emitPartitionBatch(tx,collector,partition,mata);
			
			System.err.println("启动一个事务:"+mata.toString());
			return mata;
		}
		
	}
	
}


----------------------------分区事务与普通事务bolt处理类相同,不作过多改变--------------------------

2、统计当天数据

/**
 * 分区事务bolt与普通事务bolt相同,不作更多改变
 */
public class MyDailyBatchBolt implements IBatchBolt<TransactionAttempt> {

	
	private static final long serialVersionUID = 1L;
	
	//统计当天数据
	Map<String, Integer> countMap = new HashMap<String, Integer>();
	BatchOutputCollector collector ;
	Integer count = null;
	String today = null;
	TransactionAttempt tx = null;
	
	
	public void execute(Tuple tuple) {
		String log = tuple.getString(1);
		tx = (TransactionAttempt)tuple.getValue(0);
		if (log != null && log.split("\\t").length >=3 ) {
			today = DateFmt.getCountDate(log.split("\\t")[2], DateFmt.date_short) ;
			count = countMap.get(today);
			if(count == null)
			{
				count = 0;
			}
			count ++ ;
			
			countMap.put(today, count);
		}
	}

	
	public void finishBatch() {
		System.err.println(tx+"--"+today+"--"+count);
		collector.emit(new Values(tx,today,count));
	}

	
	public void prepare(Map conf, TopologyContext context,
			BatchOutputCollector collector, TransactionAttempt id) {
		this.collector = collector;
		
	}

	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("tx","date","count"));
	}

	
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}

}


3、汇总批数据,更新数据库
/**
 * 分区事务CommitterBolt与普通事务CommitterBolt相同,不作更多改变
 */
public class MyDailyCommitterBolt extends BaseTransactionalBolt implements ICommitter{

	
	private static final long serialVersionUID = 1L;
	
	//汇总批数据,更新数据库
	public static final String GLOBAL_KEY = "GLOBAL_KEY";
	public static Map<String, DbValue> dbMap = new HashMap<String, DbValue>() ;
	
	Map<String, Integer> countMap = new HashMap<String, Integer>();
	TransactionAttempt id ;
	BatchOutputCollector collector;
	String today = null;
	
	
	public void execute(Tuple tuple) {
		today = tuple.getString(1) ;
		Integer count = tuple.getInteger(2);
		id = (TransactionAttempt)tuple.getValue(0);
		
		if (today !=null && count != null) {
			Integer batchCount = countMap.get(today) ;
			if (batchCount == null) {
				batchCount = 0;
			}
			batchCount += count ;
			countMap.put(today, batchCount);
		}
	}

	
	public void finishBatch() {
		if (countMap.size() > 0) {
			DbValue value = dbMap.get(GLOBAL_KEY);
			DbValue newValue ;
			if (value == null || !value.txid.equals(id.getTransactionId())) {
				//更新数据库
				newValue = new DbValue();
				newValue.txid = id.getTransactionId() ;
				newValue.dateStr = today;
				if (value == null) {
					newValue.count = countMap.get(today) ;
				}else {
					newValue.count = value.count + countMap.get("2014-01-07") ;
				}
				dbMap.put(GLOBAL_KEY, newValue);
			}else
			{
				newValue = value;
			}
		}
		
		System.out.println("total==========================:"+dbMap.get(GLOBAL_KEY).count);
//		collector.emit(tuple)
	}

	
	public void prepare(Map conf, TopologyContext context,
			BatchOutputCollector collector, TransactionAttempt id) {
		// TODO Auto-generated method stub
		this.id = id ;
		this.collector = collector;
	}

	
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub

	}
	public static class DbValue
	{
		BigInteger txid;
		int count = 0;
		String dateStr;
	}

}



4、topo类

public class MyDailyTopo {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub

		TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId","spoutid",new MyPtTxSpout(),1);
		builder.setBolt("bolt1", new MyDailyBatchBolt(),3).shuffleGrouping("spoutid");
		builder.setBolt("committer", new MyDailyCommitterBolt(),1).shuffleGrouping("bolt1") ;
		
		Config conf = new Config() ;
		conf.setDebug(false);

		if (args.length > 0) {
			try {
				StormSubmitter.submitTopology(args[0], conf, builder.buildTopology());
			} catch (AlreadyAliveException e) {
				e.printStackTrace();
			} catch (InvalidTopologyException e) {
				e.printStackTrace();
			} catch (AuthorizationException e) {
				e.printStackTrace();
			}
		}else {
			LocalCluster localCluster = new LocalCluster();
			localCluster.submitTopology("mytopology", conf, builder.buildTopology());
		}
		
		
		
		
	}

}




6、测试结果
通过测试结果,得出结论:系统启动五个线程分别读取五个分区的前10条数据,然后用三个线程汇总每一批次数据,最后用一个线程全部汇总数据。
引用

emitPartitionBatch partition:0
启动一个事务:0----10
emitPartitionBatch partition:1
启动一个事务:0----10
emitPartitionBatch partition:2
启动一个事务:0----10
emitPartitionBatch partition:3
启动一个事务:0----10
emitPartitionBatch partition:4
启动一个事务:0----10
1:716574575327675658--2014-01-07--13
1:716574575327675658--2014-01-07--19
1:716574575327675658--2014-01-07--18
total==========================:50
emitPartitionBatch partition:0
启动一个事务:10----10
emitPartitionBatch partition:1
启动一个事务:10----10
emitPartitionBatch partition:2
启动一个事务:10----10
emitPartitionBatch partition:3
启动一个事务:10----10
emitPartitionBatch partition:4
启动一个事务:10----10
2:2288688263701331016--2014-01-07--18
2:2288688263701331016--2014-01-07--13
2:2288688263701331016--2014-01-07--19
total==========================:100

















分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics