HLog线程的启动入口:
HRegionServer启动线程
private void startServiceThreads() throws IOException {..... Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", uncaughtExceptionHandler);//logRoller守护进程,每一个小时生成一个hlog this.splitLogWorker = new SplitLogWorker(this.zooKeeper, sinkConf, this, this); splitLogWorker.start();//启动split work类 .....} HRegionServer实例化FSHlog protected void handleReportForDutyResponse(final RegionServerStartupResponse c) throws IOException {..... this.hlog = setupWALAndReplication();//RS的hlog .....}
FSHlog启动hlog的各个线程
asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter"); asyncWriter.start();//从队列写logedit到stream的进程 int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5); asyncSyncers = new AsyncSyncer[syncerNums]; for (int i = 0; i < asyncSyncers.length; ++i) { asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i); asyncSyncers[i].start();//将stream里的logedit写到hdfs上,并且同步image } asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier"); asyncNotifier.start();//专门的notify同步线程,(无需同步线程去notify其他的)?? coprocessorHost = new WALCoprocessorHost(this, conf); this.metrics = new MetricsWAL();
HLog的wal过程:
HBase put源码分析中 http://blackproof.iteye.com/blog/2197710
在第7步中,将hlog写入logedit队列中
FSHLog的AsyncWriter线程将队列写入hdfs中
1.等待新write进入队列
2.等待队列同步,置换,清空
3.logedit写入hdfs(outputstream中)
4.更新最新的write num
public void run() { try { while (!this.isInterrupted()) { // 1. wait until there is new writes in local buffer synchronized (this.writeLock) {//写同步 while (this.pendingTxid <= this.lastWrittenTxid) {//等待新writenum,队列中的writenum大于当前writenum this.writeLock.wait();//否则等待新write edit } } // 2. get all buffered writes and update 'real' pendingTxid // since maybe newer writes enter buffer as AsyncWriter wakes // up and holds the lock // NOTE! can't hold 'updateLock' here since rollWriter will pend // on 'sync()' with 'updateLock', but 'sync()' will wait for // AsyncWriter/AsyncSyncer/AsyncNotifier series. without updateLock // can leads to pendWrites more than pendingTxid, but not problem List<Entry> pendWrites = null; synchronized (pendingWritesLock) {//等待队列同步,置换,清空 this.txidToWrite = unflushedEntries.get(); pendWrites = pendingWrites; pendingWrites = new LinkedList<Entry>(); } // 3. write all buffered writes to HDFS(append, without sync) try { for (Entry e : pendWrites) { writer.append(e);//hlog edit写入hdfs中 } } catch(IOException e) { LOG.error("Error while AsyncWriter write, request close of hlog ", e); requestLogRoll(); asyncIOE = e; failedTxid.set(this.txidToWrite); } // 4. update 'lastWrittenTxid' and notify AsyncSyncer to do 'sync' this.lastWrittenTxid = this.txidToWrite;//更新最新的write num boolean hasIdleSyncer = false; for (int i = 0; i < asyncSyncers.length; ++i) { if (!asyncSyncers[i].isSyncing()) { hasIdleSyncer = true; asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid); break; } } if (!hasIdleSyncer) { int idx = (int)this.lastWrittenTxid % asyncSyncers.length; asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid); } } } catch (InterruptedException e) { LOG.debug(getName() + " interrupted while waiting for " + "newer writes added to local buffer"); } catch (Exception e) { LOG.error("UNEXPECTED", e); } finally { LOG.info(getName() + " exiting"); } } }
FSHlog的AsyncSyncer线程,会将outputstream的数据flush到磁盘,并同步hdfs的image
public void run() { try { while (!this.isInterrupted()) { // 1. wait until AsyncWriter has written data to HDFS and // called setWrittenTxid to wake up us synchronized (this.syncLock) { while (this.writtenTxid <= this.lastSyncedTxid) { this.syncLock.wait(); } this.txidToSync = this.writtenTxid; } // if this syncer's writes have been synced by other syncer: // 1. just set lastSyncedTxid // 2. don't do real sync, don't notify AsyncNotifier, don't logroll check // regardless of whether the writer is null or not if (this.txidToSync <= syncedTillHere.get()) { this.lastSyncedTxid = this.txidToSync; continue; } // 2. do 'sync' to HDFS to provide durability long now = EnvironmentEdgeManager.currentTimeMillis(); try { if (writer == null) { // the only possible case where writer == null is as below: // 1. t1: AsyncWriter append writes to hdfs, // envokes AsyncSyncer 1 with writtenTxid==100 // 2. t2: AsyncWriter append writes to hdfs, // envokes AsyncSyncer 2 with writtenTxid==200 // 3. t3: rollWriter starts, it grabs the updateLock which // prevents further writes entering pendingWrites and // wait for all items(200) in pendingWrites to append/sync // to hdfs // 4. t4: AsyncSyncer 2 finishes, now syncedTillHere==200 // 5. t5: rollWriter close writer, set writer=null... // 6. t6: AsyncSyncer 1 starts to use writer to do sync... before // rollWriter set writer to the newly created Writer // // Now writer == null and txidToSync > syncedTillHere here: // we need fail all the writes with txid <= txidToSync to avoid // 'data loss' where user get successful write response but can't // read the writes! LOG.fatal("should never happen: has unsynced writes but writer is null!"); asyncIOE = new IOException("has unsynced writes but writer is null!"); failedTxid.set(this.txidToSync); } else { this.isSyncing = true; writer.sync();//同步方法//hlog的地方需要增加retry,延长文件租期过期时间,hlog是否重写 this.isSyncing = false; } postSync(); } catch (IOException e) { LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e); requestLogRoll(); asyncIOE = e; failedTxid.set(this.txidToSync); this.isSyncing = false; } metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); // 3. wake up AsyncNotifier to notify(wake-up) all pending 'put' // handler threads on 'sync()' this.lastSyncedTxid = this.txidToSync; asyncNotifier.setFlushedTxid(this.lastSyncedTxid); // 4. check and do logRoll if needed boolean logRollNeeded = false; if (rollWriterLock.tryLock()) { try { logRollNeeded = checkLowReplication(); } finally { rollWriterLock.unlock(); } try { if (logRollNeeded || writer != null && writer.getLength() > logrollsize) { requestLogRoll(); } } catch (IOException e) { LOG.warn("writer.getLength() failed,this failure won't block here"); } } } } catch (InterruptedException e) { LOG.debug(getName() + " interrupted while waiting for " + "notification from AsyncWriter thread"); } catch (Exception e) { LOG.error("UNEXPECTED", e); } finally { LOG.info(getName() + " exiting"); } } }
LogRoller定时调用FSHlog的rollwrite方法,生成hlog,将stream流中的数据输出到hdfs上
@Override public byte [][] rollWriter(boolean force) throws FailedLogCloseException, IOException { rollWriterLock.lock(); try { // Return if nothing to flush. if (!force && this.writer != null && this.numEntries.get() <= 0) { return null; } byte [][] regionsToFlush = null; if (closed) { LOG.debug("HLog closed. Skipping rolling of writer"); return null; } try { if (!closeBarrier.beginOp()) { LOG.debug("HLog closing. Skipping rolling of writer"); return regionsToFlush; } // Do all the preparation outside of the updateLock to block // as less as possible the incoming writes long currentFilenum = this.filenum; Path oldPath = null; if (currentFilenum > 0) { //computeFilename will take care of meta hlog filename oldPath = computeFilename(currentFilenum); } this.filenum = System.currentTimeMillis(); Path newPath = computeFilename(); while (fs.exists(newPath)) { this.filenum++; newPath = computeFilename(); } // Tell our listeners that a new log is about to be created if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { i.preLogRoll(oldPath, newPath); } } FSHLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); // Can we get at the dfsclient outputstream? FSDataOutputStream nextHdfsOut = null; if (nextWriter instanceof ProtobufLogWriter) { nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream(); // perform the costly sync before we get the lock to roll writers. try { nextWriter.sync();//protobuf flush到hdfs上,并且seque write写hdfs的同步,让block和image同步 } catch (IOException e) { // optimization failed, no need to abort here. LOG.warn("pre-sync failed", e); } } Path oldFile = null; int oldNumEntries = 0; synchronized (updateLock) { // Clean up current writer. oldNumEntries = this.numEntries.get(); oldFile = cleanupCurrentWriter(currentFilenum); this.writer = nextWriter; this.hdfs_out = nextHdfsOut; this.numEntries.set(0); if (oldFile != null) { this.hlogSequenceNums.put(oldFile, this.latestSequenceNums); this.latestSequenceNums = new HashMap<byte[], Long>(); } } if (oldFile == null) LOG.info("New WAL " + FSUtils.getPath(newPath)); else { long oldFileLen = this.fs.getFileStatus(oldFile).getLen(); this.totalLogSize.addAndGet(oldFileLen); LOG.info("Rolled WAL " + FSUtils.getPath(oldFile) + " with entries=" + oldNumEntries + ", filesize=" + StringUtils.humanReadableInt(oldFileLen) + "; new WAL " + FSUtils.getPath(newPath)); } // Tell our listeners that a new log was created if (!this.listeners.isEmpty()) { for (WALActionsListener i : this.listeners) { i.postLogRoll(oldPath, newPath); } } // Can we delete any of the old log files? if (getNumRolledLogFiles() > 0) { cleanOldLogs(); regionsToFlush = findRegionsToForceFlush(); } } finally { closeBarrier.endOp(); } return regionsToFlush; } finally { rollWriterLock.unlock(); } }
相关推荐
Hbase1.3.1源码
hbase-0.98.1-src.tar.gz hbase 0.98源码包
HBase实战 hbase in action 源码
hbase 1.2.0源码,学习大数据nosql数据库时小白们可以用得到
hbase权威指南源码
HBase源码分析,详细的源码分析,专业的知识分析,绝对难得
Hbase HLog源代码阅读笔记 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)...
hadoop hbase源码包 稳定版hbase-0.94.4.tar.gz
Hbase权威指南 随书源代码 源码包 绝对完整版 maven工程,带pom文件,可以直接作为一个完整工程导入eclipse等ide。
【No0057】HBase源码解析与开发实战
hbase源码,适合研究分析底层实现。对hbase的原理的理解很有好处
主要针对那些架构师及开发人员而设计,希望他们能更好地理解大数据应用程序的部署。在这之前,你应该具备基本的Hadoop知识,包括所需组件的设置以及成功安装过Hadoop集群,我们不会在Hadoop的配置或NodeManager功能...
hbase是一个开源的数据库,主要用于大数据集的分布式存储,本身是hadoop的一个子项目,区分于传统的数据库,它是按列存储的
HBase数据库连接例子,可以建表,删除表,插入,查询单条,查询全部
HBase源码(hbase-2.4.9-src.tar.gz)是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File ...
hadoop2.x、Hive、HBase源码解析+企业级应用视频,本人花钱买的视频,全部的还没有看,如果感觉老师讲的不好,请不要碰我,谢谢
java操作Hbase之从Hbase中读取数据写入hdfs中源码,附带全部所需jar包,欢迎下载学习。
HBase源码分析与开发实战视频技术讲解高阶视频教程以及课件,内部讲解资料 内容非常详细 值得想要提高薪水的人去学习了解