`

hbase put源码分析

阅读更多

 

这里写下HRegionServer在做put操作的源码:

 

HRegionServer 

 

  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
  throws ServiceException {
  ....
  try {
        region = getRegion(regionAction.getRegion());//获得对应操作的Region
      } catch (IOException e) {
        regionActionResultBuilder.setException(ResponseConverter.buildException(e));
        responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
        continue;  // For this region it's a failure.
      }
  ....
   if (regionAction.hasAtomic() && regionAction.getAtomic()) {//是否需要原子操作
        // How does this call happen?  It may need some work to play well w/ the surroundings.
        // Need to return an item per Action along w/ Action index.  TODO.
        try {
          mutateRows(region, regionAction.getActionList(), cellScanner);//以下详细介绍
        } catch (IOException e) {
          // As it's atomic, we may expect it's a global failure.
          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
        }
      } else {
        // doNonAtomicRegionMutation manages the exception internally
        cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
            regionActionResultBuilder, cellsToReturn, nonceGroup);
      }
  ....
  }

 HRegionServer的mutateRows方法:

 

 

  protected void mutateRows(final HRegion region, final List<ClientProtos.Action> actions,
      final CellScanner cellScanner)
  throws IOException {
    if (!region.getRegionInfo().isMetaTable()) {
      cacheFlusher.reclaimMemStoreMemory();
    }
    RowMutations rm = null;
    for (ClientProtos.Action action: actions) {
      if (action.hasGet()) {
        throw new DoNotRetryIOException("Atomic put and/or delete only, not a Get=" +
          action.getGet());
      }
      MutationType type = action.getMutation().getMutateType();
      if (rm == null) {
        rm = new RowMutations(action.getMutation().getRow().toByteArray());
      }
      switch (type) {
      case PUT:
        rm.add(ProtobufUtil.toPut(action.getMutation(), cellScanner));
        break;
      case DELETE:
        rm.add(ProtobufUtil.toDelete(action.getMutation(), cellScanner));
        break;
      default:
          throw new DoNotRetryIOException("Atomic put and/or delete only, not " + type.name());
      }
    }
    region.mutateRow(rm);//HRegion执行操作
  }
  

 调用HRegion

 

 

    public void mutateRow(RowMutations rm) throws IOException {
    // Don't need nonces here - RowMutations only supports puts and deletes
    mutateRowsWithLocks(rm.getMutations(), Collections.singleton(rm.getRow()));
  }

 HRegion的mutateRow详细过程:

 

  1.执行操作前的coprocessorHost的操作

   MultiRowMutationProcessor类

  2.获取rowlock(rowid)

  3.尝试获取HRegion(updatesLock) read lock

  4.执行row变化的处理(如timestamp),并且封装wal需要的walEdit对象

  MultiRowMutationProcessor类

  5.封装一个writeEntry(带有一个write number),并加到MultiVersionConsistencyControl类的writequeue中

  6.添加到对应HStore里的memstore中,(需要store的读锁)

  7.执行wal,生成logkey(用于和memflush时删除wal用,与mvcc无关),放入logedit的队列中(异步)

  FSHlog

  8.释放HRegion(updatesLock) read lock

  9.释放rowlock(rowid) latchdown--

  10.同步这个hregionserver上的hlog到当前的hlog,

  使用FSHlog里的unflushedEntries这个每次添加logedit就加加的AtomicLong作为坐标值txid

  FSHlog类

  11.MultiVersionConsistencyControl类,使用writequeue中的值,将readpoint设置到最大的完成wal的writepoint值,等待readpoint大于当前的writepoint(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)

  12.MultiRowMutationProcessor执行postProcess

  执行协同类

 

以下是带源码的分析过程:

1.执行操作前的coprocessorHost的操作

   MultiRowMutationProcessor类

2.获取rowlock(rowid)

 

  /**
   * Tries to acquire a lock on the given row.
   * @param waitForLock if true, will block until the lock is available.
   *        Otherwise, just tries to obtain the lock and returns
   *        false if unavailable.
   * @return the row lock if acquired,
   *   null if waitForLock was false and the lock was not acquired
   * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
   */
  public RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException {
    checkRow(row, "row lock");
    startRegionOperation();
    try {
      HashedBytes rowKey = new HashedBytes(row);
      RowLockContext rowLockContext = new RowLockContext(rowKey);

      // loop until we acquire the row lock (unless !waitForLock)
      while (true) {//循环获取此rowid
        RowLockContext existingContext = lockedRows.putIfAbsent(rowKey, rowLockContext);//尝试获取rowid
        if (existingContext == null) {//没有其他获得此rowid
          // Row is not already locked by any thread, use newly created context.
          break;
        } else if (existingContext.ownedByCurrentThread()) {//当前进行已经获取此rowid
          // Row is already locked by current thread, reuse existing context instead.
          rowLockContext = existingContext;
          break;
        } else {//其他进行获得rowid
          // Row is already locked by some other thread, give up or wait for it
          if (!waitForLock) {
            return null;
          }
          try {
            if (!existingContext.latch.await(this.rowLockWaitDuration, TimeUnit.MILLISECONDS)) {//使用CountDownLatch等待其他进行释放此rowid
              throw new IOException("Timed out waiting for lock for row: " + rowKey);
            }
          } catch (InterruptedException ie) {
            LOG.warn("Thread interrupted waiting for lock on row: " + rowKey);
            InterruptedIOException iie = new InterruptedIOException();
            iie.initCause(ie);
            throw iie;
          }
        }
      }

      // allocate new lock for this thread
      return rowLockContext.newLock();
    } finally {
      closeRegionOperation();
    }
  }

 

3.尝试获取HRegion(updatesLock) read lock

  

  

  4.执行row变化的处理(如timestamp),并且封装wal需要的walEdit对象

  MultiRowMutationProcessor类 

 

   @Override
  public void process(long now,
                      HRegion region,
                      List<KeyValue> mutationKvs,
                      WALEdit walEdit) throws IOException {
    byte[] byteNow = Bytes.toBytes(now);
    // Check mutations and apply edits to a single WALEdit
    for (Mutation m : mutations) {//对每个row进行检查,更新timestamp
      if (m instanceof Put) {
        Map<byte[], List<Cell>> familyMap = m.getFamilyCellMap();
        region.checkFamilies(familyMap.keySet());
        region.checkTimestamps(familyMap, now);
        region.updateKVTimestamps(familyMap.values(), byteNow);
      } else if (m instanceof Delete) {
        Delete d = (Delete) m;
        region.prepareDelete(d);
        region.prepareDeleteTimestamps(d.getFamilyCellMap(), byteNow);
      } else {
        throw new DoNotRetryIOException(
            "Action must be Put or Delete. But was: "
            + m.getClass().getName());
      }
      for (List<Cell> cells: m.getFamilyCellMap().values()) {
        boolean writeToWAL = m.getDurability() != Durability.SKIP_WAL;
        for (Cell cell : cells) {//对每个cf的row,写入walEdit对象
          KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
          mutationKvs.add(kv);
          if (writeToWAL) {
            walEdit.add(kv);
          }
        }
      }
    }
  }

 

 

5.封装一个writeEntry(带有一个write number),并加到MultiVersionConsistencyControl类的writequeue中

  writeEntry = mvcc.beginMemstoreInsert();

 

  /**
   * Generate and return a {@link WriteEntry} with a new write number.
   * To complete the WriteEntry and wait for it to be visible,
   * call {@link #completeMemstoreInsert(WriteEntry)}.
   */
  public WriteEntry beginMemstoreInsert() {
    synchronized (writeQueue) {
      long nextWriteNumber = ++memstoreWrite;//使用当前HRegion的mvcc的writepoint++作为logedit的number
      WriteEntry e = new WriteEntry(nextWriteNumber);
      writeQueue.add(e);
      return e;
    }
  }

   6.添加到对应HStore里的memstore中,(需要store的读锁)

 

   @Override
  public long add(final KeyValue kv) {
    lock.readLock().lock();
    try {
      return this.memstore.add(kv);
    } finally {
      lock.readLock().unlock();
    }
  }

   7.执行wal,生成logkey(用于和memflush时删除wal用,与mvcc无关),放入logedit的队列中(异步)

  FSHlog

  /**
   * Append a set of edits to the log. Log edits are keyed by (encoded)
   * regionName, rowname, and log-sequence-id.
   *
   * Later, if we sort by these keys, we obtain all the relevant edits for a
   * given key-range of the HRegion (TODO). Any edits that do not have a
   * matching COMPLETE_CACHEFLUSH message can be discarded.
   *
   * <p>
   * Logs cannot be restarted once closed, or once the HLog process dies. Each
   * time the HLog starts, it must create a new log. This means that other
   * systems should process the log appropriately upon each startup (and prior
   * to initializing HLog).
   *
   * synchronized prevents appends during the completion of a cache flush or for
   * the duration of a log roll.
   *
   * @param info
   * @param tableName
   * @param edits
   * @param clusterIds that have consumed the change (for replication)
   * @param now
   * @param doSync shall we sync?
   * @param sequenceId of the region.
   * @return txid of this transaction
   * @throws IOException
   */
  @SuppressWarnings("deprecation")
  private long append(HRegionInfo info, TableName tableName, WALEdit edits, List<UUID> clusterIds,
      final long now, HTableDescriptor htd, boolean doSync, boolean isInMemstore, 
      AtomicLong sequenceId, long nonceGroup, long nonce) throws IOException {
      if (edits.isEmpty()) return this.unflushedEntries.get();
      if (this.closed) {
        throw new IOException("Cannot append; log is closed");
      }
      TraceScope traceScope = Trace.startSpan("FSHlog.append");
      try {
        long txid = 0;
        synchronized (this.updateLock) {//同步hlog的更新锁
          // get the sequence number from the passed Long. In normal flow, it is coming from the
          // region.
          long seqNum = sequenceId.incrementAndGet();
          //生成region的seqnum,用于当memstore flush了,则mem的seqnum>=hlog的seqnum,则删除这些hlog
          // The 'lastSeqWritten' map holds the sequence number of the oldest
          // write for each region (i.e. the first edit added to the particular
          // memstore). . When the cache is flushed, the entry for the
          // region being flushed is removed if the sequence number of the flush
          // is greater than or equal to the value in lastSeqWritten.
          // Use encoded name.  Its shorter, guaranteed unique and a subset of
          // actual  name.
          byte [] encodedRegionName = info.getEncodedNameAsBytes();//regioncode
          if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);//没有这个region,则添加(存储着这个region最早的修改seqnum)
          HLogKey logKey = makeKey(
            encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce);

          synchronized (pendingWritesLock) {
            doWrite(info, logKey, edits, htd);//将logedit写入队列
            txid = this.unflushedEntries.incrementAndGet();
          }
          this.numEntries.incrementAndGet();
          this.asyncWriter.setPendingTxid(txid);

          if (htd.isDeferredLogFlush()) {
            lastUnSyncedTxid = txid;
          }
          this.latestSequenceNums.put(encodedRegionName, seqNum);
        }
        // TODO: note that only tests currently call append w/sync.
        //       Therefore, this code here is not actually used by anything.
        // Sync if catalog region, and if not then check if that table supports
        // deferred log flushing
        if (doSync &&
            (info.isMetaRegion() ||
            !htd.isDeferredLogFlush())) {
          // sync txn to file system
          this.sync(txid);
        }
        return txid;
      } finally {
        traceScope.close();
      }
    }

 8.释放HRegion(updatesLock) read lock

  

  9.释放rowlock(rowid) latchdown--

  

  10.同步这个hregionserver上的hlog到当前的hlog,

  使用FSHlog里的unflushedEntries这个每次添加logedit就加加的AtomicLong作为坐标值txid

  FSHlog类

 // sync all transactions upto the specified txid
  private void syncer(long txid) throws IOException {
    synchronized (this.syncedTillHere) {
      while (this.syncedTillHere.get() < txid) {
        try {
          this.syncedTillHere.wait();

          if (txid <= this.failedTxid.get()) {
            assert asyncIOE != null :
              "current txid is among(under) failed txids, but asyncIOE is null!";
            throw asyncIOE;
          }
        } catch (InterruptedException e) {
          LOG.debug("interrupted while waiting for notification from AsyncNotifier");
        }
      }
    }
  }

 

  11.MultiVersionConsistencyControl类,使用writequeue中的值,将readpoint设置到最大的完成wal的writepoint值,等待readpoint大于当前的writepoint(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)

 /**
   * Complete a {@link WriteEntry} that was created by {@link #beginMemstoreInsert()}.
   *
   * At the end of this call, the global read point is at least as large as the write point
   * of the passed in WriteEntry.  Thus, the write is visible to MVCC readers.
   */
  public void completeMemstoreInsert(WriteEntry e) {
    advanceMemstore(e);
    waitForRead(e);
  }
  
  /**
   * Mark the {@link WriteEntry} as complete and advance the read point as
   * much as possible.
   *
   * How much is the read point advanced?
   * Let S be the set of all write numbers that are completed and where all previous write numbers
   * are also completed.  Then, the read point is advanced to the supremum of S.
   *
   * @param e
   * @return true if e is visible to MVCC readers (that is, readpoint >= e.writeNumber)
   */
  boolean advanceMemstore(WriteEntry e) {
    synchronized (writeQueue) {
      e.markCompleted();

      long nextReadValue = -1;
      boolean ranOnce=false;
      while (!writeQueue.isEmpty()) {//将readpoint设置到最大的完成wal的writepoint值
        ranOnce=true;
        WriteEntry queueFirst = writeQueue.getFirst();

        if (nextReadValue > 0) {
          if (nextReadValue+1 != queueFirst.getWriteNumber()) {
            throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
                + nextReadValue + " next: " + queueFirst.getWriteNumber());
          }
        }

        if (queueFirst.isCompleted()) {
          nextReadValue = queueFirst.getWriteNumber();
          writeQueue.removeFirst();
        } else {
          break;
        }
      }

      if (!ranOnce) {
        throw new RuntimeException("never was a first");
      }

      if (nextReadValue > 0) {
        synchronized (readWaiters) {
          memstoreRead = nextReadValue;
          readWaiters.notifyAll();
        }
      }
      if (memstoreRead >= e.getWriteNumber()) {
        return true;
      }
      return false;
    }
  }
  ----------------------------
  /**
   * Wait for the global readPoint to advance upto
   * the specified transaction number.
   */
  public void waitForRead(WriteEntry e) {
    boolean interrupted = false;
    synchronized (readWaiters) {
      while (memstoreRead < e.getWriteNumber()) {//等待readpoint大于当前的writepoint
        try {
        	//(因为wal是异步进行的,所以readpoint有可能比当前writepoint打,或是之前的writepoint还没有completed,所以要进行等待到readpoint至少等于当前的writepoint)
          readWaiters.wait(0);
        } catch (InterruptedException ie) {
          // We were interrupted... finish the loop -- i.e. cleanup --and then
          // on our way out, reset the interrupt flag.
          interrupted = true;
        }
      }
    }
    if (interrupted) Thread.currentThread().interrupt();
  }

 

  12.MultiRowMutationProcessor执行postProcess

  执行协同类

 

 

结束

 

 

 

 

 

 

 

 

 

 

 

 

1
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics