`
无尘道长
  • 浏览: 157805 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

hbase读数据之client

阅读更多

   前言:研究的HBase版本是0.94.12,贴出的源代码可能是经过我删减或者加工过的(简化篇幅,更易读)

  

  以Scan查询为例介绍数据查询时HBase client端的实现

  

public static void main(String[] args) {

scan("product", "f", "", 2); //product表查询2条记录

}

public static void scan(String tableName, String fml, String startRow, int limit) {

                   HConnection conn = null;

                   HTableInterface table = null;

                   ResultScanner rs = null;

                   try {

                            Configuration conf = HBaseConfiguration.create(); //步骤1、加载配置文件

                            conn = HConnectionManager.createConnection(conf);//步骤2、创建连接对象

                            table = conn.getTable(tableName);//步骤3、获取表对象

                            PageFilter filter = new PageFilter(limit);//步骤4、创建过滤器(如果需要)

                            Scan scan = new Scan(startRow.getBytes(), filter);//步骤5、创建scan

                            rs = table.getScanner(scan);//步骤6、获取scanner

                            for (Result r : rs) { //步骤7、迭代结果集

                                     NavigableMap<byte[], byte[]> map = r.getFamilyMap(fml.getBytes());

                                     StringBuffer sb = new StringBuffer("id:"+new String(r.getRow())+" ");

                                     for(byte[] key : map.keySet()) {

                                               sb.append(new String(key)+":"+new String(map.get(key))+" ");

                                     }

                                     System.out.println(sb);

                            }

                   } catch (Exception e) {

                            e.printStackTrace();

                   } finally {

                            try {

                                     if ( rs != null ) rs.close(); //步骤8、关闭scanner

                                     if ( table != null ) table.close(); //步骤9、关闭table

                                     if ( conn != null ) conn.close(); //步骤10、关闭连接

                            } catch (Exception e) {

                                     e.printStackTrace();

                            }

                   }

         }

 

  以下将详细解析以上9步的执行过程:

  步骤1、加载hbase配置文件

  代码:Configuration conf = HBaseConfiguration.create();

  说明:该过程的核心方法是HBaseConfiguration类的addHbaseResources()方法

public static Configuration addHbaseResources(Configuration conf) {

    // hbase-default.xmlhbase的默认配置文件,如果有自定义的配置项则需要配置到hbase-site.xml文件,hbase-site.xml的配置会覆盖hbase-default.xml的配置

    conf.addResource("hbase-default.xml");

    conf.addResource("hbase-site.xml");

 

    checkDefaultsVersion(conf);

    checkForClusterFreeMemoryLimit(conf);

    return conf;

  }

//检查集群的memstoreblockcache的内存总大小,限制在hbase总内存的80%以内

// hbase.regionserver.global.memstore.upperLimit:控制memstore的内存比例,默认值0.4,影响写性能

// hfile.block.cache.size:控制blockcache的内存比例,默认值0.25,影响读性能

//在性能测试中发现HBase的以上两项默认配置基本上兼顾了读写,如果对某一方面没有太多的要求可以采用默认值

//该段源码里涉及的常量等被我替换成具体的数字并做了代码微调,以便更易读

private static void checkForClusterFreeMemoryLimit(Configuration conf) {

      float globalMemstoreLimit = conf.getFloat("hbase.regionserver.global.memstore.upperLimit", 0.4f);

      int gml = (int)(globalMemstoreLimit * 100);

      float blockCacheUpperLimit = conf.getFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.25f);

      int bcul = (int)(blockCacheUpperLimit * 100);

      if (100 - (gml + bcul) < 20) {

          throw new RuntimeException(…+"The combined value cannot exceed 0.8 " +…);

}

  }

 

  步骤2、创建连接对象

  代码:HConnection conn = HConnectionManager.createConnection(conf);

  说明:该步骤只是创建HConnectionImplementation的实例,并不会产生与Master、HRegionServer、zookeeper的连接,在调用getTable、getScanner等需要与hbase交互的方法时才会真正建立连接,这些连接以及region信息均会被共享,通常一个HBase集群的一个client端实例只需创建一个HConnection对象。

public static HConnection createConnection(Configuration conf)

  throws ZooKeeperConnectionException {

    return new HConnectionImplementation(conf, false, null);

  }

//只是对一些类属性赋值,不连接HRegionServer

public HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool)

    throws ZooKeeperConnectionException {

      this.conf = conf;

      this.batchPool = pool;

      …

}

可以通过HConnectionManager.getConnection(conf)方法获取一个受管理的连接,如果缓存中没有连接,则会自动创建一个连接,源码如下:

public static HConnection getConnection(Configuration conf) throws ZooKeeperConnectionException {

    HConnectionKey connectionKey = new HConnectionKey(conf);

synchronized (HBASE_INSTANCES) {

// connectionKeyhash值由hbase.zookeeper.quorumzookeeper.znode.parent

hbase.zookeeper.property.clientPort等配置属性决定,因此虽然HConnectionKey每次均是new一个新的对象,但是实际上如果Configuration对象中涉及hash计算的配置项的值相同则会定位到同一个connection对象,由此可见如果每次调用该方法时均是传入具有相同配置项的Configuration对象则还不如直接使用createConnection方法创建一个连接,然后一直使用。

      HConnectionImplementation connection = HBASE_INSTANCES.get(connectionKey);

      if (connection == null) {

        connection = new HConnectionImplementation(conf, true, null);

        HBASE_INSTANCES.put(connectionKey, connection);

      } else if (connection.isClosed()) {

        HConnectionManager.deleteConnection(connectionKey, true);

        connection = new HConnectionImplementation(conf, true, null);

        HBASE_INSTANCES.put(connectionKey, connection);

      }

      connection.incCount();

      return connection;

    }

  }

 

  步骤3、获取表对象

  代码:table = conn.getTable(tableName);

  说明:每次getTable均会创建一个HTable实例,HTable实例会共享HConnection连接对象,创建HTable并不消耗资源(第一次会比较耗时,如果没有没有建立zookeeper链接则需要建立,并会缓存所有的表涉及的region信息,这些信息是所有htable实例共享的)。

//该方法实现了创建HTable对象的核心逻辑

private void finishSetup() throws IOException {

    //定位表涉及的region,会一次性缓存表相关的所有region信息,在下次创建HTable时就直接从缓存中获取

this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);

//默认的client端写缓存是2M大小,通常超过2M发起一次批量提交,可适当调大该值,改进写性能,通过性能测试看,改为6M最佳(没有严格意义上的最佳,姑且根据性能测试的场景得出该最佳值),后续会有HBase性能测试报告放出

this.writeBufferSize = this.configuration.getLong("hbase.client.write.buffer", 2097152);

//scannercache大小,控制scanner一次从服务端获取的数据条数,可适当调大,减少与服务端交互的次数,提高批量读性能

this.scannerCaching = this.configuration.getInt("hbase.client.scanner.caching", 1);

//限制keyvalue的大小,默认不限制

    this.maxKeyValueSize = this.configuration.getInt("hbase.client.keyvalue.maxsize", -1);

   

}

 

//定位region

private HRegionLocation locateRegion(final byte [] tableName, final byte [] row, boolean useCache, boolean retry){

      //建立zookeeper连接(如果没有连接),从zookeeper中获取master-ROOT- 表的region信息,并且实例化HBaseClient对象

ensureZookeeperTrackers();

      if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {

//zookeeper中获取-ROOT-表信息

        ServerName servername = this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout);

        return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO,

            servername.getHostname(), servername.getPort());

      } else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {

//根据-ROOT-表定位.META.表,定位后缓存到本地

        return locateRegionInMeta(HConstants.ROOT_TABLE_NAME, tableName, row,

            useCache, metaRegionLock, retry);

      } else {

        //根据.META.表定位业务表,在定位过程中会缓存该表涉及的所有region信息

        return locateRegionInMeta(HConstants.META_TABLE_NAME, tableName, row,

            useCache, userRegionLock, retry);

      }

}

 

 

  步骤4、创建过滤器(如果需要)

  代码:PageFilter filter = new PageFilter(limit);

  说明:Hbase默认提供了很多过滤器,通过这些过滤器可以实现分页、条件查询等功能,在后续的博客中会对该章节的内容进行详细的讲解。

 

    步骤5、创建scan对象

    代码:Scan scan = new Scan(startRow.getBytes(), filter);

//查询所有数据

public Scan() {}

//查询startRow及之后的数据

public Scan(byte [] startRow)

//查询startRow及之后的数据,并采用filter进行过滤

public Scan(byte [] startRow, Filter filter)

//查询startRowstopRow之间的数据

public Scan(byte [] startRow, byte [] stopRow)

 

  步骤6、获取scanner和步骤7、迭代结果集

  代码:rs = table.getScanner(scan)for (Result r : rs){}

  说明:该过程会创建一个ClientScanner类(ResultScanner的实现类)的实例,体现该类核心属性和方法的类图如下:



 

caching:执行next()方法时缓存服务端数据的大小,先填满cache(如果数据足够多),再从cache中一条一条的取出,如果一次scan的数据只涉及一个region,则该值决定了一次从服务端批量读取数据的条数。可由hbase.client.scanner.caching配置项配置,默认值为1,可以通过ScansetCaching(int)方法制定每次查询的批量数,可适当调大该值,减少与服务端交互的次数,具体多大合适,或许没有一个明确的值,但是在设置该值时有两个关键点需要明确:

  1. 调大后会消耗更多的clientserver端的内存;
  2. 调大后可能会导致从server端读取时一次操作占用过大的网络带宽,出现波峰现象,影响到网络中的其它应用。

       cache:缓存从服务端批量加载的数据,在调用next方法迭代结果集时从cache中获取。

以下重点讲解ClientScanner的核心方法nextScanner()next(),这些方法中涉及的RPC调用内容,在后续的博客中会系统介绍。

//获取下一个regionscanner,从server端读取数据前均需先执行该方法,在该方法中会通过ScannerCallable进行RPC调用打开server端的scanner,打开成功后会获取到一个scannerIdclient端在scan过程中每次均需把该id传给server端,client端通过该id判断是否需要open scanner以及close scannerserver端通过该id找到对应的RegionScanner查询数据以及close一次scan,一个region只打开一个RegionScanner

private boolean nextScanner(int nbRows, final boolean done) throws IOException {

      if (this.callable != null) { // nextScanner被非首次调用时(即从不同的region获取数据时)

        this.callable.setClose();

        callable.withRetries();//RPC调用,关闭server端的RegionScanner

        this.callable = null;

      }

      byte [] localStartKey;

      if (this.currentRegion != null) { //nextScanner被非首次调用时

        byte [] endKey = this.currentRegion.getEndKey();

//上次scanregion是最后一个region或者已经到了end rowKey等时终止查询

        if (endKey == null ||     

            Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||

            checkScanStopRow(endKey) ||

            done) {

          close();

          return false;

        }

//把上次查询regionendKey作为下一个regionstartKey,因为scan的数据是一个连续的区间,因此可如此实现

        localStartKey = endKey;

      } else {

        localStartKey = this.scan.getStartRow(); //从第一个region查询数据时

      }

 

      try {

        callable = getScannerCallable(localStartKey, nbRows);

        callable.withRetries(); //RPC调用,执行HRegionServeropenScanner方法

        this.currentRegion = callable.getHRegionInfo();

      } catch (IOException e) {

      }

      return true;

}

 

//返回下一条记录

public Result next() throws IOException {

      if (cache.size() == 0) {

        Result [] values = null;

        int countdown = this.caching;

        do {

          values = callable.withRetries();//RPC调用,执行HRegionServernext()方法

          if (values != null && values.length > 0) {

            for (Result rs : values) {

              cache.add(rs); //cache可存储多region的数据

              countdown--; //如果该region的数据没有达到caching值则会大于0,在while条件中使用

            }

          }

//如果没有查询到数据则valuesnull,这时nextScanner(int nbRows, final boolean done)将返回false,停止查询

        } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));

      }

 

      if (cache.size() > 0) {

        return cache.poll();

      }

    }

 

  步骤8、关闭RegionScanner

         该步骤会释放RegionServer端的scanner资源。

 

  步骤9、关闭table

  代码:if ( table != null ) table.close();

//HTableclose实现

public void close() throws IOException {

    if (this.closed) {

      return;

    }

    flushCommits(); //如果writeBuffer没有数据则不会真正提交

    if (cleanupPoolOnClose) {//通过不同的HTable构造函数创建的htable实例该值可能不同,参考后面的源码

      this.pool.shutdown();

    }

    if (cleanupConnectionOnClose) {//cleanupPoolOnClose

      if (this.connection != null) {

        this.connection.close();

      }

    }

    this.closed = true;

  }

//通过本博客获取HTable的方式采用的是该构造函数:conn.getTable(tableName)

public HTable(final byte[] tableName, final HConnection connection, final ExecutorService pool) {

    this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;

  }

 

public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool){

    this.cleanupPoolOnClose = false;

    this.cleanupConnectionOnClose = true;

  }

 

public HTable(Configuration conf, final byte [] tableName) {

    this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;

  }

 

  步骤9、关闭连接

  代码:if ( conn != null ) conn.close();

  说明:如果一个HBase client程序只访问一个hbase集群,则通常只需要一个HConnection实例,因此无需close

 

  • 大小: 3.4 KB
分享到:
评论

相关推荐

    HBase数据读取流程解析-1——Client-Server交互逻辑.pdf

    HBase数据读取流程解析-1——Client-Server交互逻辑.pdf 学习资料 复习资料 教学资源

    Hbase中文文档

    11.8. 从 HBase读取 11.9. 从 HBase删除 11.10. HDFS 11.11. Amazon EC2 11.12. 案例 12. 故障排除和调试 HBase 12.1. 通用指引 12.2. Logs 12.3. 资源 12.4. 工具 12.5. 客户端 12.6. MapReduce 12.7. NameNode ...

    hbase-1.2.2-Java测试最小依赖包(经过严格测试)

    hbase-1.2.2-Java测试最小依赖包,很多博客都写过,但是很多博客都没有经过验证,本依赖包是经过代码严格检测的,能够通过最简单的hbase数据写入,数据读取等操作的,请放心下载

    spark_hbase:Scala中的示例通过Spark读取保存在hbase中的数据,以及python的转换器示例

    并且仅在返回org.apache.hadoop.hbase.client.Result并执行.count()调用时停止。 在这里,我们提供了Scala中的一个新示例,该示例涉及通过Spark将hbase中保存的数据传输到String ,以及python转换器的新示例。 ...

    利用java api读取hbase数据遇到的一些坑及解决方法

    java.lang.RuntimeException: org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions: Fri Feb 14 10:30:36 CST 2020, null, java.net.SocketTimeoutException: ...

    hbase-solr-rest-client

    hbase-solr-rest-client 客户端,用于从solr读取数据,然后使用REST从HBase查询详细信息

    基于springboot+netty+mybatis+hbase+kafka实现的socket server+源代码+文档说明

    - [ ] socket client接受到请求数据的命令后,从mysql中读取假数据,伪造成真实设备传输的数据格式,并发送给socket server - [ ] socket server接收到返回的数据后,分别写入到hbase数据库和kafka队列中 - [ ] 最后...

    flume包,用于数据的采集

    同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。Client:Client生产数据,运行在一个独立的线程。  Event: 一个数据单元,消息头和消息体组成。(Events可以是日志...

    大数据面试题,唬住50k.pdf

    client通过hdfs的api并发读取数据。 3. 关闭连接。 1. 举⼀个简单的例⼦说明mapreduce是怎么来运⾏的 ? wordcount的例⼦ 1. 1. ⽤mapreduce来实现下⾯需求? 现在有10个⽂件夹,每个⽂件夹都有1000000个url.现在让你...

    2017最新大数据架构师精英课程

    133_hbase的bulkload命令实现hbase集群之间数据的传输2 D6 d; F6 S8 x+ I/ I0 B0 @ 134_hive同hbase集成,统计hbase数据表信息% Q/ R! Z1 J3 J) k+ H! {6 D# M 135_使用TableInputFormat进行MR编程! m& C6 B/ v6 N" ...

    XSQL:基于SparkSQL的统一SQL分析引擎

    1)首先,XSQL提供了一种使用标准SQL从NoSQL数据库读取数据的解决方案,因此大数据工程师可以专注于数据,而API具有特殊数据源。 2)XSQL致力于优化SQL执行的执行计划以及监视每个SQL的运行状态,从而使用户的工作...

    分布式协调工具-ZooKeeper实现动态负载均衡

    6、实时性,在一定时间范围内,client能读到最新数据 Zookeeper数据结构 1、层次化的目录结构,命名符合常规文件系统规范(类似文件系统)    2、每个节点在zookeeper中叫做znode,并且其有一个唯一的...

Global site tag (gtag.js) - Google Analytics