`
free9277
  • 浏览: 104583 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Hbase实战

阅读更多

1、 背景

        网站日趋成熟,用户量趋向稳定。客户想从网站产生的用户行为信息中获取一些有用的信息,以便支持决策。便有了今天《运营分析平台》这个项目,项目的主要内容包括:1、日志入库(数据库使用DB2),2、根据客户提供的规则进行一些简单的统计汇总,然后报表展示结果,这些规则一般很简单。3、对资源和 业务进行阀值实时监控预警。4、日志明细的查询

 

        如今网站一天产生1000多万条记录,面对暴涨的数据量。原先使用DB2的方案中遇到了一些瓶 颈,其中包括入库流程,统计过程与明细查询。经常收到运维同事的抱怨,今天存储不够了,明天统计速度慢了,数据查了老半天查不出来了,很是烦恼。正好之前 有过hadoop的相关工作经验,决定使用分布式的方案来缓解这些老调长谈的问题。现在把这个过程简单记录一下,以便日后学习。

2、 日志格式

         网站的日志格式如下,并且一条记录占据一行,以文本的形式存放在存储上面。以前的方法是写定时任务,按时将日志存入DB2中。现在也是采用这种方式,而将数据迁移到hbase中。

SERIAL,APPID,STARTTIME,USETIME,CHANNEL,MOBILENO,BRANCH,BRAND,LOGINTYPE,IP,OPERARESULT

         日志内容示例:

"sSOXmxAxWP3","ucs.server","2013-10-16-08.31.12.000000",192,0,"13724086484","GZ",3,1,"127.0.0.1",1

3、 hbase表设计

       hbase使用三维有序存储,三维是指:rowkey(行主键),column key(columnFamily+qualifier),timestamp(时间戳)。

 

       我们知道rowkey是行的主键,而且hbase只能指定rowkey,或者一个rowkey范围(即scan)来查找数据。所以rowkey的设计是至关重要的,关系到你应用层的查询效率。我们知道,rowkey是以字典顺序排序的。比如,有两个rowkey:aa、bb,因为按字典排序,那么rowkey1是排在 rowkey2前面的。这个理解了,我们在根据rowkey范围查询的时候,我们一般是知道startRowkey和endRowkey的,这样查询的范围就确定下来了,就可以快速查询到这个范围内的所有数据,而且不需要遍历,这也是hbase的优势之一。比如说rowkey设计为:用户ID-日期,那么查某个用户某天的数 据,startKEY为1234-20140729,endKey为:1234+20140730,那么你查到的就是用户为1234在20140729这一天的数据。
 

       按照这种规则以及日志格式,hbase表与rowkey设计如下:

 

       表名:T_LOGIN_DETAIL
       rowKey:CHANNEL-BRANCH-BRAND-OPERARESULT-STARTTIME

       如:00000000000000000000000000
       簇族名称:cotenxt

       qualifier:SERIAL,APPID,STARTTIME,USETIME,

       CHANNEL,MOBILENO,BRANCH,BRAND,LOGINTYPE,IP,OPERARESULT

4、 创建hbase表

          事先分割好regions,即确定每个regions的startKey和endKey。能够进行将region分好,而无需启用hbase的自split策略,这种方式能够提高集群稳定性,缺点是操作上较困难。代码如下:

private static org.apache.hadoop.conf.Configuration conf = HbaseHelper.getHbaseConfiguration();

	/**
	 * 创建hbase表
	 * @param tableName 表名
	 * @param familyNames 列族名
	 * @param regions
	 */
	public static boolean createTable(String tableName, String[] familyNames, byte[][] regions) {
		boolean result = false;
		try {
			HBaseAdmin admin = new HBaseAdmin(conf);
			
			// 初始化table,并设置名称
			HTableDescriptor table = new HTableDescriptor(tableName);
			
			// 添加列族
			for(String familyName : familyNames) {
				HColumnDescriptor col = new HColumnDescriptor(familyName);
				table.addFamily(col);
			}
			
			// 执行创建表命令
			admin.createTable(table, regions);
			result = true;
		} catch (TableExistsException e) {
			System.out.println("the table already exists...");
		} catch (MasterNotRunningException e) {
			e.printStackTrace();
		} catch (ZooKeeperConnectionException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
		return result;
	}
	
	public static void main(String[] args) {
		// 根据row分割好regions
		byte[][] regions = new byte[][] {   
				  Bytes.toBytes("00000000000000000000000000"),  
				  Bytes.toBytes("00019999999999999999999999"),
				  Bytes.toBytes("00029999999999999999999999"),
				  Bytes.toBytes("00039999999999999999999999"),
				  Bytes.toBytes("00049999999999999999999999"),
				  Bytes.toBytes("00059999999999999999999999"),
				  Bytes.toBytes("00069999999999999999999999"),
				  Bytes.toBytes("00079999999999999999999999"),
				  Bytes.toBytes("00089999999999999999999999"),
				  Bytes.toBytes("00099999999999999999999999"),
				  Bytes.toBytes("00109999999999999999999999"),
				  Bytes.toBytes("00119999999999999999999999"),
				  Bytes.toBytes("00129999999999999999999999"),
				  Bytes.toBytes("00139999999999999999999999"),
				  Bytes.toBytes("00149999999999999999999999"),
				  Bytes.toBytes("00159999999999999999999999"),
				  Bytes.toBytes("00169999999999999999999999"),
				  Bytes.toBytes("00179999999999999999999999"),
				  Bytes.toBytes("00189999999999999999999999"),
				  Bytes.toBytes("00199999999999999999999999"),
				  Bytes.toBytes("00209999999999999999999999"),
				  Bytes.toBytes("00219999999999999999999999"),
				  Bytes.toBytes("00229999999999999999999999"),
				  Bytes.toBytes("00239999999999999999999999"),
				  Bytes.toBytes("00249999999999999999999999"),
				  Bytes.toBytes("00259999999999999999999999"),
				  Bytes.toBytes("99999999999999999999999999")  
				};
		createTable("T_LOGIN_DETAIL", new String[]{"cotenxt"}, regions);
	}

       

        程序执行结束之后,打开控制台,可看到如下效果:


5、 入库

入库程序其中一个重要的步骤就是拼接rowKey(CHANNEL-BRANCH-BRAND-OPERARESULT-STARTTIME),还有从以下几方面优化写操作。

5.1 Auto Flush

      通过调用HTable.setAutoFlush(false)方法可以将HTable写客户端的自动flush关闭,这样可以批量写入数据到 HBase,而不是有一条put就执行一次更新,只有当put填满客户端写缓存时,才实际向HBase服务端发起写请求。默认情况下auto flush是开启的。

5.2 Write Buffer

      通过调用HTable.setWriteBufferSize(writeBufferSize)方法可以设置 HTable客户端的写buffer大小,如果新设置的buffer小于当前写buffer中的数据时,buffer将会被flush到服务端。其 中,writeBufferSize的单位是byte字节数,可以根据实际写入数据量的多少来设置该值。

5.3 WAL Flag

       在HBae中,客户端向集群中的RegionServer提交数据时(Put/Delete操作),首先会先写WAL(Write Ahead Log)日志(即HLog,一个RegionServer上的所有Region共享一个HLog),只有当WAL日志写成功后,再接着写 MemStore,然后客户端被通知提交数据成功;如果写WAL日志失败,客户端则被通知提交失败。这样做的好处是可以做到RegionServer宕机 后的数据恢复。
        因此,对于相对不太重要的数据,可以在Put/Delete操作时,通过调用Put.setWriteToWAL(false)或Delete.setWriteToWAL(false)函数,放弃写WAL日志,从而提高数据写入的性能。
值得注意的是:谨慎择关闭WAL日志,因为这样的话,一旦RegionServer宕机,Put/Delete的数据将会无法根据WAL日志进行恢复。
        
核心代码:
@Override
	public boolean doInLogSaveList(List<Object> dataList) throws SQLException {
		int listLen = dataList != null ? dataList.size() : 0;
		List<Put> puts = new ArrayList<Put>();
		
		for (int i = 0; i < listLen; i++) {
			LoginDetail loginDetail = (LoginDetail) dataList.get(i);
			String channel = String.valueOf(loginDetail.getChannel());
			String branch = loginDetail.getBranch();
			String brand = String.valueOf(loginDetail.getBrand());
			String operaresult = String.valueOf(loginDetail.getOperaResult());
			
			// 拼凑rowkey,形式为:CHANNEL-BRANCH-BRAND-OPERARESULT-STARTTIME
			StringBuffer sb = new StringBuffer();
			sb.append(LoginDetailHelper.trunChannel(channel)); // CHANNEL
			sb.append(LoginDetailHelper.trunBranch(branch)); // BRANCH
			sb.append(brand); // BRAND
			sb.append(LoginDetailHelper.trunValue(operaresult)); // OPERARESULT
			sb.append(loginDetail.getTime().replace(".", "").replace("-", "").replace("\"", "")); // STARTTIME
			String key = sb.toString();
			
			// 拼凑列族中的列名与值
			String[] items = new String[11];
			String columnsString = "SERIAL,APPID,STARTTIME,USETIME,CHANNEL,MOBILENO,BRANCH,BRAND,LOGINTYPE,IP,OPERARESULT";
			String[] columns = columnsString.split(",");
			items[0] = LoginDetailHelper.trunValue(loginDetail.getSerial()); // SERIAL
			items[1] = LoginDetailHelper.trunValue(loginDetail.getAppid()); // APPID
			items[2] = LoginDetailHelper.trunValue(loginDetail.getTime().toString()); // TIME
			items[3] = String.valueOf(loginDetail.getUseTime()); // USETIME
			items[4] = LoginDetailHelper.trunChannelChinese(channel); // CHANNEL
			items[5] = LoginDetailHelper.trunValue(loginDetail.getMobileNo()); // MOBILENO
			items[6] = LoginDetailHelper.trunBranchChinese(branch); // BRANCH
			items[7] = LoginDetailHelper.getBrandCode(brand); // BRAND
			items[8] = LoginDetailHelper.trunValue(""); // LOGINTYPE
			items[9] = LoginDetailHelper.trunValue(loginDetail.getIp()); // IP
			items[10] = LoginDetailHelper.getOperaResult(operaresult); // OPERARESULT

			for (int j = 0; j < items.length; j++) {
				Put put = new Put(key.getBytes());
				String value = items[j] == null ? "" : items[j];
				put.add("cotenxt".getBytes(), columns[j].getBytes(), value.getBytes());
				// 放弃写WAL日志
				put.setWriteToWAL(false);
				puts.add(put);
			}
			
			// 2000条数据提交一次
			if (puts.size() >= 2000) {
				try {
					table.put(puts);
					table.flushCommits();
					puts.clear();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}

		// 检查列表中是否还存在数据,如果有,则再提交一次
		if (puts.size() > 0) {
			try {
				table.put(puts);
				table.flushCommits();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		return true;
	}

6、 查询数据

hbase的存储结构是key-value形式。相当于一个大map,于是无法做到在不遍历的情况下,多条件查询。

在查询开始之前,预先设置row的startKey和endKey。

public List<Map<String, String>> getHbaseData(String sdate, String sdate2, String channel, String branch, String brand, String mobileno) {
		List<Map<String, String>> result = new ArrayList<Map<String, String>>();
		
		try {
			HTable table = new HTable(conf, "T_LOGIN_DETAIL");
			
			// 拼接key值
			String prefix = channel + branch + brand + "0";
			String startKey = prefix + sdate.replace("-", "") + "000000000000";
			String stopKey = sdate2 == null || sdate2.trim().length() == 0 ? prefix + sdate.replace("-", "") + "999999999999": prefix + sdate2.replace("-", "") + "999999999999";
			
			Scan scan = new Scan();
			
			if(mobileno != null && mobileno.trim().length() != 0) {
				BinaryComparator comparable = new BinaryComparator(Bytes.toBytes(mobileno));
				SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("cotenxt"), Bytes.toBytes("MOBILENO"),CompareOp.EQUAL, comparable);
				scan.setFilter(filter1);
			} else {
				// 使用分页过滤器
				PageFilter filter = new PageFilter(100);
				scan.setFilter(filter);
			}
			
			// 设置起始key值
			scan.setStartRow(Bytes.toBytes(startKey));
			scan.setStopRow(Bytes.toBytes(stopKey));
			
			System.out.println("startKey:" + startKey);
			System.out.println(" stopKey:" + stopKey);
			
			ResultScanner resultScanner = table.getScanner(scan);
			for (Result r : resultScanner) {
				Map<String, String> map = new HashMap<String, String>();
				map.put("DATE", Bytes.toString(r.getRow()));
			     for (KeyValue kv : r.raw()) {
					map.put(Bytes.toString(kv.getQualifier()), Bytes.toString(kv.getValue()));
				}
			     result.add(map);
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		return result;
	}

 

  • 大小: 1.7 MB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics