`
半点玻璃心
  • 浏览: 26673 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

HBASE 代码阅读笔记-1 - PUT-2-定位RS和R-1(0.96-HADOOP2)

阅读更多
按照94的阅读进度,这里该看如何定位RS和Region了
先回顾下94,原来的做法是遍历操作,然后根据每个操作来定位region,按后加入region的任务队列,没有则创建。定位region的操作由HConnectionManager.HConnectionImplementation.locateRegion方法完成,这里由AsyncProcessor.findDestLocation完成

这里依然是循环,不过循环体有些变化

private HRegionLocation findDestLocation(Row row, int numAttempt, int posInList) {
    if (row == null) throw new IllegalArgumentException("row cannot be null");
    HRegionLocation loc = null;
    IOException locationException = null;
    try {
      loc = hConnection.locateRegion(this.tableName, row.getRow());
      if (loc == null) {
        locationException = new IOException("No location found, aborting submit for" +
            " tableName=" + tableName +
            " rowkey=" + Arrays.toString(row.getRow()));
      }
    } catch (IOException e) {
      locationException = e;
    }
    if (locationException != null) {
      // There are multiple retries in locateRegion already. No need to add new.
      // We can't continue with this row, hence it's the last retry.
      manageError(numAttempt, posInList, row, false, locationException, null);
      return null;
    }

    return loc;
  }


可以看出其实还是调用HConnectionImplementation.locateRegion,但是HConnectionImplementation.locateRegion方法本身的内容又不一样了
private HRegionLocation locateRegion(final TableName tableName,
      final byte [] row, boolean useCache, boolean retry)
    throws IOException {
      if (this.closed) throw new IOException(toString() + " closed");
      if (tableName== null || tableName.getName().length == 0) {
        throw new IllegalArgumentException(
            "table name cannot be null or zero length");
      }

      if (tableName.equals(TableName.META_TABLE_NAME)) {
        return this.registry.getMetaRegionLocation();
      } else {
        // Region not in the cache - have to go to the meta RS
        return locateRegionInMeta(TableName.META_TABLE_NAME, tableName, row,
          useCache, userRegionLock, retry);
      }
    }

嗯,ROOT表的特殊处理流程已经不见了,这里有两个值得注意的地方,ROOT表已经改名为hbase:namespace,META则是hbase:meta,META表的处理流程也发生了变化,唯一不变的是user表
先看看变化的部分:调用了Registry的一个方法,具体由ZooKeeperRegistry实现,我很高兴的告诉亲,both of 他们都是新增的,94版本中没有。

public HRegionLocation getMetaRegionLocation() throws IOException {
    ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();

    try {
      if (LOG.isTraceEnabled()) {
        LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
      }
      ServerName servername = MetaRegionTracker.blockUntilAvailable(zkw, hci.rpcTimeout);
      if (LOG.isTraceEnabled()) {
        LOG.trace("Looked up meta region location, connection=" + this +
          "; serverName=" + ((servername == null) ? "null" : servername));
      }
      if (servername == null) return null;
      return new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO, servername, 0);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      return null;
    } finally {
      zkw.close();
    }
  }

  public static ServerName blockUntilAvailable(final ZooKeeperWatcher zkw,
      final long timeout)
  throws InterruptedException {
    byte [] data = ZKUtil.blockUntilAvailable(zkw, zkw.metaServerZNode, timeout);
    if (data == null) return null;
    try {
      return ServerName.parseFrom(data);
    } catch (DeserializationException e) {
      LOG.warn("Failed parse", e);
      return null;
    }
  }


我勒个去。。。这也太简单了。。。这是不需要ROOT表了么,直接从ZK取?说好的zk存root表的region信息,root表存meta表的region信息的,怎么能够这么改,节操呢。。。不过这样看的话,少了一层IO明显变得NB了

不过locateRegionInMeta明显也瘦身了。。。嗯。。先贴出来,明天接着看
private HRegionLocation locateRegionInMeta(final TableName parentTable,
      final TableName tableName, final byte [] row, boolean useCache,
      Object regionLockObject, boolean retry)
    throws IOException {
      HRegionLocation location;
      // If we are supposed to be using the cache, look in the cache to see if
      // we already have the region.
      // 上来还是首先从缓存里面读取,方法接口没有变。备注【1】
      if (useCache) {
        location = getCachedLocation(tableName, row);
        if (location != null) {
          return location;
        }
      }
      int localNumRetries = retry ? numTries : 1;
      // build the key of the meta region we should be looking for.
      // the extra 9's on the end are necessary to allow "exact" matches
      // without knowing the precise region names.
      // 还是定义region信息查询键
      byte [] metaKey = HRegionInfo.createRegionName(tableName, row,
        HConstants.NINES, false);
      for (int tries = 0; true; tries++) {
        if (tries >= localNumRetries) {
          throw new NoServerForRegionException("Unable to find region for "
            + Bytes.toStringBinary(row) + " after " + numTries + " tries.");
        }

        HRegionLocation metaLocation = null;
        try {
          // locate the meta region
          // 还是先查询父表。不过如上所示,父表查询接口有了很大的变化,已经没有root表的查询了,直接从ZK获取meta表信息
          metaLocation = locateRegion(parentTable, metaKey, true, false);
          // If null still, go around again.
          if (metaLocation == null) continue;
          // RPC的调用方有了些变化,原来是使用HRgionInterface实例进行查询,实际上由委托代理,调用HBaseclient方法实现。这里先记下来备注【2】
          ClientService.BlockingInterface service = getClient(metaLocation.getServerName());

          Result regionInfoRow;
          // This block guards against two threads trying to load the meta
          // region at the same time. The first will load the meta region and
          // the second will use the value that the first one found.
          synchronized (regionLockObject) {
            // Check the cache again for a hit in case some other thread made the
            // same query while we were waiting on the lock.
            // 和之前一样,类似于读写双锁检查,进入写模块之后,需要再次确认缓存,避免重复请求
            if (useCache) {
              location = getCachedLocation(tableName, row);
              if (location != null) {
                return location;
              }
              // If the parent table is META, we may want to pre-fetch some
              // region info into the global region cache for this table.
              if (parentTable.equals(TableName.META_TABLE_NAME)
                  && (getRegionCachePrefetch(tableName))) {
                prefetchRegionCache(tableName, row);//和之前一样,父表为meta表则进行预抓取。这里新增了一个TableName的类型。直接使用该类型的比较方法,去掉了之前的直接byte比较。备注【3】
              }
              location = getCachedLocation(tableName, row);//还是一样,第三次确认
              if (location != null) {
                return location;
              }
            } else {
              // If we are not supposed to be using the cache, delete any existing cached location
              // so it won't interfere.
              forceDeleteCachedLocation(tableName, row);//这里只是方法名变了,不过内部实现,内部的内部实现跟之前完全是一模一样的。
            }
            // Query the meta region for the location of the meta region
            // 查询Region meta,有变化。
            regionInfoRow = ProtobufUtil.getRowOrBefore(service,
              metaLocation.getRegionInfo().getRegionName(), metaKey,
              HConstants.CATALOG_FAMILY);//备注【4】,这属于读操作,后续再看
          }
          if (regionInfoRow == null) {
            throw new TableNotFoundException(tableName);
          }

          // convert the row result into the HRegionLocation we need!
          // 将查询结果转化为一个HRegionInfo,这里的调用有些改变,之前是先获取value,然后委托Writables.getWritable(  
                    value, new HRegionInfo()),这里语义更明显。备注【5】,这里委托了protobuf,只能先放放了
          HRegionInfo regionInfo = MetaScanner.getHRegionInfo(regionInfoRow);
          // 跟之前的判断一样,检查regionInfo是否为空,是否属于当前表,是否正在split或者是否已经下线。多了一个down机检查。
          if (regionInfo == null) {
            throw new IOException("HRegionInfo was null or empty in " +
              parentTable + ", row=" + regionInfoRow);
          }

          // possible we got a region of a different table...
          if (!regionInfo.getTable().equals(tableName)) {
            throw new TableNotFoundException(
                  "Table '" + tableName + "' was not found, got: " +
                  regionInfo.getTable() + ".");
          }
          if (regionInfo.isSplit()) {
            throw new RegionOfflineException("the only available region for" +
              " the required row is a split parent," +
              " the daughters should be online soon: " +
              regionInfo.getRegionNameAsString());
          }
          if (regionInfo.isOffline()) {
            throw new RegionOfflineException("the region is offline, could" +
              " be caused by a disable table call: " +
              regionInfo.getRegionNameAsString());
          }
      
          ServerName serverName = HRegionInfo.getServerName(regionInfoRow);//这里将原有的byte[]信息转化为对象,代码可读性更好了,94中这里很多byte[]解析。
          if (serverName == null) {
            throw new NoServerForRegionException("No server address listed " +
              "in " + parentTable + " for region " +
              regionInfo.getRegionNameAsString() + " containing row " +
              Bytes.toStringBinary(row));
          }

          if (isDeadServer(serverName)){
            throw new RegionServerStoppedException("hbase:meta says the region "+
                regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
                ", but it is dead.");
          }

          // Instantiate the location
          // HRegionLocation的构造稍微变复杂了,其实也就是封装了下,唯一多出来的就是一个seqNum,不知道具体作用还  
          location = new HRegionLocation(regionInfo, serverName,
            HRegionInfo.getSeqNumDuringOpen(regionInfoRow));
          cacheLocation(tableName, null, location);//缓存,备注【6】
          return location;
        } catch (TableNotFoundException e) {
          // if we got this error, probably means the table just plain doesn't
          // exist. rethrow the error immediately. this should always be coming
          // from the HTable constructor.
          throw e;
        } catch (IOException e) {
          if (e instanceof RemoteException) {
            e = ((RemoteException)e).unwrapRemoteException();
          }
          if (tries < numTries - 1) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("locateRegionInMeta parentTable=" +
                parentTable + ", metaLocation=" +
                ((metaLocation == null)? "null": "{" + metaLocation + "}") +
                ", attempt=" + tries + " of " +
                this.numTries + " failed; retrying after sleep of " +
                ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
            }
          } else {
            throw e;
          }
          // Only relocate the parent region if necessary
          if(!(e instanceof RegionOfflineException ||
              e instanceof NoServerForRegionException)) {
            relocateRegion(parentTable, metaKey);
          }
        }
        try{
          Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new IOException("Giving up trying to location region in " +
            "meta: thread is interrupted.");
        }
      }
    }

    

总的来看,流程和思路和94是完全一模一样的,只是具体的实现有了变化。

备注【1】【6】:HRegionLocation缓存和读取缓存,其中【1】读取缓存完全没有任何变化,但是【6】缓存HRegionlocation变化很大。94版的流程很简单,将region对应的服务器信息拿出来并缓存,然后将当前region加入到table的region缓存中。

96版的首先是获取table的所有region信息有了变化,以前是按照byte[]型tableName的hash值当键做的缓存索引,这里使用了TableName类型直接作为键,执行效率会有细微的降低,但是逻辑就更清楚了。(1)
然后serverName缓存也有了细微的变换,以前是缓存一个串,现在是缓存了一个ServerName类型。(2)

最后多了一个source参数,也是HRegionLocation类型,非空表示不是直接从meta表中查询到的。一般来说为空。(3)

private void cacheLocation(final TableName tableName, final HRegionLocation source,
        final HRegionLocation location) {
      boolean isFromMeta = (source == null);
      byte [] startKey = location.getRegionInfo().getStartKey();
      Map<byte[], HRegionLocation> tableLocations = getTableLocations(tableName);//(1)
      boolean isNewCacheEntry = false;
      boolean isStaleUpdate = false;
      HRegionLocation oldLocation = null;
      synchronized (this.cachedRegionLocations) {
        cachedServers.add(location.getServerName());//(2)
        oldLocation = tableLocations.get(startKey);// 还是用startKey做索引
        isNewCacheEntry = (oldLocation == null);//如果能查询出来,则表示更新
        // If the server in cache sends us a redirect, assume it's always valid.
        if (!isNewCacheEntry && !oldLocation.equals(source)) {
        //如果是更新,而且确实需要更新,这个equals方法只是判断了ServerName对象是否相等。代码就不贴了
          long newLocationSeqNum = location.getSeqNum();

          // 这里解释了两个判断是否为陈旧信息的判断标准,有可能服务器自己关闭了老的region,当我们请求的时候,告诉我们一个新的region信息,类似于我们用一个url信息请求一个网页,但是收到一个302
          // 或者服务器关闭了一个region,但是用相同的序列号又打开了一个新的。之前HRegionlocation的构造函数多了一个seqNum在这里就用上了,看来96版的region信息管理跟之前的版本相比较有较大的变化 
          // Meta record is stale - some (probably the same) server has closed the region
          // with later seqNum and told us about the new location.
          boolean isStaleMetaRecord = isFromMeta && (oldLocation.getSeqNum() > newLocationSeqNum);
          // Same as above for redirect. However, in this case, if the number is equal to previous
          // record, the most common case is that first the region was closed with seqNum, and then
          // opened with the same seqNum; hence we will ignore the redirect.
          // There are so many corner cases with various combinations of opens and closes that
          // an additional counter on top of seqNum would be necessary to handle them all.
          boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
          isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
        }
        if (!isStaleUpdate) {
          tableLocations.put(startKey, location);
        }
      }
      if (isNewCacheEntry) {
        if (LOG.isTraceEnabled()) {
          LOG.trace("Cached location for " +
            location.getRegionInfo().getRegionNameAsString() +
            " is " + location.getHostnamePort());
        }
      } else if (isStaleUpdate && !location.equals(oldLocation)) {
        if (LOG.isTraceEnabled()) {
          LOG.trace("Ignoring stale location update for "
            + location.getRegionInfo().getRegionNameAsString() + ": "
            + location.getHostnamePort() + " at " + location.getSeqNum() + "; local "
            + oldLocation.getHostnamePort() + " at " + oldLocation.getSeqNum());
        }
      }
    }


搞定,主要是对是否需要缓存当前拿到的region信息多了很多判断,看来服务端的代码也有很多不同的坑要填了。

然后看备注【2】的代码,获取RPC调用连接。之前是是一个HRegionInterface的实例,这里变成了ClientService.BlockingInterface实例。不过创建方式差不多的呢。

温故知新,先回顾下94版本的实现:
首先需要一个RPCEngine,默认为WritableRpcEngine,一个RPCInterface,默认是HRegionInterface。然后创建一个Invocation托管的实例,这个实例再委托HBaseclient通过传统的Socket完成RPC。其中RPCEngine完成对参数和方法的序列化--反序列化。

这里从表象上看,原有的方式被完全抛弃了,首先不再需要RPCEngine,而是使用了一个RpcClient,这是一个全新的类。RPC也不再直接使用传统的socket,而是使用了google的protobuf组件。表示不熟悉这个组件,暂时先把代码码出来吧。

public ClientService.BlockingInterface getClient(final ServerName sn)
    throws IOException {
      if (isDeadServer(sn)) {
        throw new RegionServerStoppedException(sn + " is dead.");
      }
      String key = getStubKey(ClientService.BlockingInterface.class.getName(), sn.getHostAndPort());
      // 这个key简单,这俩玩意中间加个@连起来
      this.connectionLock.putIfAbsent(key, key);
      ClientService.BlockingInterface stub = null;
      synchronized (this.connectionLock.get(key)) {
        stub = (ClientService.BlockingInterface)this.stubs.get(key);
        if (stub == null) {
          BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
            user, this.rpcTimeout);
          stub = ClientService.newBlockingStub(channel);
          // In old days, after getting stub/proxy, we'd make a call.  We are not doing that here.
          // Just fail on first actual call rather than in here on setup.
          this.stubs.put(key, stub);
        }
      }
      return stub;
    }


备注【2】只能这么多了,protobuf这个框架确实不熟悉。看看备注备注【3】吧,预抓取

private void prefetchRegionCache(final TableName tableName,
        final byte[] row) {
      // Implement a new visitor for MetaScanner, and use it to walk through
      // the hbase:meta
      // 还是用MetaScannerVisitorBase来遍历meta信息,跟之前一样,用MetaScanner来简化了之前基于Writable的HRegionInfo解析
      MetaScannerVisitor visitor = new MetaScannerVisitorBase() {
        public boolean processRow(Result result) throws IOException {
          try {
            HRegionInfo regionInfo = MetaScanner.getHRegionInfo(result);
            if (regionInfo == null) {
              return true;
            }

            // possible we got a region of a different table...
            if (!regionInfo.getTable().equals(tableName)) {
              return false; // stop scanning
            }
            if (regionInfo.isOffline()) {
              // don't cache offline regions
              return true;
            }

            ServerName serverName = HRegionInfo.getServerName(result);
            if (serverName == null) {
              return true; // don't cache it
            }
            // instantiate the location
            long seqNum = HRegionInfo.getSeqNumDuringOpen(result);//知道HRegionlocation的seqNum哪儿来的了。MS META表加料了哦。info:seqnumDuringOpen,没有则为-1
            HRegionLocation loc = new HRegionLocation(regionInfo, serverName, seqNum);
            // cache this meta entry
            cacheLocation(tableName, null, loc);
            return true;
          } catch (RuntimeException e) {
            throw new IOException(e);
          }
        }
      };
      try {
        // pre-fetch certain number of regions info at region cache.
        MetaScanner.metaScan(conf, this, visitor, tableName, row,
            this.prefetchRegionLimit, TableName.META_TABLE_NAME);
      } catch (IOException e) {
        LOG.warn("Encountered problems when prefetch hbase:meta table: ", e);
      }
    }

    public static ServerName getServerName(final Result r) {
    byte[] value = r.getValue(HConstants.CATALOG_FAMILY,
      HConstants.SERVER_QUALIFIER);
    if (value == null || value.length == 0) return null;
    String hostAndPort = Bytes.toString(value);
    value = r.getValue(HConstants.CATALOG_FAMILY,
      HConstants.STARTCODE_QUALIFIER);//还记得94版本中的那个坑么,最后取出来这个cell的值但是没有使用,没想到在这里用上了,这是下一盘很大的棋么
    if (value == null || value.length == 0) return null;
    return new ServerName(hostAndPort, Bytes.toLong(value));
  }

MetaScanner.metaScan跟之前的版本几乎完全一样,不贴了

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics