`

2。hbase CRUD--Read(Scan) operations(server side)

 
阅读更多

 

  just recovered from a disease,i should finish the retain part of work now...

  yes,as u can see,the Scan/Get oper from hbase is some more tricky,as the kvs multi-dimensions and related to storefile and memstore.

 

----Part 1:Abstract

  first ,have a glance at the query flow below:




 
there are two main loops in a Scan oper:

1. iterate rowkeys -controls how to filter per row kvs and limit the number of rows returned to client.

2.iterate qualifiers of the same row-determine which qulifiers to be matched and how many verions of a  same qualifier. 

 

 second,here is a Heap Tree Search Model of hbase



there are three level scanners in hbase :

1.region scanner-use to combine all kvs from store scanners

2. store scanner-integrate all underlying storefile scanners and one memstore scanner

3.storefile /memstore scanner-the leafe scanners in hbase,these are the data sources

 

  every child lifts the min references to parent,for the later ,will contains a heap called KeyValueHeap which implements "heap merge",that is choosed a scanner whcih has the min kv as current sacnner.the heap also has a in-built "PriorityQueue" to achieve the min scanner.

 

----Part 2:some utilities classes

class usage  
RegionScannerImpl implement of region scanner  
ScanQueryMatcher

whether a qualifier is to filter out or remain or see to next row/col;

and columns amount checking ,number of verions...

 
ColumnTracker tracks the expected qualifiers and versions,used with above  
HFileReaderV2 how to read a kv from  a hfile or a data block  
MemStore the writing cache of a store   

 

 

----Part 3:Implementions

 below loop is corresponding the first loop above:

public Result[] next(final long scannerId, int nbRows) throws IOException {
  ...
  //2 -- FIRST LOOP:rowKeys
      for (int i = 0; i < nbRows	//limit by client size
          && currentScanResultSize < maxScannerResultSize; i++) {	//limit by return byte size
        requestCount.incrementAndGet();
        // Collect values to be returned here
        boolean moreRows = s.next(values, SchemaMetrics.METRIC_NEXTSIZE); //-go into RegionScannerImpl
        if (!values.isEmpty()) {
          for (KeyValue kv : values) {
            currentScanResultSize += kv.heapSize();
          }
          results.add(new Result(values));
        }
        if (!moreRows) {
          break;
        }
        values.clear();
      }
  ....
}

 

 

second loop:

/**依据scannerid记录的历史scan偏移,取出limit(实际是batch) fields/cols of a row;此方法是对底层(memstore,hfiles)scan操作的包装类
     * @return true if exists more rows(only use for real Scan to stop remain iterations)
     */
    private boolean nextInternal(int limit, String metric) throws IOException {
      RpcCallContext rpcCall = HBaseServer.getCurrentCall();
      while (true) {	//outer loop:skip unmatched rows
        if (rpcCall != null) {
          // If a user specifies a too-restrictive or too-slow scanner, the
          // client might time out and disconnect while the server side
          // is still processing the request. We should abort aggressively/竭力地
          // in that case.
          rpcCall.throwExceptionIfCallerDisconnected();
        }
        //--NOTE this is rowKey only instead of composite key(used in heap to switch scanners) to compare!
        byte [] currentRow = peekRow(); //-so the second loop to will need to compare qualifies in fact
        if (isStopRow(currentRow)) { //--NOTE case in providing a stop row,this will avoid iterating the remain rows
          if (filter != null && filter.hasFilterRow()) {
            filter.filterRow(results);
          }
          if (filter != null && filter.filterRow()) { //filterRow:true to exclude row
            results.clear();
          }

          return false;
        } else if (filterRowKey(currentRow)) { //-NOTE ignore needless row,eg. PrefixFilter if currentRow is less then the prefix
          nextRow(currentRow);
        } else {
          byte [] nextRow;
          do {//-1.how to locate the expected row,2 how to retrieve all kvs about this row? see StoreScanner#next(...) @A
            this.storeHeap.next(results, limit - results.size(), metric); 
            if (limit > 0 && results.size() == limit) {
              if (this.filter != null && filter.hasFilterRow()) {
                throw new IncompatibleFilterException(//client processed also,@see Scan#setBatch()
                  "Filter with filterRow(List<KeyValue>) incompatible with scan with limit!");
              }
              return true; // we are expecting more yes, but also limited to how many we can return.
            } //-this loop is for:if one store scanner iterate completely,go to next one
          } while (Bytes.equals(currentRow, nextRow = peekRow())); //-note:use row key only to compare;retrieve same rowkey's fields(cols)

          final boolean stopRow = isStopRow(nextRow); //-ture for Get oper;but maybe false for real Scan

          // now that we have an entire row, lets process with a filters:

          // first filter with the filterRow(List)
          if (filter != null && filter.hasFilterRow()) {
            filter.filterRow(results); //--exclude or adjust the results before returning to clent for improving perf?
          }

          if (results.isEmpty() || filterRow()) {
            // this seems like a redundant step - we already consumed the row
            // there're no left overs.
            // the reasons for calling this method are:
            // 1. reset the filters.
            // 2. provide a hook to fast forward the row (used by subclasses)
            nextRow(currentRow);

            // This row was totally filtered out, if this is NOT the last row,
            // we should continue on.-so continue the next round seeking the expect row

            if (!stopRow) continue; //-not Get oper(ie. Scan),iterrate the next row
          }
          return !stopRow; //-if this is a stoprow(no more rows),return false 
        }//else
      }//while
    }

 KeyValueHeap#next(xxx)

/**
   * Gets the next row of keys from the top-most scanner.--will be invoked in loop for the same rowKey
   * <p>
   * This method takes care of updating the heap.
   * <p>
   * This can ONLY be called when you are using Scanners that implement
   * InternalScanner as well as KeyValueScanner (a {@link StoreScanner}).
   * @param result output result list
   * @param limit limit on row count to get
   * @param metric the metric name
   * @return true if there are more keys, false if all scanners are done
   */
  public boolean next(List<KeyValue> result, int limit, String metric) throws IOException {
    if (this.current == null) {
      return false;
    }
    InternalScanner currentAsInternal = (InternalScanner)this.current; //-StoreScanner if invoked from region level;
    boolean mayContainMoreRows = currentAsInternal.next(result, limit, metric);
    KeyValue pee = this.current.peek(); //-whether this scanner exists more data?if false then destroy it by close() below
    /*
     * By definition, any InternalScanner must return false only when it has no
     * further rows to be fetched. So, we can close a scanner if it returns
     * false. All existing implementations seem to be fine with this. It is much
     * more efficient to close scanners which are not needed than keep them in
     * the heap. This is also required for certain optimizations.--this descriptions maybe focus older versions as below has closed
     */
    if (pee == null || !mayContainMoreRows) { //-no more data
      this.current.close();
    } else {
      this.heap.add(this.current); //-put back to priority queue , usage:switch StoreScanner to seek smallest kv?yes
    }
    this.current = pollRealKV(); //---then acquire the smallest-kv scanner in current again
    return (this.current != null);
  }

KeyValueHeap#next()---switch the min sanner per invoking

/** --return the next kv and update the current scanner if any
   * --if this scanner is the region level,this method *maybe* switch the current smallest kv among multi-memstores/storefiles.
   * example,same row with certain col updates:
   * -------------------------------------
   * order | ts | ms1   || order | ts |ms2
   * -------------------------------------
   * 1	   | 5  | q1    || 1     | 4  | q2	 
   * 2     | 3  | q1    || 2     | 2  | q2	--switch MemStoreScanners/StoreFileScanners by ts as 3 < 4,so this time will use ms2 scanner as current
   */
  public KeyValue next()  throws IOException {
    if(this.current == null) { //maybe null if closed or some cases below pollReaKV()
      return null;
    }
    KeyValue kvReturn = this.current.next(); //-@A retrieve kv of current scanner;this position if not changed if it is from put back to heap
    KeyValue kvNext = this.current.peek();   //-prepare:also probe the next scanner for next time to invoke this method
    if (kvNext == null) {
      this.current.close();
      this.current = pollRealKV(); 
    } else {
      KeyValueScanner topScanner = this.heap.peek();
      if (topScanner == null || //-in fact is needless here
          this.comparator.compare(kvNext, topScanner.peek()) >= 0) { //-NOTE multi-memstores/storefiles case:switch the smallest(latest) scanners by kv
        this.heap.add(this.current); //put back to heap .NOTE the next time to invoke next() will seek to next kv correctly,see @A
        this.current = pollRealKV(); //-then acquire the kv-least(newest) scanner
      }
    }
    return kvReturn;
  }

  

 

StoreScanner#next()

/**--second loop for retrieve 
   * --most of methods here are synchronized,but note that this class is a instance per Get,so i think this will not decrease the perf
   * Get the next row of values from this Store.-  how to guarantee all kvs are belong same row?see below @A
   * @param outResult
   * @param limit ---1 means all kvs to be returned
   * @return true if there are more rows, false if scanner is done
   */
  @Override
  public synchronized boolean next(List<KeyValue> outResult, int limit,
      String metric) throws IOException {
	  //--some preconditions every time invoke this method for the same/different row key--
    if (checkReseek()) {
      return true;
    }

    // if the heap was left null, then the scanners had previously run out anyways, close and
    // return.
    if (this.heap == null) {
      close();
      return false;
    }

    KeyValue peeked = this.heap.peek(); //-actually use this.current to do 
    if (peeked == null) {
      close();
      return false;
    }

    // only call setRow if the row changes; avoids confusing the query matcher--NOTE keep the resulting kvs all belong this row
    // if scanning intra-row/行内-that is same rowKey @A
    if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
      matcher.setRow(peeked.getRow());
    }

    KeyValue kv;
    KeyValue prevKV = null;

    // Only do a sanity-check if store and comparator are available.
    KeyValue.KVComparator comparator =
        store != null ? store.getComparator() : null;

    long cumulativeMetric = 0;
    int count = 0;
    try {  //--NOTE SECOND LOOP:qualifies
      LOOP: while((kv = this.heap.peek()) != null) { //-iterate throughout current row's kvs
        // Check that the heap gives us KVs in an increasing order.
        assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 :
          "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store;
        prevKV = kv;
        ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
        switch(qcode) {
          case INCLUDE: //-all are backed to client below cases
          case INCLUDE_AND_SEEK_NEXT_ROW:
          case INCLUDE_AND_SEEK_NEXT_COL:

            Filter f = matcher.getFilter();
            outResult.add(f == null ? kv : f.transform(kv)); //-maybe do a simplified conversion of kv,eg. KeyOnlyFilter use key only
            count++;

            if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
              if (!matcher.moreRowsMayExistAfter(kv)) { //--a simple,effect mean to return directly without doing a loop to check again
                return false;
              }
              reseek(matcher.getKeyForNextRow(kv)); //-construct a fake kv to quickly locate to the next row position,
            } else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
              reseek(matcher.getKeyForNextColumn(kv)); //---do it here is more effect than outer loop
            } else { //-case INCLUDE
              this.heap.next(); //--here will not miss the retunred kv as we will use peak() to use this kv before any next() 
            }

            cumulativeMetric += kv.getLength();
            if (limit > 0 && (count == limit)) { //-reach the limit,return
              break LOOP; //-same as break
            }
            continue; //-for include 

          case DONE:
            return true;

          case DONE_SCAN: //-this scan is complete
            close();

            return false;

          case SEEK_NEXT_ROW: //-a bit like appropriate include one;
            // This is just a relatively simple end of scan fix, to short-cut end
            // us if there is an endKey in the scan.
            if (!matcher.moreRowsMayExistAfter(kv)) { //--quick effect way to terminate this get/scan oper
              return false;
            }

            reseek(matcher.getKeyForNextRow(kv));
            break;

          case SEEK_NEXT_COL: //-a bit like appropriate include one
            reseek(matcher.getKeyForNextColumn(kv));
            break;

          case SKIP: //-return's value is checked by count(number of versions);ignore this col and seek to next one
            this.heap.next(); 
            break;

          case SEEK_NEXT_USING_HINT:
            KeyValue nextKV = matcher.getNextKeyHint(kv);
            if (nextKV != null) {
              reseek(nextKV);
            } else {
              heap.next();
            }
            break;

          default:
            throw new RuntimeException("UNEXPECTED");
        }//switch
      }//while
    } finally {
      if (cumulativeMetric > 0 && metric != null) {
        RegionMetricsStorage.incrNumericMetric(this.metricNamePrefix + metric,
            cumulativeMetric);
      }
    }

    if (count > 0) {
      return true;
    }

    // No more keys
    close();
    return false;
  }

 

 

 

----Part 4: FAQs

 

 

 

 

ref:

2。hbase CRUD--Read(query) operations 

hbase源码系列(十二)Get、Scan在服务端是如何处理?

 

  • 大小: 70.4 KB
  • 大小: 109.4 KB
  • 大小: 46.3 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics