`
hongs_yang
  • 浏览: 59973 次
  • 性别: Icon_minigender_1
  • 来自: 西安
社区版块
存档分类
最新评论

HMBASE的REGION分配

阅读更多

HMBASEREGION分配

Region assign分为meta的分配与userregion assign,同时包含hbase启动时与rs下线,因此从4个方面来说明regionassign

启动时的metaregion assign

针对master启动时的补充说明:

Hmaster.run.finishInitialization方法中:

得到WALs目录下所有子目录,如果WALs目录下的日志目录的名称为ServerName-splitting,去掉-splitting部分

log目录格式:hdfs://<namenode>/hbase/WALs/<servername>-splitting/...

orhdfs://<namenode>/hbase/WALs/<servername>/...

取出日志目录的ServerName,检查现在masterServerManager.onlineServers中不包含的ServerName,

并返回这些个ServerName列表

通过hbase.hlog.split.skip.errors配置如果在加载log目录时出现错误(IOException)是否跳过重试,

默认为false,也就是需要重试

通过hbase.hlog.split.failure.retry.interval配置重试的间隔,默认为30*1000ms

主要作用metaregion log split

Set<ServerName>previouslyFailedServers=this.fileSystemManager

.getFailedServersFromLogFolders();

 

//remove stale recovering regions from previous run

通过SplitLogManagerzk中的路径下删除过期的replay日志路径

如果分布式日志重播hbase.master.distributed.log.replay配置为false,

直接删除recovering-regions路径(zookeeper.znode.recovering.regions配置)下的内容,

并结束此方法调用

如果分布式日志重播hbase.master.distributed.log.replay配置为true,

得到zookeeper.znode.splitlog配置的路径下的所有子路径(存放的是splittask名称),默认为splitWAL

每一个splittask的任务名称的最后一个”/”线后面部分是WALsServerNameURLDecoder.encode

读取并检查子路径下的内容,转换成SplitLogTask任务检查是否ZooKeeperProtos.SplitLogTask.State.DONE状态

如果不是,把此splittask对应的ServerName添加到方法开始时定义的列表中

读取recovering-regions路径(zookeeper.znode.recovering.regions配置)下的所有子路径,

每一个子路径通过regionencodename命名,同时检查每一个region下是否包含没重播日志的server子路径

检查没重播日志的server是否在刚才记录的servers

(调用此方法传入的servers(未启动)splittaskDONE状态的servers)中,

如果不包含,删除此regionsplitlog路径,并同时删除下面的子路径

this.fileSystemManager.removeStaleRecoveringRegionsFromZK(previouslyFailedServers);

 

//log splitting for hbase:meta server

通过metaRegionTracker在配置zookeeper.znode.metaserver的路径下

(默认meta-region-server)得到上一次metaregion运行的servername,

如果servername不为空(也就是cluster不是第一次运行),同时未启动的server列表中包含上一次运动的server

metaregion进行splitlog操作

splitlog先检查分布式日志重播hbase.master.distributed.log.replay配置,

如果配置为true,recovering-regions路径(zookeeper.znode.recovering.regions配置)

并在路径下创建此metaregionencodename的子路径,

同时在region子路径下把上一次运行metaregionserver也创建为子路径

示例:1->/hbase/recovering-regions/regionname,2->/hbase/recovering-regions/regionname/servername

如果配置为false,首先得到/hbase/WALs/servername路径重命名为/hbase/WALs/servername-splitting

zookeeper.znode.splitlog配置的路径下,默认为splitWAL,生成servername-splitting子路径

并在生成的路径下写入生成的SplitLogTask数据,

SplitLogManager.tasks中添加到一个Task,keysplitlogpath,value为此task,

同时在SplitLogManager.deadWorkers中添加此server(要做splitlog),等待task执行完成,

针对split的具体分析请点击查看hbasereplay分析

ServerNameoldMetaServerLocation=this.catalogTracker.getMetaLocation();

if(oldMetaServerLocation!=null&&previouslyFailedServers.contains(oldMetaServerLocation)){

splitMetaLogBeforeAssignment(oldMetaServerLocation);

//Note: we can't remove oldMetaServerLocation frompreviousFailedServers list because it

//may also host user regions

}

得到recovering-regions路径(zookeeper.znode.recovering.regions配置)下的metaregion的所有server

这些server表示需要日志重播,针对metaregion,主要作用在对metalogsplit

Set<ServerName>previouslyFailedMetaRSs=getPreviouselyFailedMetaServersFromZK();

 

this.initializationBeforeMetaAssignment=true;

 

//initializeload balancer

初始化loadbalancer

this.balancer.setClusterStatus(getClusterStatus());

this.balancer.setMasterServices(this);

this.balancer.initialize();

 

//Make sure meta assigned before proceeding.

status.setStatus("AssigningMeta Region");

执行metaassign操作

assignMeta(status);



Hmaster启动时执行metaassign

启动时hmaster.run.finishInitialization调用assignMeta方法



voidassignMeta(MonitoredTaskstatus)

throwsInterruptedException, IOException, KeeperException {

//Work on meta region

通过hbase.catalog.verification.timeout配置超时时间,默认为1000ms

intassigned =0;

longtimeout =this.conf.getLong("hbase.catalog.verification.timeout",1000);

status.setStatus("Assigninghbase:meta region");

ServerNamelogReplayFailedMetaServer=null;

 

得到AssignmentManager中生成的RegionStates实例,此实例中保存所有region的相关状态

RegionStatesregionStates=assignmentManager.getRegionStates();

 

添加metaregionregionStates中,设置metaregion的状态为offline或者split(如果需要split)

regionStates.createRegionState(HRegionInfo.FIRST_META_REGIONINFO);

 

 

第一步:

执行processRegionInTransition

通过zookeeper.znode.unassigned配置的路径,默认为region-in-transition

检查region是否在此路径下存在子路径,不存在表示region不需要事物处理,直接返回方法调用返回false,

否则执行如下流程:

此时表示/hbase/region-in-transition/region-name有值,取出路径下的值,转换成RegionTransition对象

检查regionStates中的regionsInTransition列表中是否已经包含此region

如果包含表示region已经在做transition,直接返回true,否则执行如下流程

a1.检查ServerManager.onlineServers如果不包含此RegionTransition对象所在的server时,

a2.接下来检查regionRegionStatesregionsInTransition中是否不包含,并在regionAssignments中包含此region

a3.更新region的状态为offline,regionStates中得到region原来的RegionStateServerName

并修改RegionState状态为offline添加到RegionStates.regionsInTransitionregionStates

a3.1regionsInTransition中移出此region

a3.2regionAssignments中移出此region,此处返回为region所有的server

a3.3检查serverHoldingsserver所分配的所有region中是否包含a3.2中移出此region所在的server

a3.4serverHoldings得到的所有的此server对应的region中移出此region

a3.5检查此server对应的region是否为空(server中不在包含分配的region),在serverHoldings移出此server

b1.重新把region/RegionTransition对应的ServerRegionState状态设置为offline

b2.如果regionmetaregion,把server设置到meta-region-server路径的值中

b3.如果region不是meta,把regionserver更新到RegionStates.lastAssignments

添加到serverregion的最后一次分配server

b4.如果serverServerManager中还不在isServerDead中存在,

通过ServerManager.expireServer设置此server下线

c.返回false.

根据RegionTransition实例的EventType操作分配流程,并返回true给方法调用:

通过AssignmentManager.assign去创建/hbase/region-in-transition/region-name

通过RegionPlan来实现REGION分配的RS选择,在ASSIGN时,通过balancer来控制选择那一个RS

调用RSopenRegion方法,执行region分配,

master这端通过AssignmentManager.nodeDataChanged来监听rsregionasign状态修改

RS实现AdminProtos.AdminService.BlockingInterface接口

待分析

第二步:

检查processRegionInTransition方法返回结果,如果是false,不做处理,直接返回false.

否则表示processRegionInTransition方法返回为true,一直等待regiontransition的处理完成。并返回true

booleanrit =this.assignmentManager

.processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);

 

通过RSRPC接口调用getRegionInfo方法,检查RS中是否有META的注册REGION,如果上一步返回false,

此调用也null,也就是false,因为上一步调用返回false表示没有分配meta,

因此通过getRegionInfo调用时meta还没有注册,因此返回null,否则分配到RS成功,此处返回也就是true

booleanmetaRegionLocation=this.catalogTracker.verifyMetaRegionLocation(timeout);

 

得到metaregion最后一次分配的RS,如果上一步返回false,此时拿到的RS不是最新的RS

ServerNamecurrentMetaServer=this.catalogTracker.getMetaLocation();

如果metaregion没有分配成功

if(!metaRegionLocation){

assigned++;

此时rit表示不需要regiontransition

if(!rit) {

如果上次分配METARS不为空,表示这个RS需要日志split,执行splitlog,并重新执行assignMeta

if(currentMetaServer!=null){

if(serverManager.isServerOnline(currentMetaServer)){

LOG.info("Forcingexpire of " + currentMetaServer);

serverManager.expireServer(currentMetaServer);

}

splitMetaLogBeforeAssignment(currentMetaServer);

if(this.distributedLogReplay){

logReplayFailedMetaServer=currentMetaServer;

}

}

assignmentManager.assignMeta();

}

}else{

//Region already assigned. We didn't assign it. Add to in-memory state.

Region分配成功,设置region状态为open

RegionStates.lastAssignments中添加到此region对应的server

regionsInTransition中添加region状态为open

regionStates中添加region状态为open

regionStates.updateRegionState(

HRegionInfo.FIRST_META_REGIONINFO,State.OPEN,currentMetaServer);

 

RegionStates.regionAssignments添加region对应的server

RegionStates.serverHoldings中指定server中添加一个openregion

this.assignmentManager.regionOnline(

HRegionInfo.FIRST_META_REGIONINFO,currentMetaServer);

}

 

启动metatable

enableMeta(TableName.META_TABLE_NAME);

.........此处省去一些处理代码

}

 

master中监听rsregionopen的状态修改:

通过AssignmentManager.nodeDataChanged,

rs把状态修改为opening(RS_ZK_REGION_OPENING)时,更新RegionStates中此region的状态为opening

rs把状态修改为openedRS_ZK_REGION_OPENED)时,更新RegionStates中此region的状态为open

同时生成一个OpenedRegionHandler删除zk中此regiontransition,具体请自行查看代码。

 

 

regionserver启动时执行metaassign

Master通过AssignmentManager.assignMeta

publicOpenRegionResponseopenRegion(finalRpcControllercontroller,

finalOpenRegionRequestrequest)throwsServiceException {

.........此处省去一些处理代码

OpenRegionResponse.Builderbuilder =OpenRegionResponse.newBuilder();

finalintregionCount=request.getOpenInfoCount();

finalMap<TableName,HTableDescriptor>htds =

newHashMap<TableName,HTableDescriptor>(regionCount);

finalbooleanisBulkAssign=regionCount> 1;

for(RegionOpenInforegionOpenInfo:request.getOpenInfoList()){

finalHRegionInforegion= HRegionInfo.convert(regionOpenInfo.getRegion());

 

intversionOfOfflineNode= -1;

if(regionOpenInfo.hasVersionOfOfflineNode()){

versionOfOfflineNode=regionOpenInfo.getVersionOfOfflineNode();

}

HTableDescriptorhtd;

try{

检查提交的region是否在rs中的onlineRegions中存在,

finalHRegiononlineRegion=getFromOnlineRegions(region.getEncodedName());

if(onlineRegion!=null){

.........此处省去一些处理代码

如果region已经存在,meta表中得到此region对应的server

Pair<HRegionInfo,ServerName>p =MetaReader.getRegion(

this.catalogTracker,region.getRegionName());

if(this.getServerName().equals(p.getSecond())){

检查regiontransition是否完成,

BooleanclosingregionsInTransitionInRS.get(region.getEncodedNameAsBytes());

.........此处省去一些注释

if(!Boolean.FALSE.equals(closing)

&&getFromOnlineRegions(region.getEncodedName())!=null){

.........此处省去一些日志打印代码

如果regiontransitionn为非false,

同时onlineRegions中包含此region,设置response为已经打开的region

builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);

continue;

}

}else{

.........此处省去一些日志打印代码

否则从onlineRegions中移出此region

regionFavoredNodesMapregion转换地址列表中移出此region

removeFromOnlineRegions(onlineRegion,null);

}

}

LOG.info("Open" +region.getRegionNameAsString());

htd=htds.get(region.getTable());

if(htd ==null){

htd=this.tableDescriptors.get(region.getTable());

htds.put(region.getTable(),htd);

}

 

finalBooleanpreviousthis.regionsInTransitionInRS.putIfAbsent(

region.getEncodedNameAsBytes(),Boolean.TRUE);

 

.........此处省去一些代码

 

//We are opening this region. If it moves back and forth for whateverreason, we don't

//want to keep returning the stale moved record while we are opening/ifwe close again.

movedRegions列表中移出此region

removeFromMovedRegions(region.getEncodedName());

 

if(previous==null){

//check if the region to be opened is marked in recovering state in ZK

如果此region有需要日志重播,添加到recoveringRegions列表中

if(this.distributedLogReplay

&&SplitLogManager.isRegionMarkedRecoveringInZK(this.getZooKeeper(),

region.getEncodedName())){

this.recoveringRegions.put(region.getEncodedName(),null);

}

//If there is no action in progress, we can submit a specific handler.

//Need to pass the expected version in the constructor.

执行metaregion的打开操作。通过handler.process方法

通过hbase.assignment.timeout.management配置是否需要assign超时管理,默认为false

通过hbase.master.assignment.timeoutmonitor.timeout配置assign超时时行,默认为600000ms=10minute

检查是否在rs中的onlineRegions中存在,如果存在,不在向下执行

检查region是否开启RIT(也就是在rs中的regionsInTransitionInRSregiontrue),如果没有开启,不在向下执行

zkregion的状态从offline更新为opening

执行openRegion();方法操作,得到一个HRegion实例,rsOpenRegionHandleropenRegion()方法分析

如果regionrecoveringRegions列表中存在值,更新region.setRecoveringtrue,

HRegion实例添加到recoveringRegions对应的regionvalue中。需要做日志重播的region实例

调用updateMeta(finalHRegionr)更新meta(meta表更新zk),此方法调用会一直等待,直到openRegion完成。

UpdateMeta会通过一个线程支调用rspostOpenDeployTasks方法,检查store是否需要compact,并发起compact请求

得到region中所有的store中最小的seqid的值,检查zkrecoveringseqid是否小过此值,如果是更新zk中的seqid

如果是metaregion,更新zkmetaserver地址为当前server的地址。

如果是用户region,更新meta表中此region现在对应的server,与打开regionopenSeqNum(当前最大的storeseqid+1).

通过handler.transitionToOpened设置region的状态从openingopened

region添加到onlineRegions列表中,移出regionsInTransitionInRS中的此region

设置response的响应为opend

if(region.isMetaRegion()){

this.service.submit(newOpenMetaHandler(this,this,region,htd,

versionOfOfflineNode));

}else{

用户region的打开处理

updateRegionFavoredNodesMapping(region.getEncodedName(),

regionOpenInfo.getFavoredNodesList());

this.service.submit(newOpenRegionHandler(this,this,region,htd,

versionOfOfflineNode));

}

}

 

builder.addOpeningState(RegionOpeningState.OPENED);

 

}catch(KeeperExceptionzooKeeperEx){

.........此处省去一些处理代码

}catch(IOExceptionie){

.........此处省去一些处理代码

}

}

 

returnbuilder.build();

}

 

rsOpenRegionHandleropenRegion()方法分析:

此方法中生成HRegion实例,此实例的类通过hbase.hregion.impl进行配置,默认为HRegion

执行HRegion.openRegion方法,initialize-->initializeRegionInternals

此方法会调用并加载region中所有的store,并得到所有store中最大的storeseqid,

把最大的seqid+1就表示现在regionopenSeqNum

store(columnfamily)的加载通过一个独立的线程池去完成,

这个线程池的大小通过hbase.hstore.open.and.close.threads.max进行配置,默认为1

每一个store加载时生成HStore实例,生成HStore实例时会加载此cf下所有的storefile,

storefile的加载线程个数:

a.通过hbase.hstore.open.and.close.threads.max进行配置,默认为1

b.取出store中所有的storefile个数,

a/b=storefile的加载线程个数

 

 

启动时的userregion assign

master端的处理

Hmaster.run.finishInitialization方法中:

 

//Fix up assignment manager status

status.setStatus("Startingassignment manager");

执行userregion的分配操作,

this.assignmentManager.joinCluster();

 

//setcluster status again after user regions are assigned

this.balancer.setClusterStatus(getClusterStatus());

 

if(!masterRecovery){

//Start balancer and meta catalog janitor after meta and regions have

//been assigned.

status.setStatus("Startingbalancer and catalog janitor");

this.clusterStatusChore=getAndStartClusterStatusChore(this);

loadbalance的定时线程,通过hbase.balancer.period进行配置,默认值300000ms

this.balancerChore=getAndStartBalancerChore(this);

this.catalogJanitorChore=newCatalogJanitor(this,this);

startCatalogJanitorChore();

}

 

AssignmentManager.joincluster()方法:

voidjoinCluster()throwsIOException,

KeeperException,InterruptedException {

.........此处省去一些注释

a.得到zk中通过zookeeper.znode.tableEnableDisable配置的地址,默认为table路径下的所有ENABLINGtable.

b.得到zk中通过zookeeper.znode.tableEnableDisable配置的地址,默认为table路径下的所有DISABLEDtable.

c.得到zk中通过zookeeper.znode.tableEnableDisable配置的地址,默认为table路径下的所有DISABLINGtable.

代码如下:

Set<TableName>enablingTables = ZKTable.getEnablingTables(watcher);

Set<TableName>disabledOrEnablingTables = ZKTable.getDisabledTables(watcher);

disabledOrEnablingTables.addAll(enablingTables);

Set<TableName>disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);

disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);

d.通过MetaReader.fullScanmeta下所有的region全部scan出来,

metascancache通过hbase.meta.scanner.caching配置,默认100

e.通过ServerManager得到所有的onlineServer,

f.如果onlineServer列表中不包含metaregionserver地址,

添加此region到一个key=servername,value=list<region>map容器中。

g.如果region对应的table不是disabled/disabling的表,设置tableenabled

h.如果table不是disabled/disabling的表,同时region所在的serveronlineserver

设置region的状态为open,同时把region添加到RegionStates.onlineRegions列表中。

j.返回f中生成的map,这些个map中的region表示server还没有启动起来,需要重新对region进行分配。

Map<ServerName,List<HRegionInfo>>deadServers=rebuildUserRegions();

 

//This method will assign all user regions if a clean server startup or

//it will reconstruct master state and cleanup any leftovers from

//previous master process.

1.得到region-in-transition路径下所有目前有transitionregionpath,

2.检查是否有server下线或请求下线,启动时正常为false.

3.2的检查为false的前提下,检查regionStates是否有已经分配的,metaregion,如果不包含为false

4.2/3的检查为false的前提下,检查1中的所有regionpath是否有非metaregionpath,如果不包含为false

5.如果2/3/4任何一个检查为true,调用processDeadServersAndRecoverLostRegions处理region的分配。

6.否则调用assignAllUserRegions,此处我们分析这个部分。一个新的集群启动

7.通过hbase.master.startup.retainassign配置的值检查是否需要保留分配,默认为true

8.如果7true,调用assign(Map),否则调用assign(List)

9.检查region的个数是否超过可以批量assign的条件,同时检查server个数是否超过批量assign的条件

region的批量条件通过hbase.bulk.assignment.threshold.regions配置,默认为7

server的批量条件通过hbase.bulk.assignment.threshold.servers配置,默认为3

10.迭代执行每一个regionzk中的region-in-transition路径下创建子路径。设置region状态为PENDING_OPEN

11.调用rs中的openRegion

12.在调用RSopenRegion的过程中会有重试,如果失败,

最大超时时间通过hbase.regionserver.rpc.startup.waittime配置,默认为60000ms

processDeadServersAndRegionsInTransition(deadServers);

 

recoverTableInDisablingState();

recoverTableInEnablingState();

}

 

rs中处理userregionopen操作:

publicOpenRegionResponseopenRegion(finalRpcControllercontroller,

finalOpenRegionRequestrequest)throwsServiceException {

try{

checkOpen();

}catch(IOExceptionie){

thrownewServiceException(ie);

}

requestCount.increment();

OpenRegionResponse.Builderbuilder =OpenRegionResponse.newBuilder();

finalintregionCount=request.getOpenInfoCount();

finalMap<TableName,HTableDescriptor>htds =

newHashMap<TableName,HTableDescriptor>(regionCount);

finalbooleanisBulkAssign=regionCount> 1;

for(RegionOpenInforegionOpenInfo:request.getOpenInfoList()){

.........此处省去一些处理代码,这部分代码与metaregionopen相同

if(region.isMetaRegion()){

this.service.submit(newOpenMetaHandler(this,this,region,htd,

versionOfOfflineNode));

}else{

用户region的分配

通过handler.process方法

通过hbase.assignment.timeout.management配置是否需要assign超时管理,默认为false

通过hbase.master.assignment.timeoutmonitor.timeout配置assign超时时行,默认为600000ms=10minute

检查是否在rs中的onlineRegions中存在,如果存在,不在向下执行

检查region是否开启RIT(也就是在rs中的regionsInTransitionInRSregiontrue),如果没有开启,不在向下执行

zkregion的状态从offline更新为opening

执行openRegion();方法操作,得到一个HRegion实例,rsOpenRegionHandleropenRegion()方法分析

如果regionrecoveringRegions列表中存在值,更新region.setRecoveringtrue,

HRegion实例添加到recoveringRegions对应的regionvalue中。需要做日志重播的region实例

调用updateMeta(finalHRegionr)更新meta(用户表更新meta表的数据),此方法调用会一直等待,直到openRegion完成。

UpdateMeta会通过一个线程支调用rspostOpenDeployTasks方法,检查store是否需要compact,并发起compact请求

得到region中所有的store中最小的seqid的值,检查zkrecoveringseqid是否小过此值,如果是更新zk中的seqid

如果是metaregion,更新zkmetaserver地址为当前server的地址。

如果是用户region,更新meta表中此region现在对应的server,与打开regionopenSeqNum(当前最大的storeseqid+1).

通过handler.transitionToOpened设置region的状态从openingopened

region添加到onlineRegions列表中,移出regionsInTransitionInRS中的此region

设置response的响应为opend

updateRegionFavoredNodesMapping(region.getEncodedName(),

regionOpenInfo.getFavoredNodesList());

this.service.submit(newOpenRegionHandler(this,this,region,htd,

versionOfOfflineNode));

}

}

 

builder.addOpeningState(RegionOpeningState.OPENED);

 

}catch(KeeperExceptionzooKeeperEx){

......................................

}

 

returnbuilder.build();

}

 

 

RS下线的regionassign





分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics