java ipc实例,仿照hadoop ipc写的实例
1.用接口规定ipc协议的方法
2.client端用动态代理作调用远程ipc接口方法
3.server端用反射,执行ipc接口方法,并返回给client端接口方法返回值
hadoop ipc的另一个特点是server端用三个角色,Listener,Handler,Responser。server聚合这三个角色
Listener:nio socket获取请求CALL对象,放入队列中
Handler:从队列中获取CALL对象,执行ipc接口方法
Responser:被Handler调用,用nio socket返回接口方法返回值
简单实例:(实例来自github)
1.定义协议
public interface Echo { public String who() throws IOException;; public void from(String name) throws IOException;; }
2.定义代理
/*Invocation封装方法名和参数*/ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean logDebug = LOG.isDebugEnabled(); long startTime = 0; if (logDebug) { startTime = System.currentTimeMillis(); } //调用Client call方法 Invocation value = client.call(new Invocation(iface, method, args), remoteId);//调用远程的当前方法,阻塞直到server返回值 if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + "() " + callTime); } return value.getResult(); }
public Invocation call(Invocation invoked, ConnectionId remoteId) throws InterruptedException, IOException { Call call = new Call(invoked); //将传入的数据封装成call对象 Serializable接口 //已经向服务器端 RPCHeader ConnectionHeader验证 Connection connection = getConnection(remoteId, call); //获得一个连接 connection.sendParam(call); // 向服务端发送Call对象 boolean interrupted = false; synchronized (call) { while (!call.done) { try { call.wait(); //等待结果的返回,在Call类的callComplete()方法里有notify()方法用于唤醒线程 } catch (InterruptedException ie) { // save the fact that we were interrupted interrupted = true; } } if (interrupted) { //因中断异常而终止,设置标志interrupted为true Thread.currentThread().interrupt(); } if (call.error != null) { if (call.error instanceof RemoteException) { call.error.fillInStackTrace(); throw call.error; } else { /* local exception use the connection because it will * reflect an ip change, unlike the remoteId */ throw wrapException(connection.getRemoteAddress(), call.error); } } else { return call.value; //返回结果数据 } } }
3.server端反射ipc接口方法
private class Handler extends Thread { public Handler(int instanceNumber) { this.setDaemon(true); this.setName("IPC Server handler " + instanceNumber + " on " + port); } @Override public void run() { LOG.info(getName() + ": starting"); SERVER.set(Server.this); //创建大小为10240个字节的响应缓冲区 ByteArrayOutputStream buf = new ByteArrayOutputStream( INITIAL_RESP_BUF_SIZE); while (running) { try { /** pop the queue; maybe blocked here */ //获取一个远程调用请求 Server.Call final Call call = callQueue.take(); //弹出call,可能会阻塞 if (LOG.isDebugEnabled()) LOG.debug(getName() + ": has #" + call.id + " from " + call.connection); String errorClass = null; String error = null; Serializable value = null; /** process the current call. */ //处理当前Call CurCall.set(call); try { //调用ipc.Server类中的call()方法,但该call()方法是抽象方法,具体实现在RPC.Server类中 value = call(call.connection.protocol, call.param, call.timestamp); //Invocation对象 } catch (Throwable e) { LOG.info(getName() + ", call " + call + ": error: " + e, e); errorClass = e.getClass().getName(); error = e.getMessage(); } CurCall.set(null); synchronized (call.connection.responseQueue) { /** * setupResponse() needs to be sync'ed together with * responder.doResponse() since setupResponse may use * SASL to encrypt response data and SASL enforces its * own message ordering. */ //将返回结果序列化到Call的成员变量response中 setupResponse( buf, call, (error == null) ? Status.SUCCESS : Status.ERROR, value, errorClass, error); /* Discard the large buf and reset it back to smaller size to freeup heap*/ //丢弃大的buf 重设到更小的容量 释放内存 if (buf.size() > maxRespSize) { LOG.warn("Large response size " + buf.size() + " for call " + call.toString()); buf = new ByteArrayOutputStream( INITIAL_RESP_BUF_SIZE); } //给客户端响应请求 responder.doRespond(call);//Responder在Server构造器初始化 } } catch (InterruptedException e) { if (running) { // unexpected -- log it LOG.info(getName() + " caught: " + e); } } catch (Exception e) { LOG.info(getName() + " caught: " + e); } } LOG.info(getName() + ": exiting"); } }
public Serializable call(Class<?> iface, Serializable param, long receivedTime) throws IOException { try { Invocation call = (Invocation) param; //调用参数 Invocationd对象包含方法名称 形式参数列表和实际参数列表 if (verbose) log("Call: " + call); //从实例缓存中按照接口寻找实例对象 Object instance = INSTANCE_CACHE.get(iface); if (instance == null) throw new IOException("interface `" + iface + "` not inscribe."); //通过Class对象获取Method对象 Method method = iface.getMethod(call.getMethodName(), call.getParameterClasses()); //取消Java语言访问权限检查 method.setAccessible(true); long startTime = System.currentTimeMillis(); //调用Method对象的invoke方法 Object value = method.invoke(instance, call.getParameters()); int processingTime = (int) (System.currentTimeMillis() - startTime); int qTime = (int) (startTime - receivedTime); if (LOG.isDebugEnabled()) { LOG.debug("Served: " + call.getMethodName() + " queueTime= " + qTime + " procesingTime= " + processingTime); } if (verbose) log("Return: " + value); call.setResult(value); //向Invocation对象设置结果 return call; } catch (InvocationTargetException e) { Throwable target = e.getTargetException(); if (target instanceof IOException) { throw (IOException) target; } else { IOException ioe = new IOException(target.toString()); ioe.setStackTrace(target.getStackTrace()); throw ioe; } } catch (Throwable e) { if (!(e instanceof IOException)) { LOG.error("Unexpected throwable object ", e); } IOException ioe = new IOException(e.toString()); ioe.setStackTrace(e.getStackTrace()); throw ioe; } } }
相关推荐
java生成海报实例源码INKMACS - Inkscape Emacs 融合 什么是 Inkmacs? Inkmacs 是 Inkscape 和 Emacs 的融合。 好吧,至少在遥远的未来。 但它现在也很酷。 Inkscape 是一款功能强大的免费软件矢量绘图工具。 Emacs...
Binder通信过程类似于TCP/IP服务连接过程binder四大架构Server(服务器),Client(客户端),ServiceManager(DNS)...book.java package com.example.android_binder_testservice; import android.os.Parcel; import andr
以不同的输入值运行检测应用程序的多个实例。 来自正在执行的 Java 应用程序的跟踪信息被发送到启动程序,在该程序中使用哈希表来跟踪程序执行不同点的变量、它们的绑定和值。 先决条件 JAVA SDK 11 或更高版本 ...
这是一个Android应用程序,用于演示如何使用AIDL进行进程间通信(IPC)。 该项目包括一个带有服务(“ SensorService”)的Android库,该服务在清单文件中声明为在单独的进程中运行。 该服务完成了一个简单的任务,...
3. 实现接口-AIDL编译器从AIDL接口文件中利用Java语言创建接口,该接口有一个继承的命名为Stub的内部抽象类(并且实现了一些IPC调用的附加方法),要做的就是创建一个继承YourInterface.Stub的类并且实现在.aidl文件...
本文实例讲述了Android开发之Parcel机制。分享给大家供大家参考。具体分析如下: 在java中,有序列化机制。但是在安卓设备上,由于内存有限,所以设计了新的序列化机制。 Container for a message (data and object ...
Android进程间通信(Inter-Process Communication, IPC)采用Binder通信机制,是一种client/server结构。 AIDL(Android Interface Define Language):Android接口定义语言,帮助开发者自动生成实现Binder通信机制...
� Google 提供了一套 Java 核心包 (J2SE 5,J2SE 6) 的有限子集,尚不承诺遵守 Java 任何 Java 规范 , 可能会造 成J ava 阵营的进一步分裂。 � 现有应用完善度不太够,需要的开发工作量较大。--------------------...
除了标准的Linux内核外,Android还增加了内核的驱动程序:Binder(IPC)驱动、显⽰驱动、输⼊设备驱 动、⾳频系统驱动、摄像头驱动、WiFi驱动、蓝⽛驱动、电源管理。 b) 程序库 (LIBRARIES) 程序库是指可供使⽤的各种...
7.SystemService 与 HAL 整合7.1 IPC、 Remote method call与Binder观念说明 7.2 AIDL 介绍与IInterface设计观念解析 7.3 Activity & ApplicationContext 7.4 ServiceManager 7.5 专题讨论:LedService设计与...
8.3.5 调用IPC方法 352 8.4 小结 353 第9章 访问网络数据和服务 354 9.1 HTTP协议简介 354 9.1.1 HTTP协议的主要特点 354 9.1.2 HTTP连接过程 355 9.1.3 HTTP消息格式 355 9.2 线程管理 358 9.2.1 匿名Thread 358 ...
补充材料在本书中用类似www.cdk4.net/ipc的链接表示。 5)为使用本书的课程建立的Web网站的链接:本书第3版的Web站点包括使用本书的15个课程的链接,这样可获得大量有用的课件、幻灯片、练习和实验室项目。我们...
本书全面系统地介绍了Red Hat Linux 6。全书共分为五个部分,包括35章和四个附录。第一部分为Red Hat Linux的介绍和安装;第二部分为服务配置;第三部分为系统管理;第四部分为Linux编程;第五部分为附录。...
本书所介绍的各个应用实例简明扼要且极具实用价值,它们覆盖了Android 1.0的所有基本功能和高级功能。 由于Android 1.0是一个正在兴起的全新手机操作系统,所以当前支持它的手机还不多。和任何其他产品早期的...