`

namenode任务线程之FSNamesystem$ReplicationMonitor

阅读更多

终于可以开始分析ReplicationMonitor,他依赖了其他的任务数据,我们先来看看他的类的java doc

Java代码   收藏代码
  1. /**  
  2.   * Periodically calls computeReplicationWork()  
  3.   */   

 说明反而没有说具体的工作内容,而是直接说call某个方法,看方法名字也好理解,就是执行一些复制的工作,那么在复制决策执行前肯定需要做调研了,调研完了再按照一定的准则执行特定的复制操作了。

 

好我们首先看下复制检测的时间间隔

Js代码   收藏代码
  1. this .replicationRecheckInterval = conf.getInt( "dfs.replication.interval" , 3) * 1000L;    
  2.  //dfs.replication.interval=3 默认的设置,注意单位是秒   

 

 看这个时间间隔貌似算是最小的了,3秒就会检测一次是否需要复制数据了,看来干活很给力。

 

我们先来看下run方法里的逻辑:

Java代码   收藏代码
  1. 1 :computeDatanodeWork();           //计算块复制情况(例如少了还是多了)   
  2. 2 :processPendingReplications();     //处理那些复制超时的块请求   

 我们首先看下computeDatanodeWork()这个方法的java doc

Java代码   收藏代码
  1. /**  
  2.    * Compute block replication and block invalidation work   
  3.    * that can be scheduled on data-nodes.  
  4.    * The datanode will be informed of this work at the next heartbeat.  
  5.    *   
  6.    * @return number of blocks scheduled for replication or removal  
  7.    */   

 其实这个返回值根本就没有用到,连个log也没打

在进行计算block复制数前需要验证是否处于安全模式下,针对这点注释里写的很明白(后面要把这个safeMonitor后台任务加入分析)

 

Java代码   收藏代码
  1. // blocks should not be replicated or removed if safe mode is on   

 因为检测的频率过高,为防止检测过多的节点和数据块,在检测时设置了一些阈值来限定当此检测的范围,例如

Java代码   收藏代码
  1. static   final   int  INVALIDATE_WORK_PCT_PER_ITERATION =  32 ;  
  2. static   final   float  REPLICATION_WORK_MULTIPLIER_PER_ITERATION =  2 ;  
  3.    
  4. blocksToProcess = (int )(heartbeats.size()   
  5.           * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);  
  6. nodesToProcess = (int )Math.ceil(( double )heartbeats.size()   
  7.           * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100 );  
  8. ArrayList<DatanodeDescriptor> heartbeats   

 针对上面的计算公式,例如我有100个datanode节点,则hearbeats.size ==100 ,那么在3秒的时间内处理的限制数就是blockToProcess = 200  nodeToProcess = 32

 

好进入正题,首先是选择那些需要复制的块,那些需要复制的块保存在下面这个数据结构中

Java代码   收藏代码
  1. // Store set of Blocks that need to be replicated 1 or more times   
  2. UnderReplicatedBlocks neededReplications  

 我们看下这个结构的内部实现

Java代码   收藏代码
  1. /* Class for keeping track of under replication blocks  
  2.  * Blocks have replication priority, with priority 0 indicating the highest  
  3.  * Blocks have only one replicas has the highest  
  4.  */   
  5. private  List<TreeSet<Block>> priorityQueues =  new  ArrayList<TreeSet<Block>>();  

   利用这个实现了一个优先级有序队列,优先级一共只有4级 (0,1,2,3),同时block实现了Comparable接口,依据blockId和生成时间来做比较。

 

那么是谁将复制请求传递过来的呢,追踪代码看有以下几种情况:

Java代码   收藏代码
  1. 1 :当lease被删除时,需要检测和这个租约关联的hdfs文件的block数是否和期望的一致,如果小于期望值则将这个块加入到需要复制队列中  
  2. 2 :当离开安全模式时需要校验块的复制情况,如果没达到复制因子的则加入到需要复制队列中  
  3. 3 :当datanode注册到namenode时需要校验这个datanode是否处于正准备退役阶段,如果是那需要检测该datanode节点上的所有block的复制数是否已经达到复制因子,如果没有则需要加入到需要复制队列中  
  4. 4 :当DecommissionManager的监控线程执行检测时,如果发现某个退役节点处于正准备退役阶段,则对该退役节点的所有块执行检测,查看是否达到复制因子,如果没有达到则将该block加入到需要复制队列中  

 好了有了来源我们就看看下面的逻辑, 从neededReplications中先选出blocksToProcess大小的block,然后经过一系列复杂的逻辑判断,看是否需要真正的复制,如果需要就将block加入到pendingReplications队列中。

同时还存在一个recentInvalidateSets 这个集合里存储了当时无效的block,这里会计算这里面的block还是否有效。

 

接下来看  processPendingReplications,这个的逻辑比较简单,就是将pendingReplicationMonitor中监控到的超时复制请求重新放入neededReplications中去。

 

好了我们已经看到block需要被复制的逻辑了,那么到底是谁执行复制的具体操作呢,追踪下发现是

DatanodeDescriptor.addBlockToBeReplicated(block, targets);该方法将需要复制的功能交给了datanode自己来做,datanode发送心跳信息时会接收到一个需要传递数据的commond,然后依据command里的参数来执行对应的传送数据的请求。

更多信息请查看 java进阶网 http://www.javady.com

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics