`

hbase hlog源码

阅读更多

  HLog线程的启动入口:

  

  HRegionServer启动线程

    private void startServiceThreads() throws IOException {.....
    Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
        uncaughtExceptionHandler);//logRoller守护进程,每一个小时生成一个hlog
    this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this);
    splitLogWorker.start();//启动split work类
  .....}
  HRegionServer实例化FSHlog
    protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
  throws IOException {.....
  this.hlog = setupWALAndReplication();//RS的hlog
  .....}

 

  FSHlog启动hlog的各个线程 

 asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter");
    asyncWriter.start();//从队列写logedit到stream的进程
   
    int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5);
    asyncSyncers = new AsyncSyncer[syncerNums];
    for (int i = 0; i < asyncSyncers.length; ++i) {
      asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i);
      asyncSyncers[i].start();//将stream里的logedit写到hdfs上,并且同步image
    }

    asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier");
    asyncNotifier.start();//专门的notify同步线程,(无需同步线程去notify其他的)??

    coprocessorHost = new WALCoprocessorHost(this, conf);

    this.metrics = new MetricsWAL();

 

 HLog的wal过程:

  

  HBase put源码分析中 http://blackproof.iteye.com/blog/2197710

  在第7步中,将hlog写入logedit队列中

  FSHLog的AsyncWriter线程将队列写入hdfs中

  1.等待新write进入队列

  2.等待队列同步,置换,清空

  3.logedit写入hdfs(outputstream中)

  4.更新最新的write num

  public void run() {
      try {
        while (!this.isInterrupted()) {
          // 1. wait until there is new writes in local buffer
          synchronized (this.writeLock) {//写同步
            while (this.pendingTxid <= this.lastWrittenTxid) {//等待新writenum,队列中的writenum大于当前writenum
              this.writeLock.wait();//否则等待新write edit
            }
          }

          // 2. get all buffered writes and update 'real' pendingTxid
          //    since maybe newer writes enter buffer as AsyncWriter wakes
          //    up and holds the lock
          // NOTE! can't hold 'updateLock' here since rollWriter will pend
          // on 'sync()' with 'updateLock', but 'sync()' will wait for
          // AsyncWriter/AsyncSyncer/AsyncNotifier series. without updateLock
          // can leads to pendWrites more than pendingTxid, but not problem
          List<Entry> pendWrites = null;
          synchronized (pendingWritesLock) {//等待队列同步,置换,清空
            this.txidToWrite = unflushedEntries.get();
            pendWrites = pendingWrites;
            pendingWrites = new LinkedList<Entry>();
          }

          // 3. write all buffered writes to HDFS(append, without sync)
          try {
            for (Entry e : pendWrites) {
              writer.append(e);//hlog edit写入hdfs中
            }
          } catch(IOException e) {
            LOG.error("Error while AsyncWriter write, request close of hlog ", e);
            requestLogRoll();

            asyncIOE = e;
            failedTxid.set(this.txidToWrite);
          }

          // 4. update 'lastWrittenTxid' and notify AsyncSyncer to do 'sync'
          this.lastWrittenTxid = this.txidToWrite;//更新最新的write num
          boolean hasIdleSyncer = false;
          for (int i = 0; i < asyncSyncers.length; ++i) {
            if (!asyncSyncers[i].isSyncing()) {
              hasIdleSyncer = true;
              asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid);
              break;
            }
          }
          if (!hasIdleSyncer) {
            int idx = (int)this.lastWrittenTxid % asyncSyncers.length;
            asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid);
          }
        }
      } catch (InterruptedException e) {
        LOG.debug(getName() + " interrupted while waiting for " +
            "newer writes added to local buffer");
      } catch (Exception e) {
        LOG.error("UNEXPECTED", e);
      } finally {
        LOG.info(getName() + " exiting");
      }
    }
  }
  

 

 

  FSHlog的AsyncSyncer线程,会将outputstream的数据flush到磁盘,并同步hdfs的image

   public void run() {
      try {
        while (!this.isInterrupted()) {
          // 1. wait until AsyncWriter has written data to HDFS and
          //    called setWrittenTxid to wake up us
          synchronized (this.syncLock) {
            while (this.writtenTxid <= this.lastSyncedTxid) {
              this.syncLock.wait();
            }
            this.txidToSync = this.writtenTxid;
          }

          // if this syncer's writes have been synced by other syncer:
          // 1. just set lastSyncedTxid
          // 2. don't do real sync, don't notify AsyncNotifier, don't logroll check
          // regardless of whether the writer is null or not
          if (this.txidToSync <= syncedTillHere.get()) {
            this.lastSyncedTxid = this.txidToSync;
            continue;
          }

          // 2. do 'sync' to HDFS to provide durability
          long now = EnvironmentEdgeManager.currentTimeMillis();
          try {
            if (writer == null) {
              // the only possible case where writer == null is as below:
              // 1. t1: AsyncWriter append writes to hdfs,
              //        envokes AsyncSyncer 1 with writtenTxid==100
              // 2. t2: AsyncWriter append writes to hdfs,
              //        envokes AsyncSyncer 2 with writtenTxid==200
              // 3. t3: rollWriter starts, it grabs the updateLock which
              //        prevents further writes entering pendingWrites and
              //        wait for all items(200) in pendingWrites to append/sync
              //        to hdfs
              // 4. t4: AsyncSyncer 2 finishes, now syncedTillHere==200
              // 5. t5: rollWriter close writer, set writer=null...
              // 6. t6: AsyncSyncer 1 starts to use writer to do sync... before
              //        rollWriter set writer to the newly created Writer
              //
              // Now writer == null and txidToSync > syncedTillHere here:
              // we need fail all the writes with txid <= txidToSync to avoid
              // 'data loss' where user get successful write response but can't
              // read the writes!
              LOG.fatal("should never happen: has unsynced writes but writer is null!");
              asyncIOE = new IOException("has unsynced writes but writer is null!");
              failedTxid.set(this.txidToSync);
            } else {
              this.isSyncing = true;            
              writer.sync();//同步方法//hlog的地方需要增加retry,延长文件租期过期时间,hlog是否重写
              this.isSyncing = false;
            }
            postSync();
          } catch (IOException e) {
            LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e);
            requestLogRoll();

            asyncIOE = e;
            failedTxid.set(this.txidToSync);

            this.isSyncing = false;
          }
          metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);

          // 3. wake up AsyncNotifier to notify(wake-up) all pending 'put'
          // handler threads on 'sync()'
          this.lastSyncedTxid = this.txidToSync;
          asyncNotifier.setFlushedTxid(this.lastSyncedTxid);

          // 4. check and do logRoll if needed
          boolean logRollNeeded = false;
          if (rollWriterLock.tryLock()) {
            try {
              logRollNeeded = checkLowReplication();
            } finally {
              rollWriterLock.unlock();
            }            
            try {
              if (logRollNeeded || writer != null && writer.getLength() > logrollsize) {
                requestLogRoll();
              }
            } catch (IOException e) {
              LOG.warn("writer.getLength() failed,this failure won't block here");
            }
          }
        }
      } catch (InterruptedException e) {
        LOG.debug(getName() + " interrupted while waiting for " +
            "notification from AsyncWriter thread");
      } catch (Exception e) {
        LOG.error("UNEXPECTED", e);
      } finally {
        LOG.info(getName() + " exiting");
      }
    }
  }

 

 

  LogRoller定时调用FSHlog的rollwrite方法,生成hlog,将stream流中的数据输出到hdfs上

   @Override
  public byte [][] rollWriter(boolean force)
      throws FailedLogCloseException, IOException {
    rollWriterLock.lock();
    try {
      // Return if nothing to flush.
      if (!force && this.writer != null && this.numEntries.get() <= 0) {
        return null;
      }
      byte [][] regionsToFlush = null;
      if (closed) {
        LOG.debug("HLog closed. Skipping rolling of writer");
        return null;
      }
      try {
        if (!closeBarrier.beginOp()) {
          LOG.debug("HLog closing. Skipping rolling of writer");
          return regionsToFlush;
        }
        // Do all the preparation outside of the updateLock to block
        // as less as possible the incoming writes
        long currentFilenum = this.filenum;
        Path oldPath = null;
        if (currentFilenum > 0) {
          //computeFilename  will take care of meta hlog filename
          oldPath = computeFilename(currentFilenum);
        }
        this.filenum = System.currentTimeMillis();
        Path newPath = computeFilename();
        while (fs.exists(newPath)) {
          this.filenum++;
          newPath = computeFilename();
        }

        // Tell our listeners that a new log is about to be created
        if (!this.listeners.isEmpty()) {
          for (WALActionsListener i : this.listeners) {
            i.preLogRoll(oldPath, newPath);
          }
        }
        FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
        // Can we get at the dfsclient outputstream?
        FSDataOutputStream nextHdfsOut = null;
        if (nextWriter instanceof ProtobufLogWriter) {
          nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
          // perform the costly sync before we get the lock to roll writers.
          try {
            nextWriter.sync();//protobuf flush到hdfs上,并且seque write写hdfs的同步,让block和image同步
          } catch (IOException e) {
            // optimization failed, no need to abort here.
            LOG.warn("pre-sync failed", e);
          }
        }

        Path oldFile = null;
        int oldNumEntries = 0;
        synchronized (updateLock) {
          // Clean up current writer.
          oldNumEntries = this.numEntries.get();
          oldFile = cleanupCurrentWriter(currentFilenum);
          this.writer = nextWriter;
          this.hdfs_out = nextHdfsOut;
          this.numEntries.set(0);
          if (oldFile != null) {
            this.hlogSequenceNums.put(oldFile, this.latestSequenceNums);
            this.latestSequenceNums = new HashMap<byte[], Long>();
          }
        }
        if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath));
        else {
          long oldFileLen = this.fs.getFileStatus(oldFile).getLen();
          this.totalLogSize.addAndGet(oldFileLen);
          LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries="
              + oldNumEntries + ", filesize="
              + StringUtils.humanReadableInt(oldFileLen) + "; new WAL "
              + FSUtils.getPath(newPath));
        }

        // Tell our listeners that a new log was created
        if (!this.listeners.isEmpty()) {
          for (WALActionsListener i : this.listeners) {
            i.postLogRoll(oldPath, newPath);
          }
        }

        // Can we delete any of the old log files?
        if (getNumRolledLogFiles() > 0) {
          cleanOldLogs();
          regionsToFlush = findRegionsToForceFlush();
        }
      } finally {
        closeBarrier.endOp();
      }
      return regionsToFlush;
    } finally {
      rollWriterLock.unlock();
    }
  }

 

1
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics