`
hongs_yang
  • 浏览: 59750 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

日志重播分析

阅读更多

日志重播分析

 

Hbase的日志重播分为启动时的日志重播与rs下线时的日志重播操作。

 

通过hbase.master.distributed.log.replay来控制日志的split是在regionreopen前执行还是reopen后执行

 

如果是true表示在reopen后执行,否则相反

 



 

Rs下线时的日志重播分析

 

master监听下线

 

master通过RegionServerTracker监听rszk上的节点,当节点被删除时(rs下线)。触发nodeDeleted

 

publicvoidnodeDeleted(String path) {

 

if (path.startsWith(watcher.rsZNode)) {

 

解析出zkrs路径下的rs名称,并解析成ServerName实例。

 

String serverName = ZKUtil.getNodeName(path);

 

....................此处部分日志打印信息没有显示

 

ServerName sn = ServerName.parseServerName(serverName);

 

如果下线的rsServerManageronlineServers中已经不包含,不做处理,

 

if (!serverManager.isServerOnline(sn)) {

 

....................此处部分日志打印信息没有显示

 

return;

 

}

 

RegionServerTracker.onlineServers列表中移出此rs

 

remove(sn);

 

执行ServerManager.expireServer进行下线处理

 

this.serverManager.expireServer(sn);

 

}

 

}

 

执行ServerManager.expireServer进行下线处理

 

publicsynchronizedvoidexpireServer(final ServerName serverName) {

 

....................此处部分代码没有显示

 

 

 

rs添加到deadservers列表中。

 

this.deadservers.add(serverName);

 

onlineServers列表中移出此rs

 

this.onlineServers.remove(serverName);

 

synchronized (onlineServers) {

 

onlineServers.notifyAll();

 

}

 

rsAdmins(rs进行RPC调用的接口实现类)容器中移出此rs

 

this.rsAdmins.remove(serverName);

 

....................此处部分代码没有显示

 

 

 

检查此rs中是否包含metaregion,如果是,执行MetaServerShutdownHandler.否则执行ServerShutdownHandler

 

booleancarryingMeta = services.getAssignmentManager().isCarryingMeta(serverName);

 

if (carryingMeta) {

 

this.services.getExecutorService().submit(newMetaServerShutdownHandler(this.master,

 

this.services, this.deadservers, serverName));

 

} else {

 

this.services.getExecutorService().submit(newServerShutdownHandler(this.master,

 

this.services, this.deadservers, serverName, true));

 

}

 

....................此处部分日志打印没有显示

 

}

 



 

MetaServerShutdownHandler.process方法处理流程:

 

 

 

publicvoidprocess() throws IOException {

 

booleangotException = true;

 

try {

 

AssignmentManager am = this.services.getAssignmentManager();

 

try {

 

检查是否需要做hlogsplit,生成此实例时,shouldSplitHlog的值为true

 

if (this.shouldSplitHlog) {

 

LOG.info("Splitting hbase:meta logs for " + serverName);

 

 

 

检查hbase.master.distributed.log.replay配置是否设置为true,默认值为false

 

if (this.distributedLogReplay) {

 

 

 

先对meta region执行prepareLogReplay处理。

 

MasterFileSystem.prepareLogReplay分析

 

Set<HRegionInfo> regions = newHashSet<HRegionInfo>();

 

regions.add(HRegionInfo.FIRST_META_REGIONINFO);

 

this.services.getMasterFileSystem().prepareLogReplay(serverName, regions);

 

} else {

 

 

 

否则在没有配置distributedLogReplay时,执行splitMetaLogrs的日志进行split,等待split完成

 

MasterFileSystem.splitMetaLog分析

 

this.services.getMasterFileSystem().splitMetaLog(serverName);

 

}

 

 

 

AssignmentManager.RegionStates.lastAssignments中移出meta region的分配。

 

am.getRegionStates().logSplit(HRegionInfo.FIRST_META_REGIONINFO);

 

}

 

} catch (IOException ioe) {

 

....................此处部分代码没有显示

 

}

 

 

 

// Assign meta if we were carrying it.

 

// Check again: region may be assigned to other where because of RIT

 

// timeout

 

检查此server上还没有完成region open操作(regionInTransition还在)如果包含有meta region,

 

if (am.isCarryingMeta(serverName)) {

 

LOG.info("Server " + serverName + " was carrying META. Trying to assign.");

 

更新RegionStates中此region的状态为offline

 

regionsInTransition中移出此region,

 

serverHoldings中移出此servermeta region的分配信息

 

regionAssignments中移出此meta region的分配信息

 

regionsToReopen中移出此meta region

 

regionPlans中移出此meta region

 

am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO);

 

等待meta region的分配,

 

通过hbase.catalog.verification.retries配置meta region分配的重试次数,默认10

 

通过hbase.catalog.verification.timeout配置每次分配重试的间隔时间,默认1000ms

 

verifyAndAssignMetaWithRetries();

 

 

 

如果metazk中的地址过期数据被删除,重新执行meta的分配,并等待meta分配完成

 

} elseif (!this.services.getCatalogTracker().isMetaLocationAvailable()) {

 

// the meta location as per master is null. This could happen in case when meta assignment

 

// in previous run failed, while meta znode has been updated to null. We should try to

 

// assign the meta again.

 

如果meta regionzk中的地址没有注册的数据,执行meta region的分配,并等待分配结束

 

通过hbase.catalog.verification.retries配置meta region分配的重试次数,默认10

 

通过hbase.catalog.verification.timeout配置每次分配重试的间隔时间,默认1000ms

 

verifyAndAssignMetaWithRetries();

 

} else {

 

LOG.info("META has been assigned to otherwhere, skip assigning.");

 

}

 

 

 

try {

 

如果distributedLogReplay配置为true,等待region replayregionintranstion事务完成

 

也就是RegionStates.regionsInTransition中不在包含此meta regionregionintransition

 

region replay的等待超时通过hbase.master.log.replay.wait.region.timeout配置,默认为15000ms

 

如果在超时的时间内没有完成regionintransition时,此方法返回false

 

if (this.shouldSplitHlog && this.distributedLogReplay) {

 

if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,

 

regionAssignmentWaitTimeout)) {

 

....................此处部分代码没有显示

 

}

 

 

 

执行log split,并等待split完成,如果是distributedLogReplay时,

 

此时region assign已经完成,开始splt log

 

MasterFileSystem.splitMetaLog分析

 

this.services.getMasterFileSystem().splitMetaLog(serverName);

 

}

 

} catch (Exception ex) {

 

....................此处部分代码没有显示

 

}

 

 

 

gotException = false;

 

} finally {

 

if (gotException){

 

// If we had an exception, this.deadServers.finish will be skipped in super.process()

 

this.deadServers.finish(serverName);

 

}

 

}

 

执行此rs中非meta region的日志重播与region assign

 

ServerShutdownHandler.process方法处理流程

 

super.process();

 

}

 

 

 

 

 

MasterFileSystem.prepareLogReplay分析

 

 

 

此方法在hbase.master.distributed.log.replay配置为true,分执行此操作

 

 

 

public void prepareLogReplay(ServerName serverName, Set<HRegionInfo> regions) throws IOException {

 

一些必要的检查,检查是否设置有日志的分布式replay模式,要进行分布式日志的region列表是否为空

 

if (!this.distributedLogReplay) {

 

return;

 

}

 

// mark regions in recovering state

 

if (regions == null || regions.isEmpty()) {

 

return;

 

}

 

try {

 

通过SplitLogManager.markRegionsRecoveringInZK/hbase/recovering-regions中添加region路径

 

this.splitLogManager.markRegionsRecoveringInZK(serverName, regions);

 

} catch (KeeperExceptione) {

 

thrownewIOException(e);

 

}

 

}

 

 

 

执行distributedLogReplay

 

void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)

 

throws KeeperException {

 

一些必要的检查,检查是否设置有日志的分布式replay模式,要进行分布式日志的region列表是否为空

 

if (userRegions == null || !this.distributedLogReplay) {

 

return;

 

}

 

 

 

try {

 

this.recoveringRegionLock.lock();

 

// mark that we're creating recovering znodes

 

更新SplitLogManager中最后一次recoveringnode的时间为当前时间

 

this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();

 

 

 

开始迭代执行要replay的每一个region,如果是meta region,此时只有一个迭代

 

for (HRegionInfo region : userRegions) {

 

String regionEncodeName = region.getEncodedName();

 

得到hbase.splitlog.zk.retries配置的在zk中创建子路径的最大重试次数,默认为3

 

longretries = this.zkretries;

 

 

 

do {

 

zookeeper.znode.recovering.regions配置的路径下生成一个通过region name为名称的子路径

 

默认为/hbase/recovering-regions/region-name

 

String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);

 

longlastRecordedFlushedSequenceId = -1;

 

try {

 

得到region中的最大的seqid,seqidServerManager.flushedSequenceIdByRegion中存储,

 

记录着regionflush的最大的seqid

 

longlastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(

 

regionEncodeName.getBytes());

 

....................此处部分代码没有显示

 

 

 

 

 

检查在zk中的recovering-regions中是否已经包含此region,

 

byte[] data = ZKUtil.getData(this.watcher, nodePath);

 

if (data == null) {

 

如果recovering-regions中还不包含此regionreplay信息,

 

region的最后一次flushseqid写入到replay路径下

 

ZKUtil.createSetData(this.watcher, nodePath,

 

ZKUtil.positionToByteArray(lastSequenceId));

 

} else {

 

如果recovering-regions中已经包含有此regionreplay信息,

 

得到上一次regionrecoveringseqid,

 

如果上一次的seqid小于当前region的最后一次flushseqid,更新zk中此regionreplayseqid为最新的seqid

 

否则不做修改(上一次flushseqid比记录的flushseqid更加的新)

 

lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);

 

if (lastRecordedFlushedSequenceId < lastSequenceId) {

 

// update last flushed sequence id in the region level

 

ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));

 

}

 

}

 

// go one level deeper with server name

 

recovering-regions/region-name下生成当前下线的server子路径

 

nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());

 

如果当前regionflushseqid小于上一次的recoveringreplayseqid,

 

(上一次flushseqid比记录的flushseqid更加的新),更新regionlast flush seqid为上一次的seqid

 

if (lastSequenceId <= lastRecordedFlushedSequenceId) {

 

// the newly assigned RS failed even before any flush to the region

 

lastSequenceId = lastRecordedFlushedSequenceId;

 

}

 

 

 

/hbase/recovering-regions/region-name/server-name路径下记录最后一次flushseqid.

 

ZKUtil.createSetData(this.watcher, nodePath,

 

ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));

 

 

 

....................此处部分代码没有显示

 

break;

 

} catch (KeeperExceptione) {

 

....................此处部分代码没有显示

 

}

 

} while ((--retries) > 0 && (!this.stopper.isStopped()));

 

}

 

} finally {

 

this.recoveringRegionLock.unlock();

 

}

 

}

 

 

 

 

 

MasterFileSystem.splitMetaLog分析

 

splitMetaLog传入需要执行split操作的server(下线的server),方法去调用splitLog方法,

 

由于此时只针对meta regionhlog时行split,因此在splitLog方法时传入META_FILTER来区分是否meta split

 

 

 

public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {

 

longsplitTime = 0, splitLogSize = 0;

 

/hbase/WALs的日志路径下得到通过下线的servers命名的所有日志路径,老版本中.logs目录

 

如:/hbase/WALs/server-name1

 

并把下线的server路径名称更新为/hbase/WALs/server-name1-splitting路径

 

List<Path> logDirs = getLogDirs(serverNames);

 

把下线的所有rs server添加到SplitLogManager.deadWorkers中,

 

等待SplitLogManager.TimeoutMonitor线程定期去处理,

 

SplitLogManager.TimeoutMonitor线程分析

 

splitLogManager.handleDeadWorkers(serverNames);

 

splitTime = EnvironmentEdgeManager.currentTimeMillis();

 

执行hlog split操作,SplitLogManager.splitLogDistributed分析

 

splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);

 

splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;

 

 

 

....................此处部分代码没有显示,监控信息

 

}

 

 

 

 

 

MasterFileSystem.splitLog分析

 

splitLog传入需要执行split操作的server(下线的server),方法去调用splitLog方法,

 

由于此时只针对非meta regionhlog时行split,

 

因此在splitLog方法时传入NON_META_FILTER来区分是否非meta split

 

 

 

public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {

 

longsplitTime = 0, splitLogSize = 0;

 

/hbase/WALs的日志路径下得到通过下线的servers命名的所有日志路径,老版本中.logs目录

 

如:/hbase/WALs/server-name1

 

并把下线的server路径名称更新为/hbase/WALs/server-name1-splitting路径

 

List<Path> logDirs = getLogDirs(serverNames);

 

把下线的所有rs server添加到SplitLogManager.deadWorkers中,

 

等待SplitLogManager.TimeoutMonitor线程定期去处理,

 

SplitLogManager.TimeoutMonitor线程分析

 

splitLogManager.handleDeadWorkers(serverNames);

 

splitTime = EnvironmentEdgeManager.currentTimeMillis();

 

执行hlog split操作,SplitLogManager.splitLogDistributed分析

 

splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);

 

splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;

 

 

 

....................此处部分代码没有显示,监控信息

 

}

 

 

 

 

 

 

 

SplitLogManager.splitLogDistributed分析

 

 

 

此方法主要用于对server hlog根据region进行split操作,生成split task,并等待split完成。

 

public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,

 

PathFilterfilter) throws IOException {

 

 

 

....................此处部分代码没有显示,监控信息,日志信息

 

 

 

得到/hbase/WALs/server-name-splitting下的所有日志文件,

 

如果传入的filterMETA_FILTER,那么只获取.metahlog文件,否则获取全部hlog文件

 

 

 

FileStatus[] logfiles = getFileList(logDirs, filter);

 

 

 

....................此处部分代码没有显示,监控信息,日志信息

 

 

 

longtotalSize = 0;

 

TaskBatch batch = newTaskBatch();

 

Boolean isMetaRecovery = (filter == null) ? null : false;

 

for (FileStatus lf : logfiles) {

 

 

 

....................此处部分代码没有显示,监控信息,日志信息

 

 

 

totalSize += lf.getLen();

 

得到日志文件路径去掉/hbase的部分名称,如/WALs/server-name-splitting/aaa.meta

 

 

 

String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);

 

 

 

1.hlog的全路径去掉/hbase部分通过URLEncoder.encode进行转码(/会被转换成%2F)

 

2.hlog的全路径添加到zookeeper.znode.splitlog配置的路径下默认为splitWAL,作为其子路径存在。

 

3.SplitLogManager.tasks中添加一个Task实例,key2zk生成的pathvalue为生成的Task实例,

 

设置TaskstatusIN_PROGRESS并把taskbatch实例为上面生成的TaskBatch实例(batch),

 

batch中的installed加一,表示增加一个批量执行的Task

 

4.根据hbase.splitlog.zk.retries配置的zk重试次数,默认为3

 

生成SplitLogTask实例,设置其originServermasterServerName

 

设置其stateZooKeeperProtos.SplitLogTask.State.UNASSIGNED

 

zk中注册此地址,并把SplitLogTask写入到此zk的路径下。

 

5.regionserver中监听zksplitWAL的路径,

 

regionserver中处理splitlog

 

6.master中通过SplitLogManager.nodeDataChanged来监听rsSplitLogTask的状态修改。

 

SplitLogManager.nodeDataChanged分析

 

 

 

if (!enqueueSplitTask(pathToLog, batch)) {

 

thrownewIOException("duplicate log split scheduled for " + lf.getPath());

 

}

 

}

 

 

 

等待split操作完成,

 

a.batch中所有的Task.statusTerminationStatus.IN_PROGRESStask个数为0

 

b.splitWAL路径下的所有子路径的个数为0

 

c.每次迭代都需要等待batchnodeDataChanged或者其它地方对batch进行notify

 

 

 

waitForSplittingCompletion(batch, status);

 

// remove recovering regions from ZK

 

if (filter == MasterFileSystem.META_FILTER/* reference comparison */) {

 

....................此处部分代码没有显示,日志信息

 

isMetaRecovery = true;

 

}

 

 

 

删除zkrecovering-regions下对应的region路径下传入的servers子路径

 

(如果region下所有的servers子路径不存在,直接删除region子路径)

 

如果isMetaRecovery等于true表示只删除meta regionrecovering路径

 

this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);

 

 

 

如果有日志split出现错误,直接throw IOException

 

if (batch.done != batch.installed) {

 

batch.isDead = true;

 

....................此处部分代码没有显示,日志信息

 

thrownewIOException(msg);

 

}

 

for(Path logDir: logDirs){

 

status.setStatus("Cleaning up log directory...");

 

try {

 

删除WALs目录下对应的server-name-splitting的日志文件。

 

if (fs.exists(logDir) && !fs.delete(logDir, false)) {

 

LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);

 

}

 

} catch (IOException ioe) {

 

....................此处部分代码没有显示,日志信息

 

}

 

SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();

 

}

 

....................此处部分代码没有显示,监控信息,日志信息

 

returntotalSize;

 

}

 

 

 

 

 

ServerShutdownHandler.process方法处理流程:

 

ServerShutdownHandler的处理流程主要对非meta region的下线处理,region的重新分配,日志split

 

 

 

 

 

public void process() throws IOException {

 

booleanhasLogReplayWork = false;

 

final ServerName serverName = this.serverName;

 

try {

 

....................此处部分代码没有显示

 

 

 

AssignmentManager am = services.getAssignmentManager();

 

if (isCarryingMeta() // hbase:meta

 

|| !am.isFailoverCleanupDone()) {

 

this.services.getServerManager().processDeadServer(serverName, this.shouldSplitHlog);

 

return;

 

}

 

 

 

....................此处部分代码没有显示

 

 

 

NavigableMap<HRegionInfo, Result> hris = null;

 

while (!this.server.isStopped()) {

 

try {

 

meta表中进行scan,扫描出当前下线的regionserver中所有的user region 列表。

 

this.server.getCatalogTracker().waitForMeta();

 

hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(),

 

this.serverName);

 

break;

 

} catch (InterruptedException e) {

 

Thread.currentThread().interrupt();

 

thrownewIOException("Interrupted", e)

 

} catch (IOException ioe) {

 

....................此处部分代码没有显示

 

}

 

}

 

if (this.server.isStopped()) {

 

thrownewIOException("Server is stopped");

 

}

 

 

 

try {

 

shouldSplitHlogServerShutdownHandler实例生成时默认为true

 

if (this.shouldSplitHlog) {

 

LOG.info("Splitting logs for " + serverName + " before assignment.");

 

 

 

检查hbase.master.distributed.log.replay配置是否设置为true,默认值为false

 

 

 

if (this.distributedLogReplay) {

 

LOG.info("Mark regions in recovery before assignment.");

 

Set<ServerName> serverNames = newHashSet<ServerName>();

 

serverNames.add(serverName);

 

如果设置有distributedLogReplay,执行log split的预处理,

 

MasterFileSystem.prepareLogReplay分析

 

this.services.getMasterFileSystem().prepareLogReplay(serverNames);

 

} else {

 

如果没有设置distributedLogReplay,执行log split,并等待split完成

 

MasterFileSystem.splitLog分析

 

this.services.getMasterFileSystem().splitLog(serverName);

 

}

 

RegionStates.lastAssignments中移出此server对应的所有region分配信息

 

RegionStates.processedServers中移出超出时间限制的

 

通过hbase.master.maximum.logsplit.keeptime配置的时间的server,默认为7200000ms(2 hour)

 

am.getRegionStates().logSplit(serverName);

 

} else {

 

LOG.info("Skipping log splitting for " + serverName);

 

}

 

} catch (IOException ioe) {

 

resubmit(serverName, ioe);

 

}

 

....................此处部分代码没有显示

 

 

 

1.AssignmentManager.regionPlans中移出包含此serverregion plan,

 

2.regionStates.serverHoldings中得到此server所有的region assign,

 

如果region的状态为online/splitting/merging,region的状态设置为offline

 

并从regionsInTransitionregionAssignments移出这些个region.

 

如果region的状态为splitting/merging时,删除regionzkregion-in-transition的注册信息

 

3.regionsInTransition中找到所有此servertransition

 

状态为PENDING_OPEN/OPENING/FAILED_OPEN/FAILED_CLOSE/OFFLINEregion,并返回

 

4.根据3返回的在regionsInTransition中的region,删除regionzkregion-in-transition的注册信息

 

zk中的路径通过zookeeper.znode.unassigned进行配置。

 

5.注意:3中返回的region是当前下线的serverassignments中不包含的,

 

同时在regionInTransition又包含,也就是这些个region准备在当前下线的server上启动,

 

但此时这个server挂掉了。此方法的主要作用是删除掉当前下线server中正在做transitionregionzk信息,

 

把这些个region的状态设置为offline,等待下面的代码逻辑重新执行分配.

 

 

 

List<HRegionInfo> regionsInTransition = am.processServerShutdown(serverName);

 

 

 

....................此处部分代码没有显示

 

 

 

把上面得到的正在做transitionregions添加到待分配的region列表中

 

 

 

List<HRegionInfo> toAssignRegions = newArrayList<HRegionInfo>();

 

toAssignRegions.addAll(regionsInTransition);

 

 

 

// Iterate regions that were on this server and assign them

 

if (hris != null) {

 

RegionStates regionStates = am.getRegionStates();

 

 

 

迭代从meta表中得到的所有当前下线serveruser region,

 

 

 

for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {

 

HRegionInfo hri = e.getKey();

 

 

 

如果此regiontransition中已经包含,重新迭代下一次

 

 

 

if (regionsInTransition.contains(hri)) {

 

continue;

 

}

 

String encodedName = hri.getEncodedName();

 

Locklock = am.acquireRegionLock(encodedName);

 

try {

 

RegionState rit = regionStates.getRegionTransitionState(hri);

 

 

 

检查region所在的table是否被删除/是否是disabletable,如果不是执行如下流程

 

 

 

if (processDeadRegion(hri, e.getValue(), am, server.getCatalogTracker())) {

 

ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);

 

if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {

 

....................此处部分代码没有显示

 

continue;

 

}

 

if (rit != null) {

 

if (rit.getServerName() != null && !rit.isOnServer(serverName)) {

 

....................此处部分代码没有显示

 

continue;

 

}

 

try{

 

....................此处部分代码没有显示

 

 

 

删除regionzkregion-in-transition的注册信息,zk中的路径通过zookeeper.znode.unassigned进行配置,

 

并更新region的状态为offline

 

 

 

ZKAssign.deleteNodeFailSilent(services.getZooKeeper(), hri);

 

regionStates.updateRegionState(hri, State.OFFLINE);

 

} catch (KeeperExceptionke) {

 

this.server.abort("Unexpected ZK exception deleting unassigned node " + hri, ke);

 

return;

 

}

 

} elseif (regionStates.isRegionInState(

 

hri, State.SPLITTING_NEW, State.MERGING_NEW)) {

 

如果region的状态是准备split或者准备merge时,重新设置region状态为offline

 

regionStates.regionOffline(hri);

 

}

 

 

 

添加此region到待分配的region列表中

 

 

 

toAssignRegions.add(hri);

 

 

 

} elseif (rit != null) {

 

region所在的table现在是disabletable,设置region状态为offline,

 

a.如果regionzk中的eventTypeM_ZK_REGION_CLOSING/RS_ZK_REGION_CLOSED,zk中删除此region的路径

 

zkregion-in-transition的注册信息,zk中的路径通过zookeeper.znode.unassigned进行配置

 

b.如果regionzk中的eventtypeRS_ZK_REGION_CLOSED/M_ZK_REGION_OFFLINE,从zk中删除此region的路径

 

if (rit.isPendingCloseOrClosing()

 

&& am.getZKTable().isDisablingOrDisabledTable(hri.getTable())) {

 

....................此处部分代码没有显示

 

regionStates.updateRegionState(hri, State.OFFLINE);

 

am.deleteClosingOrClosedNode(hri, rit.getServerName());

 

am.offlineDisabledRegion(hri);

 

} else {

 

LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition "

 

+ rit + " not to be assigned by SSH of server " + serverName);

 

}

 

}

 

} finally {

 

lock.unlock();

 

}

 

}

 

}

 

 

 

try {

 

执行region的批量assign操作

 

am.assign(toAssignRegions);

 

} catch (InterruptedException ie) {

 

LOG.error("Caught " + ie + " during round-robin assignment");

 

thrownewIOException(ie);

 

}

 

 

 

if (this.shouldSplitHlog && this.distributedLogReplay) {

 

// wait for region assignment completes

 

for (HRegionInfo hri : toAssignRegions) {

 

try {

 

此处只能是distributedLogReplay设置为true时,因为这时rs中不做log replay,

 

distributedLogReplay设置为true,region下不存在recovered.edits路径,因此openregionreplay不会执行,

 

所以此时等待regionopen完成是可行的。等待每一个regionassign完成,

 

也就是assign时的RegionInTransitionRegionStates.regionsInTransition的处理完成(列表中不包含此region)

 

或者说等待分配的超时时间hbase.master.log.replay.wait.region.timeout过期,默认15000ms

 

assign时会在zk中的region-in-transition注册一个region地址,等待rs处理,

 

此方法会不停止的迭代,直接timeout或者regionsInTransition中移出此regiontransition,

 

每次迭代会让regionstates处于wait状态,等待AssignmentManager.nodeDataChanged/nodeDeleted对其notify

 

完成后通过AssignmentManager中的相关nodeDataChanged处理事件方法对regionsInTransition更新,

 

通过nodeDeleted处理事件对regionsInTransition移出

 

if (!am.waitOnRegionToClearRegionsInTransition(hri, regionAssignmentWaitTimeout)) {

 

// Wait here is to avoid log replay hits current dead server and incur a RPC timeout

 

// when replay happens before region assignment completes.

 

LOG.warn("Region " + hri.getEncodedName()

 

+ " didn't complete assignment in time");

 

}

 

} catch (InterruptedException ie) {

 

thrownewInterruptedIOException("Caught " + ie

 

+ " during waitOnRegionToClearRegionsInTransition");

 

}

 

}

 

// submit logReplay work

 

如果设置distributedLogReplaytrue,此时region assign完成,执行log split,并等待split完成

 

MasterFileSystem.splitLog分析

 

this.services.getExecutorService().submit(

 

newLogReplayHandler(this.server, this.services, this.deadServers, this.serverName));

 

hasLogReplayWork = true;

 

}

 

} finally {

 

this.deadServers.finish(serverName);

 

}

 

 

 

if (!hasLogReplayWork) {

 

LOG.info("Finished processing of shutdown of " + serverName);

 

}

 

}

 

 

 

 

 

regionserver中处理splitlog

 

regionserver中通过regionserver启动时启动的SplitLogWorker线程,

 

通过其的run方法监听masterzk中生成splitWAL,一但masterzk中注册splitWAL路径成功,

 

执行taskLoop方法默认5s进行一次split log的检查(线程等待,timeout5000ms),

 

通过nodeChildrenChanged来监听zksplitWAL子路径的修改,notify此线程,

 

通过nodeDataChanged来更新每一个SplitLogTask的状态更新,

 

 

 

 

 

publicvoidrun() {

 

try {

 

....................此处部分代码没有显示

 

 

 

// wait for master to create the splitLogZnode

 

intres = -1;

 

while (res == -1 && !exitWorker) {

 

try {

 

监听masterzksplitWAL的注册

 

res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);

 

} catch (KeeperExceptione) {

 

// ignore

 

LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e);

 

}

 

if (res == -1) {

 

try {

 

....................此处部分代码没有显示

 

Thread.sleep(1000);

 

} catch (InterruptedException e) {

 

....................此处部分代码没有显示

 

exitWorker = true;

 

break;

 

}

 

}

 

}

 

 

 

if (!exitWorker) {

 

定期检查并启动执行split hlog的处理

 

taskLoop();

 

}

 

} catch (Throwable t) {

 

....................此处部分代码没有显示

 

} finally {

 

LOG.info("SplitLogWorker " + this.serverName + " exiting");

 

}

 

}

 

 

 

 

 

 

 

检查并执行split hlog

 

private void taskLoop() {

 

while (!exitWorker) {

 

intseq_start = taskReadySeq;

 

得到所有的需要进行log splitservername的路径

 

List<String> paths = getTaskList();

 

if (paths == null) {

 

LOG.warn("Could not get tasks, did someone remove " +

 

this.watcher.splitLogZNode + " ... worker thread exiting.");

 

return;

 

}

 

// pick meta wal firstly

 

首先定义一个先执行的servername hlog split的路径值,默认为随机取一个下标

 

如果要splitserver中包含有metaregion,那么先从metaserver开始执行

 

intoffset = (int) (Math.random() * paths.size());

 

for(inti = 0; i < paths.size(); i ++){

 

if(HLogUtil.isMetaFile(paths.get(i))) {

 

offset = i;

 

break;

 

}

 

}

 

intnumTasks = paths.size();

 

for (inti = 0; i < numTasks; i++) {

 

计算执行顺序,offset开始执行,如:paths.size()=6,offset=5,那么执行顺序为501234

 

intidx = (i + offset) % paths.size();

 

// don't call ZKSplitLog.getNodeName() because that will lead to

 

// double encoding of the path name

 

每一个server最大同时执行split hlogtask个数通过hbase.regionserver.wal.max.splitters配置,默认为2

 

得到现在活着的所有的regionserver列表,根据要splitserver个数,

 

平均下来后计算此server最多要执行多少个splt task,

 

最多同时执行个数不超过hbase.regionserver.wal.max.splitters配置,每次执行tasksInProgress值加一

 

if (this.calculateAvailableSplitters(numTasks) > 0) {

 

如果此server还有能力执行split hlog task

 

更新zksplitWAL中此servername(split)SplitLogTaskSplitLogTask.Owned,

 

并把当前执行splitrs更新到zk中。生成HLogSplitterHandler实例,并启动线程执行此处理程序

 

tasksInProgress的正在处理的splittask的值加一,HLogSplitterHandler.process流程分析

 

等待500-1000ms在重新执行下一次分配,这样能保证其它的rs也能分配到任务

 

注意:此部分逻辑第一次执行此方法时不会执行,因为第一次执行时zksplitWAL路径下可能为空,

 

直接进入下面部分,让此线程进入wait状态,等待nodeChildrenChanged来进行notify

 

grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));

 

} else {

 

LOG.debug("Current region server " + this.serverName + " has "

 

+ this.tasksInProgress.get() + " tasks in progress and can't take more.");

 

break;

 

}

 

if (exitWorker) {

 

return;

 

}

 

}

 

SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();

 

synchronized (taskReadyLock) {

 

此次任务执行完成,zksplitWAL在任务执行到此时还没有更新的rs下线被注册进来

 

while (seq_start == taskReadySeq) {

 

try {

 

线程进行等待状态,等待nodeChildrenChanged来进行notify

 

taskReadyLock.wait(checkInterval);

 

if (this.server != null) {

 

// check to see if we have stale recovering regions in our internal memory state

 

如果是设置有distributedLogReplay模式,此时在region open后才开始执行splitlog,

 

那么得到要进行splitlogregion列表。迭代每一个region

 

recovering-regions中检查是否此region需要splitlog,

 

如果recovering-regions中不存在此region,rs中的recoveringRegions列表中移出此region

 

并设置此Hregionrecovering的值为false.

 

开始回到taskLoop方法的顶部,重新对这部分region进行splitlog

 

Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();

 

if (!recoveringRegions.isEmpty()) {

 

// Make a local copy to prevent ConcurrentModificationException when other threads

 

// modify recoveringRegions

 

List<String> tmpCopy = newArrayList<String>(recoveringRegions.keySet());

 

for (String region : tmpCopy) {

 

String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);

 

try {

 

if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {

 

HRegion r = recoveringRegions.remove(region);

 

if (r != null) {

 

r.setRecovering(false);

 

}

 

LOG.debug("Mark recovering region:" + region + " up.");

 

} else {

 

....................此处部分代码没有显示

 

break;

 

}

 

} catch (KeeperExceptione) {

 

....................此处部分代码没有显示

 

break;

 

}

 

}

 

}

 

}

 

} catch (InterruptedException e) {

 

....................此处部分代码没有显示

 

exitWorker = true;

 

return;

 

}

 

}

 

}

 

 

 

}

 

}

 

 

 

 

 

HlogSplitterHandler.process处理流程分析

 

HlogSplitterHandler是具体对hlog进行处理的handler,通过其传入的TaskExecutor.exec方法执行,

 

TaskExecutor是在SplitLogWorker实例生成时在构造方法中生成的一个匿名实现类,

 

 

 

public HLogSplitterHandler(final Server server, String curTask,

 

final MutableInt curTaskZKVersion,

 

CancelableProgressablereporter,

 

AtomicInteger inProgressTasks, TaskExecutorsplitTaskExecutor) {

 

设置EventTypeRS_LOG_REPLAY

 

super(server, EventType.RS_LOG_REPLAY);

 

this.curTask = curTask;

 

this.wal = ZKSplitLog.getFileName(curTask);

 

this.reporter = reporter;

 

this.inProgressTasks = inProgressTasks;

 

regionserver中执行split log task的值加一,表示占用一个执行位置

 

this.inProgressTasks.incrementAndGet();

 

this.serverName = server.getServerName();

 

this.zkw = server.getZooKeeper();

 

this.curTaskZKVersion = curTaskZKVersion;

 

SplitLogWorker的构造方法最后一个参数

 

this.splitTaskExecutor = splitTaskExecutor;

 

}

 

 

 

public void process() throws IOException {

 

longstartTime = System.currentTimeMillis();

 

try {

 

执行split log的处理程序,见下面的SplitLog TaskExecutor.exec处理分析,并得到流程执行的返回状态

 

Statusstatus = this.splitTaskExecutor.exec(wal, reporter);

 

switch (status) {

 

caseDONE:

 

成功结束,调用endTask结束任务,

 

设置zksplitWAL路径的servernameSplitLogTask的状态为SplitLogTask.Done

 

endTask(zkw, new SplitLogTask.Done(this.serverName),

 

SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue());

 

break;

 

casePREEMPTED:

 

如果split task是一个抢占的资源,不做处理

 

SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();

 

LOG.warn("task execution prempted " + wal);

 

break;

 

caseERR:

 

执行过程错误,调用endTask结束任务,

 

设置zksplitWAL路径的servernameSplitLogTask的状态为SplitLogTask.Err

 

if (server != null && !server.isStopped()) {

 

endTask(zkw, new SplitLogTask.Err(this.serverName),

 

SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue());

 

break;

 

}

 

// if the RS is exiting then there is probably a tons of stuff

 

// that can go wrong. Resign instead of signaling error.

 

//$FALL-THROUGH$

 

caseRESIGNED:

 

如果资源的task执行被放弃,调用endTask结束任务,

 

设置zksplitWAL路径的servernameSplitLogTask的状态为SplitLogTask.Resigned

 

if (server != null && server.isStopped()) {

 

LOG.info("task execution interrupted because worker is exiting " + curTask);

 

}

 

endTask(zkw, new SplitLogTask.Resigned(this.serverName),

 

SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue());

 

break;

 

}

 

} finally {

 

LOG.info("worker " + serverName + " done with task " + curTask + " in "

 

+ (System.currentTimeMillis() - startTime) + "ms");

 

regionserver中的split log task的值减一,表示有一个空闲的位置

 

this.inProgressTasks.decrementAndGet();

 

}

 

}

 

 

 

SplitLog TaskExecutor.exec处理分析:

 

 

 

public Status exec(String filename, CancelableProgressable p) {

 

Path rootdir;

 

FileSystemfs;

 

try {

 

rootdir = FSUtils.getRootDir(conf);

 

fs = rootdir.getFileSystem(conf);

 

} catch (IOException e) {

 

LOG.warn("could not find root dir or fs", e);

 

如果得到hdfs/hbase目录出错或生成/hbaseFileSystem出错时,返回RESIGNED(放弃)

 

returnStatus.RESIGNED;

 

}

 

// TODO have to correctly figure out when log splitting has been

 

// interrupted or has encountered a transient error and when it has

 

// encountered a bad non-retry-able persistent error.

 

Try {

 

执行split log操作,生成一个SplitLogFile实例,并执行其splitLogFile方法,

 

方法执行返回true or false,执行过程中定期向zk中此hlog replay的路径发送心跳,如果心跳发送失败返回false

 

发送心跳的间隔通过hbase.splitlog.report.period配置,默认为hbase.splitlog.manager.timeout(120000)/3

 

发送心跳其实就是定期在zk中重新注册此servername,并得到上一次注册的version,

 

如果上一次version小于1表示 PREEMPTED(此server有资源抢占)

 

1.通过hbase.regionserver.hlog.splitlog.buffersize配置读取源hlogbuffer大小,默认为128*1024*1024

 

2.通过hbase.regionserver.hlog.splitlog.writer.threads配置OutputSink的写入线程个数

 

3.配置hbase.regionserver.wal.logreplay.batch.size,默认为64

 

4.如果distributedLogReplay设置为true,生成的OutputSinkHLogSplitter.LogReplayOutputSink/

 

否则生成HLogSplitter.LogRecoveredEditsOutputSink实例

 

5.通过hbase.hlog.split.skip.errors配置是否跳过split error,默认为false

 

6.通过hbase.splitlog.report.interval.loglines配置每次读取的行数,默认为1024

 

读取过程中如果hlogentityseqid小于region中的seqid或者cocovering-regions中存储的seqid,continue.

 

数据在output时,根据regionname,在regionname下创建一个recovered.edits目录,并写入hlog数据到此目录下

 

具体请参见HLogSplitter.splitLogFile方法源代码。

 

if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(newPath(rootdir, filename)),

 

fs, conf, p, sequenceIdChecker, watcher)) {

 

server有资源抢占,主要是在zk上定期注册此serverhlogsplit

 

returnStatus.PREEMPTED;

 

}

 

} catch (InterruptedIOException iioe) {

 

LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);

 

returnStatus.RESIGNED;

 

} catch (IOException e) {

 

Throwable cause = e.getCause();

 

if (einstanceof RetriesExhaustedException

 

&& (causeinstanceof NotServingRegionException

 

|| causeinstanceof ConnectException

 

|| causeinstanceof SocketTimeoutException)) {

 

LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "

 

+ "resigning", e);

 

returnStatus.RESIGNED;

 

} elseif (causeinstanceof InterruptedException) {

 

LOG.warn("log splitting of " + filename + " interrupted, resigning", e);

 

returnStatus.RESIGNED;

 

} elseif(causeinstanceofKeeperException) {

 

LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);

 

returnStatus.RESIGNED;

 

}

 

LOG.warn("log splitting of " + filename + " failed, returning error", e);

 

returnStatus.ERR;

 

}

 

returnStatus.DONE;

 

}

 

}

 

 

 

 

 

SplogLogWorker.nodeDataChanged方法中监听到zk的状态修改时,如果状态不是如下状态是,调用stopTask结束线程

 

String taskpath = currentTask;

 

if (taskpath != null && taskpath.equals(path)) {

 

// have to compare data. cannot compare version because then there

 

// will be race with attemptToOwnTask()

 

// cannot just check whether the node has been transitioned to

 

// UNASSIGNED because by the time this worker sets the data watch

 

// the node might have made two transitions - from owned by this

 

// worker to unassigned to owned by another worker

 

if (! slt.isOwned(this.serverName) &&

 

! slt.isDone(this.serverName) &&

 

! slt.isErr(this.serverName) &&

 

! slt.isResigned(this.serverName)) {

 

LOG.info("task " + taskpath + " preempted from " +

 

serverName + ", current task state and owner=" + slt.toString());

 

stopTask();

 

}

 

}

 

结束线程的执行过程

 

void stopTask() {

 

LOG.info("Sending interrupt to stop the worker thread");

 

worker.interrupt(); // TODO interrupt often gets swallowed, do what else?

 

}

 

 

 

 

 

SplitLogManager.nodeDataChanged流程分析

 

regionserver中执行split log操作,并根据执行情况修改zksplitWALSplitLogTask的状态。

 

SplitLogManager.nodeDataChangedmaster端对zksplitWAL进行监听,

 

tasks列表中找到对应修改的task,task的状态从IN_PROGRESS修改为SUCCESS,

 

设置task对应的TaskBatchdoneerror的值加一。调用TaskBatch.notify方法叫醒线程的等待。

 

waitForSplittingCompletion方法中会每执行一次检查把TaskBatch.wait,因此需要对其做notify

 

 

 

 

 

Region open数据重播分析

 

HregionServer.openRegion-->OpenRegionHandler.process-->openRegion-->

 

Hregion.openRegion-->生成HRegion实例,并调用实例的r.openHRegion(reporter)-->initialize

 

-->initializeRegionInternals-->initializeRegionStores-->replayRecoveredEditsIfAny

 

注意:日志重播时传入的每一个store中最大的seqid是不包含bluk loadhfileseqid

 

region open时得到并计算next sequence id的所有store中最大的seqid是包含bluk loadhfileseqid

 

 

 

protectedlongreplayRecoveredEditsIfAny(final Path regiondir,

 

Map<byte[], Long> maxSeqIdInStores,

 

finalCancelableProgressablereporter, finalMonitoredTaskstatus)

 

throws UnsupportedEncodingException, IOException {

 

取出所有的storeflush到磁盘上的所有store中最小的一个seqid

 

longminSeqIdForTheRegion = -1;

 

for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {

 

if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {

 

minSeqIdForTheRegion = maxSeqIdInStore;

 

}

 

}

 

longseqid = minSeqIdForTheRegion;

 

 

 

FileSystemfs = this.fs.getFileSystem();

 

取出region目录下recovered.edits子路径下所有的文件,但不包含结尾是.temp的文件,并根据文件名称排序返回

 

hlogregion下的文件名称是此文件对应的最大seqid,也就是按seqid从小到大排序。

 

NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);

 

if (LOG.isDebugEnabled()) {

 

LOG.debug("Found " + (files == null ? 0 : files.size())

 

+ " recovered edits file(s) under " + regiondir);

 

}

 

没有需要重播的日志文件,直接返回当前所有的store中最小的seqid,如果是表示不需要进行replay

 

if (files == null || files.isEmpty()) returnseqid;

 

 

 

for (Path edits: files) {

 

检查日志文件是否存在

 

if (edits == null || !fs.exists(edits)) {

 

LOG.warn("Null or non-existent edits file: " + edits);

 

continue;

 

}

 

检查文件大小是否为空,如果是空文件直接删除,如果是表示不需要进行replay

 

if (isZeroLengthThenDelete(fs, edits)) continue;

 

 

 

longmaxSeqId = Long.MAX_VALUE;

 

String fileName = edits.getName();

 

检查此文件中最大的seqid是否小于region是所有store中最小的seqid,如果是表示此文件不需要进行replay

 

maxSeqId = Math.abs(Long.parseLong(fileName));

 

if (maxSeqId <= minSeqIdForTheRegion) {

 

if (LOG.isDebugEnabled()) {

 

String msg = "Maximum sequenceid for this log is " + maxSeqId

 

+ " and minimum sequenceid for the region is " + minSeqIdForTheRegion

 

+ ", skipped the whole file, path=" + edits;

 

LOG.debug(msg);

 

}

 

continue;

 

}

 

 

 

try {

 

得到replayedits中每一个kv,并根据kv得到对应的store,

 

如果kv中的seqid小于store中最大的seqid,此kv不需要replay,

 

否则把kv添加到store中,得到添加的kvsize,size添加到:

 

a.RegionServerAccounting.replayEditsPerRegion中对应的region的大小中,

 

表示此regionreplaymemory使用情况

 

b.RegionServerAccounting.atomicGlobalMemstoreSize中,表示全局的memstore使用情况

 

c.添加到此regionmemstore中,HRegion.memstoreSize,表示当前regionmemory使用情况

 

d.检查memstore是否达到flush的值,通过hbase.hregion.memstore.flush.size配置,默认1024*1024*128L

 

如果达到memstoreflush值,对memstore进行flush

 

f.返回最新的seqid

 

seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);

 

} catch (IOException e) {

 

出现replay错误,检查hbase.hregion.edits.replay.skip.errors是否配置为true

 

老版本使用hbase.skip.errors进行配置,默认值为false,表示不跳过error

 

booleanskipErrors = conf.getBoolean(

 

HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,

 

conf.getBoolean(

 

"hbase.skip.errors",

 

HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));

 

if (conf.get("hbase.skip.errors") != null) {

 

LOG.warn(

 

"The property 'hbase.skip.errors' has been deprecated. Please use " +

 

HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");

 

}

 

如果配置有跳过replay error,把此edits文件重命名为editsname.systime,removeregion的根目录下

 

if (skipErrors) {

 

Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);

 

LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS

 

+ "=true so continuing. Renamed " + edits +

 

" as " + p, e);

 

} else {

 

throwe;

 

}

 

}

 

}

 

// The edits size added into rsAccounting during this replaying will not

 

// be required any more. So just clear it.

 

RegionServerAccounting.replayEditsPerRegion中此region对应的replay kvsize清空

 

if (this.rsAccounting != null) {

 

this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());

 

}

 

如果进行了replay,那么当前replay后的seqid一定是大于原来的storeseqid,强制对region进行flush

 

if (seqid > minSeqIdForTheRegion) {

 

// Then we added some edits to memory. Flush and cleanup split edit files.

 

internalFlushcache(null, seqid, status);

 

}

 

// Now delete the content of recovered edits. We're done w/ them.

 

删除region下所有的recovered.edits下的文件

 

for (Path file: files) {

 

if (!fs.delete(file, false)) {

 

LOG.error("Failed delete of " + file);

 

} else {

 

LOG.debug("Deleted recovered.edits file=" + file);

 

}

 

}

 

returnseqid;

 

}

 

 

 

 

 

distributedLogReplaytrue的日志重播

 

通过hbase.master.distributed.log.replay配置的值为true时,在splitLog时,

 

生成HLogSplitter实例时OutputSink的实现会选择HLogSplitter.LogReplayOutputSink

 

此实现不经过recovered.edits目录,直接把数据replayregion中。具体实现请查看相关源代码

 

0
0
分享到:
评论

相关推荐

    基于大数据的数据分析系统架构.doc

    流式架构由于取消ETL操作,所以数据的处理效率非 常高,但是由于没有了数据批处理,没有很好的支撑数据统计和重播,不利于离线进行 数据分析。 (三)Lambda数据分析结构 在大数据分析系统中Lambda架构是比较重要的...

    panda:架构中性动态分析平台

    此外,重播日志文件是紧凑且可共享的,允许进行可重复的实验。 例如,FreeBSD的90亿个指令启动仅用几百MB来表示。 PANDA利用QEMU对13种不同CPU架构的支持,使得可以在LLVM IR中对这些不同的指令集进行分析。 这样,...

    bbjs-cli:Node.js Cleanflight黑盒库

    它将来自黑盒查看器的解析逻辑包装到一个简单的Node.js EventEmitter中,以便多个侦听器可以在重播日志时对其进行分析。入门安装node.js和npm。 运行npm install安装依赖项。 运行grunt将ES6转换为Node友好的...

    车辆识别代码matlab-Player:播放器

    RVPlayer:“假设分析”重播的机器人车辆取证 介绍 RVPlayer是一种机器人车辆(RV)取证工具,它支持使用模拟器内部的假设分析进行重放,以取代昂贵的基于现场测试的取证工具。 它具有高效的需求驱动的自适应日志...

    taint-entropy-panda

    此外,重播日志文件是紧凑且可共享的,允许进行可重复的实验。 例如,FreeBSD的90亿个指令启动仅用几百MB来表示。 PANDA利用QEMU对13种不同CPU架构的支持,使得可以在LLVM IR中对这些不同的指令集进行分析。 这样,...

    node-raft-algorithm:RAFT 共识算法的 node.js 实现

    看到一个抽象的和分析 这个模块是什么? 该库主要实现共识算法和与之交互的API。 它完全围绕 API 的概念实现。 这意味着您只能使用您想要和自定义的实现部分: 运输机制。 (默认情况下直接 TCP 并通过本地网络...

    twitch-chat-log-analyzer:Twitch Chat是否拥有神圣的知识? 可能不是..

    Twitch聊天日志分析器顾虑Twitch VOD停留60天脚步设置Twitch API-完成创建Twitch API的凭证文件拨打oauth电话打个电话获取VOD ID-完成使用“获取VOD ID 看一个样品要求从过去的广播中提取视频ID(可能在url字段中) ...

    SQL Server 2008管理员必备指南(超高清PDF)Part1

    14.6.5 重播跟踪 第15章 备份和还原SQL Server 2008 15.1 创建备份和还原计划 15.1.1 开始备份和还原计划 15.1.2 规划镜像与镜像数据库的备份 15.1.3 规划被复制数据库的备份 15.1.4 规划大型数据库的备份 15.1.5 ...

    SQL Server 2008管理员必备指南(超高清PDF)Part3

    14.6.5 重播跟踪 第15章 备份和还原SQL Server 2008 15.1 创建备份和还原计划 15.1.1 开始备份和还原计划 15.1.2 规划镜像与镜像数据库的备份 15.1.3 规划被复制数据库的备份 15.1.4 规划大型数据库的备份 15.1.5 ...

    SQL Server 2008管理员必备指南(超高清PDF)Part2

    14.6.5 重播跟踪 第15章 备份和还原SQL Server 2008 15.1 创建备份和还原计划 15.1.1 开始备份和还原计划 15.1.2 规划镜像与镜像数据库的备份 15.1.3 规划被复制数据库的备份 15.1.4 规划大型数据库的备份 15.1.5 ...

    SQL.Server.2008管理员必备指南.part2.rar(2/4)

     提供实际场景案例分析和故障诊断实验  SQL Server专家的呕心力作,数据库管理员的实战宝典,全面、深入地剖析SQL Server2008新特性,结构独特,实例丰富,操作性强。 编辑本段 目录  第Ⅰ部分 SQL Server 2008...

    SQL.Server.2008管理员必备指南.part1.rar(1/4)

     提供实际场景案例分析和故障诊断实验  SQL Server专家的呕心力作,数据库管理员的实战宝典,全面、深入地剖析SQL Server2008新特性,结构独特,实例丰富,操作性强。 编辑本段 目录  第Ⅰ部分 SQL Server 2008...

    SQL.Server.2008管理员必备指南.part3.rar(3/4)

     提供实际场景案例分析和故障诊断实验  SQL Server专家的呕心力作,数据库管理员的实战宝典,全面、深入地剖析SQL Server2008新特性,结构独特,实例丰富,操作性强。 编辑本段 目录  第Ⅰ部分 SQL Server 2008...

    SQL.Server.2008管理员必备指南.part4.rar(4/4)

     提供实际场景案例分析和故障诊断实验  SQL Server专家的呕心力作,数据库管理员的实战宝典,全面、深入地剖析SQL Server2008新特性,结构独特,实例丰富,操作性强。 编辑本段 目录  第Ⅰ部分 SQL Server 2008...

    ubi_reader:Python脚本集合,用于读取有关UBI和UBIFS图像的信息并从中提取数据

    这不会重播日志,因此将不会检索未提交的数据。 数据可以同时包含干净和不干净的关闭日志。 取决于NAND的转储方式,可能无法对数据位进行纠错。 套接字文件将被忽略,您可以更改ubireader / settings.py使其在其...

    matlab代码输入如何换行符-starcraft2-ai:学士学位项目的资料库,目标是使用机器学习开发StarCraft2AI

    matlab用于数据分析和图形生成的Matlab代码。 还包含生成的数字。 publishing -一些与出版有关的材料,例如徽标和海报。 replays -从训练DQN代理几个小时后重播文件。 scripts -用于执行诸如处理日志数据之类的次要...

    Record/Replay-crx插件

    报告-提供有关用户界面交互执行的反馈,包括用户界面交互失败的详细日志记录和屏幕快照的可视回归分析。 为什么要安装记录/重播? 在开发过程中,尤其是在部署到生产之前,您想知道应用程序的所有关键功能确实有效。...

    新版Android开发教程.rar

    也有分析认为,谷歌并不想做一个简单的手机终端制造商或者软件平台开发商,而意在一统传统互联网和 移 动互联网。----------------------------------- Android 编程基础 4 Android Android Android Android 手机新...

Global site tag (gtag.js) - Google Analytics