`

第七章:小朱笔记hadoop之源码分析-hdfs分析 Datanode 心跳分析

阅读更多

第七章:小朱笔记hadoop之源码分析-hdfs分析

第五节:Datanode 分析

5.2 Datanode 心跳分析

(1)offerService分析

写道
(a)检查心跳间隔是否超时,如是向namenode发送心跳报告,内容是dfs的容量、剩余的空间和DataXceiverServer的数量等,调用processCommand方法处理namenode返回的命令
(b)通知namenode已经接收的块
(c)检查块报告间隔是否超时,如是向namenode发送块报告,调用processCommand方法处理namenode返回的命令
(d)如果没到下个发送心跳的时候,休眠

 

    /** 
       * Main loop for the DataNode.  Runs until shutdown, 
       * forever calling remote NameNode functions. 
       *  
       *   1.检查心跳间隔是否超时,如是向namenode发送心跳报告,内容是dfs的容量、剩余的空间和DataXceiverServer的数量等,调用processCommand方法处理namenode返回的命令 
       *   2.通知namenode已经接收的块 
       *   3.检查块报告间隔是否超时,如是向namenode发送块报告,调用processCommand方法处理namenode返回的命令 
       *   4.如果没到下个发送心跳的时候,休眠 
       *  
       *  
       *   DNA_UNKNOWN = 0:未知操作 
       *   DNA_TRANSFER = 1:传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验 
       *   DNA_INVALIDATE = 2:不合法的块,将所有块删除 
       *   DNA_SHUTDOWN = 3:停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时 
       *   DNA_REGISTER = 4:重新注册 
       *   DNA_FINALIZE = 5:完成升级,调用DataStorage的finalizeUpgrade方法完成升级 
       *   DNA_RECOVERBLOCK = 6:请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息 
       *  
       *   会利用保存在receivedBlockList和delHints两个列表中的信息。 
       *   receivedBlockList表明在这个DataNode成功创建的新的数据块 
       *   delHints,是可以删除该数据块的节点 
       */  
      public void offerService() throws Exception {  
           
        LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" +   
           " Initial delay: " + initialBlockReportDelay + "msec");  
      
        //  
        // Now loop for a long time....  
        //  
      
        while (shouldRun) {  
          try {  
            long startTime = now();  
      
            //  
            // Every so often, send heartbeat or block-report  
            //  
              
            if (startTime - lastHeartbeat > heartBeatInterval) {  
              //定期发送心跳  
              //  
              // All heartbeat messages include following info:  
              // -- Datanode name  
              // -- data transfer port  
              // -- Total capacity  
              // -- Bytes remaining  
              //  
              lastHeartbeat = startTime;  
              DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,  
                                                           data.getCapacity(),  
                                                           data.getDfsUsed(),  
                                                           data.getRemaining(),  
                                                           xmitsInProgress.get(),  
                                                           getXceiverCount());  
              myMetrics.addHeartBeat(now() - startTime);  
              //LOG.info("Just sent heartbeat, with name " + localName);  
              //响应namenode返回的命令做处理  
              if (!processCommand(cmds))  
                continue;  
            }  
                  
            // check if there are newly received blocks  
            Block [] blockArray=null;  
            String [] delHintArray=null;  
            synchronized(receivedBlockList) {  
              synchronized(delHints) {  
                int numBlocks = receivedBlockList.size();  
                if (numBlocks > 0) {  
                  if(numBlocks!=delHints.size()) {  
                    LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );  
                  }  
                  //  
                  // Send newly-received blockids to namenode  
                  //  
                  blockArray = receivedBlockList.toArray(new Block[numBlocks]);// receivedBlockList表明在这个DataNode成功创建的新的数据块,而delHints,是可以删除该数据块的节点  
                  delHintArray = delHints.toArray(new String[numBlocks]);//在datanode.notifyNamenodeReceivedBlock函数中发生变化  
                }  
              }  
            }  
            if (blockArray != null) {  
              if(delHintArray == null || delHintArray.length != blockArray.length ) {  
                LOG.warn("Panic: block array & delHintArray are not the same" );  
              }  
              namenode.blockReceived(dnRegistration, blockArray, delHintArray);;//Block状态变化报告通过NameNode.blockReceived来报告。  
      
              synchronized (receivedBlockList) {  
                synchronized (delHints) {  
                  for(int i=0; i<blockArray.length; i++) {  
                    receivedBlockList.remove(blockArray[i]);  
                    delHints.remove(delHintArray[i]);  
                  }  
                }  
              }  
            }  
      
            // Send latest blockinfo report if timer has expired.  
            if (startTime - lastBlockReport > blockReportInterval) {// 向namenode报告系统中Block状态的变化  
              if (data.isAsyncBlockReportReady()) {  
                // Create block report  
                long brCreateStartTime = now();  
                Block[] bReport = data.retrieveAsyncBlockReport();  
                  
                // Send block report  
                long brSendStartTime = now();  
                  
                //向Namenode报告其上的块状态报告    
                DatanodeCommand cmd = namenode.blockReport(dnRegistration,  
                        BlockListAsLongs.convertToArrayLongs(bReport));  
                  
                // Log the block report processing stats from Datanode perspective  
                long brSendCost = now() - brSendStartTime;  
                long brCreateCost = brSendStartTime - brCreateStartTime;  
                myMetrics.addBlockReport(brSendCost);  
                LOG.info("BlockReport of " + bReport.length  
                    + " blocks took " + brCreateCost + " msec to generate and "  
                    + brSendCost + " msecs for RPC and NN processing");  
      
                //  
                // If we have sent the first block report, then wait a random  
                // time before we start the periodic block reports.  
                //  
                if (resetBlockReportTime) {  
                  lastBlockReport = startTime -  
                      R.nextInt((int)(blockReportInterval));  
                  resetBlockReportTime = false;  
                } else {  
                  /* say the last block report was at 8:20:14. The current report  
                   * should have started around 9:20:14 (default 1 hour interval).  
                   * If current time is : 
                   *   1) normal like 9:20:18, next report should be at 10:20:14 
                   *   2) unexpected like 11:35:43, next report should be at 
                   *      12:20:14 
                   */  
                  lastBlockReport += (now() - lastBlockReport) /   
                                     blockReportInterval * blockReportInterval;  
                }  
                processCommand(cmd);  
              } else {  
                data.requestAsyncBlockReport();  
                if (lastBlockReport > 0) { // this isn't the first report  
                  long waitingFor =  
                      startTime - lastBlockReport - blockReportInterval;  
                  String msg = "Block report is due, and been waiting for it for " +  
                      (waitingFor/1000) + " seconds...";  
                  if (waitingFor > LATE_BLOCK_REPORT_WARN_THRESHOLD) {  
                    LOG.warn(msg);  
                  } else if (waitingFor > LATE_BLOCK_REPORT_INFO_THRESHOLD) {  
                    LOG.info(msg);  
                  } else if (LOG.isDebugEnabled()) {  
                    LOG.debug(msg);  
                  }  
                }  
              }  
            }  
      
            // start block scanner;//启动blockScanner线程 进行block扫描  
            if (blockScanner != null && blockScannerThread == null &&  
                upgradeManager.isUpgradeCompleted()) {  
              LOG.info("Starting Periodic block scanner.");  
              blockScannerThread = new Daemon(blockScanner);  
              blockScannerThread.start();  
            }  
                  
            //  
            // There is no work to do;  sleep until hearbeat timer elapses,   
            // or work arrives, and then iterate again.  
            //  
            long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);  
            synchronized(receivedBlockList) {  
              if (waitTime > 0 && receivedBlockList.size() == 0) {  
                try {  
                  receivedBlockList.wait(waitTime);  
                } catch (InterruptedException ie) {  
                }  
                delayBeforeBlockReceived();  
              }  
            } // synchronized  
          } catch(RemoteException re) {  
            String reClass = re.getClassName();  
            if (UnregisteredDatanodeException.class.getName().equals(reClass) ||  
                DisallowedDatanodeException.class.getName().equals(reClass) ||  
                IncorrectVersionException.class.getName().equals(reClass)) {  
              LOG.warn("DataNode is shutting down: " +   
                       StringUtils.stringifyException(re));  
              shutdown();  
              return;  
            }  
            LOG.warn(StringUtils.stringifyException(re));  
          } catch (IOException e) {  
            LOG.warn(StringUtils.stringifyException(e));  
          }  
        } // while (shouldRun)  
      } // offerService  

 

 

(2)processCommand分析

写道
DNA_UNKNOWN = 0:未知操作
_TRANSFER = 1:传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验
DNA_INVALIDATE = 2:不合法的块,将所有块删除
DNA_SHUTDOWN = 3:停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时
DNA_REGISTER = 4:重新注册
DNA_FINALIZE = 5:完成升级,调用DataStorage的finalizeUpgrade方法完成升级
DNA_RECOVERBLOCK = 6:请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息

 

    /** 
        *   DNA_UNKNOWN = 0:未知操作 
        *   DNA_TRANSFER = 1:传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验 
        *   DNA_INVALIDATE = 2:不合法的块,将所有块删除 
        *   DNA_SHUTDOWN = 3:停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时 
        *   DNA_REGISTER = 4:重新注册 
        *   DNA_FINALIZE = 5:完成升级,调用DataStorage的finalizeUpgrade方法完成升级 
        *   DNA_RECOVERBLOCK = 6:请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息 
        *  
        * @param cmd 
        * @return true if further processing may be required or false otherwise.  
        * @throws IOException 
        */  
     private boolean processCommand(DatanodeCommand cmd) throws IOException {  
       if (cmd == null)  
         return true;  
       final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;  
      
       switch(cmd.getAction()) {  
       //传输块到另一个datanode,创建DataTransfer来传输每个块,请求的类型是OP_WRITE_BLOCK,使用BlockSender来发送块和元数据文件,不对块进行校验  
       case DatanodeProtocol.DNA_TRANSFER:  
         // Send a copy of a block to another datanode  
         transferBlocks(bcmd.getBlocks(), bcmd.getTargets());  
         myMetrics.incrBlocksReplicated(bcmd.getBlocks().length);  
         break;  
       //不合法的块,将所有块删除  
       case DatanodeProtocol.DNA_INVALIDATE:  
         //  
         // Some local block(s) are obsolete and can be   
         // safely garbage-collected.  
         //  
         Block toDelete[] = bcmd.getBlocks();  
         try {  
           if (blockScanner != null) {  
             blockScanner.deleteBlocks(toDelete);  
           }  
           data.invalidate(toDelete);  
         } catch(IOException e) {  
           checkDiskError();  
           throw e;  
         }  
         myMetrics.incrBlocksRemoved(toDelete.length);  
         break;  
       //停止datanode,停止infoServer、DataXceiverServer、DataBlockScanner和处理线程,将存储目录解锁,DataBlockScanner结束可能需要等待1小时  
       case DatanodeProtocol.DNA_SHUTDOWN:  
         // shut down the data node  
         this.shutdown();  
         return false;  
       //重新注册  
       case DatanodeProtocol.DNA_REGISTER:  
         // namenode requested a registration - at start or if NN lost contact  
         LOG.info("DatanodeCommand action: DNA_REGISTER");  
         if (shouldRun) {  
           register();  
         }  
         break;  
       //完成升级,调用DataStorage的finalizeUpgrade方法完成升级  
       case DatanodeProtocol.DNA_FINALIZE:  
         storage.finalizeUpgrade();  
         break;  
       case UpgradeCommand.UC_ACTION_START_UPGRADE:  
         // start distributed upgrade here  
         processDistributedUpgradeCommand((UpgradeCommand)cmd);  
         break;  
       //请求块恢复,创建线程来恢复块,每个线程服务一个块,对于每个块,调用recoverBlock来恢复块信息  
       case DatanodeProtocol.DNA_RECOVERBLOCK:  
         recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());  
         break;  
       case DatanodeProtocol.DNA_ACCESSKEYUPDATE:  
         LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");  
         if (isBlockTokenEnabled) {  
           blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());  
         }  
         break;  
       //块均衡  
       case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:  
         LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");  
         int vsn = ((BalancerBandwidthCommand) cmd).getBalancerBandwidthVersion();  
         if (vsn >= 1) {  
           long bandwidth =   
                      ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();  
           if (bandwidth > 0) {  
             DataXceiverServer dxcs =  
                          (DataXceiverServer) this.dataXceiverServer.getRunnable();  
             dxcs.balanceThrottler.setBandwidth(bandwidth);  
           }  
         }  
         break;  
       default:  
         LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());  
       }  
       return true;  
     }  

 

 

 

 

 

1
6
分享到:
评论

相关推荐

    hadoop-hdfs-2.6.5-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-client-2.9.1-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-client-2.9.1.jar 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-client-2.9.1-javadoc-...

    hadoop-hdfs-client-2.9.1-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-client-2.9.1.jar; 赠送原API文档:hadoop-hdfs-client-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-client-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-client-2.9.1.pom;...

    hadoop-hdfs-2.7.3-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.7.3-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.7.3.jar; 赠送原API文档:hadoop-hdfs-2.7.3-javadoc.jar; 赠送源代码:hadoop-hdfs-2.7.3-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.7.3.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.5.1-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.5.1-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.5.1.jar; 赠送原API文档:hadoop-hdfs-2.5.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.5.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.5.1.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.6.5-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.6.5.jar; 赠送原API文档:hadoop-hdfs-2.6.5-javadoc.jar; 赠送源代码:hadoop-hdfs-2.6.5-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.6.5.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.9.1-API文档-中英对照版.zip

    赠送jar包:hadoop-hdfs-2.9.1.jar; 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar; 赠送源代码:hadoop-hdfs-2.9.1-sources.jar; 赠送Maven依赖信息文件:hadoop-hdfs-2.9.1.pom; 包含翻译后的API文档:hadoop...

    hadoop-hdfs-2.9.1-API文档-中文版.zip

    赠送jar包:hadoop-hdfs-2.9.1.jar 赠送原API文档:hadoop-hdfs-2.9.1-javadoc.jar 赠送源代码:hadoop-hdfs-2.9.1-sources.jar 包含翻译后的API文档:hadoop-hdfs-2.9.1-javadoc-API文档-中文(简体)版.zip 对应...

    hadoop最新版本3.1.1全量jar包

    hadoop-annotations-3.1.1.jar hadoop-common-3.1.1.jar hadoop-mapreduce-client-core-3.1.1.jar hadoop-yarn-api-3.1.1.jar hadoop-auth-3.1.1.jar hadoop-hdfs-3.1.1.jar hadoop-mapreduce-client-hs-3.1.1.jar ...

    Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码

    Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】---- 代码 Hadoop 3.x(HDFS)----【HDFS 的 API 操作】--...

    hadoop源码分析-HDFS&MapReduce

    hadoop平台下hdfs和mapreduce的源码分析。

    hadoop-hdfs-2.4.1.jar

    hadoop-hdfs-2.4.1.jar

    hadoop-hdfs-2.7.3

    hadoop-hdfs-2.7.3搭建flume1.7需要用到的包,还有几个包也有提供

    hadoop-hdfs-2.2.0.jar

    hadoop-hdfs-2.2.0.jar 点击下载资源即表示您确认该资源不违反资源分享的使用条款

    hadoop-hdfs-test-0.21.0.jar

    hadoop-hdfs-test-0.21.0.jar

    hadoop-hdfs-2.7.7.jar

    flume 想要将数据输出到hdfs,必须要有hadoop相关jar包。本资源是hadoop 2.7.7版本

Global site tag (gtag.js) - Google Analytics