- 浏览: 137368 次
- 性别:
- 来自: 杭州
文章分类
最新评论
-
SINCE1978:
还没细看,似乎取材自一本书《scala与clojure设计模式 ...
Scala设计模式 -
HowKeyond:
补充,我代码中监听了session失效事件,并做了重连,但重连 ...
ZK Timeout再讨论 -
HowKeyond:
请问这一般是什么原因引起的呢?怎么解决?我被这个问题困扰几个星 ...
ZK Timeout再讨论 -
chenchao051:
耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 耶 ...
回答一位网友对Scala的提问 -
dogstar:
唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.唉.
回答一位网友对Scala的提问
首先肯定是HMaster通过ZK发现某RS挂掉了,HMaster使用private ServerManager serverManager来处理rs的信息:
接下来走到ServerShutdownHandler, 继承于 EventHandler, 最终走到process方法
split log的相关代码:
分配region的时候:
最后,regionserver会执行openRegionHandler来打开这个region, 这是一个异步的过程。这里面其实还涉及到了region在zk中的状态变化,包括在打开region之前的HLog恢复,恢复过程和正常的向HBase写数据一样,都会先写到memstore中,最后需要更新meta表。
当然了,如果region带有meta或者root信息,那处理方式也会变得不太一样。
public synchronized void expireServer(final HServerInfo hsi) { // 首先从onlineServers中获取HServerInfo // <hostname> , <port> , <startcode> String serverName = hsi.getServerName(); HServerInfo info = this.onlineServers .get(serverName); //假如info为空,则抛出server不在线的警告并且返回 if (info == null) { LOG.warn( "Received expiration of " + hsi.getServerName() + " but server is not currently online" ); return; } //这个作者都怀疑是否会发生这样的情况了,我也想不出什么情况下会发生满足一下if条件的情景 if (this. deadservers.contains(serverName)) { // TODO : Can this happen? It shouldn't be online in this case? LOG.warn( "Received expiration of " + hsi.getServerName() + " but server shutdown is already in progress" ); return; } // 已挂列表中add,在线列表中remove this. deadservers.add(serverName); this. onlineServers.remove(serverName); this. serverConnections.remove(serverName); //如果集群正在关闭,则不处理, 直接return了 if (this. clusterShutdown) { LOG.info( "Cluster shutdown set; " + hsi.getServerName() + " expired; onlineServers=" + this .onlineServers .size()); if (this. onlineServers.isEmpty()) { master.stop( "Cluster shutdown set; onlineServer=0" ); } return; } //通过CatalogTracker判断挂掉的rs中是否还有root和meta,注意CatalogTracker是只读的 CatalogTracker ct = this.master .getCatalogTracker(); // Was this server carrying root? boolean carryingRoot; try { HServerAddress address = ct.getRootLocation(); //看看挂掉的rs是否带有root信息 carryingRoot = address != null && hsi.getServerAddress().equals(address); } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.info( "Interrupted"); return; } HServerAddress address = ct.getMetaLocation(); //看看挂掉的rs中是否带有meta信息 boolean carryingMeta = address != null && hsi.getServerAddress().equals(address); // MetaServerShutdownHandler是继承ServerShutdownHandler, 前者在恢复时需要assign root或者 assign meta if (carryingRoot || carryingMeta) { this. services. getExecutorService().submit(new MetaServerShutdownHandler(this .master , this.services , this.deadservers, info, carryingRoot, carryingMeta)); } else { this. services. getExecutorService().submit(new ServerShutdownHandler(this .master , this.services , this.deadservers, info)); } LOG.debug( "Added=" + serverName + " to dead servers, submitted shutdown handler to be executed, root=" + carryingRoot + ", meta=" + carryingMeta); }
接下来走到ServerShutdownHandler, 继承于 EventHandler, 最终走到process方法
public void process() throws IOException { final String serverName = this.hsi .getServerName(); LOG.info( "Splitting logs for " + serverName); try { // split log, split出的log跟着region走 split log在每个版本中的做法相差很大,92中就引入了分布式处理的方式, 看下几段的代码 this. services.getMasterFileSystem().splitLog(serverName); // 这部分说到底就是返回rsi,当然里面涉及到了regionPlans,deadRegions之类的东西,都是与传进去的参数hsi有关的 List<RegionState> regionsInTransition = this.services .getAssignmentManager().processServerShutdown(this. hsi); // 假如有root需要恢复,那就先分配root,分配之前还需要检查一下root的location是不是已经确定 if (isCarryingRoot()) { // -ROOT- LOG.info("Server " + serverName + " was carrying ROOT. Trying to assign."); verifyAndAssignRootWithRetries(); } // 假如有meta需要恢复,那就先分配meta,随机分配rs if (isCarryingMeta()) { LOG.info("Server " + serverName + " was carrying META. Trying to assign."); this.services .getAssignmentManager().assignMeta(); } // 需要等到所有持有meta的region都open好以后才能进行下面的工作 NavigableMap<HRegionInfo, Result> hris = null; while (! this.server .isStopped()) { try { this.server .getCatalogTracker().waitForMeta(); hris = MetaReader. getServerUserRegions(this. server.getCatalogTracker(), this.hsi ); break; } catch (InterruptedException e) { Thread. currentThread().interrupt(); throw new IOException("Interrupted", e); } catch (IOException ioe) { LOG.info("Received exception accessing META during server shutdown of " + serverName + ", retrying META read", ioe); } } // Skip regions that were in transition unless CLOSING or PENDING_CLOSE for (RegionState rit : regionsInTransition) { if (!rit.isClosing() && !rit.isPendingClose()) { LOG.debug("Removed " + rit.getRegion().getRegionNameAsString() + " from list of regions to assign because in RIT" ); hris.remove(rit.getRegion()); } } LOG.info( "Reassigning " + (hris == null? 0: hris.size()) + " region(s) that " + serverName + " was carrying (skipping " + regionsInTransition.size() + " regions(s) that are already in transition)" ); // 开始分配region,分配前先检查region所属的表是否正在被disable,或者集群是否正在被关闭 // 接下来将region的状态强制设为offline,最后的assign请参看下面对这个方法的分析 for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) { if (processDeadRegion(e.getKey(), e.getValue(), this.services .getAssignmentManager(), this.server .getCatalogTracker())) { this.services .getAssignmentManager().assign(e.getKey(), true); } } } finally { this. deadServers.finish(serverName); } LOG.info( "Finished processing of shutdown of " + serverName); }
split log的相关代码:
public void splitLog(final String serverName ) { // 首先获取锁 this. splitLogLock.lock(); long splitTime = 0, splitLogSize = 0; // 在hdfs中获取该rs的WAL位置 一般在/hbase/.logs文件夹中 Path logDir = new Path( this.rootdir , HLog.getHLogDirectoryName (serverName)); try { //创建splitter对象负责split工作 HLogSplitter splitter = HLogSplitter. createLogSplitter( conf, rootdir, logDir, oldLogDir, this .fs ); try { //假如hdfs处于安全模式,那么阻塞知道文件系统退出安全模式 FSUtils. waitOnSafeMode(conf, conf.getInt(HConstants. THREAD_WAKE_FREQUENCY, 1000)); //进行split splitter.splitLog(); } catch (OrphanHLogAfterSplitException e) { LOG.warn("Retrying splitting because of:" , e); //发生异常后重新获得实例,再试一遍 splitter = HLogSplitter. createLogSplitter(conf, rootdir, logDir, oldLogDir, this .fs ); splitter.splitLog(); } //下面两个参数用来统计 splitTime = splitter.getTime(); splitLogSize = splitter.getSize(); } catch (IOException e) { checkFileSystem(); LOG.error( "Failed splitting " + logDir.toString(), e); } finally { this. splitLogLock.unlock(); } if (this. metrics != null) { this. metrics.addSplit(splitTime, splitLogSize); } }
//这边大多数就做些检查的工作 public List<Path> splitLog() throws IOException { Preconditions.checkState(!hasSplit , "An HLogSplitter instance may only be used once" ); hasSplit = true; long startTime = System. currentTimeMillis(); List<Path> splits = null; //检查是否存在存放log的目录,假如不存在的话,直接返回 if (! fs.exists( srcDir)) { // Nothing to do return splits; } FileStatus[] logfiles = fs.listStatus( srcDir); //检查目录下的文件是否有问题 if (logfiles == null || logfiles.length == 0) { // Nothing to do return splits; } LOG.info( "Splitting " + logfiles.length + " hlog(s) in " + srcDir.toString()); //执行split动作 splits = splitLog(logfiles); splitTime = System. currentTimeMillis() - startTime; LOG.info( "hlog file splitting completed in " + splitTime + " ms for " + srcDir .toString()); return splits; }
private List<Path> splitLog(final FileStatus[] logfiles) throws IOException { //以下两个列表分别维护了已经处理成功的log及已经损坏的log List<Path> processedLogs = new ArrayList<Path>(); List<Path> corruptedLogs = new ArrayList<Path>(); List<Path> splits = null; //这里是配置遇到错误怎样处理,默认为true,会把那些读的时候出问题的log转到.corrupt目录下 //假如设为false,那么只要在split过程中出现问题,就直接抛出IOException, 整个split过程立即终止 boolean skipErrors = conf.getBoolean( "hbase.hlog.split.skip.errors" , true); splitSize = 0; //启动写线程 outputSink.startWriterThreads( entryBuffers); try { int i = 0; //遍历需要处理的文件 for (FileStatus log : logfiles) { Path logPath = log.getPath(); long logLength = log.getLen(); splitSize += logLength; LOG.debug("Splitting hlog " + (i++ + 1) + " of " + logfiles. length + ": " + logPath + ", length=" + logLength); try { recoverFileLease(fs, logPath, conf); //解析Log,将其放进entryBuffer,里面涉及到了HLogKey,它里面存储了region名和表名 //还有一个Entry类,它封装了WALEDIT和HLogKey, 到最后就放到了一个map结构中 //key和value分别为一些列的region及相应的edits parseHLog(log, entryBuffers, fs , conf ); //将处理完的收集起来 processedLogs.add(logPath); } catch (EOFException eof) { // truncated files are expected if a RS crashes (see HBASE-2643) LOG.info("EOF from hlog " + logPath + ". Continuing"); processedLogs.add(logPath); } catch (FileNotFoundException fnfe) { // A file may be missing if the region server was able to archive it // before shutting down. This means the edits were persisted already LOG.info("A log was missing " + logPath + ", probably because it was moved by the" + " now dead region server. Continuing" ); processedLogs.add(logPath); } catch (IOException e) { // If the IOE resulted from bad file format, // then this problem is idempotent and retrying won't help if (e.getCause() instanceof ParseException) { LOG.warn("Parse exception from hlog " + logPath + ". continuing", e); processedLogs.add(logPath); } else { if (skipErrors) { LOG.info("Got while parsing hlog " + logPath + ". Marking as corrupted", e); corruptedLogs.add(logPath); } else { throw e; } } } } if (fs.listStatus( srcDir). length > processedLogs.size() + corruptedLogs.size()) { throw new OrphanHLogAfterSplitException( "Discovered orphan hlog after split. Maybe the " + "HRegionServer was not dead when we started" ); } //就做一些移动文件的事情 //归档logfiles,把有损坏及跳过的log放到.corrupt文件中,再把处理好的放到.oldlogs中,最后把./logs下面那个挂掉的rs给删除了 archiveLogs(srcDir , corruptedLogs, processedLogs, oldLogDir, fs, conf); } finally { //等待线程执行结束 splits = outputSink.finishWritingAndClose(); } return splits; }
分配region的时候:
private void assign(final RegionState state, final boolean setOfflineInZK, final boolean forceNewPlan) { for ( int i = 0; i < this.maximumAssignmentAttempts; i++) { if (setOfflineInZK && !setOfflineInZooKeeper(state)) return; //master挂了,还搞个啥?直接退出 if (this. master.isStopped()) { LOG.debug("Server stopped; skipping assign of " + state); return; } // 获取RegionPlan,注意到LoadBalancer.randomAssignment(servers),即把这个region随机分配到server上 RegionPlan plan = getRegionPlan(state, forceNewPlan); // 假如获取plan失败,那就退出 if (plan == null) { debugLog(state.getRegion(), "Unable to determine a plan to assign " + state); return; // Should get reassigned later when RIT times out. } try { debugLog(state.getRegion(), "Assigning region " + state.getRegion().getRegionNameAsString() + " to " + plan.getDestination().getServerName()); // 将region的状态设置为PENDING_OPEN state.update(RegionState.State. PENDING_OPEN); // 向rs发送请求,要求其open serverManager.sendRegionOpen(plan.getDestination(), state.getRegion()); break; } catch (Throwable t) { LOG.warn("Failed assignment of " + state.getRegion().getRegionNameAsString() + " to " + plan.getDestination() + ", trying to assign elsewhere instead; " + "retry=" + i, t); // Clean out plan we failed execute and one that doesn't look like it'll // succeed anyways; we need a new plan! // Transition back to OFFLINE state.update(RegionState.State. OFFLINE); // Force a new plan and reassign. Will return n // ull if no servers. if (getRegionPlan(state, plan.getDestination(), true) == null) { LOG.warn("Unable to find a viable location to assign region " + state.getRegion().getRegionNameAsString()); return; } } } }
最后,regionserver会执行openRegionHandler来打开这个region, 这是一个异步的过程。这里面其实还涉及到了region在zk中的状态变化,包括在打开region之前的HLog恢复,恢复过程和正常的向HBase写数据一样,都会先写到memstore中,最后需要更新meta表。
当然了,如果region带有meta或者root信息,那处理方式也会变得不太一样。
@Override @QosPriority(priority= HIGH_QOS ) public void openRegion(HRegionInfo region) throws IOException { if ( this. regionsInTransitionInRS .contains(region.getEncodedNameAsBytes())) { throw new RegionAlreadyInTransitionException("open" , region.getEncodedName()); } LOG.info( "Received request to open region: " + region.getRegionNameAsString()); if ( this. stopped) throw new RegionServerStoppedException(); this. regionsInTransitionInRS .add(region.getEncodedNameAsBytes()); //会因为region的类型不同而进入不一样的分支 if (region.isRootRegion()) { this. service.submit( new OpenRootHandler( this, this, region)); } else if (region.isMetaRegion()) { this. service.submit( new OpenMetaHandler( this, this, region)); } else { this. service.submit( new OpenRegionHandler( this , this , region)); } }
发表评论
-
简单HBase笔记
2012-10-26 16:35 1967一、Client-side write buffe ... -
诡异的超长时间GC问题定位
2012-10-19 16:45 4349HBase的GC策略采用PawNew+CMS, 这是大众化的配 ... -
ZK Timeout再讨论
2012-10-18 15:29 29837http://crazyjvm.iteye.com/blog/ ... -
HBase集群中的某几台rs挂掉后导致整个集群挂掉的案例
2012-10-10 09:35 0集群规模(小型):13dn 6rs 现象:2台rs在很短 ... -
HBase异常记录
2012-10-09 11:19 7635一、YouAreDeadException FA ... -
HBase日志中的异常记录1
2012-10-09 10:49 2晕菜了,这狗屁编辑器把我的格式全弄没了...mlgbd! 异 ... -
zookeeper超时--minSessionTimeout与maxSessionTimeout
2012-10-08 16:55 11082很多同学可能有这样的疑问,我明明把连接zk客户端的超时 ... -
HBase备份与还原
2012-09-18 13:53 2787转载两篇相关文章: 第一篇、http://blog.nosq ... -
Thrift安装中出现的问题(For HBase)
2012-09-06 10:55 1882安装巨简单: 进入thrif ... -
hadoop 0.20.203 数据迁移至 cdh3u3
2012-08-29 08:40 1474假如用hadoop0.20.203版本,hbase会提 ... -
HBase Backup Options
2012-08-23 15:24 1314If you are thinking about using ... -
HBase LRU源码解析
2012-08-13 14:52 2457先来看下LruBlockCache的构造,关键是看清每个参数的 ... -
HBase架构简介
2012-08-06 10:47 1152HBase的架构简介,有兴趣的可以看看。
相关推荐
配置`hbase.regionserver.heapsize`和`hbase.master.heapsize`以设定RegionServer和Master的Java堆大小,根据集群规模适当调整。 7. **Region分裂策略** `hbase.hregion.max.filesize`定义一个Region的最大大小,...
通过分析源码,开发者可以深入理解分布式数据库的设计思想,而jar包则使开发者能够快速构建基于HBase的应用。不过需要注意的是,0.94.13已经是较旧的版本,最新的稳定版本可能会包含更多的功能和改进,因此在生产...
HBase源码分析揭示了HBase在RPC通信机制方面的一些关键技术点,这包括了角色分配、通信信道建立、通信接口协议定义、对象序列化、传输控制和会话管理,以及在传输过程中可能出现的错误处理和重试机制。 HBase中的...
* hbase.regionserver.info.bindAddress:0.0.0.0,这个参数指定了 HBase RegionServer WEB-UI 绑定的地址。 十、HBase RegionServer IPC 配置 * hbase.ipc.server.callqueue.handler.factor:0.1,这个参数指定了...
源码分析是理解HBase工作原理和技术细节的重要途径。HBase在大数据领域扮演着关键角色,它能够处理海量数据并提供实时访问。下面,我们将深入探讨HBase的核心概念和源码中的关键组件。 1. **HBase架构**:HBase基于...
通过分析`org.apache.hadoop.hbase.masterAssignment.RegionStates`和`org.apache.hadoop.hbase.master.LoadBalancer`等类,我们可以了解HBase如何实现集群的负载均衡和容错能力。 在大数据处理中,HBase的性能优化...
《HBase权威指南》是一本深入探讨Apache HBase这一分布式列式数据库的著作,源码则是作者为了辅助读者理解书中理论知识而提供的实践材料。HBase是构建在Hadoop文件系统(HDFS)之上的开源数据库,专为处理大规模数据...
`hbase.regionserver.global.memstore.upperLimit` 和 `hbase.regionserver.global.memstore.lowerLimit` - **含义**:分别定义了RegionServer所有MemStore占用内存的比例上限和下限。 - **默认值**:分别为0.4和...
BlockCache 采用的是 LRU 策略,因此 BlockCache 达到上限后,会启动淘汰机制,淘汰掉最老的一批数据。一个 RegionServer 上有一个 BlockCache 和 N 个 Memstore,它们的大小之和不能大于等于 heapsize * 0.8,否则 ...
HBase是Apache软件基金会的一个开源项目,它是基于Google的Bigtable设计思想...通过分析HBase 1.3.1的源码,开发者可以深入理解HBase的工作原理,从而更好地优化应用、解决性能问题,甚至进行功能扩展和定制化开发。
本项目为基于Java语言的HBase分布式数据库设计源码,包含5415个文件,涵盖了4575个Java源文件、223个Ruby脚本、118个XML...该源码是GitHub上HBase项目的核心代码,适用于需要深入理解和分析HBase架构与实现细节的场景。
在Java编程环境中,操作HBase并将其数据写入HDFS(Hadoop Distributed File System)是一项常见的任务,特别是在大数据处理和分析的场景下。本篇将详细介绍如何使用Java API实现这一功能,以及涉及到的关键技术和...
HBase源码分析与开发实战视频技术讲解高阶视频教程以及课件,内部讲解资料 内容非常详细 值得想要提高薪水的人去学习了解
源码包“hbase-0.98.1-src.tar.gz”提供了HBase 0.98.1版本的完整源代码,对于理解其内部工作原理、进行二次开发或调试是非常有价值的。 HBase的核心概念包括: 1. 表:HBase中的表由行和列族组成,表名全局唯一。...
本项目为Apache HBase的Java实现设计源码镜像,包含4981个文件,主要涵盖4134个Java源文件,辅以Ruby脚本、Shell脚本、C/C++、Python、CSS、HTML、...该源码库为Apache HBase项目提供了全面的开发与维护支持。
Hbase权威指南 随书源代码 源码包 绝对完整版 maven工程,带pom文件,可以直接作为一个完整工程导入eclipse等ide。
### HBase源码分析 #### 一、HBase性能测试要点与分析 ##### 1.1 测试环境 - **硬件配置**: - 客户端:1台 - RegionServer:5台 - Master:1台 - ZooKeeper:3台 - **软件配置**: - CPU:每台服务器配备8...
该项目为HBase分布式数据库的设计源码,采用Java作为主要开发语言,并融合了Ruby、Shell、Python、HTML、JavaScript、CSS、C++、C、PHP等语言,总文件量达到5854个。其中,Java文件占主导地位,达4975个,其他文件...