`
ganliang13
  • 浏览: 249571 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hbase关于对表操作API实现

阅读更多
package com.gan.hbase;

import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PageFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

import com.gan.util.Const;
public class HbaseTest  {
	
	   public static Configuration configuration;

	/**
	 * a.配置host, 例如:ganbjc2 192.168.11.72 
	 * b.参考:hbase-site.xml: 
	 * <property> <name>hbase.zookeeper.quorum</name>
	 *			  <value>ganbjc2,ganbjc3,ganbjc4</value>
	 * </property> 
	 * <property> <name>hbase.rootdir</name> 
	 * 			  <value>hdfs://ganbjc1:12000/hbase</value>
	 * </property>
	 */
	    static {  
	        configuration = HBaseConfiguration.create(); 
		    //String ZOOKEEPER_QUORAM =  "zk-1:2181,zk-2:2181,zk-3:2181,zk-4:2181,zk-5:2181,zk-6:2181";
	        configuration.set("hbase.zookeeper.quorum","ganbjc2:2181,ganbjc3:2181,ganbjc4:2181");  
	        configuration.set("hbase.rootdir", "hdfs://ganbjc1:12000/hbase");  
	    }  
	    
	    
	    public static void main(String[] args) throws Exception {  
	       // createTable("gangliang13");   
	        //insertData("gangliang13");
	    	//deleteRow("gangliang13", "112");
	        //queryAllLimit("gangliang13",2); 
	    	//queryByRowKey("GidCross","0100020a000056c100004e374e267993");
	    	//queryByColumn("gangliang13","aaa1");
	    	//testLikeQuery("gangliang13","11");
	    	//queryByManyColumn("gangliang13");
	    	//queryAll("gangliang13");
	    	//System.out.println("input GidCross rawkey:"+args[0]);
	    	queryByRowKey("Gid_Cat_Visit", "0100020a00000353000005b64e1f00a8");
	    }  
	    
		public static void createTable(Configuration configuration,
				String tableName, String columnDescriptor) {
			System.out.println("start create table ......");
			try {
				HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration);
				if (hBaseAdmin.tableExists(tableName)) {
					hBaseAdmin.disableTable(tableName);
					hBaseAdmin.deleteTable(tableName);
				}
				HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
				tableDescriptor.addFamily(new HColumnDescriptor(columnDescriptor));
				hBaseAdmin.createTable(tableDescriptor,getHexSplits("0","FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",16));
			} catch (Exception e) {
				e.printStackTrace();
			}
			System.out.println("end create table ......");
		}
		
		
		public static void createTable(Configuration configuration,String tableName, String columnDescriptor,List<String> regionKeyList) {
			System.out.println("start create table ......");
			try {
				HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration);
				if (hBaseAdmin.tableExists(tableName)) {
					hBaseAdmin.disableTable(tableName);
					hBaseAdmin.deleteTable(tableName);
				}
				HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
				tableDescriptor.addFamily(new HColumnDescriptor(columnDescriptor));
				hBaseAdmin.createTable(tableDescriptor,getHexSplits(regionKeyList));
			} catch (Exception e) {
				e.printStackTrace();
			}
			System.out.println("end create table ......");
		}
		
		public static byte[][] getHexSplits(String startKey, String endKey,
				int numRegions) {
			byte[][] splits = new byte[numRegions - 1][];
			BigInteger lowestKey = new BigInteger(startKey, 16);
			BigInteger highestKey = new BigInteger(endKey, 16);
			BigInteger range = highestKey.subtract(lowestKey);
			BigInteger regionIncrement = range.divide(BigInteger
					.valueOf(numRegions));
			lowestKey = lowestKey.add(regionIncrement);
			for (int i = 0; i < numRegions - 1; i++) {
				BigInteger key = lowestKey.add(regionIncrement.multiply(BigInteger
						.valueOf(i)));
				byte[] b = String.format("%016x", key).getBytes();
				splits[i] = b;
			}
			return splits;
		}
		
		
		public static byte[][] getHexSplits(List<String> regionStartKeyList) {
			if(regionStartKeyList ==null || regionStartKeyList.size()==0){
				return getHexSplits("0","FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF",16);
			}else{
				byte[][] splits = new byte[regionStartKeyList.size()][];
				for (int i = 0; i <regionStartKeyList.size(); i++) {
					BigInteger key = new BigInteger(regionStartKeyList.get(i), 16);
					byte[] b = String.format("%016x", key).getBytes();
					splits[i] = b;
				}
				return splits;
			}
		}
		
		public StringBuffer compactKV(String k, String v) {
			StringBuffer retval = new StringBuffer();
			retval.append(k).append(Const.KV_SPLIT).append(v)
					.append(Const.COLUMN_SPLIT);
			return retval;
		}
	    
	    /**
	     * 如果存在要创建的表,那么先删除,再创建
	     * @param tableName
	     */
	    public static void createTable(String tableName) {  
	        System.out.println("start create table ......");  
	        try {  
	            HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration);  
	            if (hBaseAdmin.tableExists(tableName)) {  
	                hBaseAdmin.disableTable(tableName);  
	                hBaseAdmin.deleteTable(tableName);  
	                System.out.println(tableName + " is exist,detele....");  
	            }  
	            HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);  
	            tableDescriptor.addFamily(new HColumnDescriptor("name"));  
	            tableDescriptor.addFamily(new HColumnDescriptor("age"));  
	            tableDescriptor.addFamily(new HColumnDescriptor("sex"));  
	            hBaseAdmin.createTable(tableDescriptor);  
	        } catch (MasterNotRunningException e) {  
	            e.printStackTrace();  
	        } catch (ZooKeeperConnectionException e) {  
	            e.printStackTrace();  
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	        System.out.println("end create table ......");  
	    }  
	    
	    /**
	     * 插入数据
	     * @param tableName
	     */
	    public static void insertData(String tableName) {  
	        System.out.println("start insert data ......");  
	        HTablePool pool = new HTablePool(configuration, 1000);  
	        HTableInterface table =  pool.getTable(tableName);  
	        Put put = new Put("111".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值  
	        put.add("name".getBytes(), null, "aaa1".getBytes());// 本行数据的第一列  
	        put.add("age".getBytes(), null, "bbb1".getBytes());// 本行数据的第三列  
	        put.add("sex".getBytes(), null, "ccc1".getBytes());// 本行数据的第三列  
	        
	        Put put2 = new Put("222".getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY,此处rowkey为put构造方法中传入的值  
	        put2.add("name".getBytes(), null, "aaa2".getBytes());// 本行数据的第一列  
	        put2.add("name".getBytes(), "nickname".getBytes(), "aaabbbbbnick".getBytes());
	        put2.add("age".getBytes(), null, "bbb2".getBytes());// 本行数据的第三列  
	        put2.add("sex".getBytes(), null, "ccc2".getBytes());// 本行数据的第三列  
	        try {  
	            table.put(put);  
	            table.put(put2);
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	        System.out.println("end insert data ......");  
	    }  
	    
	    /**
	     * 
	     * @param tableName 取前面limit条
	     */
	    public static void queryAllLimit(String tableName,int limit) {  
	        HTablePool pool = new HTablePool(configuration, 1000); 
	        HTableInterface table = pool.getTable(tableName);
	        try {  
	            Scan scan = new Scan();  
	            scan.setCaching(1);  
	            Filter filter = new PageFilter(limit); 
	            scan.setFilter(filter);  
	            ResultScanner scanner = table.getScanner(scan);// 执行扫描查找 int num = 0;  
	            Iterator<Result> res = scanner.iterator();// 返回查询遍历器  
	            while (res.hasNext()) {  
	                Result result = res.next();  
	                table.setWriteBufferSize(1024*1024*1);
	                KeyValue[] kv = result.raw();
	                for (int i = 0; i < kv.length; i++) {
		                System.out.print(new String(kv[i].getRow()) + "  ");
	                    System.out.print(new String(kv[i].getFamily()) + ":");
	                    System.out.print(new String(kv[i].getQualifier()) + "  ");
	                    System.out.print(kv[i].getTimestamp() + "  ");
	                    System.out.println(new String(kv[i].getValue()));
	                }
	            }  
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	    }  
	    
	    
	    
	    /**
	     * 对行key进行like查询
	     * @param table
	     * @param likeQuery
	     * @throws Exception
	     */
	    public static void testLikeQuery(String table,String likeQuery) throws Exception {  
	        Scan scan = new Scan();  
	        RegexStringComparator comp = new RegexStringComparator(likeQuery);  
	        RowFilter filter = new RowFilter(CompareOp.EQUAL, comp);  
	        scan.setFilter(filter);  
	        scan.setCaching(200);  
	        scan.setCacheBlocks(false);  
	        HTable hTable = new HTable(configuration, table);  
	        ResultScanner scanner = hTable.getScanner(scan);  
	        for (Result r : scanner) {  
	        	KeyValue[] kv = r.raw();
                for (int i = 0; i < kv.length; i++) {
	                System.out.print(new String(kv[i].getRow()) + "  ");
                    System.out.print(new String(kv[i].getFamily()) + ":");
                    System.out.print(new String(kv[i].getQualifier()) + "  ");
                    System.out.print(kv[i].getTimestamp() + "  ");
                    System.out.println(new String(kv[i].getValue()));
                } 
            }  
	    }  
	    
	    /**
	     * 查询表所有行
	     * @param tableName
	     */
	    public static void queryAll(String tableName) {  
	        HTablePool pool = new HTablePool(configuration, 1000); 
	        HTableInterface table = pool.getTable(tableName);
	        try {  
	            ResultScanner rs = table.getScanner(new Scan());  
	            for (Result r : rs) {  
	            	KeyValue[] kv = r.raw();
	                for (int i = 0; i < kv.length; i++) {
		                System.out.print(new String(kv[i].getRow()) + "  ");
	                    System.out.print(new String(kv[i].getFamily()) + ":");
	                    System.out.print(new String(kv[i].getQualifier()) + "  ");
	                    System.out.print(kv[i].getTimestamp() + "  ");
	                    System.out.println(new String(kv[i].getValue()));
	                } 
	            }  
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	    }  
	    
	    /**
	     * 根据行记录值删除
	     * @param tablename
	     * @param rowkey
	     */
	    public static void deleteRow(String tablename, String rowkey)  {  
	        try {  
	            HTable table = new HTable(configuration, tablename);  
	            List<Delete> list = new ArrayList<Delete>();  
	            Delete d1 = new Delete(rowkey.getBytes());  
	            list.add(d1);  
	            table.delete(list);  
	            System.out.println("删除行成功!");  
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	    }  
	    
	    /**
	     * 根据行记录索引
	     * @param tableName
	     * @param row
	     * @throws IOException
	     */
	    public static void queryByRowKey(String tableName,String row) throws IOException {  
	    	HTable table = new HTable(configuration, tableName); 
	    	//System.err.println(table.getRegionLocation(row.getBytes()));
	        try {  
	            Get scan = new Get(row.getBytes());// 根据rowkey查询  
	            Result r = table.get(scan);  
	        	KeyValue[] kv = r.raw();
                for (int i = 0; i < kv.length; i++) {
	               // System.out.print(new String(kv[i].getRow()) + "  ");
                    //System.out.print(new String(kv[i].getFamily()) + ":");
                    System.out.print(new String(kv[i].getQualifier()) + "  ");
                   // System.out.print(kv[i].getTimestamp() + "  ");
                    System.out.println(new String(kv[i].getValue()));
                }
                System.out.println("==============");
            	for (KeyValue kv2 : r.raw()) {
            		  System.out.println(new String(kv2.getQualifier()) + "  ");
            		  System.out.println(new String(kv2.getValue()) + "  ");
                      
					
				}
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        }  
	    }  
	    
	    /** 
	     * 按列查询,查询多条记录 
	     * @param tableName 
	     */  
	    public static void queryByColumn(String tableName,String columnValue) {  
	        try {  
	            HTable table = new HTable(configuration,tableName);  
	            Filter filter = new SingleColumnValueFilter(Bytes.toBytes("name"), null, CompareOp.EQUAL, Bytes.toBytes(columnValue)); // 当列column1的值为aaa时进行查询  
	            Scan s = new Scan();  
	            s.setFilter(filter);  
	            ResultScanner rs = table.getScanner(s);  
	            for (Result r : rs) {  
	            	KeyValue[] kv = r.raw();
	                for (int i = 0; i < kv.length; i++) {
		                System.out.print(new String(kv[i].getRow()) + "  ");
	                    System.out.print(new String(kv[i].getFamily()) + ":");
	                    System.out.print(new String(kv[i].getQualifier()) + "  ");
	                    System.out.print(kv[i].getTimestamp() + "  ");
	                    System.out.println(new String(kv[i].getValue()));
	                }
	            }  
	        } catch (Exception e) {  
	            e.printStackTrace();  
	        }  
	    }  
	    
	    /** 
	     * 按多列查询,查询多条记录 
	     * @param tableName 
	     */  
	    public static void queryByManyColumn(String tableName) {  
	    	 try {  
	             HTable table =  new HTable(configuration,tableName); 
	             List<Filter> filters = new ArrayList<Filter>();  
	   
	             Filter filter1 = new SingleColumnValueFilter(Bytes  
	                     .toBytes("age"), null, CompareOp.EQUAL, Bytes.toBytes("bbb1"));  
	             filters.add(filter1);  
	   
	             Filter filter2 = new SingleColumnValueFilter(Bytes  
	                     .toBytes("name"), null, CompareOp.EQUAL, Bytes.toBytes("aaa1")); 
	             filters.add(filter2);  
	   
	             FilterList filterList1 = new FilterList(filters);  
	   
	             Scan scan = new Scan();  
	             scan.setFilter(filterList1);  
	             ResultScanner rs = table.getScanner(scan);  
	             for (Result r : rs) {  
            		KeyValue[] kv = r.raw();
                    for (int i = 0; i < kv.length; i++) {
    	                System.out.print(new String(kv[i].getRow()) + "  ");
                        System.out.print(new String(kv[i].getFamily()) + ":");
                        System.out.print(new String(kv[i].getQualifier()) + "  ");
                        System.out.print(kv[i].getTimestamp() + "  ");
                        System.out.println(new String(kv[i].getValue()));
                    }  
	             }  
	             rs.close();  
	   
	         } catch (Exception e) {  
	             e.printStackTrace();  
	         }  
	    }  
}

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics