RPC(Remote Procedure Call) 在介绍分布是RPC前首先介绍一个下JAVA中简单的RPC实现
服务器端,通过SocketServer,持续接收客户端的请求,并将客户端的请求分发到指定的处理器出去处理。
阻塞通线模型,是server对每一个请求都开启一条线程去执行请求,此种方式的缺点是服务器端线程的数量和客户端并发访问请求树呈1:1的正比关系。
此处对此作出了一定优化,伪异步IO通信,将所有用户请求放到线程池中处理。
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private ServerSocket server; //线程池 @Autowired private Executor executorService; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { server = new ServerSocket(port); // 一直服务 for (;;) { try { // 获取socket final Socket socket = server.accept(); executorService.execute(new Runnable() { @Override public void run() { try { // 获取input ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream output = new ObjectOutputStream(socket .getOutputStream()); try { // 获取引用 String interfaceName = input.readUTF(); //获取 方法名 String methodName = input.readUTF(); // Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); try { Object service = handlerMap.get(interfaceName); Method method = service.getClass().getMethod(methodName, parameterTypes); Object result = method.invoke(service, arguments); output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { } } }); } catch (Exception e) { } } } public void init() { } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { //发布服务 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { if (server != null) { try { server.close(); } catch (IOException e) { } } } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } /** * Setter method for property <tt>executorService</tt>. * * @param executorService value to be assigned to property executorService */ public void setExecutorService(Executor executorService) { this.executorService = executorService; } }
/** * * @author zhangwei_david * @version $Id: SRPC.java, v 0.1 2015年8月8日 下午12:51:17 zhangwei_david Exp $ */ @Documented @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Component public @interface SRPC { public Class<?> interf(); }
至此就实现了服务的自动发现自动注册,当然这个仅针对单机环境下的自动注册。
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { output.writeUTF(interfaceClass.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(arguments); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
上面在没有使用第三方依赖包实现了简单的RPC,优化增加 request和reponse,定义RPC协议。
/** * * @author zhangwei_david * @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $ */ public class SrpcRequest implements Serializable { /** */ private static final long serialVersionUID = 6132853628325824727L; // 请求Id private String requestId; // 远程调用接口名称 private String interfaceName; //远程调用方法名称 private String methodName; // 参数类型 private Class<?>[] parameterTypes; // 参数值 private Object[] parameters; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>interfaceName</tt>. * * @return property value of interfaceName */ public String getInterfaceName() { return interfaceName; } /** * Setter method for property <tt>interfaceName</tt>. * * @param interfaceName value to be assigned to property interfaceName */ public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } /** * Getter method for property <tt>methodName</tt>. * * @return property value of methodName */ public String getMethodName() { return methodName; } /** * Setter method for property <tt>methodName</tt>. * * @param methodName value to be assigned to property methodName */ public void setMethodName(String methodName) { this.methodName = methodName; } /** * Getter method for property <tt>parameterTypes</tt>. * * @return property value of parameterTypes */ public Class<?>[] getParameterTypes() { return parameterTypes; } /** * Setter method for property <tt>parameterTypes</tt>. * * @param parameterTypes value to be assigned to property parameterTypes */ public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } /** * Getter method for property <tt>parameters</tt>. * * @return property value of parameters */ public Object[] getParameters() { return parameters; } /** * Setter method for property <tt>parameters</tt>. * * @param parameters value to be assigned to property parameters */ public void setParameters(Object[] parameters) { this.parameters = parameters; } }
/** * * @author zhangwei_david * @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $ */ public class SrpcResponse implements Serializable { /** */ private static final long serialVersionUID = -5934073769679010930L; // 请求的Id private String requestId; // 异常 private Throwable error; // 响应 private Object result; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>error</tt>. * * @return property value of error */ public Throwable getError() { return error; } /** * Setter method for property <tt>error</tt>. * * @param error value to be assigned to property error */ public void setError(Throwable error) { this.error = error; } /** * Getter method for property <tt>result</tt>. * * @return property value of result */ public Object getResult() { return result; } /** * Setter method for property <tt>result</tt>. * * @param result value to be assigned to property result */ public void setResult(Object result) { this.result = result; } }
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private ServerSocket server; //线程池 @Autowired private Executor executorService; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { server = new ServerSocket(port); // 一直服务 for (;;) { try { // 获取socket final Socket socket = server.accept(); executorService.execute(new Runnable() { @Override public void run() { try { // 获取input ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { // 获取RPC请求 SrpcRequest request = (SrpcRequest) input.readObject(); ObjectOutputStream output = new ObjectOutputStream(socket .getOutputStream()); try { SrpcResponse response = doHandle(request); output.writeObject(response); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { } } }); } catch (Exception e) { } } } private SrpcResponse doHandle(SrpcRequest request) { SrpcResponse response = new SrpcResponse(); response.setRequestId(request.getRequestId()); try { Object service = handlerMap.get(request.getInterfaceName()); Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); response.setResult(method.invoke(service, request.getParameters())); } catch (Exception e) { response.setError(e); } return response; } public void init() { } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { //发布 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { if (server != null) { try { server.close(); } catch (IOException e) { } } } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } /** * Setter method for property <tt>executorService</tt>. * * @param executorService value to be assigned to property executorService */ public void setExecutorService(Executor executorService) { this.executorService = executorService; } }
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { SrpcRequest request = new SrpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(arguments); output.writeObject(request); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { SrpcResponse response = (SrpcResponse) input.readObject(); if (response.getError() != null && response.getError() instanceof Throwable) { throw response.getError(); } return response.getResult(); } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
相关推荐
rpc原理的简单实现
使用socket/反射/序列化等技术实现了一个基本的远程调用框架,可参考博客http://blog.csdn.net/u013177446/article/details/66473066
RPC的简单实现
RPC架构webService简单实现
java原生实现的RPC框架,使用技术:JDK动态代理、Socket通信(BIO方式)、反射、注解、Java序列化
netty的rpc协议的简单实现
利用JSON格式,作为数据体,进行程序间的RPC调用,代码简单、方便。
RPC简单实现-JAVA
Java rpc框架简易版,类似dubbo分布式实现 (纯socket实现).zip
简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC 简单实现一个RPC ...
可参考博客http://blog.csdn.net/u013177446/article/details/66473066 使用netty/反射/序列化反序列化等技术是一个一个简单的RPC框架
基于sun rpc rpcgen 实现简单的NFS功能 能够列取远端目录文件,进行创建文件,创建目录,列取目录,删除空目录,删除文件操作。
RPC简单实现
基于RPC的简单NFS系统实现 代码实现,基于sunrpc 实现 目录创建,列取,删除,文件创建,列取,删除。
使用netty作为底层的通讯工具实现了一个简单的rpc工具。
不依赖任何第三方类库,自己实现一个JAVA版的最简单RPC框架。对理解dubbo,thrift原理有所帮助
包含RPC原理、NIO操作、netty简单的api、自定义RPC框架
使用idea的maven工程创建,分为服务端和客户端,涉及到的技术有以下: 1、socket通信 2、通过注解的方式发布服务。...3、支持多版本的实现。 4、java动态代理。 5、Spring的启动。 6、线程池的使用
django-json-rpc, 面向 Django的JSON RPC实现 Django json-rpc django供电站点的基本 json rpc实现。功能:简单,Pythonic API对 Django 身份验证的支持支持所有官方 Django python/版本组合支持 json
在win7下运行的一个thrift的简单的RPC例子。客户端向服务端发送字符串,服务端加上前缀"result"后返回。