`

hbase初探之三(小结)

阅读更多

经过几天来的看资料,写代码,终于对这个东东有点眉目了。

package linhon.crud;

import java.util.Date;
import java.util.Map.Entry;
import java.util.NavigableMap;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

/***
 * test hbase crud operations
 * @author leibnitz
 * @create jan,12,11
 */
public class TestHbaseCrud {
	
	/**
	 * 不存在rowKey则添加;否则代表修改某column(s).这些操作在行级上更新是原子的。
	 * @param tableName
	 * @param rowkey
	 * @param content
	 * @param addTime
	 * @throws Exception
	 */
	public static void add(String tableName,int rowkey,String content,Date addTime) throws Exception{
		HBaseConfiguration hbaseConf = new HBaseConfiguration();
		HTable htable = new HTable(hbaseConf, tableName);
		htable.setAutoFlush(false);
		htable.setWriteBufferSize(1024 * 5);
		
		//add
		byte[] rowKey = Bytes.toBytes(rowkey);
		Put put = new Put(rowKey );
		if(content != null)
			put.add(Bytes.toBytes("info"), Bytes.toBytes("content"), addTime.getTime(),Bytes.toBytes(content));
		if(addTime != null)		//can add more than one column at the same time
			put.add(Bytes.toBytes("info"), Bytes.toBytes("add_time"), addTime.getTime(),Bytes.toBytes(addTime.getTime()));
		
		htable.put(put);
		htable.flushCommits();
		htable.close();		//invoke flushCommits() also	
	}
	
	/**
	 * add a column(member) to specified row
	 * @param tableName
	 * @param rowkey
	 * @param family
	 * @param column
	 * @throws Exception
	 */
	public static void addColumnOnly(String tableName,int rowkey,String family,String column) throws Exception{
		HBaseConfiguration hbaseConf = new HBaseConfiguration();
		HTable htable = new HTable(hbaseConf, tableName);
		htable.setAutoFlush(false);
		htable.setWriteBufferSize(1024 * 5);
		
		//add
		byte[] rowKey = Bytes.toBytes(rowkey);
		Put put = new Put(rowKey );
		put.add(Bytes.toBytes(family), Bytes.toBytes(column),Bytes.toBytes(""));
		
		htable.put(put);
		htable.flushCommits();
		htable.close();		//invoke flushCommits() also	
	}
	
	public static void query(String tblName,int rowKey,String family,String... columns) throws Exception{
		HBaseConfiguration hconf = new HBaseConfiguration();
		HTable htbl = new HTable(hconf,tblName);
		Scan s = new Scan();
		ResultScanner scan = htbl.getScanner(s);	//add a filer param if necessary
		Result rst = null;
		while(( rst = scan.next() ) != null){		//scan by row
			int row = Bytes.toInt(rst.getRow());
			System.out.println("row:" + row );
			for(String col : columns){				//NOTE :可以使用rst.list()显示所有列
				if(col.contains("time") || col.contains("date")){
					System.out.printf("  %s:%2$tF %2$tH:%2$tM:%2$tS  ",
							col,Bytes.toLong(rst.getValue(Bytes.toBytes(family),Bytes.toBytes(col))));
				}else{
					String content = Bytes.toString(rst.getValue(Bytes.toBytes(family), Bytes.toBytes(col)));
					System.out.printf("  %s:%s  " ,col,content);
				}
				
				byte[] key = Bytes.toBytes(rowKey);
				long ts = 1295977940837l;//1294813460620l;//1295977421536l;//1295976774855l;//1295969908063l;//1294813460625l;
				//note:the second column param is family instead of column. 
//				String qualifier = family + KeyValue.COLUMN_FAMILY_DELIMITER + col;
				final Get g = new Get(key);
			    g.addColumn(Bytes.toBytes(family), Bytes.toBytes(col));
			    g.setTimeStamp( ts);	//query by time range.this means time range:[ts,ts+1)
				boolean b = htbl.exists(g);
				System.out.println("  has versions:" + ts + "," + b);
			}
		}
		
		scan.close();
		htbl.close();
		
	}
	
	/**
	 * test retrieve by versions
	 * @param tblName
	 * @param rowKey
	 * @param family
	 * @param maxVersions 由于建表时指定只保留二个版本,所以如果大于2时输出不会有三个版本。
	 * @param columns
	 * @throws Exception
	 */
	public static void queryByMaxVersions(String tblName,int rowKey,String family,int maxVersions,String...columns) throws Exception{
		HBaseConfiguration hconf = new HBaseConfiguration();
		HTable htbl = new HTable(hconf,tblName);
		
		final Get g = new Get(Bytes.toBytes(rowKey));
	    if(columns == null || columns.length == 0)
	    	g.addColumn(Bytes.toBytes(family));
	    else{
	    	for(String col : columns){
	    		g.addColumn(Bytes.toBytes(family), Bytes.toBytes(col));
	    	}
	    }
	    
		g.setMaxVersions(maxVersions);		
		Result rst = htbl.get(g);
//		System.out.println(rst.getMap());
		for(Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry : rst.getMap().entrySet()){
			System.out.println("family: " + Bytes.toString(entry.getKey()));
			for(Entry<byte[],NavigableMap<Long, byte[]>> entry2 : entry.getValue().entrySet()){
				String col = Bytes.toString(entry2.getKey());
				System.out.println("  qualifier: " + col);
				for(Entry<Long, byte[]> entry3 : entry2.getValue().entrySet()){
					if(col.contains("time") || col.contains("date")){
						System.out.println("    version: " + entry3.getKey() + 
								",value:" + Bytes.toLong(entry3.getValue()));
					}else{
						System.out.println("    version: " + entry3.getKey() + 
								",value:" + Bytes.toString(entry3.getValue()));
					}
				}
			}
		}
//		当输出所有columns,并且maxVersions >=2时,output is:
//		family: info
//		  qualifier: add_time
//		    version: 1295977940837,value:1295977940837		已经是倒序输出(比早版本大)
//		    version: 1295977489609,value:1295977488769		此版本小
//		  qualifier: content
//		    version: 1295977940837,value:linhon				同上
//		    version: 1295976774855,value:bye,linhon
		
		htbl.close();
		
	}
		
	//见add()
	public static void modify(){
		
	}
	
	/**
	 * 删除可以根据以下条件进行:
	 * 1.family or family+column 
	 * 2.timestamp range
	 * 3.regexp
	 */
	public static void deleteColumnData(String tblName,int rowKey,String family,String column,long timestamp) throws Exception{
		HBaseConfiguration hconf = new HBaseConfiguration();
		HTable htbl = new HTable(hconf,tblName);
		Delete dlt = new Delete(Bytes.toBytes(rowKey));
		dlt.deleteColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp);
		
		htbl.delete(dlt);
		htbl.flushCommits();
		htbl.close();
	}
	/**
	 * delete the column(and data) but family
	 * @param tblName
	 * @param rowKey
	 * @param family
	 * @param column
	 * @param timestamp
	 * @throws Exception
	 */
	public static void deleteColumnFamily(String tblName,String family,String column) throws Exception{
		HBaseConfiguration hconf = new HBaseConfiguration();
		HBaseAdmin admin = new HBaseAdmin(hconf);
		//disable table is a must
		if(admin.isTableEnabled(tblName))
			admin.disableTable(tblName);
		admin.deleteColumn(tblName, family /*+ ":" + column*/);	//columnName参数是任意family,':',qualifier组合的,有没有qualifier均可
		
//		admin.enableTable(tblName);	//this is a artifice(技巧)
		
		admin.flush(tblName);
		
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) throws Exception{
//		add("test_user",1,"linhon",new Date());
//		add("test_user",1,"hello,linhon",new Date());
//		add("test_user",1,"bye,linhon",new Date());
//		add("test_user",1,null,new Date());
		
//		add("test_user",1,null,new Date());
		
//		System.out.println(System.currentTimeMillis());
		
//		query("test_user",1,"info",new String[]{"content","add_time"});
		
//		queryByMaxVersions("test_user",1,"info",3,new String[]{"content","add_time"});
//		queryByMaxVersions("test_user",1,"info",3,new String[]{"content"/*,"add_time"*/});
		
//		addColumnOnly("test_user", 1, "info", "age");
//		deleteColumnData("test_user",1,"info","age",1296030610746l);
		
//		deleteColumnFamily("test_user","info","age");
		
		addColumnOnly("test_user2", 1, "num", "age");
//		deleteColumnFamily("test_user2","num","age");	//test table

	}

}




我觉得既然它有横向切分(书上是这样说的,但没有在真正分布式跑过,只在伪分布,所以不是否正确??),非结构化
儲存,支持版本化,那么就不应该只是进行简单的CRUD的普通表似的操作,所以我挖倔一些新功能点出来。
注意问题:
1.旧版本的:exists(final byte [] row, final byte [] column,long timestamp),其中的timestamp代表是从0开始到timestamp
的time range;新版本的exists(Get)可以指定一个具体的timestamp范围而不是使用从0开始的范围。
hbase(main):014:0> scan 'test_user'
ROW                          COLUMN+CELL                                                                      
 \x00\x00\x00\x01            column=info:add_time, timestamp=1294813460625, value=\x00\x00\x01-x\xE5uw        
 \x00\x00\x00\x01            column=info:content, timestamp=1295976774855, value=bye,linhon
 
2.pub或get中的addColumn(column)如果只有一个参数,代表这是old format column,that means the form is:<family:column>
3.Htable是对表数据的修改查询操作;HBaseAdmin是对表结构操作;
4.在shell下进行的scan操作,各cell只输出最后一个version的value
5.添加数据时,row key是必须指定的。
6.在已有数据情况下添加新column,HTable中需要指定一个rowkey,代表只添加到些行上,其它行是没有这列数据的。
7.deleteColumn(tbl,col)使用family+":"+column作为col时删除全部列(family)
8.hbase无法做到动态增加/删除列族(要先disable);删除只能删除列族,不能单独删除column成员


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics