HBase Version: hbase-0.90.3-cdh3u1
org.apache.hadoop.hbase.client.HTablePool
用起来不是很方便. 所以重写了一些HTablePool, 对自己业务逻辑这块比较相关. 欢迎讨论.
主要是对源代码下面4点进行改进和设置:
1. 为不同的table建立的poolSize不一样, 目前HTablePool为所有的table建立的maxSize一致.
private final int maxSize;
2. 从HTablePool中getTable是只初始化了一个HTable, 而且在这个时候才初始化HTable的Queen.
这个点不是很好, 也是我想修改源码的出发点.
/** * Get a reference to the specified table from the pool.<p> * * Create a new one if one is not available. * @param tableName table name * @return a reference to the specified table * @throws RuntimeException if there is a problem instantiating the HTable */ public HTableInterface getTable(String tableName) { LinkedList<HTableInterface> queue = tables.get(tableName); if(queue == null) { queue = new LinkedList<HTableInterface>(); tables.putIfAbsent(tableName, queue); return createHTable(tableName); } HTableInterface table; synchronized(queue) { table = queue.poll(); } if(table == null) { return createHTable(tableName); } return table; }
3. 应该有一个createTablePool的方法, 便于用户自己创建HTablePool.此方法可以与closeTablePool相互呼应.创建后面再关闭.
public void closeTablePool(final String tableName) { Queue<HTableInterface> queue = tables.get(tableName); synchronized (queue) { HTableInterface table = queue.poll(); while (table != null) { this.tableFactory.releaseHTableInterface(table); table = queue.poll(); } } HConnectionManager.deleteConnection(this.config, true); }
4. 由于HTable的Put可以使用优化, 让多个Put一起提交flushCommits(). 循环pool的Htable,调用flushCommits().
贴上修改后的源码:
package org.apache.hadoop.hbase.client; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; /** * A rewrite pool of HTable instances.<p> * * Each HTablePool acts as a pool for all tables. To use, instantiate an * HTablePool and use {@link #getTable(String)} to get an HTable from the pool. * Once you are done with it, return it to the pool with {@link #putTable(HTableInterface)}. * * <p>A pool can be created with a <i>maxSize</i> which defines the most HTable * references that will ever be retained for each table. Otherwise the default * is {@link Integer#MAX_VALUE}. * * <p>Pool will manage its own cluster to the cluster. See {@link HConnectionManager}. * * @author greatwqs * @update 2012-08-25 */ public class MyHTablePool { public final static int DEFAULT_POOL_SIZE = 4; /** * ConcurrentMap<String, LinkedList<HTableInterface>> * String tableName * LinkedList<HTableInterface> the HTable pool contains HTableInterface * LinkedList you can create HTable pool different size you want. */ private final ConcurrentMap<String, LinkedList<HTableInterface>> tables = new ConcurrentHashMap<String, LinkedList<HTableInterface>>(); /*** * Configuration for hbase-site.xml */ private final Configuration config; /** * HTableInterfaceFactory that createHTableInterface and releaseHTableInterface */ private final HTableInterfaceFactory tableFactory; /** * Default Constructor. */ public MyHTablePool() { this(HBaseConfiguration.create()); } /** * Constructor to set maximum versions and use the specified configuration. * @param config configuration */ public MyHTablePool(final Configuration config) { this(config, null); } public MyHTablePool(final Configuration config, final HTableInterfaceFactory tableFactory) { // Make a new configuration instance so I can safely cleanup when // done with the pool. this.config = config == null ? new Configuration() : new Configuration( config); this.tableFactory = tableFactory == null ? new HTableFactory() : tableFactory; } /** * Create all the HTable instances , belonging to the given table. * <p> * Note: this is a 'create' of the given table pool. * @param tableName * @param maxSize * @param isAutoFlush */ public void createHTablePool(final String tableName, final int maxSize, boolean isAutoFlush) { LinkedList<HTableInterface> queue = tables.get(tableName); if (queue == null) { queue = new LinkedList<HTableInterface>(); tables.putIfAbsent(tableName, queue); } synchronized (queue) { int addHTableSize = maxSize - queue.size(); if(addHTableSize <= 0){ return; } for(int i=0; i<addHTableSize; i++){ HTable table = (HTable)createHTable(tableName); if(table != null){ table.setAutoFlush(isAutoFlush); queue.add(table); } } } } /** * Create all the HTable instances , belonging to the given tables. * <p> * Note: this is a 'create' of the given table pool. * @param tableNameArray * @param maxSize * @param isAutoFlush default false * usage example: * false: when {@link Put} use. use buffere put. call flushCommits after a time. * you can design a thread(such as 3MS run a time)to loop all pool table, and call flushCommits. * the performance well. * true: when {@link Scan} and {@link Delete} use. */ public void createHTablePool(final String[] tableNameArray, final int maxSize, boolean isAutoFlush) { for(String tableName : tableNameArray){ createHTablePool(tableName,maxSize,isAutoFlush); } } /** * Create all the HTable instances , belonging to the given tables. * <p> * Note: this is a 'create' of the given table pool. * @param tableName * @param maxSize */ public void createHTablePool(final String[] tableNameArray, final int maxSize) { createHTablePool(tableNameArray,maxSize,false); } /** * Get a reference to the specified table from the pool.<p> * * @param tableName table name * @return a reference to the specified table * @throws RuntimeException if there is a problem instantiating the HTable */ public HTableInterface getHTable(String tableName) { LinkedList<HTableInterface> queue = tables.get(tableName); if (queue == null) { throw new RuntimeException("There is no pool for the HTable"); } HTableInterface table; synchronized (queue) { table = queue.poll(); } return table; } /** * Get a reference to the specified table from the pool.<p> * * Create a new one if one is not available. * @param tableName table name * @return a reference to the specified table * @throws RuntimeException if there is a problem instantiating the HTable */ public HTableInterface getHTable(byte[] tableName) { return getHTable(Bytes.toString(tableName)); } /** * Puts the specified HTable back into the pool. * <p> * * If the HTable not belong to HTablePool before, do not use this method. * * @param table table */ public void putHTableBack(HTableInterface table) { LinkedList<HTableInterface> queue = tables.get(Bytes.toString(table.getTableName())); synchronized (queue) { queue.add(table); } } protected HTableInterface createHTable(String tableName) { return this.tableFactory.createHTableInterface(config, Bytes .toBytes(tableName)); } /** * Closes all the HTable instances , belonging to the given table, in the table pool. * <p> * Note: this is a 'shutdown' of the given table pool and different from * {@link #putTable(HTableInterface)}, that is used to return the table * instance to the pool for future re-use. * * @param tableName */ public void closeHTablePool(final String tableName) { Queue<HTableInterface> queue = tables.get(tableName); synchronized (queue) { HTableInterface table = queue.poll(); while (table != null) { this.tableFactory.releaseHTableInterface(table); table = queue.poll(); } } HConnectionManager.deleteConnection(this.config, true); } /** * See {@link #closeTablePool(String)}. * * @param tableName */ public void closeHTablePool(final byte[] tableName) { closeHTablePool(Bytes.toString(tableName)); } /** * See {@link #closeTablePool(String)}. * * @param tableName */ public void closeHTablePool() { for(String tabName:tables.keySet()){ closeHTablePool(tabName); } } /** * getCurrentPoolSize * @param tableName * @return */ public int getCurrentPoolSize(String tableName) { Queue<HTableInterface> queue = tables.get(tableName); synchronized (queue) { return queue.size(); } } }
org.apache.hadoop.hbase.client.MyHTablePoolTest 测试实例
package org.apache.hadoop.hbase.client; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.KeyValue; /** * @author greatwqs * @update 2012-08-25 */ public class MyHTablePoolTest { /** * test method * @param args * @throws Exception */ public static void main(String[] args) throws Exception{ // 1. my config file String configFile = "conf/hbase-site.xml"; Configuration config = new Configuration(); config.addResource(new Path(configFile)); // 2. init HTablePool MyHTablePool myPool = new MyHTablePool(config); // 3. create HTablePool for a table myPool.createHTablePool("DCP_DataCenter_Base", MyHTablePool.DEFAULT_POOL_SIZE, false); // 4. get already exist HTable from HTablePool HTable table = (HTable) myPool.getHTable("DCP_DataCenter_Base"); if(table != null){ System.out.println("get HTable from HTablePool Success!"); } // 5. get all data from HTable, and print to console. Scan scan = new Scan(); ResultScanner rs = table.getScanner(scan); try { for (Result result : rs) { KeyValue[] kv = result.raw(); byte[] key = kv[0].getRow(); System.out.println("RowKey: " + new String(key)); for (int i = 0; i < kv.length; i++) { System.out.println("ColumnFamily: " + new String(kv[i].getFamily())); System.out.println("Qualifier: "+ new String(kv[i].getQualifier())); System.out.println("Timestamp: "+ String.valueOf(kv[i].getTimestamp())); System.out.println("Value: " + new String(kv[i].getValue())); } System.out.println(); } } catch (Exception e) { e.printStackTrace(); } finally { rs.close(); } // 6. after use HTable end, then put the HTable back to HTablePool. myPool.putHTableBack(table); // 7. close HTablePool myPool.closeHTablePool(); } }
相关推荐
HBase: The Definitive Guide If your organization is looking for a storage solution to accommodate a virtually endless amount of data, this book will show you how Apache HBase can fulfill your needs. ...
如果你正在寻找一种具备可伸缩性的存储解决方案来适应几乎没有穷尽的数据的话,这本书将可以向你表明apache hbase完全能够满足你的需求。作为google bigtable架构的开源实现,hbase能够支持数以十亿计的记录数和数以...
HBase:权威指南(英文版)
HBase:权威指南(中文版) HBase经典书籍推荐 !
HBase:HBase_in_Alibaba_Search(绝顶).pdf
HBase:权威指南,英文版的。可以看看,还不错哦
Maven坐标:org.apache.hbase:hbase-common:1.4.3; 标签:apache、common、hbase、jar包、java、API文档、中文版; 使用方法:解压翻译后的API文档,用浏览器打开“index.html”文件,即可纵览文档内容。 人性化...
HBase:一种新型易扩展的大数据存储架构,丁海杰,,本文简要介绍了云计算开源系统Hadoop在解决大数据存储和分析的作用,而企业使用传统的关系型数据库系统在解决大数据存储时遇到了很
HBase是基于Hadoop的面向列存储的数据库,是BigTable的开源实现,在FaceBook等大型公司中广泛应用。这本书是关于Hbase的一个详细的介绍。
While reading into Hadoop you found that for random access to the accumulated data there is something call HBase. Or it was the hype that is prevalent these days addressing a new kind of data storage...
电子书,全面介绍了Hbase,大家应该懂得,英文版pdf,。
O'Reilly的新书,HBase的权威
│ [案例:Hbase的设计及企业优化].pdf ├─02_视频 │ Hbase表的设计 │ Hbase表中rowkey及列簇的设计 │ Hbase表设计-微博案例的表设计 │ Hbase表设计-微博案例的业务实现 │ Hbase列簇属性的介绍 │ Hbase性能...
hbase:Apache HBase
HBase基本操作 增删改查 java代码 要使用须导入对应的jar包
实验目的: 学习Hive基本知识;2)提高Linux操作技能;3)巩固Hadoop相关知识;4)了解Hive架构与相关组件。 实验内容: 1)配置伪分布式Hadoop3系统;2)配置并运行Hive服务HiveServer2;3)以beeline连接HiveServer2...
compile "org.banyan.spring.boot:spring-boot-starter-hbase:1.0.0" 集成 在spring-boot项目的application.properties文件中加入spring.data.hbase.quorum,spring.data.hbase.rootDir,spring.data.hbase.nodeParent...
HBase(hbase-2.4.9-bin.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System...