- 浏览: 54074 次
- 性别:
- 来自: 北京
文章分类
最新评论
1.分区事务spout
----------------------------分区事务与普通事务bolt处理类相同,不作过多改变--------------------------
2、统计当天数据
3、汇总批数据,更新数据库
4、topo类
6、测试结果
通过测试结果,得出结论:系统启动五个线程分别读取五个分区的前10条数据,然后用三个线程汇总每一批次数据,最后用一个线程全部汇总数据。
emitPartitionBatch partition:0
启动一个事务:0----10
emitPartitionBatch partition:1
启动一个事务:0----10
emitPartitionBatch partition:2
启动一个事务:0----10
emitPartitionBatch partition:3
启动一个事务:0----10
emitPartitionBatch partition:4
启动一个事务:0----10
1:716574575327675658--2014-01-07--13
1:716574575327675658--2014-01-07--19
1:716574575327675658--2014-01-07--18
total==========================:50
emitPartitionBatch partition:0
启动一个事务:10----10
emitPartitionBatch partition:1
启动一个事务:10----10
emitPartitionBatch partition:2
启动一个事务:10----10
emitPartitionBatch partition:3
启动一个事务:10----10
emitPartitionBatch partition:4
启动一个事务:10----10
2:2288688263701331016--2014-01-07--18
2:2288688263701331016--2014-01-07--13
2:2288688263701331016--2014-01-07--19
total==========================:100
public class MyPtTxSpout implements IPartitionedTransactionalSpout<MyMata>{ /** * 分区事务spout */ private static final long serialVersionUID = 1L; public static int BATCH_NUM = 10 ; public Map<Integer, Map<Long,String>> PT_DATA_MP = new HashMap<Integer, Map<Long,String>>(); public MyPtTxSpout() { Random random = new Random(); String[] hosts = { "www.taobao.com" }; String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" }; String[] time = { "2014-01-07 08:40:50", "2014-01-07 08:40:51", "2014-01-07 08:40:52", "2014-01-07 08:40:53", "2014-01-07 09:40:49", "2014-01-07 10:40:49", "2014-01-07 11:40:49", "2014-01-07 12:40:49" }; //建立五个分区数据 for (int j = 0; j < 5; j++) { HashMap<Long, String> dbMap = new HashMap<Long, String> (); for (long i = 0; i < 100; i++) { dbMap.put(i,hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]); } PT_DATA_MP.put(j, dbMap); } } public org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout.Coordinator getCoordinator( Map conf, TopologyContext context) { return new MyCoordinator(); } public org.apache.storm.transactional.partitioned.IPartitionedTransactionalSpout.Emitter<MyMata> getEmitter( Map conf, TopologyContext context) { return new MyEmitter(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { //发射数据格式 declarer.declare(new Fields("tx","log")); } public Map<String, Object> getComponentConfiguration() { return null; } public class MyCoordinator implements IPartitionedTransactionalSpout.Coordinator { public void close() { // TODO Auto-generated method stub } public boolean isReady() { // TODO Auto-generated method stub Utils.sleep(1000); return true; } //声明5个分区 public int numPartitions() { // TODO Auto-generated method stub return 5; } } public class MyEmitter implements IPartitionedTransactionalSpout.Emitter<MyMata> { public void close() { } public void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, MyMata partitionMeta) { // 与普通事务不同的是,将MyCoordinator方法的发射数据方在emitPartitionBatch方法中执行 System.err.println("emitPartitionBatch partition:"+partition); long beginPoint = partitionMeta.getBeginPoint() ; int num = partitionMeta.getNum() ; Map<Long, String> batchMap = PT_DATA_MP.get(partition); for (long i = beginPoint; i < num+beginPoint; i++) { if (batchMap.get(i)==null) { break; } collector.emit(new Values(tx,batchMap.get(i))); } } public MyMata emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, MyMata lastPartitionMeta) { // 获取批次的开始节点 long beginPoint = 0; if (lastPartitionMeta == null) { beginPoint = 0 ; }else { beginPoint = lastPartitionMeta.getBeginPoint() + lastPartitionMeta.getNum() ; } MyMata mata = new MyMata() ; mata.setBeginPoint(beginPoint); mata.setNum(BATCH_NUM); //调用上面emitPartitionBatch方法接着发射数据处理 emitPartitionBatch(tx,collector,partition,mata); System.err.println("启动一个事务:"+mata.toString()); return mata; } } }
----------------------------分区事务与普通事务bolt处理类相同,不作过多改变--------------------------
2、统计当天数据
/** * 分区事务bolt与普通事务bolt相同,不作更多改变 */ public class MyDailyBatchBolt implements IBatchBolt<TransactionAttempt> { private static final long serialVersionUID = 1L; //统计当天数据 Map<String, Integer> countMap = new HashMap<String, Integer>(); BatchOutputCollector collector ; Integer count = null; String today = null; TransactionAttempt tx = null; public void execute(Tuple tuple) { String log = tuple.getString(1); tx = (TransactionAttempt)tuple.getValue(0); if (log != null && log.split("\\t").length >=3 ) { today = DateFmt.getCountDate(log.split("\\t")[2], DateFmt.date_short) ; count = countMap.get(today); if(count == null) { count = 0; } count ++ ; countMap.put(today, count); } } public void finishBatch() { System.err.println(tx+"--"+today+"--"+count); collector.emit(new Values(tx,today,count)); } public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("tx","date","count")); } public Map<String, Object> getComponentConfiguration() { return null; } }
3、汇总批数据,更新数据库
/** * 分区事务CommitterBolt与普通事务CommitterBolt相同,不作更多改变 */ public class MyDailyCommitterBolt extends BaseTransactionalBolt implements ICommitter{ private static final long serialVersionUID = 1L; //汇总批数据,更新数据库 public static final String GLOBAL_KEY = "GLOBAL_KEY"; public static Map<String, DbValue> dbMap = new HashMap<String, DbValue>() ; Map<String, Integer> countMap = new HashMap<String, Integer>(); TransactionAttempt id ; BatchOutputCollector collector; String today = null; public void execute(Tuple tuple) { today = tuple.getString(1) ; Integer count = tuple.getInteger(2); id = (TransactionAttempt)tuple.getValue(0); if (today !=null && count != null) { Integer batchCount = countMap.get(today) ; if (batchCount == null) { batchCount = 0; } batchCount += count ; countMap.put(today, batchCount); } } public void finishBatch() { if (countMap.size() > 0) { DbValue value = dbMap.get(GLOBAL_KEY); DbValue newValue ; if (value == null || !value.txid.equals(id.getTransactionId())) { //更新数据库 newValue = new DbValue(); newValue.txid = id.getTransactionId() ; newValue.dateStr = today; if (value == null) { newValue.count = countMap.get(today) ; }else { newValue.count = value.count + countMap.get("2014-01-07") ; } dbMap.put(GLOBAL_KEY, newValue); }else { newValue = value; } } System.out.println("total==========================:"+dbMap.get(GLOBAL_KEY).count); // collector.emit(tuple) } public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) { // TODO Auto-generated method stub this.id = id ; this.collector = collector; } public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub } public static class DbValue { BigInteger txid; int count = 0; String dateStr; } }
4、topo类
public class MyDailyTopo { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("ttbId","spoutid",new MyPtTxSpout(),1); builder.setBolt("bolt1", new MyDailyBatchBolt(),3).shuffleGrouping("spoutid"); builder.setBolt("committer", new MyDailyCommitterBolt(),1).shuffleGrouping("bolt1") ; Config conf = new Config() ; conf.setDebug(false); if (args.length > 0) { try { StormSubmitter.submitTopology(args[0], conf, builder.buildTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } }else { LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.buildTopology()); } } }
6、测试结果
通过测试结果,得出结论:系统启动五个线程分别读取五个分区的前10条数据,然后用三个线程汇总每一批次数据,最后用一个线程全部汇总数据。
引用
emitPartitionBatch partition:0
启动一个事务:0----10
emitPartitionBatch partition:1
启动一个事务:0----10
emitPartitionBatch partition:2
启动一个事务:0----10
emitPartitionBatch partition:3
启动一个事务:0----10
emitPartitionBatch partition:4
启动一个事务:0----10
1:716574575327675658--2014-01-07--13
1:716574575327675658--2014-01-07--19
1:716574575327675658--2014-01-07--18
total==========================:50
emitPartitionBatch partition:0
启动一个事务:10----10
emitPartitionBatch partition:1
启动一个事务:10----10
emitPartitionBatch partition:2
启动一个事务:10----10
emitPartitionBatch partition:3
启动一个事务:10----10
emitPartitionBatch partition:4
启动一个事务:10----10
2:2288688263701331016--2014-01-07--18
2:2288688263701331016--2014-01-07--13
2:2288688263701331016--2014-01-07--19
total==========================:100
发表评论
-
ITridentSpout、FirstN(取Top N)实现、 流合并和join
2017-05-25 10:01 1001一、ITridentSpout 基于事务 static int ... -
Trident实战之计算网站PV
2017-05-24 13:24 6221、Trident实战之计算网站PV /** * ... -
Trident API和概念
2017-05-23 10:57 708一、Trident API——Spout ITride ... -
Trident入门
2017-05-22 13:44 481英文原址:https://github.com/nathanm ... -
分布式远程调用drpc实例
2017-05-22 10:53 389一、DRPC定义 分布式dRPC(distributed RP ... -
不透明分区事务IOpaquePartitionedTransactional实例
2017-05-22 10:54 6511、spout public class MyOpaq ... -
普通事务ITransactionalSpout实例之按天统计数据
2017-05-20 16:56 4501、普通事务Spout /** * 普通事务Spou ... -
普通事务ITransactionalSpout实例
2017-05-20 15:45 7921、普通事务Spout /** * 普通事务Spou ... -
Storm事务API
2017-05-19 16:00 583Spout ITransactionalSpout<T& ... -
Storm批处理事务原理详解
2017-05-19 15:54 2064事务:Storm容错机制通 ... -
集群统一启动和停止shell脚本开发
2017-05-17 09:56 4181、cd 2、ls -al 显示隐藏目录 3、rm -rf ... -
storm高并发UV统计
2017-05-14 22:05 1087统计高并发UV可行的方案(类似WordCount的计算去重wo ... -
storm高并发PV统计,利用zookeeper锁输出汇总值
2017-05-14 14:42 869汇总型方案: 1、shuffleGrouping下,pv(单线 ... -
storm高并发PV统计
2017-04-16 17:54 650一、PV统计思考 方案需要考虑分析多线程下,注意线程安全问题。 ... -
Storm高并发运用WordSum
2017-04-16 14:21 10401、创建发射所有字符串统计总个数及去重个数处理类 pub ... -
storm分组策略介绍
2017-04-16 11:46 673一、storm数据来源 Spout的数据源: MQ:直接流数 ... -
Storm高并发介绍
2017-04-16 10:18 557并发度: worker:指的是component (spo ... -
Storm 字符统计Demo
2017-04-14 13:57 5051、数据源读取,字符发射spout类 /** * 字符 ... -
Storm 本地模式
2017-04-09 22:25 376本地模式,是在eclipse等编译器编写strom运行文件 ... -
Storm启动配置
2017-03-29 17:40 641一、安装Storm wget ...
相关推荐
sqlserver分区表制作实例.doc
oracle表分区实例.doc oracle表分区实例.doc oracle表分区实例.doc
在MSCS环境下实现DB2分区服务器群集实例
在MSCS环境下实现DB2分区服务器集群实例
SQL中,分区表的实例,通过此实例理解分区表,掌握分区表如何创建,如何存储数据等
oracle 数据库的表分区操作实例,适合学习操作对表进行分区。
适合大数据的分区保存,能以较快速度查找 这里边是一些实例,英文资料
unix AIX 环境下 exp 备份 Oracle 分区表实例
项目中有需求要垂直分表,即按照时间区间将数据拆分到n个表中,PostgreSQL提供了分区表的功能。分区表实际上是把逻辑上的一个大表分割成物理上的几小块,提供了很多好处,比如: 1、查询性能大幅提升 2、删除历史...
ORACLE 分区表 分区索引 索引分区 实例讲解
实例解说Linux中fdisk分区使用方法实例解说Linux中fdisk分区使用方法
MySQL交换分区的实例详解 前言 在介绍交换分区之前,我们先了解一下 mysql 分区。 数据库的分区有两种:水平分区和垂直分区。而MySQL暂时不支持垂直分区,因此接下来说的都是水平分区。水平分区即:以行为单位对表...
MS SQL Server:分区表、分区索引 详解 1. 分区表简介 使用分区表的主要目的,是为了改善大型表以及具有各种访问模式的表的可伸缩性和可管理性。 大型表:数据量巨大的表。 访问模式:因目的不同,需访问的...
本文将从扩展分区的物理结构、扩展分区的逻辑结构、EBR的两个磁盘分区表表项的差异、计算逻辑分区的起始绝对LBA地址等角度,结合实例,逐步分析和验证扩展的结构。总的来说,扩展分区的结构是以逻辑分区结构体为基本...
用分区助手移动分区把未分配空间集合在右边也就是userdata分区那一边保存修改(提交)然后用DiskGenius新建分区userdata把剩下的空间都给它 文件系统类型linux data partion 勾上对齐到下列扇区数的整数倍数值是8 ...
不止如此,分区助手基于调整分区功能出发,能无损数据地实现扩大分区,缩小分区,合并分区,拆分分区等操作。此外,它也能迁移系统到固态硬盘、实现硬盘克隆,是一个强大的硬盘分区工具软件。 1.调整硬盘分区大小 ...
无忧分区 无忧调整分区大小 如果你不想破坏你的数据,还想调整分区的容量,那么就请用“无忧分区”。
磁盘分区磁盘分区磁盘分区磁盘分区