`
zqhxuyuan
  • 浏览: 31608 次
  • 性别: Icon_minigender_1
  • 来自: 福建
社区版块
存档分类
最新评论

Hadoop源码分析-JAVA RPC

阅读更多

JAVA RPC: http://jbm3072.iteye.com/blog/1088102

 

目标:让客户端调用远程机器(不同JVM上)的方法.

技术:RPC(Remote Process Call远程过程调用)

优点:使用RPC,可以像使用本地的程序(本地JVM)一样使用远程服务器上的程序。使用RPC的好处是简化了远程服务访问。提高了开发效率。

做法:在分发代码时,只需要将接口分发给客户端使用,在客户端看来只有接口,没有具体类实现。这样保证了代码的可扩展性和安全性。

基础:Java反射机制,动态代理,Java IO/NIO/Socket

 

Server接口

 

public interface Server {
	public void stop();
	public void start();
	public void register(Class interfaceDefiner,Class impl);
	public void call(Invocation invo);
	public boolean isRunning();
	public int getPort();
}

 

启动服务器

	Server server = new RPC.RPCServer();
	server.register(Echo.class, RemoteEcho.class);
	server.start(); // 1.向服务器注册接口和实现类并启动服务器

RPC

public class RPC {
	public static <T> T getProxy(final Class<T> clazz,String host,int port) {
		final Client client = new Client(host,port);
		InvocationHandler handler = new InvocationHandler() {
			// 5. 当客户端调用生成的代理对象的方法,实际上调用的是该回调方法
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				Invocation invo = new Invocation(); // 封装Invocation对象,要调用的接口,接口的方法,参数
				invo.setInterfaces(clazz);
				invo.setMethod(new com.xuyuan.j2ee.rpc.protocal.Method(method.getName(),method.getParameterTypes()));
				invo.setParams(args);
				client.invoke(invo); // 6. 客户端向服务器发送Invocation对象
				return invo.getResult(); // 12. 回调方法invoke()结束,返回远程方法的执行结果
			}
		};
		// 3. 生成接口的代理对象,传入回调对象InvocationHandler.在调用接口的方法时,会调用回调方法
		T t = (T) Proxy.newProxyInstance(RPC.class.getClassLoader(), new Class[] {clazz}, handler);
		return t;
	}
	
	public static class RPCServer implements Server{
		private Listener listener; 
		private Map<String ,Object> serviceEngine = new HashMap<String, Object>();

		public void call(Invocation invo) {
			Object obj = serviceEngine.get(invo.getInterfaces().getName()); //根据接口名,找到对应的处理类(实现类)			
			Method m = obj.getClass().getMethod(invo.getMethod().getMethodName(), invo.getMethod().getParams());
			Object result = m.invoke(obj, invo.getParams()); // 9. 利用反射,调用方法,返回值设置到Invocation对象中
			invo.setResult(result);
		}
		public void register(Class interfaceDefiner, Class impl) {
			this.serviceEngine.put(interfaceDefiner.getName(), impl.newInstance());
		}
		public void start() {
			listener = new Listener(this);
			this.isRuning = true;
			listener.start(); // 1.启动服务器,监听器是个线程类,会调用run()
		}
	}
}

监听器

public class Listener extends Thread {
	private ServerSocket socket;
	private Server server;

	public void run() {
		socket = new ServerSocket(server.getPort()); // 2. 创建ServerSocket,接受客户端的连接
 		while (server.isRunning()) {
			Socket client = socket.accept();

 			// 8. 接收客户端传递的Invocation对象,里面包含了客户端想要调用的接口,方法,参数
			ObjectInputStream ois = new ObjectInputStream(client.getInputStream());
			Invocation invo = (Invocation) ois.readObject();
 			// 9. 让服务器调用真正的目标方法
			server.call(invo); 

			// 10. 往客户端写回数据,同样给客户端发送Invocation对象
			ObjectOutputStream oos = new ObjectOutputStream(client.getOutputStream());
			oos.writeObject(invo);
		}
	}
}

客户端

public class Client {
	private Socket socket;
	private ObjectOutputStream oos;
	private ObjectInputStream ois;

	public void invoke(Invocation invo) throws UnknownHostException, IOException, ClassNotFoundException {
 		socket = new Socket(host, port);
  	oos = new ObjectOutputStream(socket.getOutputStream());
		// 7. 客户端向服务器写数据。因为客户端不能直接调用接口方法(在不同JVM上),
 		// 可以通过传递带有接口,方法,参数的Invocation对象给服务器,让服务器解析出对象并真正调用方法
		oos.writeObject(invo);

   	// 11. 接收服务器返回的数据Invocation对象,对象里也含有方法的执行结果
		ois = new ObjectInputStream(socket.getInputStream());
		Invocation result = (Invocation) ois.readObject();
		invo.setResult(result.getResult());
	}
}

启动客户端,调用接口的方法

	Echo echo = RPC.getProxy(Echo.class, "127.0.0.1", 20382); // 3. 生成接口Echo的代理实现类
	String res = echo.echo("hello,hello"); // 4. 像使用本地的程序一样来调用Echo中的echo方法

 

总结

因为客户端和服务器位于不同的JVM上(不同的机器),要让客户端能够调用到服务器上的某个类的某个方法,需要让客户端和服务器能够进行远程通信。

远程通信需要协议。 这里的协议就是Echo接口。 暴露给客户端的是Echo接口的某个方法。在服务器上有Echo接口的具体实现。

所以我们的目的是客户端能够调用到位于服务器上的Echo接口的实现类的某个方法。

 

Invocation

负责客户端和服务器的数据交互,即客户端想要调用哪个接口,哪个方法,方法带有什么参数。以及服务器返回给客户端方法的执行结果。

需要实现序列化,因为要在网络环境下传输。

InvocationHandler

回调对象,客户端调用接口的方法,实际上会调用回调方法invoke()

在回调方法里,封装了客户端要调用的接口,方法,参数。封装成Invocation对象

回调方法结束,客户端调用接口的方法的过程也就结束了。具体是怎么调用接口的方法:

Client: 

将上面封装好的Invocation对象,通过Socket或者NIO编程发送给服务器。

而服务器的监听器一直在监听客户端的数据

将封装了接口,方法,参数的Invocation发送给服务器的目的是让服务器调用位于服务器上的接口的实现类的方法。

Listener

负责监听客户端的写入数据, ==》监听到客户端发送的Invocation对象,读取出来

委托给Server调用客户端想要调用的方法, ==》Invocation对象包含了客户端想要调用的接口,方法,参数

调用方法结束后,向客户端回写数据 ==》调用方法的返回值也一并封装到Invocation对象传输给客户端

Server

取得客户端发送的Invocation对象后,解析出接口,方法,参数。这样服务器就知道了客户端想要调用的接口和方法

这里还有一个过程,就是在服务器启动的时候,要先注册接口和接口的实现类的关系。

这样当客户端传递含有接口名字(也只能传递接口)的Invocation对象时,服务器就能知道该接口对应的实现类。

利用反射机制,真正调用到服务器上的接口的实现类的方法,并传入参数

Client

服务器调用接口实现类的方法结束后,还会返回Invocation对象给客户端。

客户端同样能解析出Invocation对象,取得返回值。这样客户端仅仅和接口打交道,隐藏了数据交互的过程。 

 

服务器启动时序图

客户端连接服务器时序图

 

HadoopRPC采用客户机/服务器模式。请求程序就是一个客户机, 而服务提供程序就是一个服务器。当我们讨论HDFS通信可能发生在:

情景

服务器

Client-NameNode

NameNode

Client-DataNode

DataNode

DataNode-NameNode

NameNode

DataNode-DateNode

某一个DateNode是服务器, 另一个是客户端

 

 如果我们考虑HadoopMap/Reduce以后, 这些系统间的通信就更复杂了。为了解决这些客户机/服务器之间的通信, Hadoop引入了一个RPC框架。该RPC框架利用的Java的反射能力, 避免了某些RPC解决方案中需要根据某种接口语言(如CORBAIDL)生成存根和框架的问题。

 

IPC

实现RPC的一种方法,具有快速、简单的特点。 它不像Sun公司提供的标准RPC包,基于Java序列化。

IPC无需创建网络stubsskeletons

IPC中的方法调用要求参数和返回值的数据类型必须是Java的基本类型,StringWritable接口的实现类,以及元素为以上类型的数组。

接口方法应该只抛出IOException异常。

 

使用模型 采用客户/服务器模型

Server:它把Java接口暴露给客户端。指定好监听端口和接受远程调用的对象实例后,通过RPC.getServer()可以得到Server实例。

Client:连接Server,调用它所暴露的方法。Client必须指定远程机器的地址,端口和Java接口类,通过RPC.getClient()可以得到Client实例。

Server不可以向Client发出调用,但在Hadoop中,有双向调用的需求。 比如在DFSNameNodeDataNode需要互相了解状态。

 

  • 大小: 11.2 KB
  • 大小: 38.6 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics