Hlog的相关处理流程:
在对hbase中数据进行更新操作put/delete/append/increment操作时,记录操作日志供日志重播的相关处理。
Hlog的写入通过regionserver实例生成时生成的FSLog为的实例。
通过LogRoller线程定期去检查并删除过期的日志文件。
定期检查的时间间隔通过hbase.regionserver.logroll.period进行配置,默认为3600000ms
如果检查时间还没有达到上面的间隔时间时,线程等待的wake(唤醒)时间,hbase.server.thread.wakefrequency,
默认为10*1000ms
FSLog实例内部有一个FSLog.LogSyncer线程实例,并启动此实例。此线程主要用来把log写入到hdfs中
此线程的执行间隔通过hbase.regionserver.optionallogflushinterval配置,默认为1000ms
如果此值配置为小于0的值,表示实时写入hlog日志
在执行更新操作时如果开启有日志记录,调用appendNoSync-->append写入日志,
每一个日志中记录的seqid默认第一次时为当前rs中所有region中最大的seqid加一(FSHLog.logSeqNum),
每append一次后,logSeqNum的值为加一。同时此值也是flsuh时hfile中的fileinfo中记录的最大的seqid值。
此方法把要记录的日志写入到FSLog.LogSyncer.pendingWrites队列中。等待LogSyncer的run方法去sync
注意:如果是对meta的log写入时,每一次写入都会执行sync操作,保证meta数据的不丢失。
publicvoidrun() {
try {
// awaiting with a timeout doesn't always
// throw exceptions on interrupt
while(!this.isInterrupted() && !closeLogSyncer.get()) {
try {
如果没有最新的数据要写入到HDFS,现在未flush的log的值小于或等于上一次提交的值,表示没有新记录
unflushedEntries在每append一次值会加一。
if (unflushedEntries.get() <= syncedTillHere) {
synchronized (closeLogSyncer) {
线程等待
closeLogSyncer.wait(this.optionalFlushInterval);
}
}
// Calling sync since we waited or had unflushed entries.
// Entries appended but not sync'd are taken care of here AKA
// deferred log flush
否则,执行log的写入HDFS操作。使用unflushedEntries的值当txid
sync();
} catch (IOException e) {
LOG.error("Error while syncing, requesting close of hlog ", e);
requestLogRoll();
Threads.sleep(this.optionalFlushInterval);
}
}
} catch (InterruptedException e) {
LOG.debug(getName() + " interrupted while waiting for sync requests");
} finally {
LOG.info(getName() + " exiting");
}
}
使用unflushedEntries的值当成txid
private void syncer() throws IOException {
syncer(this.unflushedEntries.get()); // sync all pending items
}
private void syncer(long txid) throws IOException {
// if the transaction that we are interested in is already
// synced, then return immediately.
检查日志是否有更新,如果没有,直接返回,减少检查过程
if (txid <= this.syncedTillHere) {
return;
}
WritertempWriter;
synchronized (this.updateLock) {
if (this.closed) return;
// Guaranteed non-null.
// Note that parallel sync can close tempWriter.
// The current method of dealing with this is to catch exceptions.
// See HBASE-4387, HBASE-5623, HBASE-7329.
tempWriter = this.writer;
}
try {
longdoneUpto;
longnow = EnvironmentEdgeManager.currentTimeMillis();
// First flush all the pending writes to HDFS. Then
// issue the sync to HDFS. If sync is successful, then update
// syncedTillHere to indicate that transactions till this
// number has been successfully synced.
IOException ioe = null;
List<Entry> pending = null;
synchronized (flushLock) {
if (txid <= this.syncedTillHere) {
return;
}
得到当前未flush的最新的txid
doneUpto = this.unflushedEntries.get();
得到需要写入到HDFS的日志记录列表
pending = logSyncer.getPendingWrites();
try {
添加到HLOG的写入OUTPUT中
logSyncer.hlogFlush(tempWriter, pending);
postAppend(pending);
} catch(IOException io) {
ioe = io;
LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
}
}
if (ioe != null && pending != null) {
synchronized (this.updateLock) {
synchronized (flushLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held
tempWriter = this.writer;
logSyncer.hlogFlush(tempWriter, pending);
postAppend(pending);
}
}
}
// another thread might have sync'ed avoid double-sync'ing
if (txid <= this.syncedTillHere) {
return;
}
try {
if (tempWriter != null) {
tempWriter.sync();
postSync();
}
} catch(IOException ex) {
synchronized (this.updateLock) {
// HBASE-4387, HBASE-5623, retry with updateLock held
// TODO: we don't actually need to do it for concurrent close - what is the point
// of syncing new unrelated writer? Keep behavior for now.
tempWriter = this.writer;
if (tempWriter != null) {
执行HDFS的文件写入sync,表示数据持久化
tempWriter.sync();
postSync();
}
}
}
更新当前syncedTillHere的值为unflushedEntries的值,主要用来检查下次是否需要更新记录
this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
// TODO: preserving the old behavior for now, but this check is strange. It's not
// protected by any locks here, so for all we know rolling locks might start
// as soon as we enter the "if". Is this best-effort optimization check?
if (!this.logRollRunning) {
checkLowReplication();
try {
如果当前HLOG文件的大小超过了指定的log文件大小,
通过hbase.regionserver.hlog.blocksize配置,默认hdfs的blocksize大小 *
hbase.regionserver.logroll.multiplier的值,默认0.95
curLogSize = tempWriter.getLength();
if (curLogSize > this.logrollsize) {
调用LogRoller.logRollRequested方法
requestLogRoll();
}
} catch (IOException x) {
LOG.debug("Log roll failed and will be retried. (This is not an error)");
}
}
} catch (IOException e) {
LOG.fatal("Could not sync. Requesting roll of hlog", e);
requestLogRoll();
throwe;
}
}
LogRoller.logRollRequested方法流程:
此方法主要用来叫醒LogRoller线程本身。
public void logRollRequested() {
synchronized (rollLog) {
rollLog.set(true);
rollLog.notifyAll();
}
}
LogRoller.run方法:
public void run() {
while (!server.isStopped()) {
longnow = System.currentTimeMillis();
booleanperiodic = false;
如果是Hlog写入的时候显示调用了 logRollRequested方法,下面的if 不进去
if (!rollLog.get()) {
检查上次更新时间是否大过更新间隔时间,如果没有,线程等待
通过hbase.regionserver.logroll.period进行配置,默认为3600000ms
periodic = (now - this.lastrolltime) > this.rollperiod;
如果是定期检查,同时上一次roll log的时间还不到间隔的时间,线程等待
线程等待超时通过hbase.server.thread.wakefrequency,默认为10*1000ms
if (!periodic) {
synchronized (rollLog) {
try {
rollLog.wait(this.threadWakeFrequency);
} catch (InterruptedException e) {
// Fall through
}
}
continue;
}
// Time for periodic roll
if (LOG.isDebugEnabled()) {
LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
}
} elseif (LOG.isDebugEnabled()) {
LOG.debug("HLog roll requested");
}
rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
try {
更新最后一次roll log的时间为当前时间
this.lastrolltime = now;
// This is array of actual region names.
调用FSHLog.rollWriter方法,检查文件个数是否达到指定的值,
如果是:把所有region已经flush的hlog文件移动到oldWALs目录下。
并取出未移动同时已经关闭writer的hlog file的最小的一个seqid的hlog path,
把所有未flush的region中seqid小于此值的region返回回来。
byte [][] regionsToFlush = getWAL().rollWriter(rollLog.get());
if (regionsToFlush != null) {
发起flush请求,强制所有的region进行flush操作。
for (byte [] r: regionsToFlush) scheduleFlush(r);
}
} catch (FailedLogCloseException e) {
server.abort("Failed log close in log roller", e);
} catch (java.net.ConnectException e) {
server.abort("Failed log close in log roller", e);
} catch (IOException ex) {
// Abort if we get here. We probably won't recover an IOE. HBASE-1132
server.abort("IOE in log roller",
RemoteExceptionHandler.checkIOException(ex));
} catch (Exception ex) {
LOG.error("Log rolling failed", ex);
server.abort("Log rolling failed", ex);
} finally {
try {
rollLog.set(false);
} finally {
rollLock.unlock();
}
}
}
LOG.info("LogRoller exiting.");
}
FSHLog.rollWriter方法流程分析:
把所有的region中已经flush完成的最小的seqid与old的hlog文件进行比对,
如果old hlog的seqid小于指定的flushid把这些hlog移动到oldWALs目录下去。
如果关闭的hlog文件大于hbase.regionserver.maxlogs配置的的值,默认为32个,
返回所有的已经关闭的hlog中所有region未进行flush的seqid,
publicbyte [][] rollWriter(booleanforce)
throws FailedLogCloseException, IOException {
synchronized (rollWriterLock) {
// Return if nothing to flush.
FSHLog中的numEntries的值每次append时,都会加一
if (!force && this.writer != null && this.numEntries.get() <= 0) {
returnnull;
}
byte [][] regionsToFlush = null;
if (closed) {
LOG.debug("HLog closed. Skipping rolling of writer");
returnnull;
}
try {
....................此处部分代码没有显示
filenum在第一次是值为-1,第一次进行入是在FSHLog实例生成时通过此方法得到Writer实例
longcurrentFilenum = this.filenum;
Path oldPath = null;
如果是第二次指定此方法,也就是LogRoller显示调用时
if (currentFilenum > 0) {
//computeFilename will take care of meta hlog filename
得到上一个hlog文件的路径,也就是准备关闭的文件路径
oldPath = computeFilename(currentFilenum);
}
生成一个新的HLOG文件名称,并创建文件路径
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename();
while (fs.exists(newPath)) {
this.filenum++;
newPath = computeFilename();
}
....................此处部分代码没有显示
生成一个新的Writer实例,默认实现是ProtobufLogWriter类,通过hbase.regionserver.hlog.writer.impl配置
FSHLog.WriternextWriter = this.createWriterInstance(fs, newPath, conf);
// Can we get at the dfsclientoutputstream?
FSDataOutputStream nextHdfsOut = null;
if (nextWriterinstanceof ProtobufLogWriter) {
得到writer对应的HDFS路径的OutputStream
nextHdfsOut = ((ProtobufLogWriter)nextWriter).getStream();
// perform the costly sync before we get the lock to roll writers.
try {
nextWriter.sync();
} catch (IOException e) {
// optimization failed, no need to abort here.
LOG.warn("pre-sync failed", e);
}
}
Path oldFile = null;
intoldNumEntries = 0;
synchronized (updateLock) {
// Clean up current writer.
oldNumEntries = this.numEntries.get();
关闭上一个HLOG在HDFS的文件流,并把writer设置为null
同时把关闭的hlog文件的的seqid与路径写入到outputfiles的map列表中
oldFile = cleanupCurrentWriter(currentFilenum);
把当前新的hlog文件的writer与outputstream设置给FSHLOG实例
this.writer = nextWriter;
this.hdfs_out = nextHdfsOut;
this.numEntries.set(0);
}
....................此处部分代码没有显示
....................此处部分代码没有显示
// Can we delete any of the old log files?
如果outputfiles列表中有值,检查log的个数是否达到指定的个数,
if (getNumRolledLogFiles() > 0) {
检查log目录下的文件个数是否达到指定的个数,hbase.regionserver.maxlogs配置的的值,默认为32个
如果达到指定的个数,找到所有region中已经flush完成的最小的一个seqid,
并从outputfiles列表中取出比此seqid小的hlog文件,把这些文件移动到oldWALs路径下。
CleanOldLogs();
取出outputfiles列表中的最小的一个seqid,
检查如果未flush的region的seqid是否小于此seqid,如果是返回这些region
regionsToFlush = getRegionsToForceFlush();
}
} finally {
this.logRollRunning = false;
closeBarrier.endOp();
}
returnregionsToFlush;
}
}
相关推荐
hlog,日志,调试
hlog是一个超级简单的 Hakka Logs CLI 客户端。 只需设置您的~/.hakkarc文件,然后运行hlog 来发布日志。 配置文件 hlog 在$HOME/.hakkarc .hakkarc 中查找其配置,格式如下: [logs] token = your_hakka_logs_...
海康威视摄像机参数远程配置程序,可以实现更改ip和叠加字幕等
多线程的日志纪录插件程序
日志 一个简单的c日志工具
HLog Burpsuite HTTP 插件,主要用于内网测试,可定制Content-Type和Response Content
4- 远程配置_hlog.dll_海康威视相机参数远程配置_远程配置海康_源码.zip
有的朋友要问为什么需要关联这两者,那笔者这里提出三个相关问题:1.Memstore中的数据flush到HDFS文件中后HLog对应的数据是不是就可以被删除了?不然HLog会无限增长!那问题来了,Memstore中被flush到HDFS的数据,如何...
4- 远程配置_hlog.dll_海康威视相机参数远程配置_远程配置海康_源码.rar.rar
简单来 说,observer服务器只提供⾮事物请求服务,通常在于不影响集群事务处理能⼒的前提下提升集群⾮事物处理的能⼒。 10.Spark Stage的数量有什么决定? 答案:Partition ⼆、填空题 ⼆、填空题 1.Spark的三种部署...
1. 更新设备网络SDK时,SDK开发包【库文件】里的HCNetSDK.dll、HCCore.dll、HCNetSDKCom文件夹、libssl-1_1.dll、libcrypto-1_1.dll、hlog.dll、hpr.dll、zlib1.dll、PlayCtrl.dll、SuperRender.dll、AudioRender....
Region中包含HLOG、Store。若一张表有几个列族,就有几个Store。Store中有多个MemStore及StoreFile。StoreFile是对HFile的封装。StoreFile真正存储在HDFS上。 所以写数据时先往HLog上写一份,再往MemStore上写一份。...
1. 更新设备网络SDK时,SDK开发包【库文件】里的HCNetSDK.dll、HCCore.dll、HCNetSDKCom文件夹、PlayCtrl.dll、SuperRender.dll、AudioRender.dll、ssleay32.dll、libeay32.dll、hlog.dll、hpr.dll、zlib1.dll、log4...
Hbase HLog源代码阅读笔记 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)...
和读相比,HBase写数据流程倒是显得很简单:数据先顺序写入HLog,再写入对应的缓存Memstore,当Memstore中数据大小达到一定阈值(128M)之后,系统会异步将Memstore中数据flush到HDFS形成小文件。 HBase数据写入通常...
近年来,已经从压缩感测(CS)理论的角度解决了有关稀疏连续信号恢复的各种应用,例如源定位,雷达成像,通信信道估计等。 但是,在考虑任何实际使用时,有两个主要缺陷... 实验结果证明了所提算法和相关分析的有效性。
答:无噪声信道最大数据传输率公式:最大数据传输率=2Hlog2V b/s。因此最大数据传输率决定于每次采样所产生的比特数,如果每次采样产生16bits,那么数据传输率可达128kbps;如果每次采样产生1024bits,那么可达8.2...
写入数据时数据会先写入Hlog中成功后在写入MemStore中。Memstore中的数据因为空间有限,所以需要定期flush到文件StoreFile中,每次flush都是生成新的StoreFile。HRegionServer在处理Flush请求时,将数据写成HFile...