1.RPC简介
1.1 RPC (remote procedure call)远程过程调用,
指的是java进程,即一个java进程调用另一个java进程中对象的方法。
1.2 RPC至少有两个过程。调用方(client),被调用方(server)。
1.3 client主动发起请求,调用指定ip和port的server中的方法,把调用结果返回给client。
1.4 RPC是hadoop构建的基础。
1.5 RPC实际上就是socket通信,只要知道了对方地址和端口,即可实现通信,因此RPC可以实现多进程间交流,更适合于Hadoop集群不同地址(分布式)下的通信
2 模拟rpc机制,自定义代码写法:
2.1 需要一个接口和具体业务类,在接口中定义好需要被远程调用的业务类
(相当于公约,定义好公共规则-要调用的方法,然后客户端调用时直接传递规约中指定好的方法,最终调用到服务的这个方法上)
2.2 需要有RPC客户端和服务端类
2.3 代码如下:
1 接口: import org.apache.hadoop.ipc.VersionedProtocol; public interface MyBizAble extends VersionedProtocol{ public String sayHello(String name); } 2 业务实现类: public class MyBiz implements MyBizAble{ public String sayHello(String name){ return "hello " + name; } @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return 0; } } 3 服务端 public class MyServer { public static String bindAddress = "localhost"; public static int port = 1001; public static void main(String[] args) throws IOException { MyBiz myBiz = new MyBiz(); /** Construct an RPC server. * @param instance the instance whose methods will be called * @param conf the configuration to use * @param bindAddress the address to bind on to listen for connection * @param port the port to listen for connections on */ 将服务端实例myBiz 服务端地址 端口交给RPC管理监听,客户端调用服务端对应方法时,会通过RPC 关联到服务端对应方法,服务端处理好后将结果在通过RPC 网络流向RPC客户端 Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration()); server.start(); } } 4 客户端 public class MyClient { public static void main(String[] args) throws IOException { /** * 构建一个客户端代理对象 * 该代理对象实现了命名协议 * 代理对象会与指定地址的服务端通讯 */ MyBizAble proxy = (MyBizAble)RPC.waitForProxy(MyBizAble.class, 1001, new InetSocketAddress(MyServer.bindAddress, MyServer.port), new Configuration()); String result = proxy.sayHello("zm"); System.out.println(result); /* MyBizAble proxy = (MyBizAble)RPC.waitForProxy( MyBizAble.class, MyBizAble.VERSION, new InetSocketAddress(MyServer.ADDRESS, MyServer.PORT), new Configuration()); final String result = proxy.hello("world"); System.out.println("客户端结果:"+result); //关闭网络连接 RPC.stopProxy(proxy);*/ } }
3 hadoop中rpc结构图
4 查看namenode源码,确认是否为rpc通讯的一部分,是rpc的服务端
4.1 是否长时间运行,一直等待客户端请求(socket server端就是这种典型标志)
4.2 看是否含有服务端代表典型标志:
Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration());
4.3 源码跟踪和解释如下:
namenode rpc代码跟踪如下: public class NameNode implements ClientProtocol, DatanodeProtocol, NamenodeProtocol,.... { ....... /** RPC server */ private Server server; ...... public static void main(String argv[]) throws Exception { try { StringUtils.startupShutdownMessage(NameNode.class, argv, LOG); NameNode namenode = createNameNode(argv, null); // 点击进入 if (namenode != null) namenode.join(); } catch (Throwable e) { LOG.error(StringUtils.stringifyException(e)); System.exit(-1); } } 如下: public static NameNode createNameNode(String argv[], Configuration conf) throws IOException { ............... NameNode namenode = new NameNode(conf); // 点击进入 ............... } 如下: /* ................ upgrade and create a snapshot of the current file system state 在 ............... */ public NameNode(Configuration conf) throws IOException { try { initialize(conf);// 点击进入 } catch (IOException e) { this.stop(); throw e; } } 如下: private void initialize(Configuration conf) throws IOException { ............... this.server = RPC.getServer(this, socAddr.getHostName(), // this代表Namenode类 socAddr.getPort(), handlerCount, false, conf, namesystem .getDelegationTokenSecretManager()); ............... } } 源码参考完毕,解释如下: 一般的RPC server端代码在调用时: 第一个参数the instance whose methods will be called 表示要调用的业务类的方法, Server server = RPC.getServer(myBiz, bindAddress, port, new Configuration()); 上面Namenode中,第一个参数this表示我的业务类是Namonode,同时RPC.getServer又出现在了Namenode类,因此Namenode 即表示了业务实现类,又担当了RPC server的功能。 this.server = RPC.getServer(this, socAddr.getHostName(), // this代表Namenode类 socAddr.getPort(), handlerCount, false, conf, namesystem .getDelegationTokenSecretManager()); NameNode 作为业务实现类,需要实现公约,才能被客户端调用到, 看代码 public class NameNode implements ClientProtocol, DatanodeProtocol,NamenodeProtocol, 其中: ClientProtocol, DatanodeProtocol,NamenodeProtocol 就是公约,分别代表: ClientProtocol: 定义用户和namenode打交道的方法的接口公约 DatanodeProtocol:Protocol that a DFS datanode uses to communicate with the NameNode NamenodeProtocol: secondary NameNode uses to communicate with the NameNode
5 通过代码查看client是如何通过rpc机制来调用到namenode节点的
0 客户端上传文件到hdfs为例: private static void putData(FileSystem fileSystem) { try { System.out.println(fileSystem.getClass().getName());//运行后打印结果如下 org.apache.hadoop.hdfs.DistributedFileSystem FSDataOutputStream out = fileSystem.create(new Path(FILE)); // 点击进入 FileInputStream in = new FileInputStream("E:/seq100w.txt"); IOUtils.copyBytes(in, out, 1024, true); } catch (Exception e) { e.printStackTrace(); } } 如下: public FSDataOutputStream create(Path f) throws IOException { return create(f, true); // 点击进入 } 不断点击进入,直到如下: 如下: public abstract FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException; 去org.apache.hadoop.hdfs.DistributedFileSystem中找方法create: public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { statistics.incrementWriteOps(1); return new FSDataOutputStream (dfs.create(getPathName(f), permission, // 点击进入 dfs.create overwrite, true, replication, blockSize, progress, bufferSize), statistics); } // 返回类是一个流, 流就需要一个目的地, 如下: 进入到类DFSClient.java Create a new dfs file with the specified block replication * with write-progress reporting and return an output stream for writing * into the file. // 主要关注两件事: 1 在namenode上创建了一个dfs file 2 创建用于存储用户上传文件数据的文件流 public OutputStream create(String src, FsPermission permission, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize ) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getDefault(); } FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf)); LOG.debug(src + ": masked=" + masked); final DFSOutputStream result = new DFSOutputStream(src, masked, // 点击进入 overwrite, createParent, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512)); beginFileLease(src, result); return result; } 如下: 在类DFSClient.java内,有内部类DFSOutputStream DFSClient.java { public final ClientProtocol namenode; /** * Create a new output stream to the given DataNode.//创建指向特定datanode的输出流 * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long) */ DFSOutputStream(String src, FsPermission masked, boolean overwrite, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, int bytesPerChecksum) throws IOException { this(src, blockSize, progress, bytesPerChecksum, replication); computePacketChunkSize(writePacketSize, bytesPerChecksum); try { // Make sure the regular create() is done through the old create(). // This is done to ensure that newer clients (post-1.0) can talk to // older clusters (pre-1.0). Older clusters lack the new create() // method accepting createParent as one of the arguments. if (createParent) { namenode.create( // 看这里,这里的namenode是ClientProtocol类型, // 此时才真正是 DFSClient.java通过ClientProtocol来和NamoNode搭上线 // 此时RCP客户端已经调用了公约ClientProtocol的create方法, src, masked, clientName, overwrite, replication, blockSize); } else { namenode.create( src, masked, clientName, overwrite, false, replication, blockSize); } } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileAlreadyExistsException.class, FileNotFoundException.class, NSQuotaExceededException.class, DSQuotaExceededException.class); } streamer.start(); } 而 ClientProtocol namenode的初始化操作在 DFSClient构造函数中, DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) { ...... 277行 this.namenode = createNamenode(this.rpcNamenode, conf); 进行了初始化操作 } 而DFSClient初始化在类DistributedFileSystem: public void initialize(URI uri, Configuration conf) throws IOException { ... this.dfs = new DFSClient(namenode, conf, statistics); // 100行 ... } }
流程图如下:
相关推荐
Hadoop RPC 详细分析 Hadoop RPC(Remote Procedure Call,远程过程调用)是 Hadoop 项目中的一个重要组件,用于实现分布式系统中的通信和数据交换。下面是对 Hadoop RPC 的详细分析。 RPCInterface Hadoop RPC ...
在这个实例中,我们将深入探讨Hadoop RPC的工作原理、客户端(`hadoop_rcp_client`)与服务器端(`hadoop_rpc_server`)的角色以及它们之间的交互过程。 ### Hadoop RPC概述 Hadoop RPC是Hadoop框架中用于进程间...
### Hadoop RPC 深入理解 #### 一、引言 随着大数据处理需求的日益增长,Apache Hadoop 作为一款流行的开源分布式计算框架,在处理海量数据方面展现出了极高的性能和灵活性。其中,Hadoop 的远程过程调用(RPC)...
在Java中模拟Hadoop的RPC通讯,主要是为了理解其连接和心跳机制,这是保证Hadoop集群稳定运行的关键部分。 RPC的核心思想是透明性,即客户端可以像调用本地方法一样调用远程服务,由RPC框架负责数据的序列化、网络...
2. **Zookeeper**: ZooKeeper是Apache Hadoop的一个子项目,它是一个分布式的,开放源码的协调服务,用于分布式应用,提供命名服务、配置管理、分布式同步、组服务等。QuLab_RPC可能依赖Zookeeper来实现分布式环境中...
【大数据与云计算培训学习资料 Hadoop集群 细细品味Hadoop_第11期_HBase简介及安装_V1.0 共21页.pdf】这篇文档主要介绍了HBase这一大数据处理的重要组件,以及其在Hadoop生态系统中的角色。HBase是一个基于列族的...
一、RPC简介 RPC是一种让程序能够调用运行在其他地址空间(通常在另一台机器上)的程序的方法。在Hadoop中,RPC被广泛用于NameNode、DataNode、TaskTracker等组件之间的通信,实现了服务端与客户端的透明交互。 二...
### HBase简介及安装知识点详解 #### 一、HBase概述 **HBase** 是一个构建在 **Hadoop** 分布式文件系统 (HDFS) 之上的分布式、可扩展的大规模数据存储系统。它是针对大数据量场景设计的,特别适用于需要实时读写...
Zookeeper是Apache Hadoop的一个子项目,它提供了一个分布式的,开放源码的协调服务,用于解决分布式应用中的命名、配置管理、组服务、分布式同步等问题。虽然在transmission_rpc的描述中没有直接提到Zookeeper,但...
Java操作Hadoop的RPC(Remote Procedure Call)是分布式计算领域中的关键技术,它允许在不同的进程或机器之间进行远程调用,如同本地调用一样。Hadoop作为一个开源的大数据处理框架,其RPC机制是实现各个组件如...
在IT行业中,分布式计算系统的重要性日益凸显,而Hadoop作为其中的佼佼者,其核心组件之一就是远程过程调用(RPC,Remote Procedure Call)。RPC允许一个程序在某个网络中的计算机上执行另一个计算机上的程序,而...
压测过程中,开启RPC日志以监控内存消耗和GC情况,调整合适的IO线程数。 - `hbase.hregion.max.filesize`:默认256MB,定义了Region的最大文件大小。较小的Region有利于快速split和compaction,但过于频繁的操作...
### Hadoop的RPC通信程序详解 #### 一、引言 在分布式系统中,远程过程调用(Remote Procedure Call, RPC)是一种重要的通信机制,它允许一台计算机上的程序调用另一台计算机上的子程序,而无需程序员了解底层网络...
本文将深入探讨Hadoop中的远程过程调用(RPC)协议,这是Hadoop组件间通信的关键技术,也是理解Hadoop生态系统运作的重要一环。 RPC(Remote Procedure Call)允许一个程序在不关心远程服务器细节的情况下,调用...
本ppt主要讲解yarn的基本架构,工作流程,基础库以及程序设计方法 容错等
在分布式计算领域,Hadoop RPC(Remote Procedure Call)框架是一个至关重要的组件,它允许不同的进程之间进行通信,尤其是在大规模数据处理的场景下。Hadoop RPC是Hadoop生态系统中的基础服务,使得不同模块如HDFS...
1.java接口操作Hadoop文件系统(文件上传下载删除创建......2.RPC远程过程调用的java代码实现,便于理解Hadoop的RPC协议,具体使用方法可参考我的博客https://blog.csdn.net/qq_34233510/article/details/88142507
Hadoop rpc源码是从Hadoop分离出的ipc,去掉了认证部分,附录使用文档.使用前请add lib包commons-logging-*.*.*.jar(我用的是1.0.4)和log4j-*.*.*.jar(我的1.2.13) 相关blog post: ...
在Hadoop中,远程过程调用(Remote Procedure Call, RPC)是一种重要的通信机制,它允许分布式系统中的组件之间进行高效且便捷的交互。Hadoop的RPC机制基于Java的客户端-服务器模型,允许客户端调用服务器上的方法,...