`

hadoop_rpc简介

 
阅读更多

 

1.RPC简介
 1.1 RPC (remote procedure call)远程过程调用,

       指的是java进程,即一个java进程调用另一个java进程中对象的方法。

 1.2 RPC至少有两个过程。调用方(client),被调用方(server)。
 1.3 client主动发起请求,调用指定ip和port的server中的方法,把调用结果返回给client。
 1.4 RPC是hadoop构建的基础。

 1.5 RPC实际上就是socket通信,只要知道了对方地址和端口,即可实现通信,因此RPC可以实现多进程间交流,更适合于Hadoop集群不同地址(分布式)下的通信


2 模拟rpc机制,自定义代码写法:

  2.1 需要一个接口和具体业务类,在接口中定义好需要被远程调用的业务类

(相当于公约,定义好公共规则-要调用的方法,然后客户端调用时直接传递规约中指定好的方法,最终调用到服务的这个方法上)

  2.2 需要有RPC客户端和服务端类

  2.3 代码如下:

1 接口:

import org.apache.hadoop.ipc.VersionedProtocol;
public interface MyBizAble extends VersionedProtocol{

	public String sayHello(String name);
}

2 业务实现类:

public class MyBiz implements MyBizAble{

	
	public String sayHello(String name){
		
		return "hello " + name;
	}

	@Override
	public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
		return 0;
	}
}


3 服务端

public class MyServer {

	public static String bindAddress = "localhost";
	public static int port = 1001;
	 
	public static void main(String[] args) throws IOException {
		
		MyBiz myBiz = new MyBiz();
	    
		 /** Construct an RPC server.
	     * @param instance the instance whose methods will be called
	     * @param conf the configuration to use
	     * @param bindAddress the address to bind on to listen for connection
	     * @param port the port to listen for connections on
	     */
将服务端实例myBiz 服务端地址 端口交给RPC管理监听,客户端调用服务端对应方法时,会通过RPC
关联到服务端对应方法,服务端处理好后将结果在通过RPC 网络流向RPC客户端
	    Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration());
	    server.start();
	}
	
	
}


4 客户端

public class MyClient {

	public static void main(String[] args) throws IOException {
		/**
		 * 构建一个客户端代理对象
		 * 该代理对象实现了命名协议
		 * 代理对象会与指定地址的服务端通讯
		 */
		MyBizAble proxy = (MyBizAble)RPC.waitForProxy(MyBizAble.class, 1001, 
				new InetSocketAddress(MyServer.bindAddress, MyServer.port), 
				new Configuration());
		String result = proxy.sayHello("zm");
		System.out.println(result);
		
	/*	MyBizAble proxy = (MyBizAble)RPC.waitForProxy(
				MyBizAble.class,
				MyBizAble.VERSION,
				new InetSocketAddress(MyServer.ADDRESS, MyServer.PORT),
				new Configuration());
	final String result = proxy.hello("world");
	System.out.println("客户端结果:"+result);
	//关闭网络连接
	RPC.stopProxy(proxy);*/
	
		
	}
}

 

 

 

3 hadoop中rpc结构图




 

 
 

 

4 查看namenode源码,确认是否为rpc通讯的一部分,是rpc的服务端

 

4.1 是否长时间运行,一直等待客户端请求(socket server端就是这种典型标志)

4.2 看是否含有服务端代表典型标志:

 

 Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration());

 

 

4.3 源码跟踪和解释如下:

namenode rpc代码跟踪如下:

public class NameNode implements ClientProtocol, DatanodeProtocol,
                                 NamenodeProtocol,.... {
 .......
 /** RPC server */
  private Server server;
 ......

 public static void main(String argv[]) throws Exception {
    try {
      StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
      NameNode namenode = createNameNode(argv, null); // 点击进入
      if (namenode != null)
        namenode.join();
    } catch (Throwable e) {
      LOG.error(StringUtils.stringifyException(e));
      System.exit(-1);
    }
  }	

 如下:
  public static NameNode createNameNode(String argv[], 
                                 Configuration conf) throws IOException {
	...............
     NameNode namenode = new NameNode(conf); 	 // 点击进入
	...............							 
	}

如下:
  /*  ................
    upgrade and create a snapshot of the current file system state  在
	  ...............
  */
  public NameNode(Configuration conf) throws IOException {
    try {
      initialize(conf);// 点击进入
    } catch (IOException e) {
      this.stop();
      throw e;
    }
  }


 如下:
  private void initialize(Configuration conf) throws IOException {
  ...............
  this.server = RPC.getServer(this, socAddr.getHostName(), // this代表Namenode类 
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());
  ...............
  }

}	  


源码参考完毕,解释如下:


一般的RPC server端代码在调用时:
第一个参数the instance whose methods will be called 表示要调用的业务类的方法,
Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration());

上面Namenode中,第一个参数this表示我的业务类是Namonode,同时RPC.getServer又出现在了Namenode类,因此Namenode
即表示了业务实现类,又担当了RPC server的功能。
this.server = RPC.getServer(this, socAddr.getHostName(), // this代表Namenode类 
        socAddr.getPort(), handlerCount, false, conf, namesystem
        .getDelegationTokenSecretManager());
	


NameNode 作为业务实现类,需要实现公约,才能被客户端调用到,
看代码 
public class NameNode implements ClientProtocol, DatanodeProtocol,NamenodeProtocol,
其中:
ClientProtocol, DatanodeProtocol,NamenodeProtocol 就是公约,分别代表:
ClientProtocol: 定义用户和namenode打交道的方法的接口公约
DatanodeProtocol:Protocol that a DFS datanode uses to communicate with the NameNode
NamenodeProtocol: secondary NameNode uses to communicate with the NameNode


 

 

5 通过代码查看client是如何通过rpc机制来调用到namenode节点的

 

0 客户端上传文件到hdfs为例:
	private static void putData(FileSystem fileSystem) {
		try {
			System.out.println(fileSystem.getClass().getName());//运行后打印结果如下 org.apache.hadoop.hdfs.DistributedFileSystem
			
			FSDataOutputStream out = fileSystem.create(new Path(FILE)); // 点击进入
			FileInputStream in = new FileInputStream("E:/seq100w.txt"); 
			IOUtils.copyBytes(in, out, 1024, true);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	如下:
	 public FSDataOutputStream create(Path f) throws IOException {
    return create(f, true); // 点击进入
  }
   不断点击进入,直到如下:
  
  如下:
    public abstract FSDataOutputStream create(Path f,
      FsPermission permission,
      boolean overwrite,
      int bufferSize,
      short replication,
      long blockSize,
      Progressable progress) throws IOException;

	  
   去org.apache.hadoop.hdfs.DistributedFileSystem中找方法create:
     public FSDataOutputStream create(Path f, FsPermission permission,
    boolean overwrite,
    int bufferSize, short replication, long blockSize,
    Progressable progress) throws IOException {

    statistics.incrementWriteOps(1);
    return new FSDataOutputStream
       (dfs.create(getPathName(f), permission,  // 点击进入 dfs.create
                   overwrite, true, replication, blockSize, progress, bufferSize),
        statistics);
  } //  返回类是一个流, 流就需要一个目的地,
  
  如下: 进入到类DFSClient.java 
  
  
  Create a new dfs file with the specified block replication 
   * with write-progress reporting and return an output stream for writing
   * into the file.  
   // 主要关注两件事: 1 在namenode上创建了一个dfs file  2 创建用于存储用户上传文件数据的文件流
  public OutputStream create(String src, 
                             FsPermission permission,
                             boolean overwrite, 
                             boolean createParent,
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize
                             ) throws IOException {
    checkOpen();
    if (permission == null) {
      permission = FsPermission.getDefault();
    }
    FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
    LOG.debug(src + ": masked=" + masked);
    final DFSOutputStream result = new DFSOutputStream(src, masked,  // 点击进入
        overwrite, createParent, replication, blockSize, progress, buffersize,
        conf.getInt("io.bytes.per.checksum", 512));
    beginFileLease(src, result);
    return result;
  }
   
  
      如下: 在类DFSClient.java内,有内部类DFSOutputStream
	  DFSClient.java {
	  
	  public final ClientProtocol namenode;
	  
      /**
     * Create a new output stream to the given DataNode.//创建指向特定datanode的输出流
     * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
     */
    DFSOutputStream(String src, FsPermission masked, boolean overwrite,
        boolean createParent, short replication, long blockSize, Progressable progress,
        int buffersize, int bytesPerChecksum) throws IOException {
      this(src, blockSize, progress, bytesPerChecksum, replication);

      computePacketChunkSize(writePacketSize, bytesPerChecksum);

      try {
        // Make sure the regular create() is done through the old create().
        // This is done to ensure that newer clients (post-1.0) can talk to
        // older clusters (pre-1.0). Older clusters lack the new  create()
        // method accepting createParent as one of the arguments.
        if (createParent) {
          namenode.create( // 看这里,这里的namenode是ClientProtocol类型, 
		                   // 此时才真正是 DFSClient.java通过ClientProtocol来和NamoNode搭上线
						   // 此时RCP客户端已经调用了公约ClientProtocol的create方法,
            src, masked, clientName, overwrite, replication, blockSize);
        } else {
          namenode.create(
            src, masked, clientName, overwrite, false, replication, blockSize);
        }
      } catch(RemoteException re) {
        throw re.unwrapRemoteException(AccessControlException.class,
                                       FileAlreadyExistsException.class,
                                       FileNotFoundException.class,
                                       NSQuotaExceededException.class,
                                       DSQuotaExceededException.class);
      }
      streamer.start();
    }
	
	
	而 ClientProtocol namenode的初始化操作在 DFSClient构造函数中,
	DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,
      Configuration conf, FileSystem.Statistics stats) {
	   ......
	   277行 this.namenode = createNamenode(this.rpcNamenode, conf); 进行了初始化操作
	  }

	  
	而DFSClient初始化在类DistributedFileSystem:
	 public void initialize(URI uri, Configuration conf) throws IOException {
	 ...
	this.dfs = new DFSClient(namenode, conf, statistics);  // 100行
	...
	}
  }
  

 

流程图如下:



 

 

 

6  通过代码查看DataNode与NameNode是如何RPC通信的(涉及心跳机制简介)

 

 

DataNode:{

public DatanodeProtocol namenode = null;

开始:
public static void main(String args[]) {
    secureMain(args, null);
  }
  
  
  
 一路跟踪到:
 
 void startDataNode(Configuration conf, 
                     AbstractList<File> dataDirs, SecureResources resources
                     ) throws IOException {
	...
	 // connect to name node     370行  得到DatanodeProtocol的客户端实例
    this.namenode = (DatanodeProtocol)   
      RPC.waitForProxy(DatanodeProtocol.class,
                       DatanodeProtocol.versionID,
                       nameNodeAddr, 
                       conf);
	...
	
}	

通过搜索  namenode.  来看RPC客户端Datanode都调用了DatanodeProtocol公约的哪些方法.
eg: namenode.sendHeartbeat(),  


而NameNode通过实现规约DatanodeProtocol的方法 来提供RPC服务端。
这就是DataNode和NameNode交流的过程。





心跳机制:

查看DataNode sendHeartbeat(...)方法 如下:

DataNode :

public void offerService() throws Exception {
     
    LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" + 
       " Initial delay: " + initialBlockReportDelay + "msec");

    //
    // Now loop for a long time....
    //

    while (shouldRun) {
      try {
        long startTime = now();

        //
        // Every so often, send heartbeat or block-report      在间隔大于3S下 执行相关操作
        //
        
        if (startTime - lastHeartbeat > heartBeatInterval) {
		
		  ....
		  }
	

}

 

 

 下面是hadoop-mapreduce-rpc 大概图:

 



 

 

 

 map-reduce 交互详细图:



 

 

 

  • 大小: 42.2 KB
  • 大小: 23 KB
  • 大小: 25.2 KB
  • 大小: 235.3 KB
分享到:
评论

相关推荐

    Hadoop_RPC详细分析.doc

    Hadoop RPC 详细分析 Hadoop RPC(Remote Procedure Call,远程过程调用)是 Hadoop 项目中的一个重要组件,用于实现分布式系统中的通信和数据交换。下面是对 Hadoop RPC 的详细分析。 RPCInterface Hadoop RPC ...

    hadoop rpc实例

    在这个实例中,我们将深入探讨Hadoop RPC的工作原理、客户端(`hadoop_rcp_client`)与服务器端(`hadoop_rpc_server`)的角色以及它们之间的交互过程。 ### Hadoop RPC概述 Hadoop RPC是Hadoop框架中用于进程间...

    学习hadoop_源代码,RPC_部分

    ### Hadoop RPC 深入理解 #### 一、引言 随着大数据处理需求的日益增长,Apache Hadoop 作为一款流行的开源分布式计算框架,在处理海量数据方面展现出了极高的性能和灵活性。其中,Hadoop 的远程过程调用(RPC)...

    java_RPC_hadoop.zip

    在Java中模拟Hadoop的RPC通讯,主要是为了理解其连接和心跳机制,这是保证Hadoop集群稳定运行的关键部分。 RPC的核心思想是透明性,即客户端可以像调用本地方法一样调用远程服务,由RPC框架负责数据的序列化、网络...

    PyPI 官网下载 | QuLab_RPC-1.3.3.tar.gz

    2. **Zookeeper**: ZooKeeper是Apache Hadoop的一个子项目,它是一个分布式的,开放源码的协调服务,用于分布式应用,提供命名服务、配置管理、分布式同步、组服务等。QuLab_RPC可能依赖Zookeeper来实现分布式环境中...

    大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第11期_HBase简介及安装_V1.0 共21页.pdf

    【大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第11期_HBase简介及安装_V1.0 共21页.pdf】这篇文档主要介绍了HBase这一大数据处理的重要组件,以及其在Hadoop生态系统中的角色。HBase是一个基于列族的...

    Hadoop RPC机制分析

    一、RPC简介 RPC是一种让程序能够调用运行在其他地址空间(通常在另一台机器上)的程序的方法。在Hadoop中,RPC被广泛用于NameNode、DataNode、TaskTracker等组件之间的通信,实现了服务端与客户端的透明交互。 二...

    细细品味Hadoop_Hadoop集群(第11期)_HBase简介及安装.pdf

    ### HBase简介及安装知识点详解 #### 一、HBase概述 **HBase** 是一个构建在 **Hadoop** 分布式文件系统 (HDFS) 之上的分布式、可扩展的大规模数据存储系统。它是针对大数据量场景设计的,特别适用于需要实时读写...

    PyPI 官网下载 | transmission_rpc-0.0.8.tar.gz

    Zookeeper是Apache Hadoop的一个子项目,它提供了一个分布式的,开放源码的协调服务,用于解决分布式应用中的命名、配置管理、组服务、分布式同步等问题。虽然在transmission_rpc的描述中没有直接提到Zookeeper,但...

    java操作hadoop的RPC,源码

    Java操作Hadoop的RPC(Remote Procedure Call)是分布式计算领域中的关键技术,它允许在不同的进程或机器之间进行远程调用,如同本地调用一样。Hadoop作为一个开源的大数据处理框架,其RPC机制是实现各个组件如...

    hadoop中RPC协议的小测试例子(吴超老师)

    在IT行业中,分布式计算系统的重要性日益凸显,而Hadoop作为其中的佼佼者,其核心组件之一就是远程过程调用(RPC,Remote Procedure Call)。RPC允许一个程序在某个网络中的计算机上执行另一个计算机上的程序,而...

    大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第12期副刊_HBase性能优化_V1.0 共26页.pdf

    压测过程中,开启RPC日志以监控内存消耗和GC情况,调整合适的IO线程数。 - `hbase.hregion.max.filesize`:默认256MB,定义了Region的最大文件大小。较小的Region有利于快速split和compaction,但过于频繁的操作...

    Hadoop的RPC通信程序

    ### Hadoop的RPC通信程序详解 #### 一、引言 在分布式系统中,远程过程调用(Remote Procedure Call, RPC)是一种重要的通信机制,它允许一台计算机上的程序调用另一台计算机上的子程序,而无需程序员了解底层网络...

    Hadoop_RPCDemo:Hadoop原始解析之RPC协议

    本文将深入探讨Hadoop中的远程过程调用(RPC)协议,这是Hadoop组件间通信的关键技术,也是理解Hadoop生态系统运作的重要一环。 RPC(Remote Procedure Call)允许一个程序在不关心远程服务器细节的情况下,调用...

    hadoop_yarn讲解ppt

    本ppt主要讲解yarn的基本架构,工作流程,基础库以及程序设计方法 容错等

    Hadoop自己的Rpc框架使用Demo

    在分布式计算领域,Hadoop RPC(Remote Procedure Call)框架是一个至关重要的组件,它允许不同的进程之间进行通信,尤其是在大规模数据处理的场景下。Hadoop RPC是Hadoop生态系统中的基础服务,使得不同模块如HDFS...

    Hadoop Java接口+RPC代码实现

    1.java接口操作Hadoop文件系统(文件上传下载删除创建......2.RPC远程过程调用的java代码实现,便于理解Hadoop的RPC协议,具体使用方法可参考我的博客https://blog.csdn.net/qq_34233510/article/details/88142507

    Hadoop rpc源码

    Hadoop rpc源码是从Hadoop分离出的ipc,去掉了认证部分,附录使用文档.使用前请add lib包commons-logging-*.*.*.jar(我用的是1.0.4)和log4j-*.*.*.jar(我的1.2.13) 相关blog post: ...

    Hadoop里的RPC机制过程

    在Hadoop中,远程过程调用(Remote Procedure Call, RPC)是一种重要的通信机制,它允许分布式系统中的组件之间进行高效且便捷的交互。Hadoop的RPC机制基于Java的客户端-服务器模型,允许客户端调用服务器上的方法,...

Global site tag (gtag.js) - Google Analytics