`

第七章:小朱笔记hadoop之源码分析-hdfs分析 第四节:namenode分析-namenode启动过程分析

 
阅读更多

第七章:小朱笔记hadoop之源码分析-hdfs分析

第四节:namenode分析

4.1 namenode启动过程分析

       org.apache.hadoop.hdfs.server.namenode.main 方法是系统的入口,它会调用 createNameNode 创建 NameNode 实例。 createNameNode 分析命令行参数,如果是 FORMAT 戒 FINALIZE,调用对应的方法后退出,如果是其他的参数,将创建NameNode 对象。NameNode的构造函数会调initialize,初始化NameNode的成员发量,包括创建 RPC 服务器,初始化FSNamesystem,初始化RPC服务器和回收站线程。
   创建的服务如下:

  

服务                             类                            
server                   ipc.RPC.Server       
serviceRpcServer         ipc.RPC.Server  
HttpServer               http.HttpServer  
Trash Emptier            fs.Trash.Trash.Emptier  
hbthread                 hdfs.server.namenode.FSNamesystem.HeartbeatMonitor    
lmthread                 hdfs.server.namenode.LeaseManager.Monitor  
replthread               hdfs.server.namenode.FSNamesystem.ReplicationMonitor  
dnthread                 hdfs.server.namenode.DecommissionManager.Monitor

 初始化 name-node 入口:

 /** 
 * Initialize name-node. 
 *  
 * @param conf the configuration 
 */  
private void initialize(Configuration conf) throws IOException {  
  InetSocketAddress socAddr = NameNode.getAddress(conf);  
  //从配置conf中获取到Namenode服务器所使用的Socket地址 读取fs.default.name的值,获取hdfs集群的地址    
    
  UserGroupInformation.setConfiguration(conf);  
  //设置用户权限信息  
     
  //如果dfs.namenode.keytab.file存在,并且kerberos已经开启,则调用UserGroupInformation.loginUserFromKeytab进行登陆    
  //登陆使用dfs.namenode.kerberos.principal作为用户名,否则使用当前linux的user作为用户。    
  SecurityUtil.login(conf, DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY,   
      DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, socAddr.getHostName());  
    
  //获取namenode同时处理请求的数量配置,默认为10   服务器上处理器Handler线程的数量    
  int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);  
    
  // set service-level authorization security policy  
  //如果授权配置(hadoop.security.authorization)开启了,则刷新授权策略    
  if (serviceAuthEnabled = conf.getBoolean(ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {  
    //默认会重新加载hadoop-policy.xml中的acl配置,加载完成后acl配置保存在  
    ServiceAuthorizationManager.refresh(conf, new HDFSPolicyProvider());  
  }  
    
  //创建服务指标,用于观察namenode服务状态    
  myMetrics = NameNodeInstrumentation.create(conf);// 初始化NameNodeMetrics   
    
  //启动FSNamesystem,启动FSNamesystem时,会启动各种thread执行namenode职责    
  //1.init FSNamesystem  
  this.namesystem = new FSNamesystem(this, conf);  
  
  //如果安全机制开启    
  if (UserGroupInformation.isSecurityEnabled()) {    
    //启动守护线程每5秒执行一次ExpiredTokenRemover    
    namesystem.activateSecretManager();    
  }    
  
  //2.init namenode-datanode,namenode-snn RPC Server  
  //创建rpc服务器,如果dfs.namenode.servicerpc-address配置项存在,则用来作为服务器地址  
  InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);  
  if (dnSocketAddr != null) {  
    int serviceHandlerCount =  
      conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,  
                  DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);  
    this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),   
        dnSocketAddr.getPort(), serviceHandlerCount,  
        false, conf, namesystem.getDelegationTokenSecretManager());  
    this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();  
    setRpcServiceServerAddress(conf);  
  }  
    
  //3.init namenode-client RPC Server  
  this.server = RPC.getServer(this, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager());  
  
  // The rpc-server port can be ephemeral... ensure we have the correct info  
  this.serverAddress = this.server.getListenerAddress();   
    
  FileSystem.setDefaultUri(conf, getUri(serverAddress));  
  LOG.info("Namenode up at: " + this.serverAddress);  
  
    
  //4. start Http Server  
  startHttpServer(conf);  
    
  //5. start namenode-client RPC Server  
  this.server.start();     
    
  if (serviceRpcServer != null) {  
    serviceRpcServer.start();     
    //6. start namenode-datanode,namenode-snn RPC Server  
  
  }  
    
  //7.start TrashEmptier  
  //启动垃圾清理守护线程,读取fs.trash.interval的值作为两次清理的时间间隔。默认每60分钟清理一次    
  startTrashEmptier(conf);  
}  

 

(1)初始化FSNamesystem
启动FSNamesystem,启动FSNamesystem时,会启动各种thread执行namenode职责。FSNamesystem 的构造函数会调用 initialize 方法,去初始化上面我们分析过的一堆成员发量。几个重要的步骤包括加载 FSImage editlog 设置系统为安全模式,初始化各个工作线程和 HTTP 服务器。
FSNamesystem重要属性:
 // Default initial capacity and load factor of map  
  public static final int DEFAULT_INITIAL_MAP_CAPACITY = 16;  
  public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;  
    
  private boolean isPermissionEnabled;//是否打开权限检查,可以通过配置项dfs.permissions来设置。  
    
  //本地文件的用户文件属主和文件组,可以通过hadoop.job.ugi设置,如果没有设置,那么将使用启动HDFS的用户(通过whoami获得)和该用户所在的组(通过groups获得)作为值  
  private UserGroupInformation fsOwner;  
    
  private String supergroup;//对应配置项dfs.permissions.supergroup,应用在defaultPermission中,是系统的超级组。  
    
  //缺省权限,缺省用户为fsOwner,缺省用户组为supergroup,缺省权限为0777,可以通过dfs.upgrade.permission修改。  
  private PermissionStatus defaultPermission;  
  // FSNamesystemMetrics counter variables  
  //系统总容量/已使用容量/剩余容量  
  private long capacityTotal = 0L, capacityUsed = 0L, capacityRemaining = 0L;  
    
  //系统总连接数,根据DataNode心跳信息跟新  
  private int totalLoad = 0;  
  boolean isAccessTokenEnabled;  
  BlockTokenSecretManager accessTokenHandler;  
  private long accessKeyUpdateInterval;  
  private long accessTokenLifetime;  
    
  // Scan interval is not configurable.  
  private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =  
    TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);  
  private DelegationTokenSecretManager dtSecretManager;  
  
  //分别是成员变量pendingReplications(正在复制的数据块),  
  //neededReplications(需要复制的数据块)的大小,  
  //scheduledReplicationBlocksCount是当前正在处理的复制工作数目  
    
  volatile long pendingReplicationBlocksCount = 0L;  
  volatile long corruptReplicaBlocksCount = 0L;  
  volatile long underReplicatedBlocksCount = 0L;  
  volatile long scheduledReplicationBlocksCount = 0L;  
  volatile long excessBlocksCount = 0L;  
  volatile long pendingDeletionBlocksCount = 0L;  
  //  
  // Stores the correct file name hierarchy  
  //指向系统使用的FSDirectory对象。  
  public FSDirectory dir;  
  
  //  
  // Mapping: Block -> { INode, datanodes, self ref }   
  // Updated only in response to client-sent information.  
  //  
  final BlocksMap blocksMap = new BlocksMap(DEFAULT_INITIAL_MAP_CAPACITY,   
                                            DEFAULT_MAP_LOAD_FACTOR);  
  
  //  
  // Store blocks-->datanodedescriptor(s) map of corrupt replicas  
  //  
  //保存损坏(如:校验没通过)的数据块到对应DataNode的关系,CorruptReplicasMap类图如下,类只有一个成员变量,  
  //保存Block到一个DatanodeDescriptor的集合的映射和这个映射上的一系列操作  
  //  
  public CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();  
      
  /** 
   * Stores the datanode -> block map.   
   * <p> 
   * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by  
   * storage id. In order to keep the storage map consistent it tracks  
   * all storages ever registered with the namenode. 
   * A descriptor corresponding to a specific storage id can be 
   * <ul>  
   * <li>added to the map if it is a new storage id;</li> 
   * <li>updated with a new datanode started as a replacement for the old one  
   * with the same storage id; and </li> 
   * <li>removed if and only if an existing datanode is restarted to serve a 
   * different storage id.</li> 
   * </ul> <br> 
   * The list of the {@link DatanodeDescriptor}s in the map is checkpointed 
   * in the namespace image file. Only the {@link DatanodeInfo} part is  
   * persistent, the list of blocks is restored from the datanode block 
   * reports.  
   * <p> 
   * Mapping: StorageID -> DatanodeDescriptor 
   *  
   * 保存了StorageID >> DatanodeDescriptor的映射,用于保证DataNode使用的Storage的一致性。 
   */  
  NavigableMap<String, DatanodeDescriptor> datanodeMap =  new TreeMap<String, DatanodeDescriptor>();  
  
  //  
  // Keeps a Collection for every named machine containing  
  // blocks that have recently been invalidated and are thought to live  
  // on the machine in question.  
  // Mapping: StorageID -> ArrayList<Block>  
  //  
  // 保存了每个DataNode上无效但还存在的数据块(StorageID >> ArrayList<Block>)。  
    
  // 保存了每个DataNode上有效,但需要删除的数据块(StorageID >> TreeSet<Block>),这种情况可能发生在一个DataNode故障后恢复后,  
  // 上面的数据块在系统中副本数太多,需要删除一些数据块。  
  private Map<String, Collection<Block>> recentInvalidateSets =  new TreeMap<String, Collection<Block>>();  
  
  //  
  // Keeps a TreeSet for every named node.  Each treeset contains  
  // a list of the blocks that are "extra" at that location.  We'll  
  // eventually remove these extras.  
  // Mapping: StorageID -> TreeSet<Block>  
  //保存Datanode上有效但需要删除的数据块(StorageID -> TreeSet<Block>)比如一个Datanode故障恢复后,上面的数据块在系统中副本数太多,需要删除一些数据块。  
  //  
  Map<String, Collection<Block>> excessReplicateMap = new TreeMap<String, Collection<Block>>();  
  
  Random r = new Random();  
  
  /** 
   * Stores a set of DatanodeDescriptor objects. 
   * This is a subset of {@link #datanodeMap}, containing nodes that are  
   * considered alive. 
   * The {@link HeartbeatMonitor} periodically checks for outdated entries, 
   * and removes them from the list. 
   * 所有目前活着的DataNode,线程HeartbeatMonitor会定期检查 
   */  
  ArrayList<DatanodeDescriptor> heartbeats = new ArrayList<DatanodeDescriptor>();  
  
/** 
 * Store set of Blocks that need to be replicated 1 or more times. 
 * Set of: Block 
 *  
 * 需要进行复制的数据块。UnderReplicatedBlocks的类图如下,它其实是一个数组,数组的下标是优先级(0的优先级最高,如果数据块只有一个副本,它的优先级是0), 
 * 数组的内容是一个Block集合。UnderReplicatedBlocks提供一些方法,对Block进行增加,修改,查找和删除。 
 *  
 */  
  private UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();  
  // We also store pending replication-orders.保存正在复制的数据块的相关信息  
  private PendingReplicationBlocks pendingReplications;  
  
  public LeaseManager leaseManager = new LeaseManager(this);   
  
  //  
  // Threaded object that checks to see if we have been  
  // getting heartbeats from all clients.   
  //  
  Daemon hbthread = null;   // HeartbeatMonitor thread 对应DataNode心跳检查  
  public Daemon lmthread = null;   // LeaseMonitor thread 租约检查  
  Daemon smmthread = null;  // SafeModeMonitor thread 安全模式检查  
  public Daemon replthread = null;  // Replication thread 数据块复制  
  private ReplicationMonitor replmon = null; // Replication metrics  
    
  private volatile boolean fsRunning = true; //系统运行标志  
  long systemStart = 0;//系统启动时间  
  
  //  The maximum number of replicates we should allow for a single block  
  private int maxReplication;  
  //  How many outgoing replication streams a given node should have at one time  
  private int maxReplicationStreams;  
  // MIN_REPLICATION is how many copies we need in place or else we disallow the write  
  private int minReplication;  
  // Default replication  
  private int defaultReplication;  
  // Variable to stall new replication checks for testing purposes  
  private volatile boolean stallReplicationWork = false;  
  // heartbeatRecheckInterval is how often namenode checks for expired datanodes  
  private long heartbeatRecheckInterval;  
  // heartbeatExpireInterval is how long namenode waits for datanode to report  
  // heartbeat  
  private long heartbeatExpireInterval;  
  //replicationRecheckInterval is how often namenode checks for new replication work  
  private long replicationRecheckInterval;  
  // default block size of a file  
  private long defaultBlockSize = 0;  
  // allow appending to hdfs files  
  private boolean supportAppends = true;  
  
  /** 
   * Last block index used for replication work. 
   */  
  private int replIndex = 0; ///和neededReplications配合,记录下一个进行复制的数据块位置。  
  private long missingBlocksInCurIter = 0;  
  private long missingBlocksInPrevIter = 0;   
  
  public static FSNamesystem fsNamesystemObject;  
  /** NameNode RPC address */  
  private InetSocketAddress nameNodeAddress = null; // TODO: name-node has this field, it should be removed here  
    
  //安全模式是这样一种状态,系统处于这个状态时,不接受任何对名字空间的修改,同时也不会对数据块进行复制或删除数据块。  
  //NameNode启动的时候会自动进入安全模式,同时也可以手工进入(不会自动离开)。系统启动以后,DataNode会报告目前它拥有的数据块的信息,  
  //当系统接收到的Block信息到达一定门槛,同时每个Block都有dfs.replication.min个副本后,系统等待一段时间后就离开安全模式。这个门槛定义的参数包括:  
  //dfs.safemode.threshold.pct:接受到的Block的比例,缺省为95%,就是说,必须DataNode报告的数据块数目占总数的95%,才到达门槛;  
  //dfs.replication.min:缺省为1,即每个副本都存在系统中;  
  //dfs.replication.min:等待时间,缺省为0,单位秒。  
  private SafeModeInfo safeMode;  // safe mode information  
    
  //保存了主机名(String)到DatanodeDescriptor数组的映射(Host2NodesMap唯一的成员变量为HashMap<String,DatanodeDescriptor[]> map,它的方法都是对这个map进行操作)。   
  private Host2NodesMap host2DataNodeMap = new Host2NodesMap();  
    
    
  // datanode networktoplogy  
  //  定义了HDFS的网络拓扑,网络拓扑对应选择数据块副本的位置很重要。如在一个层次型的网络中,接到同一个交换机的两个节点间的网络速度,  
  //  会比跨越多个交换机的两个节点间的速度快,但是,如果某交换机故障,那么它对接到它上面的两个节点会同时有影响,但跨越多个交换机的两个节点,这种影响会小得多  
  NetworkTopology clusterMap = new NetworkTopology();  
    
  private DNSToSwitchMapping dnsToSwitchMapping;  
    
 // for block replicas placement  
//用于为数据块备份选择目标,例如,用户写文件时,需要选择一些DataNode,作为数据块的存放位置,这时候就利用它来选择目标地址。  
//chooseTarget是ReplicationTargetChooser中最重要的方法,  
//它通过内部的一个NetworkTopology对象,计算出一个DatanodeDescriptor数组,该数组就是选定的DataNode,同时,顺序就是最佳的数据流顺序  
  ReplicationTargetChooser replicator;  
  
  //保存了系统中允许/不允许连接到NameNode的机器列表  
  private HostsFileReader hostsReader;   
    
  //  线程句柄,该线程用于检测DataNode上的Decommission进程。例如,某节点被列入到不允许连接到NameNode的机器列表中(HostsFileReader)  
  //  那么,该节点会进入Decommission状态,它上面的数据块会被复制到其它节点,复制结束后机器进入DatanodeInfo.AdminStates.DECOMMISSIONED,这台机器就可以从HDFS中撤掉。  
  private Daemon dnthread = null;  
  
  //系统能拥有的INode最大数(配置项dfs.max.objects,0为无限制)。  
  private long maxFsObjects = 0;          // maximum number of fs objects  
  
  /** 
   * The global generation stamp for this file system.  
   */  
  private final GenerationStamp generationStamp = new GenerationStamp();  
  
  // Ask Datanode only up to this many blocks to delete.  
  //  发送给DataNode删除数据块消息中,能包含的最大数据块数。比方说,如果某DataNode上有250个Block需要被删除,而这个参数是100,  
  //  那么一共会有3条删除数据块消息消息,前面两条包含了100个数据块,最后一条是50个。  
  int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;  
  
  // precision of access times.  
  //用于控制文件的access时间的精度,也就是说,小于这个精度的两次对文件访问,后面的那次就不做记录了。  
  private long accessTimePrecision = 0;  
  private String nameNodeHostName;  

 

   FSNamesystem初始化方法:
 
    private void initialize(NameNode nn, Configuration conf) throws IOException {  
          
        this.systemStart = now();  
          
        setConfigurationParameters(conf);  
        //读取配置文件    
          
        dtSecretManager = createDelegationTokenSecretManager(conf);  
        //读取dfs.namenode.delegation.token的相关配置    
          
          
        this.nameNodeAddress = nn.getNameNodeAddress();  
          
        this.registerMBean(conf); // register the MBean for the FSNamesystemStutus  
          
        this.dir = new FSDirectory(this, conf);  
          
        StartupOption startOpt = NameNode.getStartupOption(conf);  
          
        this.dir.loadFSImage(getNamespaceDirs(conf),  
                             getNamespaceEditsDirs(conf), startOpt);  
        //加载namenode持久化在硬盘的信息    
          
          
        long timeTakenToLoadFSImage = now() - systemStart;  
        LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");  
        NameNode.getNameNodeMetrics().setFsImageLoadTime(timeTakenToLoadFSImage);  
          
        //加载安全模式信息,进入安全模式    
        this.safeMode = new SafeModeInfo(conf);  
        //将block的总数设置给safemode  
        setBlockTotal();  
          
        pendingReplications = new PendingReplicationBlocks(  
                                conf.getInt("dfs.replication.pending.timeout.sec",   
                                            -1) * 1000L);  
        if (isAccessTokenEnabled) {  
          accessTokenHandler = new BlockTokenSecretManager(true,  
              accessKeyUpdateInterval, accessTokenLifetime);  
        }  
          
        //启动心跳监控的线程    
        this.hbthread = new Daemon(new HeartbeatMonitor());  
      
        //启动文件租约管理监控的线程    
        this.lmthread = new Daemon(leaseManager.new Monitor());  
          
        //启动副本监控的线程    
        this.replmon = new ReplicationMonitor();  
        this.replthread = new Daemon(replmon);  
          
        hbthread.start();  
        lmthread.start();  
        replthread.start();  
      
        //读取主机信息黑白名单  
        this.hostsReader = new HostsFileReader(conf.get("dfs.hosts",""),  
                                               conf.get("dfs.hosts.exclude",""));  
          
        //启动退役节点监控的线程    
        this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(  
            conf.getInt("dfs.namenode.decommission.interval", 30),  
            conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));  
        dnthread.start();  
      
        this.dnsToSwitchMapping = ReflectionUtils.newInstance(  
            conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,  
                DNSToSwitchMapping.class), conf);  
          
        /* If the dns to swith mapping supports cache, resolve network  
         * locations of those hosts in the include list,  
         * and store the mapping in the cache; so future calls to resolve 
         * will be fast. 
         */  
        if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {  
          dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));  
        }  
          
        InetSocketAddress socAddr = NameNode.getAddress(conf);  
        this.nameNodeHostName = socAddr.getHostName();  
          
        //将这个类注册到监控系统    
        registerWith(DefaultMetricsSystem.INSTANCE);  
      }  
 
       从代码中可以看到,FSNamesystem的initialize方法主要的工作是:通过读取配置文件设置成员变量;创建FSDirectory并 loadFSImage;设置系统安全模式;启动一系列的后台线程monitorDaemon。其中对 loadFSImage非常重要。 Namenode会将HDFS的文件和目录元数据存储在一个叫fsimage的二进制文 件中,每次保存fsimage之后到下次保存之间的所有hdfs操作,将会记录在editlog文件中,当editlog达到一定的大小(bytes,由 fs.checkpoint.size参数定义)或从上次保存过后一定时间段过后(sec,由fs.checkpoint.period参数定 义),namenode会重新将内存中对整个HDFS的目录树和文件元数据刷到fsimage文件中。Namenode就是通过这种方式来保证HDFS中 元数据信息的安全性。当namenode重启加载fsimage时,就是按照如下格式协议从文件流中加载元数据信息。
从fsimag的存储格式可以看出,fsimage保存有如下信息:
写道
1. image head,其中包含:
a) imgVersion(int):当前image的版本信息
b) namespaceID(int):用来确保别的HDFS instance中的datanode不会误连上当前NN。
c) numFiles(long):整个文件系统中包含有多少文件和目录
d) genStamp(long):生成该image时的时间戳信息。

2.文件或目录的源数据信息,如果是目录,则包含以下信息:
a)path(String):该目录的路径,如”/user/zhuhui/data”
b)replications(short):副本数(目录虽然没有副本,但这里记录的目录副本数也为3)
c)mtime(long):该目录的修改时间的时间戳信息
d)atime(long):该目录的访问时间的时间戳信息
e)blocksize(long):目录的blocksize都为0
f)numBlocks(int):实际有多少个文件块,目录的该值都为-1,表示该item为目录
g)nsQuota(long):namespace Quota值,若没加Quota限制则为-1
h)dsQuota(long):disk Quota值,若没加限制则也为-1
i)username(String):该目录的所属用户名
j)group(String):该目录的所属组
k)permission(short):该目录的permission信息,如644等,有一个short来记录。

3.如果是文件,则还会额外包含如下信息:
a)blockid(long):属于该文件的block的blockid,
b)numBytes(long):该block的大小
c)genStamp(long):该block的时间戳
 
       当该文件对应的numBlocks数不为1,而是大于1时,表示该文件对应有多个block信息,此时紧接在该fsimage之后的就会有多个 blockid,numBytes和genStamp信息。因此,在namenode启动时,就需要对fsimage按照如下格式进行顺序的加载,以将 fsimage中记录的HDFS元数据信息加载到内存中。
       namenode在加载fsimage过程其实非常简单,就是从fsimage中不停的顺序读取文件和目录的元数据信息,并在内存中构建整个 namespace,此时BlocksMap中每个block对应的datanodes 列表暂时为空。当fsimage加载完毕后,整个HDFS的目录结构在内存中就已经初始化完毕,所缺的就是每个文件对应的block对应的 datanode列表信息。这些信息需要从datanode的blockReport中获取,所以加载fsimage完毕后,namenode进程进入 rpc等待状态,等待所有的datanodes发送blockReports。
    HDFS将fsimage和edits文件内容读入内存,进行合并,填充与INode相关的元数据结构,并将新的元数据内存镜像导出,在磁盘上形成新的 fsimage和edits文件。HDFS同时还接收DataNode的心跳信息,填充与Block和Data Node相关的元数据结构。
    saveNameSpace将元数据写入到磁盘,具体操作步骤:首先将current目录重命名为lastcheckpoint.tmp;然后在创建新的 current目录,并保存文件;最后将lastcheckpoint.tmp重命名为privios.checkpoint

    在加载完成时,初始化了一下四个守护线程:

    //启动心跳监控的线程    
    this.hbthread = new Daemon(new HeartbeatMonitor());  
      
    //启动文件租约管理监控的线程    
    this.lmthread = new Daemon(leaseManager.new Monitor());  
      
    //启动副本监控的线程    
    this.replmon = new ReplicationMonitor();  
    this.replthread = new Daemon(replmon);  
      
    hbthread.start();  
    lmthread.start();  
    replthread.start();  
      
    //启动退役节点监控的线程    
    this.dnthread = new Daemon(new DecommissionManager(this).new Monitor(  
        conf.getInt("dfs.namenode.decommission.interval", 30),  
        conf.getInt("dfs.namenode.decommission.nodes.per.interval", 5)));  
    dnthread.start();  
 (2)初始化RPC服务器
 //init namenode-client RPC Server  
  this.server = RPC.getServer(this, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager());  
     
 public synchronized void start() {  
  responder.start();  
  listener.start();  
  handlers = new Handler[handlerCount];  
    
  for (int i = 0; i < handlerCount; i++) {  
    handlers[i] = new Handler(i);  
    handlers[i].start();  
  }  
}  
   server启动了三个(假设handlercount为1)线程,这三个线程分别为listener,responder,handler ,这三个线程之间是有职责关系的:Hadoop的Server采用了Java的NIO,这样的话就不需要为每一个socket连接建立一个线程,读取 socket上的数据。在Server中,只需要一个线程,就可以accept新的连接请求和读取socket上的数据,这个线程,就是 Listener。请求处理线程一般有多个,它们都是Server.Handle类的实例。它们的run方法循环地取出一个Server.Call,调用 Server.call方法,搜集结果并串行化,然后将结果放入Responder队列中。对于处理完的请求,需要将结果写回去,同样,利用NIO,只需 要一个线程,相关的逻辑在Responder里。
(3)启动Trash Emptier 线程 
    //启动垃圾清理守护线程,读取fs.trash.interval的值作为两次清理的时间间隔。默认每60分钟清理一次    
    startTrashEmptier(conf);  
      
    private void startTrashEmptier(Configuration conf) throws IOException {  
      this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");  
      this.emptier.setDaemon(true);  
      this.emptier.start();  
    }  
 
分享到:
评论

相关推荐

    Hadoop02---HDFS基础(HDFS原理 java客户端).md

    自己的笔记,仅供参考,包含HDFS的启动停止,HDFS基本原理(上传文件、连接校验,下载文件,数据存储位置,通信机制,namenode和DataNode职责,元数据工作机制),java端操作HDFS的基本方法

    hadoop-hdfs-fsimage-exporter:将Hadoop HDFS内容统计信息导出到Prometheus

    不会给HDFS NameNode增加额外的负担(没有NameNode查询,您可以在第二个NameNode上运行它) 缺点是 没有实时更新,仅在NameNode写入FSImage时每6小时更新一次。 但是对于大多数情况(长期趋势,检测HDF

    ansible-hdfs:用于安装 Cloudera HDFS 的 Ansible 角色

    角色变量hdfs_version - HDFS 版本hdfs_cloudera_distribution - Cloudera 发行版(默认: cdh5.4 ) hdfs_conf_dir - HDFS 的配置目录(默认: /etc/hadoop/conf ) hdfs_namenode - 确定节点是否为 HDFS NameNode ...

    大数据hadoop,centos7的安装

    格式化文件系统(仅第一次执行即可,不要重复执行):hdfs/hadoop namenode -format 启动hdfs: sbin/start-dfs.sh 验证是否启动成功: jps DataNode SecondaryNameNode NameNode 浏览器访问方式: ...

    Hadoop 2.X HDFS源码剖析

    《Hadoop 2.X HDFS源码剖析》一共有5章,其中第1章从总体上介绍了HDFS的组件、概念以及典型的流程,同时详细介绍了HDFS各个组件间RPC接口的定义。第2章介绍了Hadoop RPC框架的实现,Hadoop RPC是HDFS各个组件间通信...

    Hadoop3.2.2资源包+安装文档

    Hadoop 2.x - 对于数据平衡使用HDFS平衡器。 Hadoop 3.x - 对于数据平衡使用Intra-data节点平衡器,该平衡器通过HDFS磁盘平衡器CLI调用。 存储Scheme Hadoop 2.x - 使用3X副本Scheme Hadoop 3.x - 支持HDFS中的擦除...

    hadoop-2.7.7单机win7或win10搭建完整包

    6.在cmd输入hdfs namenode -format,看到successfully就说明format成功 7.在E:\apps\hadoop-2.7.7\sbin下按shiift右键打开cmd输入start-all,启动hadoop集群即可 8.出现启动失败,请删除E:\apps\hadoop-2.7.7\work\...

    hadoop-2.2.0-x64.tar.gz part3

    自己编译的64bithadoop-2.2.0版本 [INFO] Reactor Summary: ...This command was run using /home/hadoop/Desktop/hadoop-2.2.0-src/hadoop-dist/target/hadoop-2.2.0/share/hadoop/common/hadoop-common-2.2.0.jar

    hadoop-hdfs:Hadoop分布式文件系统hdfs代码分析

    hadoop-hdfs Hadoop分布式文件系统hdfs代码分析目录介绍Datanode-数据块基本结构主要介绍了HDFS中第二关系块结构,数据块到数据中断的映射关系。退役-中断退款主要介绍了数据异步下线取消机制。INode-文件目录结构...

    高可用性的HDFS:Hadoop分布式文件系统深度实践

    第4章 Hadoop的Backup Node方案 4.1 Backup Node概述 4.1.1 系统架构 4.1.2 使用原则 4.1.3 优缺点 4.2 运行机制分析 4.2.1 启动流程 4.2.2 元数据操作情景分析 4.2.3 日志池(journal spool)机制 4.2.4 故障切换...

    Hadoop大数据平台构建、HDFS配置、启动与验证教学课件.pptx

    HDFS配置、启动与验证 HDFS配置、启动与验证 序号 任务名称 任务一 Hadoop安装及JDK环境变量配置 任务二 HDFS组件参数配置 任务三 配置Hadoop环境变量 任务四 分发Hadoop文件 任务五 NameNode格式化 任务六 启动HDFS...

    零基础学习Hadoop3.0从入门到源码

    │ ├─视频-零基础学习Hadoop3.0-HDFS从入门到源码 │ │ │ 00--课程内容大纲和学习目标.mp4 │ │ │ 01--大数据课程导论--大数据概念.mp4 │ │ │ 02--大数据课程导论--大数据特点(5V特征).mp4 │...

    hadoop-oozie:具有Oozie的映像,该映像是为Hadoop 2.x构建的(带有2.7.0库)

    andlaz/hadoop-oozie su oozie -c 'oozie-setup.sh sharelib create -fs hdfs://namenode:8020' 启动Ooozie docker run -d --name oozie -p 0.0.0.0:11000 -p 0.0.0.0:11001:11001 \ andlaz/hadoop-oozie su oozie ...

    HDFS源码剖析带书签目录高清.zip

    《Hadoop 2.X HDFS源码剖析》一共有5章,其中第1章从总体上介绍了HDFS的组件、概念以及典型的流程,同时详细介绍了HDFS各个组件间RPC接口的定义。第2章介绍了Hadoop RPC框架的实现,Hadoop RPC是HDFS各个组件间通信...

    hadoop-cos:hadoop-cos为Apache Hadoop、Spark以及Tez等大数据计算框架集成提供支持,可以像访问HDFS一样读写存储在腾讯云COS上的数据。同时也支持作为Druid等查询与分析引擎的Deep Storage

    HADOOP-COS功能说明Hadoop-COS实现了以腾讯云 COS 作为底层文件系统运行上层计算任务的功能,支持使用Hadoop、Spark以及Tez等处理存储在腾讯云COS对象存储系统上的数据。使用限制只适用于 COS V5 版本使用环境系统...

    hadoop-2.2.0-x64.tar.gz part2

    hadoop-2.2.0 64bit下载,自己编译的 [INFO] Reactor Summary: ...This command was run using /home/hadoop/Desktop/hadoop-2.2.0-src/hadoop-dist/target/hadoop-2.2.0/share/hadoop/common/hadoop-common-2.2.0.jar

    07.HDFS工作机制--namenode元数据管理--checkpoint.mp4

    07.HDFS工作机制--namenode元数据管理--checkpoint.mp4

    HadoopHA集群部署、HDFSHA配置、启动与验证教学课件.pptx

    HDFS HA 配置、启动与验证;HDFS HA 配置、启动与验证;任务一 修改core-site.xml配置文件;任务二 修改hdfs-site.xml配置文件(一);任务二 修改hdfs-site.xml配置文件(二);任务二 修改hdfs-site.xml配置文件(三);...

    1-1-HDFS+and+YARN.pdf

    Hadoop集群具体来说包含两个集群:HDFS集群和YARN集群,两者逻辑上分离,但物理上常在一起。 (1)HDFS集群:负责海量数据的存储,集群中的角色主要有 NameNode / DataNode/SecondaryNameNode。 (2)YARN集群:负责...

    网络前沿技术 Hadoop

    实验1:Hadoop 安装与配置管理 4 1.1【实验目的】 4 1.2【实验环境】 4 1.3【实验过程】 4 Step1:基础设施. 5 Step2:各服务器安装JDK 7 Step3:各服务器的网络设置 8 Step4:在namenode安装Hadoop 9 Step5:修改Hadoop...

Global site tag (gtag.js) - Google Analytics