日志重播分析
Hbase的日志重播分为启动时的日志重播与rs下线时的日志重播操作。
通过hbase.master.distributed.log.replay来控制日志的split是在region的reopen前执行还是reopen后执行
如果是true表示在reopen后执行,否则相反
Rs下线时的日志重播分析
master监听下线
master通过RegionServerTracker监听rs在zk上的节点,当节点被删除时(rs下线)。触发nodeDeleted
publicvoidnodeDeleted(String path) {
if (path.startsWith(watcher.rsZNode)) {
解析出zk中rs路径下的rs名称,并解析成ServerName实例。
String serverName = ZKUtil.getNodeName(path);
....................此处部分日志打印信息没有显示
ServerName sn = ServerName.parseServerName(serverName);
如果下线的rs在ServerManager的onlineServers中已经不包含,不做处理,
if (!serverManager.isServerOnline(sn)) {
....................此处部分日志打印信息没有显示
return;
}
从RegionServerTracker.onlineServers列表中移出此rs
remove(sn);
执行ServerManager.expireServer进行下线处理
this.serverManager.expireServer(sn);
}
}
执行ServerManager.expireServer进行下线处理
publicsynchronizedvoidexpireServer(final ServerName serverName) {
....................此处部分代码没有显示
把rs添加到deadservers列表中。
this.deadservers.add(serverName);
从onlineServers列表中移出此rs
this.onlineServers.remove(serverName);
synchronized (onlineServers) {
onlineServers.notifyAll();
}
从rsAdmins(对rs进行RPC调用的接口实现类)容器中移出此rs
this.rsAdmins.remove(serverName);
....................此处部分代码没有显示
检查此rs中是否包含meta的region,如果是,执行MetaServerShutdownHandler.否则执行ServerShutdownHandler
booleancarryingMeta = services.getAssignmentManager().isCarryingMeta(serverName);
if (carryingMeta) {
this.services.getExecutorService().submit(newMetaServerShutdownHandler(this.master,
this.services, this.deadservers, serverName));
} else {
this.services.getExecutorService().submit(newServerShutdownHandler(this.master,
this.services, this.deadservers, serverName, true));
}
....................此处部分日志打印没有显示
}
MetaServerShutdownHandler.process方法处理流程:
publicvoidprocess() throws IOException {
booleangotException = true;
try {
AssignmentManager am = this.services.getAssignmentManager();
try {
检查是否需要做hlog的split,生成此实例时,shouldSplitHlog的值为true
if (this.shouldSplitHlog) {
LOG.info("Splitting hbase:meta logs for " + serverName);
检查hbase.master.distributed.log.replay配置是否设置为true,默认值为false
if (this.distributedLogReplay) {
先对meta region执行prepareLogReplay处理。
见MasterFileSystem.prepareLogReplay分析
Set<HRegionInfo> regions = newHashSet<HRegionInfo>();
regions.add(HRegionInfo.FIRST_META_REGIONINFO);
this.services.getMasterFileSystem().prepareLogReplay(serverName, regions);
} else {
否则在没有配置distributedLogReplay时,执行splitMetaLog对rs的日志进行split,等待split完成
见MasterFileSystem.splitMetaLog分析
this.services.getMasterFileSystem().splitMetaLog(serverName);
}
从AssignmentManager.RegionStates.lastAssignments中移出meta region的分配。
am.getRegionStates().logSplit(HRegionInfo.FIRST_META_REGIONINFO);
}
} catch (IOException ioe) {
....................此处部分代码没有显示
}
// Assign meta if we were carrying it.
// Check again: region may be assigned to other where because of RIT
// timeout
检查此server上还没有完成region open操作(regionInTransition还在)如果包含有meta region,
if (am.isCarryingMeta(serverName)) {
LOG.info("Server " + serverName + " was carrying META. Trying to assign.");
更新RegionStates中此region的状态为offline
从regionsInTransition中移出此region,
从serverHoldings中移出此server中meta region的分配信息
从regionAssignments中移出此meta region的分配信息
从regionsToReopen中移出此meta region
从regionPlans中移出此meta region
am.regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
等待meta region的分配,
通过hbase.catalog.verification.retries配置meta region分配的重试次数,默认10次
通过hbase.catalog.verification.timeout配置每次分配重试的间隔时间,默认1000ms
verifyAndAssignMetaWithRetries();
如果meta在zk中的地址过期数据被删除,重新执行meta的分配,并等待meta分配完成
} elseif (!this.services.getCatalogTracker().isMetaLocationAvailable()) {
// the meta location as per master is null. This could happen in case when meta assignment
// in previous run failed, while meta znode has been updated to null. We should try to
// assign the meta again.
如果meta region在zk中的地址没有注册的数据,执行meta region的分配,并等待分配结束
通过hbase.catalog.verification.retries配置meta region分配的重试次数,默认10次
通过hbase.catalog.verification.timeout配置每次分配重试的间隔时间,默认1000ms
verifyAndAssignMetaWithRetries();
} else {
LOG.info("META has been assigned to otherwhere, skip assigning.");
}
try {
如果distributedLogReplay配置为true,等待region replay的regionintranstion事务完成
也就是RegionStates.regionsInTransition中不在包含此meta region的regionintransition
region replay的等待超时通过hbase.master.log.replay.wait.region.timeout配置,默认为15000ms
如果在超时的时间内没有完成regionintransition时,此方法返回false
if (this.shouldSplitHlog && this.distributedLogReplay) {
if (!am.waitOnRegionToClearRegionsInTransition(HRegionInfo.FIRST_META_REGIONINFO,
regionAssignmentWaitTimeout)) {
....................此处部分代码没有显示
}
执行log split,并等待split完成,如果是distributedLogReplay时,
此时region assign已经完成,开始splt log
见MasterFileSystem.splitMetaLog分析
this.services.getMasterFileSystem().splitMetaLog(serverName);
}
} catch (Exception ex) {
....................此处部分代码没有显示
}
gotException = false;
} finally {
if (gotException){
// If we had an exception, this.deadServers.finish will be skipped in super.process()
this.deadServers.finish(serverName);
}
}
执行此rs中非meta region的日志重播与region assign,
见ServerShutdownHandler.process方法处理流程
super.process();
}
MasterFileSystem.prepareLogReplay分析
此方法在hbase.master.distributed.log.replay配置为true时,分执行此操作
public void prepareLogReplay(ServerName serverName, Set<HRegionInfo> regions) throws IOException {
一些必要的检查,检查是否设置有日志的分布式replay模式,要进行分布式日志的region列表是否为空
if (!this.distributedLogReplay) {
return;
}
// mark regions in recovering state
if (regions == null || regions.isEmpty()) {
return;
}
try {
通过SplitLogManager.markRegionsRecoveringInZK在/hbase/recovering-regions中添加region路径
this.splitLogManager.markRegionsRecoveringInZK(serverName, regions);
} catch (KeeperExceptione) {
thrownewIOException(e);
}
}
执行distributedLogReplay
void markRegionsRecoveringInZK(final ServerName serverName, Set<HRegionInfo> userRegions)
throws KeeperException {
一些必要的检查,检查是否设置有日志的分布式replay模式,要进行分布式日志的region列表是否为空
if (userRegions == null || !this.distributedLogReplay) {
return;
}
try {
this.recoveringRegionLock.lock();
// mark that we're creating recovering znodes
更新SplitLogManager中最后一次recoveringnode的时间为当前时间
this.lastRecoveringNodeCreationTime = EnvironmentEdgeManager.currentTimeMillis();
开始迭代执行要replay的每一个region,如果是meta region,此时只有一个迭代
for (HRegionInfo region : userRegions) {
String regionEncodeName = region.getEncodedName();
得到hbase.splitlog.zk.retries配置的在zk中创建子路径的最大重试次数,默认为3
longretries = this.zkretries;
do {
在zookeeper.znode.recovering.regions配置的路径下生成一个通过region name为名称的子路径
默认为/hbase/recovering-regions/region-name
String nodePath = ZKUtil.joinZNode(watcher.recoveringRegionsZNode, regionEncodeName);
longlastRecordedFlushedSequenceId = -1;
try {
得到region中的最大的seqid,此seqid在ServerManager.flushedSequenceIdByRegion中存储,
记录着region中flush的最大的seqid
longlastSequenceId = this.master.getServerManager().getLastFlushedSequenceId(
regionEncodeName.getBytes());
....................此处部分代码没有显示
检查在zk中的recovering-regions中是否已经包含此region,
byte[] data = ZKUtil.getData(this.watcher, nodePath);
if (data == null) {
如果recovering-regions中还不包含此region的replay信息,
把region的最后一次flush的seqid写入到replay路径下
ZKUtil.createSetData(this.watcher, nodePath,
ZKUtil.positionToByteArray(lastSequenceId));
} else {
如果recovering-regions中已经包含有此region的replay信息,
得到上一次region的recovering的seqid,
如果上一次的seqid小于当前region的最后一次flush的seqid,更新zk中此region的replay的seqid为最新的seqid
否则不做修改(上一次flush的seqid比记录的flush的seqid更加的新)
lastRecordedFlushedSequenceId = SplitLogManager.parseLastFlushedSequenceIdFrom(data);
if (lastRecordedFlushedSequenceId < lastSequenceId) {
// update last flushed sequence id in the region level
ZKUtil.setData(this.watcher, nodePath, ZKUtil.positionToByteArray(lastSequenceId));
}
}
// go one level deeper with server name
在recovering-regions/region-name下生成当前下线的server子路径
nodePath = ZKUtil.joinZNode(nodePath, serverName.getServerName());
如果当前region的flush的seqid小于上一次的recovering中replay的seqid,
(上一次flush的seqid比记录的flush的seqid更加的新),更新region的last flush seqid为上一次的seqid
if (lastSequenceId <= lastRecordedFlushedSequenceId) {
// the newly assigned RS failed even before any flush to the region
lastSequenceId = lastRecordedFlushedSequenceId;
}
在/hbase/recovering-regions/region-name/server-name路径下记录最后一次flush的seqid.
ZKUtil.createSetData(this.watcher, nodePath,
ZKUtil.regionSequenceIdsToByteArray(lastSequenceId, null));
....................此处部分代码没有显示
break;
} catch (KeeperExceptione) {
....................此处部分代码没有显示
}
} while ((--retries) > 0 && (!this.stopper.isStopped()));
}
} finally {
this.recoveringRegionLock.unlock();
}
}
MasterFileSystem.splitMetaLog分析
splitMetaLog传入需要执行split操作的server(下线的server),方法去调用splitLog方法,
由于此时只针对meta region的hlog时行split,因此在splitLog方法时传入META_FILTER来区分是否meta split
public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
longsplitTime = 0, splitLogSize = 0;
从/hbase/WALs的日志路径下得到通过下线的servers命名的所有日志路径,老版本中.logs目录
如:/hbase/WALs/server-name1
并把下线的server路径名称更新为/hbase/WALs/server-name1-splitting路径
List<Path> logDirs = getLogDirs(serverNames);
把下线的所有rs server添加到SplitLogManager.deadWorkers中,
等待SplitLogManager.TimeoutMonitor线程定期去处理,
见SplitLogManager.TimeoutMonitor线程分析
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTimeMillis();
执行hlog split操作,见SplitLogManager.splitLogDistributed分析
splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
....................此处部分代码没有显示,监控信息
}
MasterFileSystem.splitLog分析
splitLog传入需要执行split操作的server(下线的server),方法去调用splitLog方法,
由于此时只针对非meta region的hlog时行split,
因此在splitLog方法时传入NON_META_FILTER来区分是否非meta split
public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
longsplitTime = 0, splitLogSize = 0;
从/hbase/WALs的日志路径下得到通过下线的servers命名的所有日志路径,老版本中.logs目录
如:/hbase/WALs/server-name1
并把下线的server路径名称更新为/hbase/WALs/server-name1-splitting路径
List<Path> logDirs = getLogDirs(serverNames);
把下线的所有rs server添加到SplitLogManager.deadWorkers中,
等待SplitLogManager.TimeoutMonitor线程定期去处理,
见SplitLogManager.TimeoutMonitor线程分析
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTimeMillis();
执行hlog split操作,见SplitLogManager.splitLogDistributed分析
splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
splitTime = EnvironmentEdgeManager.currentTimeMillis() - splitTime;
....................此处部分代码没有显示,监控信息
}
SplitLogManager.splitLogDistributed分析
此方法主要用于对server hlog根据region进行split操作,生成split task,并等待split完成。
public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
PathFilterfilter) throws IOException {
....................此处部分代码没有显示,监控信息,日志信息
得到/hbase/WALs/server-name-splitting下的所有日志文件,
如果传入的filter为META_FILTER,那么只获取.meta的hlog文件,否则获取全部hlog文件
FileStatus[] logfiles = getFileList(logDirs, filter);
....................此处部分代码没有显示,监控信息,日志信息
longtotalSize = 0;
TaskBatch batch = newTaskBatch();
Boolean isMetaRecovery = (filter == null) ? null : false;
for (FileStatus lf : logfiles) {
....................此处部分代码没有显示,监控信息,日志信息
totalSize += lf.getLen();
得到日志文件路径去掉/hbase的部分名称,如/WALs/server-name-splitting/aaa.meta
String pathToLog = FSUtils.removeRootPath(lf.getPath(), conf);
1.把hlog的全路径去掉/hbase部分通过URLEncoder.encode进行转码(/会被转换成%2F)
2.把hlog的全路径添加到zookeeper.znode.splitlog配置的路径下默认为splitWAL,作为其子路径存在。
3.在SplitLogManager.tasks中添加一个Task实例,key为2中zk生成的path,value为生成的Task实例,
设置Task的status为IN_PROGRESS,并把task的batch实例为上面生成的TaskBatch实例(batch),
把batch中的installed加一,表示增加一个批量执行的Task
4.根据hbase.splitlog.zk.retries配置的zk重试次数,默认为3,
生成SplitLogTask实例,设置其originServer为master的ServerName
设置其state为ZooKeeperProtos.SplitLogTask.State.UNASSIGNED
在zk中注册此地址,并把SplitLogTask写入到此zk的路径下。
5.regionserver中监听zk的splitWAL的路径,
6.master中通过SplitLogManager.nodeDataChanged来监听rs中SplitLogTask的状态修改。
见SplitLogManager.nodeDataChanged分析
if (!enqueueSplitTask(pathToLog, batch)) {
thrownewIOException("duplicate log split scheduled for " + lf.getPath());
}
}
等待split操作完成,
a.batch中所有的Task.status为TerminationStatus.IN_PROGRESS的task个数为0
b.splitWAL路径下的所有子路径的个数为0
c.每次迭代都需要等待batch被nodeDataChanged或者其它地方对batch进行notify
waitForSplittingCompletion(batch, status);
// remove recovering regions from ZK
if (filter == MasterFileSystem.META_FILTER/* reference comparison */) {
....................此处部分代码没有显示,日志信息
isMetaRecovery = true;
}
删除zk的recovering-regions下对应的region路径下传入的servers子路径
(如果region下所有的servers子路径不存在,直接删除region子路径)
如果isMetaRecovery等于true表示只删除meta region的recovering路径
this.removeRecoveringRegionsFromZK(serverNames, isMetaRecovery);
如果有日志split出现错误,直接throw IOException
if (batch.done != batch.installed) {
batch.isDead = true;
....................此处部分代码没有显示,日志信息
thrownewIOException(msg);
}
for(Path logDir: logDirs){
status.setStatus("Cleaning up log directory...");
try {
删除WALs目录下对应的server-name-splitting的日志文件。
if (fs.exists(logDir) && !fs.delete(logDir, false)) {
LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
}
} catch (IOException ioe) {
....................此处部分代码没有显示,日志信息
}
SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet();
}
....................此处部分代码没有显示,监控信息,日志信息
returntotalSize;
}
ServerShutdownHandler.process方法处理流程:
ServerShutdownHandler的处理流程主要对非meta region的下线处理,region的重新分配,日志split
public void process() throws IOException {
booleanhasLogReplayWork = false;
final ServerName serverName = this.serverName;
try {
....................此处部分代码没有显示
AssignmentManager am = services.getAssignmentManager();
if (isCarryingMeta() // hbase:meta
|| !am.isFailoverCleanupDone()) {
this.services.getServerManager().processDeadServer(serverName, this.shouldSplitHlog);
return;
}
....................此处部分代码没有显示
NavigableMap<HRegionInfo, Result> hris = null;
while (!this.server.isStopped()) {
try {
从meta表中进行scan,扫描出当前下线的regionserver中所有的user region 列表。
this.server.getCatalogTracker().waitForMeta();
hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(),
this.serverName);
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
thrownewIOException("Interrupted", e)
} catch (IOException ioe) {
....................此处部分代码没有显示
}
}
if (this.server.isStopped()) {
thrownewIOException("Server is stopped");
}
try {
shouldSplitHlog在ServerShutdownHandler实例生成时默认为true
if (this.shouldSplitHlog) {
LOG.info("Splitting logs for " + serverName + " before assignment.");
检查hbase.master.distributed.log.replay配置是否设置为true,默认值为false
if (this.distributedLogReplay) {
LOG.info("Mark regions in recovery before assignment.");
Set<ServerName> serverNames = newHashSet<ServerName>();
serverNames.add(serverName);
如果设置有distributedLogReplay,执行log split的预处理,
见MasterFileSystem.prepareLogReplay分析
this.services.getMasterFileSystem().prepareLogReplay(serverNames);
} else {
如果没有设置distributedLogReplay,执行log split,并等待split完成
this.services.getMasterFileSystem().splitLog(serverName);
}
从RegionStates.lastAssignments中移出此server对应的所有region分配信息
从RegionStates.processedServers中移出超出时间限制的
通过hbase.master.maximum.logsplit.keeptime配置的时间的server,默认为7200000ms(2 hour)
am.getRegionStates().logSplit(serverName);
} else {
LOG.info("Skipping log splitting for " + serverName);
}
} catch (IOException ioe) {
resubmit(serverName, ioe);
}
....................此处部分代码没有显示
1.从AssignmentManager.regionPlans中移出包含此server的region plan,
2.从regionStates.serverHoldings中得到此server所有的region assign,
如果region的状态为online/splitting/merging,把region的状态设置为offline
并从regionsInTransition与regionAssignments移出这些个region.
如果region的状态为splitting/merging时,删除region在zk中region-in-transition的注册信息
3.从regionsInTransition中找到所有此server中transition的
状态为PENDING_OPEN/OPENING/FAILED_OPEN/FAILED_CLOSE/OFFLINE的region,并返回
4.根据3返回的在regionsInTransition中的region,删除region在zk中region-in-transition的注册信息
zk中的路径通过zookeeper.znode.unassigned进行配置。
5.注意:3中返回的region是当前下线的server在assignments中不包含的,
同时在regionInTransition又包含,也就是这些个region准备在当前下线的server上启动,
但此时这个server挂掉了。此方法的主要作用是删除掉当前下线server中正在做transition的region的zk信息,
把这些个region的状态设置为offline,等待下面的代码逻辑重新执行分配.
List<HRegionInfo> regionsInTransition = am.processServerShutdown(serverName);
....................此处部分代码没有显示
把上面得到的正在做transition的regions添加到待分配的region列表中
List<HRegionInfo> toAssignRegions = newArrayList<HRegionInfo>();
toAssignRegions.addAll(regionsInTransition);
// Iterate regions that were on this server and assign them
if (hris != null) {
RegionStates regionStates = am.getRegionStates();
迭代从meta表中得到的所有当前下线server的user region,
for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
HRegionInfo hri = e.getKey();
如果此region在transition中已经包含,重新迭代下一次
if (regionsInTransition.contains(hri)) {
continue;
}
String encodedName = hri.getEncodedName();
Locklock = am.acquireRegionLock(encodedName);
try {
RegionState rit = regionStates.getRegionTransitionState(hri);
检查region所在的table是否被删除/是否是disable的table,如果不是执行如下流程
if (processDeadRegion(hri, e.getValue(), am, server.getCatalogTracker())) {
ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
if (addressFromAM != null && !addressFromAM.equals(this.serverName)) {
....................此处部分代码没有显示
continue;
}
if (rit != null) {
if (rit.getServerName() != null && !rit.isOnServer(serverName)) {
....................此处部分代码没有显示
continue;
}
try{
....................此处部分代码没有显示
删除region在zk中region-in-transition的注册信息,zk中的路径通过zookeeper.znode.unassigned进行配置,
并更新region的状态为offline
ZKAssign.deleteNodeFailSilent(services.getZooKeeper(), hri);
regionStates.updateRegionState(hri, State.OFFLINE);
} catch (KeeperExceptionke) {
this.server.abort("Unexpected ZK exception deleting unassigned node " + hri, ke);
return;
}
} elseif (regionStates.isRegionInState(
hri, State.SPLITTING_NEW, State.MERGING_NEW)) {
如果region的状态是准备split或者准备merge时,重新设置region状态为offline
regionStates.regionOffline(hri);
}
添加此region到待分配的region列表中
toAssignRegions.add(hri);
} elseif (rit != null) {
region所在的table现在是disable的table,设置region状态为offline,
a.如果region在zk中的eventType为M_ZK_REGION_CLOSING/RS_ZK_REGION_CLOSED,从zk中删除此region的路径
zk中region-in-transition的注册信息,zk中的路径通过zookeeper.znode.unassigned进行配置
b.如果region在zk中的eventtype为RS_ZK_REGION_CLOSED/M_ZK_REGION_OFFLINE,从zk中删除此region的路径
if (rit.isPendingCloseOrClosing()
&& am.getZKTable().isDisablingOrDisabledTable(hri.getTable())) {
....................此处部分代码没有显示
regionStates.updateRegionState(hri, State.OFFLINE);
am.deleteClosingOrClosedNode(hri, rit.getServerName());
am.offlineDisabledRegion(hri);
} else {
LOG.warn("THIS SHOULD NOT HAPPEN: unexpected region in transition "
+ rit + " not to be assigned by SSH of server " + serverName);
}
}
} finally {
lock.unlock();
}
}
}
try {
执行region的批量assign操作
am.assign(toAssignRegions);
} catch (InterruptedException ie) {
LOG.error("Caught " + ie + " during round-robin assignment");
thrownewIOException(ie);
}
if (this.shouldSplitHlog && this.distributedLogReplay) {
// wait for region assignment completes
for (HRegionInfo hri : toAssignRegions) {
try {
此处只能是distributedLogReplay设置为true时,因为这时rs中不做log replay,
distributedLogReplay设置为true时,region下不存在recovered.edits路径,因此openregion时replay不会执行,
所以此时等待region的open完成是可行的。等待每一个region的assign完成,
也就是assign时的RegionInTransition在RegionStates.regionsInTransition的处理完成(列表中不包含此region)
或者说等待分配的超时时间hbase.master.log.replay.wait.region.timeout过期,默认15000ms
在assign时会在zk中的region-in-transition注册一个region地址,等待rs处理,
此方法会不停止的迭代,直接timeout或者regionsInTransition中移出此region的transition,
每次迭代会让regionstates处于wait状态,等待AssignmentManager.nodeDataChanged/nodeDeleted对其notify
完成后通过AssignmentManager中的相关nodeDataChanged处理事件方法对regionsInTransition更新,
通过nodeDeleted处理事件对regionsInTransition移出
if (!am.waitOnRegionToClearRegionsInTransition(hri, regionAssignmentWaitTimeout)) {
// Wait here is to avoid log replay hits current dead server and incur a RPC timeout
// when replay happens before region assignment completes.
LOG.warn("Region " + hri.getEncodedName()
+ " didn't complete assignment in time");
}
} catch (InterruptedException ie) {
thrownewInterruptedIOException("Caught " + ie
+ " during waitOnRegionToClearRegionsInTransition");
}
}
// submit logReplay work
如果设置distributedLogReplay为true,此时region assign完成,执行log split,并等待split完成
this.services.getExecutorService().submit(
newLogReplayHandler(this.server, this.services, this.deadServers, this.serverName));
hasLogReplayWork = true;
}
} finally {
this.deadServers.finish(serverName);
}
if (!hasLogReplayWork) {
LOG.info("Finished processing of shutdown of " + serverName);
}
}
regionserver中处理splitlog
regionserver中通过regionserver启动时启动的SplitLogWorker线程,
通过其的run方法监听master在zk中生成splitWAL,一但master在zk中注册splitWAL路径成功,
执行taskLoop方法默认5s进行一次split log的检查(线程等待,timeout为5000ms),
通过nodeChildrenChanged来监听zk中splitWAL子路径的修改,并notify此线程,
通过nodeDataChanged来更新每一个SplitLogTask的状态更新,
publicvoidrun() {
try {
....................此处部分代码没有显示
// wait for master to create the splitLogZnode
intres = -1;
while (res == -1 && !exitWorker) {
try {
监听master对zk中splitWAL的注册
res = ZKUtil.checkExists(watcher, watcher.splitLogZNode);
} catch (KeeperExceptione) {
// ignore
LOG.warn("Exception when checking for " + watcher.splitLogZNode + " ... retrying", e);
}
if (res == -1) {
try {
....................此处部分代码没有显示
Thread.sleep(1000);
} catch (InterruptedException e) {
....................此处部分代码没有显示
exitWorker = true;
break;
}
}
}
if (!exitWorker) {
定期检查并启动执行split hlog的处理
taskLoop();
}
} catch (Throwable t) {
....................此处部分代码没有显示
} finally {
LOG.info("SplitLogWorker " + this.serverName + " exiting");
}
}
检查并执行split hlog
private void taskLoop() {
while (!exitWorker) {
intseq_start = taskReadySeq;
得到所有的需要进行log split的servername的路径
List<String> paths = getTaskList();
if (paths == null) {
LOG.warn("Could not get tasks, did someone remove " +
this.watcher.splitLogZNode + " ... worker thread exiting.");
return;
}
// pick meta wal firstly
首先定义一个先执行的servername hlog split的路径值,默认为随机取一个下标
如果要split的server中包含有meta的region,那么先从meta的server开始执行
intoffset = (int) (Math.random() * paths.size());
for(inti = 0; i < paths.size(); i ++){
if(HLogUtil.isMetaFile(paths.get(i))) {
offset = i;
break;
}
}
intnumTasks = paths.size();
for (inti = 0; i < numTasks; i++) {
计算执行顺序,从offset开始执行,如:paths.size()=6,offset=5,那么执行顺序为501234
intidx = (i + offset) % paths.size();
// don't call ZKSplitLog.getNodeName() because that will lead to
// double encoding of the path name
每一个server最大同时执行split hlog的task个数通过hbase.regionserver.wal.max.splitters配置,默认为2
得到现在活着的所有的regionserver列表,根据要split的server个数,
平均下来后计算此server最多要执行多少个splt task,
最多同时执行个数不超过hbase.regionserver.wal.max.splitters配置,每次执行tasksInProgress值加一
if (this.calculateAvailableSplitters(numTasks) > 0) {
如果此server还有能力执行split hlog task,
更新zk中splitWAL中此servername(待split)的SplitLogTask为SplitLogTask.Owned,
并把当前执行split的rs更新到zk中。生成HLogSplitterHandler实例,并启动线程执行此处理程序
把tasksInProgress的正在处理的splittask的值加一,见HLogSplitterHandler.process流程分析
等待500-1000ms在重新执行下一次分配,这样能保证其它的rs也能分配到任务
注意:此部分逻辑第一次执行此方法时不会执行,因为第一次执行时zk中splitWAL路径下可能为空,
直接进入下面部分,让此线程进入wait状态,等待nodeChildrenChanged来进行notify
grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
} else {
LOG.debug("Current region server " + this.serverName + " has "
+ this.tasksInProgress.get() + " tasks in progress and can't take more.");
break;
}
if (exitWorker) {
return;
}
}
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
synchronized (taskReadyLock) {
此次任务执行完成,zk中splitWAL在任务执行到此时还没有更新的rs下线被注册进来
while (seq_start == taskReadySeq) {
try {
线程进行等待状态,等待nodeChildrenChanged来进行notify
taskReadyLock.wait(checkInterval);
if (this.server != null) {
// check to see if we have stale recovering regions in our internal memory state
如果是设置有distributedLogReplay模式,此时在region open后才开始执行splitlog,
那么得到要进行splitlog的region列表。迭代每一个region,
从recovering-regions中检查是否此region需要splitlog,
如果recovering-regions中不存在此region,从rs中的recoveringRegions列表中移出此region
并设置此Hregion的recovering的值为false.
开始回到taskLoop方法的顶部,重新对这部分region进行splitlog
Map<String, HRegion> recoveringRegions = this.server.getRecoveringRegions();
if (!recoveringRegions.isEmpty()) {
// Make a local copy to prevent ConcurrentModificationException when other threads
// modify recoveringRegions
List<String> tmpCopy = newArrayList<String>(recoveringRegions.keySet());
for (String region : tmpCopy) {
String nodePath = ZKUtil.joinZNode(this.watcher.recoveringRegionsZNode, region);
try {
if (ZKUtil.checkExists(this.watcher, nodePath) == -1) {
HRegion r = recoveringRegions.remove(region);
if (r != null) {
r.setRecovering(false);
}
LOG.debug("Mark recovering region:" + region + " up.");
} else {
....................此处部分代码没有显示
break;
}
} catch (KeeperExceptione) {
....................此处部分代码没有显示
break;
}
}
}
}
} catch (InterruptedException e) {
....................此处部分代码没有显示
exitWorker = true;
return;
}
}
}
}
}
HlogSplitterHandler.process处理流程分析
HlogSplitterHandler是具体对hlog进行处理的handler,通过其传入的TaskExecutor.exec方法执行,
TaskExecutor是在SplitLogWorker实例生成时在构造方法中生成的一个匿名实现类,
public HLogSplitterHandler(final Server server, String curTask,
final MutableInt curTaskZKVersion,
CancelableProgressablereporter,
AtomicInteger inProgressTasks, TaskExecutorsplitTaskExecutor) {
设置EventType为RS_LOG_REPLAY
super(server, EventType.RS_LOG_REPLAY);
this.curTask = curTask;
this.wal = ZKSplitLog.getFileName(curTask);
this.reporter = reporter;
this.inProgressTasks = inProgressTasks;
把regionserver中执行split log 的task的值加一,表示占用一个执行位置
this.inProgressTasks.incrementAndGet();
this.serverName = server.getServerName();
this.zkw = server.getZooKeeper();
this.curTaskZKVersion = curTaskZKVersion;
见SplitLogWorker的构造方法最后一个参数
this.splitTaskExecutor = splitTaskExecutor;
}
public void process() throws IOException {
longstartTime = System.currentTimeMillis();
try {
执行split log的处理程序,见下面的SplitLog TaskExecutor.exec处理分析,并得到流程执行的返回状态
Statusstatus = this.splitTaskExecutor.exec(wal, reporter);
switch (status) {
caseDONE:
成功结束,调用endTask结束任务,
设置zk中splitWAL路径的servername中SplitLogTask的状态为SplitLogTask.Done
endTask(zkw, new SplitLogTask.Done(this.serverName),
SplitLogCounters.tot_wkr_task_done, curTask, curTaskZKVersion.intValue());
break;
casePREEMPTED:
如果split task是一个抢占的资源,不做处理
SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
LOG.warn("task execution prempted " + wal);
break;
caseERR:
执行过程错误,调用endTask结束任务,
设置zk中splitWAL路径的servername中SplitLogTask的状态为SplitLogTask.Err
if (server != null && !server.isStopped()) {
endTask(zkw, new SplitLogTask.Err(this.serverName),
SplitLogCounters.tot_wkr_task_err, curTask, curTaskZKVersion.intValue());
break;
}
// if the RS is exiting then there is probably a tons of stuff
// that can go wrong. Resign instead of signaling error.
//$FALL-THROUGH$
caseRESIGNED:
如果资源的task执行被放弃,调用endTask结束任务,
设置zk中splitWAL路径的servername中SplitLogTask的状态为SplitLogTask.Resigned
if (server != null && server.isStopped()) {
LOG.info("task execution interrupted because worker is exiting " + curTask);
}
endTask(zkw, new SplitLogTask.Resigned(this.serverName),
SplitLogCounters.tot_wkr_task_resigned, curTask, curTaskZKVersion.intValue());
break;
}
} finally {
LOG.info("worker " + serverName + " done with task " + curTask + " in "
+ (System.currentTimeMillis() - startTime) + "ms");
把regionserver中的split log task的值减一,表示有一个空闲的位置
this.inProgressTasks.decrementAndGet();
}
}
SplitLog TaskExecutor.exec处理分析:
public Status exec(String filename, CancelableProgressable p) {
Path rootdir;
FileSystemfs;
try {
rootdir = FSUtils.getRootDir(conf);
fs = rootdir.getFileSystem(conf);
} catch (IOException e) {
LOG.warn("could not find root dir or fs", e);
如果得到hdfs中/hbase目录出错或生成/hbase的FileSystem出错时,返回RESIGNED(放弃)
returnStatus.RESIGNED;
}
// TODO have to correctly figure out when log splitting has been
// interrupted or has encountered a transient error and when it has
// encountered a bad non-retry-able persistent error.
Try {
执行split log操作,生成一个SplitLogFile实例,并执行其splitLogFile方法,
方法执行返回true or false,执行过程中定期向zk中此hlog replay的路径发送心跳,如果心跳发送失败返回false
发送心跳的间隔通过hbase.splitlog.report.period配置,默认为hbase.splitlog.manager.timeout(120000)/3
发送心跳其实就是定期在zk中重新注册此servername,并得到上一次注册的version,
如果上一次version小于1表示 PREEMPTED(此server有资源抢占)
1.通过hbase.regionserver.hlog.splitlog.buffersize配置读取源hlog的buffer大小,默认为128*1024*1024
2.通过hbase.regionserver.hlog.splitlog.writer.threads配置OutputSink的写入线程个数
3.配置hbase.regionserver.wal.logreplay.batch.size,默认为64
4.如果distributedLogReplay设置为true,生成的OutputSink为HLogSplitter.LogReplayOutputSink/
否则生成HLogSplitter.LogRecoveredEditsOutputSink实例
5.通过hbase.hlog.split.skip.errors配置是否跳过split error,默认为false
6.通过hbase.splitlog.report.interval.loglines配置每次读取的行数,默认为1024
读取过程中如果hlog的entity的seqid小于region中的seqid或者cocovering-regions中存储的seqid,continue.
数据在output时,根据regionname,在regionname下创建一个recovered.edits目录,并写入hlog数据到此目录下
具体请参见HLogSplitter.splitLogFile方法源代码。
if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(newPath(rootdir, filename)),
fs, conf, p, sequenceIdChecker, watcher)) {
此server有资源抢占,主要是在zk上定期注册此server对hlog的split
returnStatus.PREEMPTED;
}
} catch (InterruptedIOException iioe) {
LOG.warn("log splitting of " + filename + " interrupted, resigning", iioe);
returnStatus.RESIGNED;
} catch (IOException e) {
Throwable cause = e.getCause();
if (einstanceof RetriesExhaustedException
&& (causeinstanceof NotServingRegionException
|| causeinstanceof ConnectException
|| causeinstanceof SocketTimeoutException)) {
LOG.warn("log replaying of " + filename + " can't connect to the target regionserver, "
+ "resigning", e);
returnStatus.RESIGNED;
} elseif (causeinstanceof InterruptedException) {
LOG.warn("log splitting of " + filename + " interrupted, resigning", e);
returnStatus.RESIGNED;
} elseif(causeinstanceofKeeperException) {
LOG.warn("log splitting of " + filename + " hit ZooKeeper issue, resigning", e);
returnStatus.RESIGNED;
}
LOG.warn("log splitting of " + filename + " failed, returning error", e);
returnStatus.ERR;
}
returnStatus.DONE;
}
}
SplogLogWorker.nodeDataChanged方法中监听到zk的状态修改时,如果状态不是如下状态是,调用stopTask结束线程
String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) {
// have to compare data. cannot compare version because then there
// will be race with attemptToOwnTask()
// cannot just check whether the node has been transitioned to
// UNASSIGNED because by the time this worker sets the data watch
// the node might have made two transitions - from owned by this
// worker to unassigned to owned by another worker
if (! slt.isOwned(this.serverName) &&
! slt.isDone(this.serverName) &&
! slt.isErr(this.serverName) &&
! slt.isResigned(this.serverName)) {
LOG.info("task " + taskpath + " preempted from " +
serverName + ", current task state and owner=" + slt.toString());
stopTask();
}
}
结束线程的执行过程
void stopTask() {
LOG.info("Sending interrupt to stop the worker thread");
worker.interrupt(); // TODO interrupt often gets swallowed, do what else?
}
SplitLogManager.nodeDataChanged流程分析
regionserver中执行split log操作,并根据执行情况修改zk中splitWAL中SplitLogTask的状态。
SplitLogManager.nodeDataChanged在master端对zk中splitWAL进行监听,
从tasks列表中找到对应修改的task,把task的状态从IN_PROGRESS修改为SUCCESS,
设置task对应的TaskBatch的done或error的值加一。调用TaskBatch.notify方法叫醒线程的等待。
在waitForSplittingCompletion方法中会每执行一次检查把TaskBatch.wait,因此需要对其做notify
Region open数据重播分析
HregionServer.openRegion-->OpenRegionHandler.process-->openRegion-->
Hregion.openRegion-->生成HRegion实例,并调用实例的r.openHRegion(reporter)-->initialize
-->initializeRegionInternals-->initializeRegionStores-->replayRecoveredEditsIfAny
注意:日志重播时传入的每一个store中最大的seqid是不包含bluk load的hfile的seqid,
而region open时得到并计算next sequence id的所有store中最大的seqid是包含bluk load的hfile的seqid
protectedlongreplayRecoveredEditsIfAny(final Path regiondir,
Map<byte[], Long> maxSeqIdInStores,
finalCancelableProgressablereporter, finalMonitoredTaskstatus)
throws UnsupportedEncodingException, IOException {
取出所有的store中flush到磁盘上的所有store中最小的一个seqid
longminSeqIdForTheRegion = -1;
for (Long maxSeqIdInStore : maxSeqIdInStores.values()) {
if (maxSeqIdInStore < minSeqIdForTheRegion || minSeqIdForTheRegion == -1) {
minSeqIdForTheRegion = maxSeqIdInStore;
}
}
longseqid = minSeqIdForTheRegion;
FileSystemfs = this.fs.getFileSystem();
取出region目录下recovered.edits子路径下所有的文件,但不包含结尾是.temp的文件,并根据文件名称排序返回
hlog在region下的文件名称是此文件对应的最大seqid,也就是按seqid从小到大排序。
NavigableSet<Path> files = HLogUtil.getSplitEditFilesSorted(fs, regiondir);
if (LOG.isDebugEnabled()) {
LOG.debug("Found " + (files == null ? 0 : files.size())
+ " recovered edits file(s) under " + regiondir);
}
没有需要重播的日志文件,直接返回当前所有的store中最小的seqid,如果是表示不需要进行replay
if (files == null || files.isEmpty()) returnseqid;
for (Path edits: files) {
检查日志文件是否存在
if (edits == null || !fs.exists(edits)) {
LOG.warn("Null or non-existent edits file: " + edits);
continue;
}
检查文件大小是否为空,如果是空文件直接删除,如果是表示不需要进行replay
if (isZeroLengthThenDelete(fs, edits)) continue;
longmaxSeqId = Long.MAX_VALUE;
String fileName = edits.getName();
检查此文件中最大的seqid是否小于region是所有store中最小的seqid,如果是表示此文件不需要进行replay
maxSeqId = Math.abs(Long.parseLong(fileName));
if (maxSeqId <= minSeqIdForTheRegion) {
if (LOG.isDebugEnabled()) {
String msg = "Maximum sequenceid for this log is " + maxSeqId
+ " and minimum sequenceid for the region is " + minSeqIdForTheRegion
+ ", skipped the whole file, path=" + edits;
LOG.debug(msg);
}
continue;
}
try {
得到replay的edits中每一个kv,并根据kv得到对应的store,
如果kv中的seqid小于store中最大的seqid,此kv不需要replay,
否则把kv添加到store中,得到添加的kvsize,把size添加到:
a.RegionServerAccounting.replayEditsPerRegion中对应的region的大小中,
表示此region中replay的memory使用情况
b.RegionServerAccounting.atomicGlobalMemstoreSize中,表示全局的memstore使用情况
c.添加到此region的memstore中,HRegion.memstoreSize,表示当前region的memory使用情况
d.检查memstore是否达到flush的值,通过hbase.hregion.memstore.flush.size配置,默认1024*1024*128L
如果达到memstore的flush值,对memstore进行flush
f.返回最新的seqid
seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);
} catch (IOException e) {
出现replay错误,检查hbase.hregion.edits.replay.skip.errors是否配置为true
老版本使用hbase.skip.errors进行配置,默认值为false,表示不跳过error
booleanskipErrors = conf.getBoolean(
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
conf.getBoolean(
"hbase.skip.errors",
HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS));
if (conf.get("hbase.skip.errors") != null) {
LOG.warn(
"The property 'hbase.skip.errors' has been deprecated. Please use " +
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead.");
}
如果配置有跳过replay error,把此edits文件重命名为editsname.systime,并remove到region的根目录下
if (skipErrors) {
Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
+ "=true so continuing. Renamed " + edits +
" as " + p, e);
} else {
throwe;
}
}
}
// The edits size added into rsAccounting during this replaying will not
// be required any more. So just clear it.
把RegionServerAccounting.replayEditsPerRegion中此region对应的replay kvsize清空
if (this.rsAccounting != null) {
this.rsAccounting.clearRegionReplayEditsSize(this.getRegionName());
}
如果进行了replay,那么当前replay后的seqid一定是大于原来的store的seqid,强制对region进行flush
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, status);
}
// Now delete the content of recovered edits. We're done w/ them.
删除region下所有的recovered.edits下的文件
for (Path file: files) {
if (!fs.delete(file, false)) {
LOG.error("Failed delete of " + file);
} else {
LOG.debug("Deleted recovered.edits file=" + file);
}
}
returnseqid;
}
distributedLogReplay为true的日志重播
通过hbase.master.distributed.log.replay配置的值为true时,在splitLog时,
生成HLogSplitter实例时OutputSink的实现会选择HLogSplitter.LogReplayOutputSink,
此实现不经过recovered.edits目录,直接把数据replay到region中。具体实现请查看相关源代码
相关推荐
流式架构由于取消ETL操作,所以数据的处理效率非 常高,但是由于没有了数据批处理,没有很好的支撑数据统计和重播,不利于离线进行 数据分析。 (三)Lambda数据分析结构 在大数据分析系统中Lambda架构是比较重要的...
此外,重播日志文件是紧凑且可共享的,允许进行可重复的实验。 例如,FreeBSD的90亿个指令启动仅用几百MB来表示。 PANDA利用QEMU对13种不同CPU架构的支持,使得可以在LLVM IR中对这些不同的指令集进行分析。 这样,...
它将来自黑盒查看器的解析逻辑包装到一个简单的Node.js EventEmitter中,以便多个侦听器可以在重播日志时对其进行分析。入门安装node.js和npm。 运行npm install安装依赖项。 运行grunt将ES6转换为Node友好的...
RVPlayer:“假设分析”重播的机器人车辆取证 介绍 RVPlayer是一种机器人车辆(RV)取证工具,它支持使用模拟器内部的假设分析进行重放,以取代昂贵的基于现场测试的取证工具。 它具有高效的需求驱动的自适应日志...
此外,重播日志文件是紧凑且可共享的,允许进行可重复的实验。 例如,FreeBSD的90亿个指令启动仅用几百MB来表示。 PANDA利用QEMU对13种不同CPU架构的支持,使得可以在LLVM IR中对这些不同的指令集进行分析。 这样,...
看到一个抽象的和分析 这个模块是什么? 该库主要实现共识算法和与之交互的API。 它完全围绕 API 的概念实现。 这意味着您只能使用您想要和自定义的实现部分: 运输机制。 (默认情况下直接 TCP 并通过本地网络...
Twitch聊天日志分析器顾虑Twitch VOD停留60天脚步设置Twitch API-完成创建Twitch API的凭证文件拨打oauth电话打个电话获取VOD ID-完成使用“获取VOD ID 看一个样品要求从过去的广播中提取视频ID(可能在url字段中) ...
14.6.5 重播跟踪 第15章 备份和还原SQL Server 2008 15.1 创建备份和还原计划 15.1.1 开始备份和还原计划 15.1.2 规划镜像与镜像数据库的备份 15.1.3 规划被复制数据库的备份 15.1.4 规划大型数据库的备份 15.1.5 ...
14.6.5 重播跟踪 第15章 备份和还原SQL Server 2008 15.1 创建备份和还原计划 15.1.1 开始备份和还原计划 15.1.2 规划镜像与镜像数据库的备份 15.1.3 规划被复制数据库的备份 15.1.4 规划大型数据库的备份 15.1.5 ...
14.6.5 重播跟踪 第15章 备份和还原SQL Server 2008 15.1 创建备份和还原计划 15.1.1 开始备份和还原计划 15.1.2 规划镜像与镜像数据库的备份 15.1.3 规划被复制数据库的备份 15.1.4 规划大型数据库的备份 15.1.5 ...
提供实际场景案例分析和故障诊断实验 SQL Server专家的呕心力作,数据库管理员的实战宝典,全面、深入地剖析SQL Server2008新特性,结构独特,实例丰富,操作性强。 编辑本段 目录 第Ⅰ部分 SQL Server 2008...
提供实际场景案例分析和故障诊断实验 SQL Server专家的呕心力作,数据库管理员的实战宝典,全面、深入地剖析SQL Server2008新特性,结构独特,实例丰富,操作性强。 编辑本段 目录 第Ⅰ部分 SQL Server 2008...
提供实际场景案例分析和故障诊断实验 SQL Server专家的呕心力作,数据库管理员的实战宝典,全面、深入地剖析SQL Server2008新特性,结构独特,实例丰富,操作性强。 编辑本段 目录 第Ⅰ部分 SQL Server 2008...
提供实际场景案例分析和故障诊断实验 SQL Server专家的呕心力作,数据库管理员的实战宝典,全面、深入地剖析SQL Server2008新特性,结构独特,实例丰富,操作性强。 编辑本段 目录 第Ⅰ部分 SQL Server 2008...
这不会重播日志,因此将不会检索未提交的数据。 数据可以同时包含干净和不干净的关闭日志。 取决于NAND的转储方式,可能无法对数据位进行纠错。 套接字文件将被忽略,您可以更改ubireader / settings.py使其在其...
matlab用于数据分析和图形生成的Matlab代码。 还包含生成的数字。 publishing -一些与出版有关的材料,例如徽标和海报。 replays -从训练DQN代理几个小时后重播文件。 scripts -用于执行诸如处理日志数据之类的次要...
报告-提供有关用户界面交互执行的反馈,包括用户界面交互失败的详细日志记录和屏幕快照的可视回归分析。 为什么要安装记录/重播? 在开发过程中,尤其是在部署到生产之前,您想知道应用程序的所有关键功能确实有效。...
也有分析认为,谷歌并不想做一个简单的手机终端制造商或者软件平台开发商,而意在一统传统互联网和 移 动互联网。----------------------------------- Android 编程基础 4 Android Android Android Android 手机新...