`
zengshaotao
  • 浏览: 752664 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

分布式

 
阅读更多

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/62284352

今天,我们一起来实现一个轻量级的RPC框架。

RPC,即 Remote Procedure Call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。
RPC 可基于 HTTP 或 TCP 协议,Web Service 就是基于 HTTP 协议的 RPC,它具有良好的跨平台性,但其性能却不如基于 TCP 协议的 RPC。会两方面会直接影响 RPC 的性能,一是传输方式,二是序列化。
众所周知,TCP 是传输层协议,HTTP 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下,TCP 一定比 HTTP 快。就序列化而言,Java 提供了默认的序列化方式,但在高并发的情况下,这种方式将会带来一些性能上的瓶颈,于是市面上出现了一系列优秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它们可以取代 Java 默认的序列化,从而提供更高效的性能。
为了支持高并发,传统的阻塞式 IO 显然不太合适,因此我们需要异步的 IO,即 NIO。Java 提供了 NIO 的解决方案,Java 7 也提供了更优秀的 NIO.2 支持,用 Java 实现 NIO 并不是遥不可及的事情,只是需要我们熟悉 NIO 的技术细节。
我们需要将服务部署在分布式环境下的不同节点上,通过服务注册的方式,让客户端来自动发现当前可用的服务,并调用这些服务。这需要一种服务注册表(Service Registry)的组件,让它来注册分布式环境下所有的服务地址(包括:主机名与端口号)。

每台 Server 上可发布多个 Service,这些 Service 共用一个 host 与 port,在分布式环境下会提供 Server 共同对外提供 Service。此外,为防止 Service Registry 出现单点故障,因此需要将其搭建为集群环境。

本文将为您揭晓开发轻量级分布式 RPC 框架的具体过程,该框架基于 TCP 协议,提供了 NIO 特性,提供高效的序列化方式,同时也具备服务注册与发现的能力。根据以上技术需求,我们可使用如下技术选型:
        spring:它是最强大的依赖注入框架,也是业界的权威标准。
        Netty:它使 NIO 编程更加容易,屏蔽了 Java 底层的 NIO 细节。
        Protostuff:它基于 Protobuf 序列化框架,面向 POJO,无需编写 .proto 文件。
        ZooKeeper:提供服务注册与发现功能,开发分布式系统的必备选择,同时它也具备天生的集群能力。

1 第一步:编写服务接口

 

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. /** 
  4.  * 定义服务接口 
  5.  *  @author liuyazhuang 
  6.  */  
  7. public interface HelloService {  
  8.    
  9.     String hello(String name);  
  10. }  

将该接口放在独立的客户端 jar 包中,以供应用使用

2 第二步:编写服务接口的实现类

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. /** 
  4.  * 实现服务接口 
  5.  * @author liuyazhuang 
  6.  */  
  7. @RpcService(HelloService.class// 指定远程接口  
  8. public class HelloServiceImpl implements HelloService {  
  9.    
  10.     @Override  
  11.     public String hello(String name) {  
  12.         return "Hello! " + name;  
  13.     }  
  14. }  

使用RpcService注解定义在服务接口的实现类上,需要对该实现类指定远程接口,因为实现类可能会实现多个接口,一定要告诉框架哪个才是远程接口。
RpcService代码如下:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2. import org.springframework.stereotype.Component;  
  3.   
  4. import java.lang.annotation.ElementType;  
  5. import java.lang.annotation.Retention;  
  6. import java.lang.annotation.RetentionPolicy;  
  7. import java.lang.annotation.Target;  
  8.    
  9. /** 
  10.  * RPC接口注解 
  11.  * @author liuyazhuang 
  12.  */  
  13. @Target({ElementType.TYPE})  
  14. @Retention(RetentionPolicy.RUNTIME)  
  15. @Component // 标明可被 Spring 扫描  
  16. public @interface RpcService {  
  17.    
  18.     Class<?> value();  
  19. }  

该注解具备 Spring 的Component注解的特性,可被 Spring 扫描。
该实现类放在服务端 jar 包中,该 jar 包还提供了一些服务端的配置文件与启动服务的引导程序。

3 第三步:配置服务端

服务端 Spring 配置文件名为spring-zk-rpc-server.xml,内容如下:

 

 

[html] view plain copy
 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.        xmlns:context="http://www.springframework.org/schema/context"  
  5.        xsi:schemaLocation="http://www.springframework.org/schema/beans  
  6.         http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  7.         http://www.springframework.org/schema/context  
  8.         http://www.springframework.org/schema/context/spring-context-3.0.xsd">  
  9.     <!-- 配置自动扫包 -->  
  10.     <context:component-scan base-package="com.lyz.zkrpc"/>  
  11.     <context:property-placeholder location="classpath:rpc-server-config.properties"/>  
  12.     <!-- 配置服务注册组件 -->  
  13.     <bean id="serviceRegistry" class="com.lyz.zkrpc.ServiceRegistry">  
  14.         <constructor-arg name="registryAddress" value="${registry.address}"/>  
  15.     </bean>  
  16.    
  17.     <!-- 配置 RPC 服务器 -->  
  18.     <bean id="rpcServer" class="com.lyz.zkrpc.RpcServer">  
  19.         <constructor-arg name="serverAddress" value="${server.address}"/>  
  20.         <constructor-arg name="serviceRegistry" ref="serviceRegistry"/>  
  21.     </bean>  
  22. </beans>  

具体的配置参数在rpc-server-config.properties文件中,内容如下:

[plain] view plain copy
 
  1. <!-- lang: java -->  
  2. # ZooKeeper 服务器  
  3. registry.address=127.0.0.1:2181  
  4.    
  5. # RPC 服务器  
  6. server.address=127.0.0.1:8000  

以上配置表明:连接本地的 ZooKeeper 服务器,并在 8000 端口上发布 RPC 服务。

4 第四步:启动服务器并发布服务

为了加载 Spring 配置文件来发布服务,只需编写一个引导程序即可:

 

 

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. import org.springframework.context.support.ClassPathXmlApplicationContext;  
  4.    
  5. /** 
  6.  * RPC服务启动入口 
  7.  * @author liuyazhuang 
  8.  */  
  9. public class RpcBootstrap {  
  10.    
  11.     public static void main(String[] args) {  
  12.         new ClassPathXmlApplicationContext("spring-zk-rpc-server.xml");  
  13.     }  
  14. }  

运行RpcBootstrap类的main方法即可启动服务端,但还有两个重要的组件尚未实现,它们分别是:ServiceRegistry与RpcServer,下文会给出具体实现细节。

5 第五步:实现服务注册

使用 ZooKeeper 客户端可轻松实现服务注册功能,ServiceRegistry代码如下:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. import org.apache.zookeeper.*;  
  4. import org.slf4j.Logger;  
  5. import org.slf4j.LoggerFactory;  
  6.    
  7. import java.io.IOException;  
  8. import java.util.concurrent.CountDownLatch;  
  9.    
  10. /** 
  11.  * 连接ZK注册中心,创建服务注册目录 
  12.  * @author liuyazhuang 
  13.  */  
  14. public class ServiceRegistry {  
  15.    
  16.     private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);  
  17.    
  18.     private CountDownLatch latch = new CountDownLatch(1);  
  19.    
  20.     private String registryAddress;  
  21.    
  22.     public ServiceRegistry(String registryAddress) {  
  23.         this.registryAddress = registryAddress;  
  24.     }  
  25.    
  26.     public void register(String data) {  
  27.         if (data != null) {  
  28.             ZooKeeper zk = connectServer();  
  29.             if (zk != null) {  
  30.                 createNode(zk, data);  
  31.             }  
  32.         }  
  33.     }  
  34.    
  35.     private ZooKeeper connectServer() {  
  36.         ZooKeeper zk = null;  
  37.         try {  
  38.             zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {  
  39.                 @Override  
  40.                 public void process(WatchedEvent event) {  
  41.                     // 判断是否已连接ZK,连接后计数器递减.  
  42.                     if (event.getState() == Event.KeeperState.SyncConnected) {  
  43.                         latch.countDown();  
  44.                     }  
  45.                 }  
  46.             });  
  47.    
  48.             // 若计数器不为0,则等待.  
  49.             latch.await();  
  50.         } catch (IOException | InterruptedException e) {  
  51.             LOGGER.error("", e);  
  52.         }  
  53.         return zk;  
  54.     }  
  55.    
  56.     private void createNode(ZooKeeper zk, String data) {  
  57.         try {  
  58.             byte[] bytes = data.getBytes();  
  59.             String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);  
  60.             LOGGER.debug("create zookeeper node ({} => {})", path, data);  
  61.         } catch (KeeperException | InterruptedException e) {  
  62.             LOGGER.error("", e);  
  63.         }  
  64.     }  
  65. }  

其中,通过Constant配置了所有的常量:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. /** 
  4.  * ZK相关常量 
  5.  * @author liuyazhuang 
  6.  */  
  7. public interface Constant {  
  8.    
  9.     int ZK_SESSION_TIMEOUT = 5000;  
  10.    
  11.     String ZK_REGISTRY_PATH = "/registry";  
  12.     String ZK_DATA_PATH = ZK_REGISTRY_PATH + "/data";  
  13. }  

注意:首先需要使用 ZooKeeper 客户端命令行创建/registry永久节点,用于存放所有的服务临时节点。

6 第六步:实现 RPC 服务器

使用 Netty 可实现一个支持 NIO 的 RPC 服务器,需要使用ServiceRegistry注册服务地址,RpcServer代码如下:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. import io.netty.bootstrap.ServerBootstrap;  
  4. import io.netty.channel.ChannelFuture;  
  5. import io.netty.channel.ChannelInitializer;  
  6. import io.netty.channel.ChannelOption;  
  7. import io.netty.channel.EventLoopGroup;  
  8. import io.netty.channel.nio.NioEventLoopGroup;  
  9. import io.netty.channel.socket.SocketChannel;  
  10. import io.netty.channel.socket.nio.NioServerSocketChannel;  
  11. import org.apache.commons.collections4.MapUtils;  
  12. import org.slf4j.Logger;  
  13. import org.slf4j.LoggerFactory;  
  14. import org.springframework.beans.BeansException;  
  15. import org.springframework.beans.factory.InitializingBean;  
  16. import org.springframework.context.ApplicationContext;  
  17. import org.springframework.context.ApplicationContextAware;  
  18.    
  19. import java.util.HashMap;  
  20. import java.util.Map;  
  21.    
  22. /** 
  23.  * 启动并注册服务 
  24.  * @author liuyazhuang 
  25.  */  
  26. public class RpcServer implements ApplicationContextAware, InitializingBean {  
  27.    
  28.     private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);  
  29.    
  30.     private String serverAddress;  
  31.     private ServiceRegistry serviceRegistry;  
  32.    
  33.     private Map<String, Object> handlerMap = new HashMap<String, Object>(); // 存放接口名与服务对象之间的映射关系  
  34.    
  35.     public RpcServer(String serverAddress) {  
  36.         this.serverAddress = serverAddress;  
  37.     }  
  38.    
  39.     public RpcServer(String serverAddress, ServiceRegistry serviceRegistry) {  
  40.         this.serverAddress = serverAddress;  
  41.         this.serviceRegistry = serviceRegistry;  
  42.     }  
  43.    
  44.     @Override  
  45.     public void setApplicationContext(ApplicationContext ctx) throws BeansException {  
  46.         Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class); // 获取所有带有 RpcService 注解的 Spring Bean  
  47.         if (MapUtils.isNotEmpty(serviceBeanMap)) {  
  48.             for (Object serviceBean : serviceBeanMap.values()) {  
  49.                 String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();  
  50.                 handlerMap.put(interfaceName, serviceBean);  
  51.             }  
  52.         }  
  53.     }  
  54.    
  55.     @Override  
  56.     public void afterPropertiesSet() throws Exception {  
  57.         EventLoopGroup bossGroup = new NioEventLoopGroup();  
  58.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
  59.         try {  
  60.             ServerBootstrap bootstrap = new ServerBootstrap();  
  61.             bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)  
  62.                     .childHandler(new ChannelInitializer<SocketChannel>() {  
  63.                         @Override  
  64.                         public void initChannel(SocketChannel channel) throws Exception {  
  65.                             channel.pipeline()  
  66.                                     .addLast(new RpcDecoder(RpcRequest.class)) // 将 RPC 请求进行解码(为了处理请求)  
  67.                                     .addLast(new RpcEncoder(RpcResponse.class)) // 将 RPC 响应进行编码(为了返回响应)  
  68.                                     .addLast(new RpcHandler(handlerMap)); // 处理 RPC 请求  
  69.                         }  
  70.                     })  
  71.                     .option(ChannelOption.SO_BACKLOG, 128)  
  72.                     .childOption(ChannelOption.SO_KEEPALIVE, true);  
  73.    
  74.             String[] array = serverAddress.split(":");  
  75.             String host = array[0];  
  76.             int port = Integer.parseInt(array[1]);  
  77.    
  78.             ChannelFuture future = bootstrap.bind(host, port).sync();  
  79.             LOGGER.debug("server started on port {}", port);  
  80.    
  81.             if (serviceRegistry != null) {  
  82.                 serviceRegistry.register(serverAddress); // 注册服务地址  
  83.             }  
  84.    
  85.             future.channel().closeFuture().sync();  
  86.         } finally {  
  87.             workerGroup.shutdownGracefully();  
  88.             bossGroup.shutdownGracefully();  
  89.         }  
  90.     }  
  91. }  

以上代码中,有两个重要的 POJO 需要描述一下,它们分别是RpcRequest与RpcResponse。
使用RpcRequest封装 RPC 请求,代码如下:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. /** 
  4.  * RPC请求 
  5.  * @author liuyazhuang 
  6.  */  
  7. public class RpcRequest {  
  8.    
  9.     private String requestId;  
  10.    
  11.     private String className;  
  12.    
  13.     private String methodName;  
  14.    
  15.     private Class<?>[] parameterTypes;  
  16.    
  17.     private Object[] parameters;  
  18.    
  19.     public String getRequestId() {  
  20.         return requestId;  
  21.     }  
  22.    
  23.     public void setRequestId(String requestId) {  
  24.         this.requestId = requestId;  
  25.     }  
  26.    
  27.     public String getClassName() {  
  28.         return className;  
  29.     }  
  30.    
  31.     public void setClassName(String className) {  
  32.         this.className = className;  
  33.     }  
  34.    
  35.     public String getMethodName() {  
  36.         return methodName;  
  37.     }  
  38.    
  39.     public void setMethodName(String methodName) {  
  40.         this.methodName = methodName;  
  41.     }  
  42.    
  43.     public Class<?>[] getParameterTypes() {  
  44.         return parameterTypes;  
  45.     }  
  46.    
  47.     public void setParameterTypes(Class<?>[] parameterTypes) {  
  48.         this.parameterTypes = parameterTypes;  
  49.     }  
  50.    
  51.     public Object[] getParameters() {  
  52.         return parameters;  
  53.     }  
  54.    
  55.     public void setParameters(Object[] parameters) {  
  56.         this.parameters = parameters;  
  57.     }  
  58. }  

使用RpcResponse封装 RPC 响应,代码如下:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. import io.netty.buffer.ByteBuf;  
  4. import io.netty.channel.ChannelHandlerContext;  
  5. import io.netty.handler.codec.ByteToMessageDecoder;  
  6.    
  7. import java.util.List;  
  8.    
  9. /** 
  10.  * RPC解码 
  11.  * @author liuyazhuang 
  12.  */  
  13. public class RpcDecoder extends ByteToMessageDecoder {  
  14.    
  15.     private Class<?> genericClass;  
  16.    
  17.     public RpcDecoder(Class<?> genericClass) {  
  18.         this.genericClass = genericClass;  
  19.     }  
  20.    
  21.     @Override  
  22.     public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {  
  23.         if (in.readableBytes() < 4) {  
  24.             return;  
  25.         }  
  26.         in.markReaderIndex();  
  27.         int dataLength = in.readInt();  
  28.         if (dataLength < 0) {  
  29.             ctx.close();  
  30.         }  
  31.         if (in.readableBytes() < dataLength) {  
  32.             in.resetReaderIndex();  
  33.             return;  
  34.         }  
  35.         byte[] data = new byte[dataLength];  
  36.         in.readBytes(data);  
  37.    
  38.         Object obj = SerializationUtil.deserialize(data, genericClass);  
  39.         out.add(obj);  
  40.     }  
  41. }  

使用RpcEncoder提供 RPC 编码,只需扩展 Netty 的MessageToByteEncoder抽象类的encode方法即可,代码如下:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. import io.netty.buffer.ByteBuf;  
  4. import io.netty.channel.ChannelHandlerContext;  
  5. import io.netty.handler.codec.MessageToByteEncoder;  
  6.    
  7. /** 
  8.  * RPC编码 
  9.  * @author liuyazhuang 
  10.  */  
  11. public class RpcEncoder extends MessageToByteEncoder {  
  12.    
  13.     private Class<?> genericClass;  
  14.    
  15.     public RpcEncoder(Class<?> genericClass) {  
  16.         this.genericClass = genericClass;  
  17.     }  
  18.    
  19.     @Override  
  20.     public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {  
  21.         if (genericClass.isInstance(in)) {  
  22.             byte[] data = SerializationUtil.serialize(in);  
  23.             out.writeInt(data.length);  
  24.             out.writeBytes(data);  
  25.         }  
  26.     }  
  27. }  

编写一个SerializationUtil工具类,使用Protostuff实现序列化:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. import com.dyuproject.protostuff.LinkedBuffer;  
  4. import com.dyuproject.protostuff.ProtostuffIOUtil;  
  5. import com.dyuproject.protostuff.Schema;  
  6. import com.dyuproject.protostuff.runtime.RuntimeSchema;  
  7. import org.objenesis.Objenesis;  
  8. import org.objenesis.ObjenesisStd;  
  9.    
  10. import java.util.Map;  
  11. import java.util.concurrent.ConcurrentHashMap;  
  12.    
  13. /** 
  14.  * Protostuff序列化与反序列化工具 
  15.  * @author liuyazhuang 
  16.  */  
  17. public class SerializationUtil {  
  18.    
  19.     private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();  
  20.    
  21.     private static Objenesis objenesis = new ObjenesisStd(true);  
  22.    
  23.     private SerializationUtil() {  
  24.     }  
  25.    
  26.     @SuppressWarnings("unchecked")  
  27.     private static <T> Schema<T> getSchema(Class<T> cls) {  
  28.         Schema<T> schema = (Schema<T>) cachedSchema.get(cls);  
  29.         if (schema == null) {  
  30.             schema = RuntimeSchema.createFrom(cls);  
  31.             if (schema != null) {  
  32.                 cachedSchema.put(cls, schema);  
  33.             }  
  34.         }  
  35.         return schema;  
  36.     }  
  37.    
  38.     @SuppressWarnings("unchecked")  
  39.     public static <T> byte[] serialize(T obj) {  
  40.         Class<T> cls = (Class<T>) obj.getClass();  
  41.         LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);  
  42.         try {  
  43.             Schema<T> schema = getSchema(cls);  
  44.             return ProtostuffIOUtil.toByteArray(obj, schema, buffer);  
  45.         } catch (Exception e) {  
  46.             throw new IllegalStateException(e.getMessage(), e);  
  47.         } finally {  
  48.             buffer.clear();  
  49.         }  
  50.     }  
  51.    
  52.     public static <T> T deserialize(byte[] data, Class<T> cls) {  
  53.         try {  
  54.             T message = (T) objenesis.newInstance(cls);  
  55.             Schema<T> schema = getSchema(cls);  
  56.             ProtostuffIOUtil.mergeFrom(data, message, schema);  
  57.             return message;  
  58.         } catch (Exception e) {  
  59.             throw new IllegalStateException(e.getMessage(), e);  
  60.         }  
  61.     }  
  62. }  

以上了使用 Objenesis 来实例化对象,它是比 Java 反射更加强大。
注意:如需要替换其它序列化框架,只需修改SerializationUtil即可。当然,更好的实现方式是提供配置项来决定使用哪种序列化方式。
使用RpcHandler中处理 RPC 请求,只需扩展 Netty 的SimpleChannelInboundHandler抽象类即可,代码如下:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. import io.netty.channel.ChannelFutureListener;  
  4. import io.netty.channel.ChannelHandlerContext;  
  5. import io.netty.channel.SimpleChannelInboundHandler;  
  6. import net.sf.cglib.reflect.FastClass;  
  7. import net.sf.cglib.reflect.FastMethod;  
  8. import org.slf4j.Logger;  
  9. import org.slf4j.LoggerFactory;  
  10.    
  11. import java.util.Map;  
  12.    
  13. /** 
  14.  * RPC服务端:请求处理过程 
  15.  * @author liuyazhuang 
  16.  */  
  17. public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {  
  18.    
  19.     private static final Logger LOGGER = LoggerFactory.getLogger(RpcHandler.class);  
  20.    
  21.     private final Map<String, Object> handlerMap;  
  22.    
  23.     public RpcHandler(Map<String, Object> handlerMap) {  
  24.         this.handlerMap = handlerMap;  
  25.     }  
  26.    
  27.     @Override  
  28.     public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {  
  29.         RpcResponse response = new RpcResponse();  
  30.         response.setRequestId(request.getRequestId());  
  31.         try {  
  32.             Object result = handle(request);  
  33.             response.setResult(result);  
  34.         } catch (Throwable t) {  
  35.             response.setError(t);  
  36.         }  
  37.         ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);  
  38.     }  
  39.    
  40.     private Object handle(RpcRequest request) throws Throwable {  
  41.         String className = request.getClassName();  
  42.         Object serviceBean = handlerMap.get(className);  
  43.    
  44.         Class<?> serviceClass = serviceBean.getClass();  
  45.         String methodName = request.getMethodName();  
  46.         Class<?>[] parameterTypes = request.getParameterTypes();  
  47.         Object[] parameters = request.getParameters();  
  48.    
  49.         // Method method = serviceClass.getMethod(methodName, parameterTypes);  
  50.         // method.setAccessible(true);  
  51.         // return method.invoke(serviceBean, parameters);  
  52.    
  53.         FastClass serviceFastClass = FastClass.create(serviceClass);  
  54.         FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);  
  55.         return serviceFastMethod.invoke(serviceBean, parameters);  
  56.     }  
  57.    
  58.     @Override  
  59.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
  60.         LOGGER.error("server caught exception", cause);  
  61.         ctx.close();  
  62.     }  
  63. }  

为了避免使用 Java 反射带来的性能问题,我们可以使用 CGLib 提供的反射 API,如上面用到的FastClass与FastMethod。

7 第七步:配置客户端

同样使用 Spring 配置文件来配置 RPC 客户端,spring-zk-rpc-client.xml代码如下:

[html] view plain copy
 
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.        xmlns:context="http://www.springframework.org/schema/context"  
  5.        xsi:schemaLocation="http://www.springframework.org/schema/beans  
  6.     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  7.     http://www.springframework.org/schema/context  
  8.     http://www.springframework.org/schema/context/spring-context-3.0.xsd">  
  9.     <context:component-scan base-package="com.lyz.zkrpc"/>  
  10.     <context:property-placeholder location="classpath:rpc-client-config.properties"/>  
  11.     <!-- 配置服务发现组件 -->  
  12.     <bean id="serviceDiscovery" class="com.lyz.zkrpc.ServiceDiscovery">  
  13.         <constructor-arg name="registryAddress" value="${registry.address}"/>  
  14.     </bean>  
  15.     <!-- 配置 RPC 代理 -->  
  16.     <bean id="rpcProxy" class="com.lyz.zkrpc.RpcProxy">  
  17.         <constructor-arg name="serviceDiscovery" ref="serviceDiscovery"/>  
  18.     </bean>  
  19. </beans>  

其中rpc-client-config.properties提供了具体的配置:

[plain] view plain copy
 
  1. <!-- lang: java -->  
  2. # ZooKeeper 服务器  
  3. registry.address=127.0.0.1:2181  

8 第八步:实现服务发现

同样使用 ZooKeeper 实现服务发现功能,见如下代码:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. import org.apache.zookeeper.KeeperException;  
  4. import org.apache.zookeeper.WatchedEvent;  
  5. import org.apache.zookeeper.Watcher;  
  6. import org.apache.zookeeper.ZooKeeper;  
  7. import org.slf4j.Logger;  
  8. import org.slf4j.LoggerFactory;  
  9.    
  10. import java.io.IOException;  
  11. import java.util.ArrayList;  
  12. import java.util.List;  
  13. import java.util.concurrent.CountDownLatch;  
  14. import java.util.concurrent.ThreadLocalRandom;  
  15.    
  16. /** 
  17.  * 服务发现:连接ZK,添加watch事件 
  18.  * @author liuyazhuang 
  19.  */  
  20. public class ServiceDiscovery {  
  21.    
  22.     private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class);  
  23.    
  24.     private CountDownLatch latch = new CountDownLatch(1);  
  25.    
  26.     private volatile List<String> dataList = new ArrayList<>();  
  27.    
  28.     private String registryAddress;  
  29.    
  30.     public ServiceDiscovery(String registryAddress) {  
  31.         this.registryAddress = registryAddress;  
  32.    
  33.         ZooKeeper zk = connectServer();  
  34.         if (zk != null) {  
  35.             watchNode(zk);  
  36.         }  
  37.     }  
  38.    
  39.     public String discover() {  
  40.         String data = null;  
  41.         int size = dataList.size();  
  42.         if (size > 0) {  
  43.             if (size == 1) {  
  44.                 data = dataList.get(0);  
  45.                 LOGGER.debug("using only data: {}", data);  
  46.             } else {  
  47.                 data = dataList.get(ThreadLocalRandom.current().nextInt(size));  
  48.                 LOGGER.debug("using random data: {}", data);  
  49.             }  
  50.         }  
  51.         return data;  
  52.     }  
  53.    
  54.     private ZooKeeper connectServer() {  
  55.         ZooKeeper zk = null;  
  56.         try {  
  57.             zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {  
  58.                 @Override  
  59.                 public void process(WatchedEvent event) {  
  60.                     if (event.getState() == Event.KeeperState.SyncConnected) {  
  61.                         latch.countDown();  
  62.                     }  
  63.                 }  
  64.             });  
  65.             latch.await();  
  66.         } catch (IOException | InterruptedException e) {  
  67.             LOGGER.error("", e);  
  68.         }  
  69.         return zk;  
  70.     }  
  71.    
  72.     private void watchNode(final ZooKeeper zk) {  
  73.         try {  
  74.             List<String> nodeList = zk.getChildren(Constant.ZK_REGISTRY_PATH, new Watcher() {  
  75.                 @Override  
  76.                 public void process(WatchedEvent event) {  
  77.                     if (event.getType() == Event.EventType.NodeChildrenChanged) {  
  78.                         watchNode(zk);  
  79.                     }  
  80.                 }  
  81.             });  
  82.             List<String> dataList = new ArrayList<>();  
  83.             for (String node : nodeList) {  
  84.                 byte[] bytes = zk.getData(Constant.ZK_REGISTRY_PATH + "/" + node, falsenull);  
  85.                 dataList.add(new String(bytes));  
  86.             }  
  87.             LOGGER.debug("node data: {}", dataList);  
  88.             this.dataList = dataList;  
  89.         } catch (KeeperException | InterruptedException e) {  
  90.             LOGGER.error("", e);  
  91.         }  
  92.     }  
  93. }  

9 第九步:实现 RPC 代理

这里使用 Java 提供的动态代理技术实现 RPC 代理(当然也可以使用 CGLib 来实现),具体代码如下:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. import net.sf.cglib.proxy.InvocationHandler;  
  4. import net.sf.cglib.proxy.Proxy;  
  5.    
  6. import java.lang.reflect.Method;  
  7. import java.util.UUID;  
  8.    
  9. /** 
  10.  * 客户端RPC调用代理 
  11.  * @author liuyazhuang 
  12.  */  
  13. public class RpcProxy {  
  14.    
  15.     private String serverAddress;  
  16.     private ServiceDiscovery serviceDiscovery;  
  17.    
  18.     public RpcProxy(String serverAddress) {  
  19.         this.serverAddress = serverAddress;  
  20.     }  
  21.    
  22.     public RpcProxy(ServiceDiscovery serviceDiscovery) {  
  23.         this.serviceDiscovery = serviceDiscovery;  
  24.     }  
  25.    
  26.     @SuppressWarnings("unchecked")  
  27.     public <T> T create(Class<?> interfaceClass) {  
  28.         return (T) Proxy.newProxyInstance(  
  29.             interfaceClass.getClassLoader(),  
  30.             new Class<?>[]{interfaceClass},  
  31.             new InvocationHandler() {  
  32.                 @Override  
  33.                 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
  34.                     RpcRequest request = new RpcRequest(); // 创建并初始化 RPC 请求  
  35.                     request.setRequestId(UUID.randomUUID().toString());  
  36.                     request.setClassName(method.getDeclaringClass().getName());  
  37.                     request.setMethodName(method.getName());  
  38.                     request.setParameterTypes(method.getParameterTypes());  
  39.                     request.setParameters(args);  
  40.    
  41.                     if (serviceDiscovery != null) {  
  42.                         serverAddress = serviceDiscovery.discover(); // 发现服务  
  43.                     }  
  44.    
  45.                     String[] array = serverAddress.split(":");  
  46.                     String host = array[0];  
  47.                     int port = Integer.parseInt(array[1]);  
  48.    
  49.                     RpcClient client = new RpcClient(host, port); // 初始化 RPC 客户端  
  50.                     RpcResponse response = client.send(request); // 通过 RPC 客户端发送 RPC 请求并获取 RPC 响应  
  51.    
  52.                     if (response.getError() != null) {  
  53.                         throw response.getError();  
  54.                     } else {  
  55.                         return response.getResult();  
  56.                     }  
  57.                 }  
  58.             }  
  59.         );  
  60.     }  
  61. }  

使用RpcClient类实现 RPC 客户端,只需扩展 Netty 提供的SimpleChannelInboundHandler抽象类即可,代码如下:

[java] view plain copy
 
  1. package com.lyz.zkrpc;  
  2.    
  3. import io.netty.bootstrap.Bootstrap;  
  4. import io.netty.channel.*;  
  5. import io.netty.channel.nio.NioEventLoopGroup;  
  6. import io.netty.channel.socket.SocketChannel;  
  7. import io.netty.channel.socket.nio.NioSocketChannel;  
  8. import org.slf4j.Logger;  
  9. import org.slf4j.LoggerFactory;  
  10.    
  11. /** 
  12.  * RPC真正调用客户端 
  13.  * @author liuyazhuang 
  14.  */  
  15. public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {  
  16.    
  17.     private static final Logger LOGGER = LoggerFactory.getLogger(RpcClient.class);  
  18.    
  19.     private String host;  
  20.     private int port;  
  21.    
  22.     private RpcResponse response;  
  23.    
  24.     private final Object obj = new Object();  
  25.    
  26.     public RpcClient(String host, int port) {  
  27.         this.host = host;  
  28.         this.port = port;  
  29.     }  
  30.    
  31.     @Override  
  32.     public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {  
  33.         this.response = response;  
  34.    
  35.         synchronized (obj) {  
  36.             obj.notifyAll(); // 收到响应,唤醒线程  
  37.         }  
  38.     }  
  39.    
  40.     @Override  
  41.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
  42.         LOGGER.error("client caught exception", cause);  
  43.         ctx.close();  
  44.     }  
  45.    
  46.     public RpcResponse send(RpcRequest request) throws Exception {  
  47.         EventLoopGroup group = new NioEventLoopGroup();  
  48.         try {  
  49.             Bootstrap bootstrap = new Bootstrap();  
  50.             bootstrap.group(group).channel(NioSocketChannel.class)  
  51.                 .handler(new ChannelInitializer<SocketChannel>() {  
  52.                     @Override  
  53.                     public void initChannel(SocketChannel channel) throws Exception {  
  54.                         channel.pipeline()  
  55.                             .addLast(new RpcEncoder(RpcRequest.class)) // 将 RPC 请求进行编码(为了发送请求)  
  56.                             .addLast(new RpcDecoder(RpcResponse.class)) // 将 RPC 响应进行解码(为了处理响应)  
  57.                             .addLast(RpcClient.this); // 使用 RpcClient 发送 RPC 请求  
  58.                     }  
  59.                 })  
  60.                 .option(ChannelOption.SO_KEEPALIVE, true);  
  61.    
  62.             ChannelFuture future = bootstrap.connect(host, port).sync();  
  63.             future.channel().writeAndFlush(request).sync();  
  64.    
  65.             synchronized (obj) {  
  66.                 obj.wait(); // 未收到响应,使线程等待  
  67.             }  
  68.    
  69.             if (response != null) {  
  70.                 future.channel().closeFuture().sync();  
  71.             }  
  72.             return response;  
  73.         } finally {  
  74.             group.shutdownGracefully();  
  75.         }  
  76.     }  
  77. }  

10 第十步:发送 RPC 请求

使用 JUnit 结合 Spring 编写一个单元测试,代码如下:

[java] view plain copy
 
  1. <!-- lang: java -->  
  2. @RunWith(SpringJUnit4ClassRunner.class)  
  3. @ContextConfiguration(locations = "classpath:spring.xml")  
  4. /** 
  5.  * 测试RPC服务 
  6.  * @author liuyazhuang 
  7.  */  
  8. public class HelloServiceTest {  
  9.    
  10.     @Autowired  
  11.     private RpcProxy rpcProxy;  
  12.    
  13.     @Test  
  14.     public void helloTest() {  
  15.         HelloService helloService = rpcProxy.create(HelloService.class);  
  16.         String result = helloService.hello("World");  
  17.         Assert.assertEquals("Hello! World", result);  
  18.     }  
  19. }  

11 最后,总结

本文通过 Spring + Netty + Protostuff + ZooKeeper 实现了一个轻量级 RPC 框架,使用 Spring 提供依赖注入与参数配置,使用 Netty 实现 NIO 方式的数据传输,使用 Protostuff 实现对象序列化,使用 ZooKeeper 实现服务注册与发现。使用该框架,可将服务部署到分布式环境中的任意节点上,客户端通过远程接口来调用服务端的具体实现,让服务端与客户端的开发完全分离,为实现大规模分布式应用提供了基础支持。

12 附录:Maven 依赖

[html] view plain copy
 
  1. <!-- lang: xml -->  
  2. <!-- JUnit -->  
  3. <dependency>  
  4.     <groupId>junit</groupId>  
  5.     <artifactId>junit</artifactId>  
  6.     <version>4.11</version>  
  7.     <scope>test</scope>  
  8. </dependency>  
  9.    
  10. <!-- SLF4J -->  
  11. <dependency>  
  12.     <groupId>org.slf4j</groupId>  
  13.     <artifactId>slf4j-log4j12</artifactId>  
  14.     <version>1.7.7</version>  
  15. </dependency>  
  16.    
  17. <!-- Spring -->  
  18. <dependency>  
  19.     <groupId>org.springframework</groupId>  
  20.     <artifactId>spring-context</artifactId>  
  21.     <version>3.2.12.RELEASE</version>  
  22. </dependency>  
  23. <dependency>  
  24.     <groupId>org.springframework</groupId>  
  25.     <artifactId>spring-test</artifactId>  
  26.     <version>3.2.12.RELEASE</version>  
  27.     <scope>test</scope>  
  28. </dependency>  
  29.    
  30. <!-- Netty -->  
  31. <dependency>  
  32.     <groupId>io.netty</groupId>  
  33.     <artifactId>netty-all</artifactId>  
  34.     <version>4.0.24.Final</version>  
  35. </dependency>  
  36.    
  37. <!-- Protostuff -->  
  38. <dependency>  
  39.     <groupId>com.dyuproject.protostuff</groupId>  
  40.     <artifactId>protostuff-core</artifactId>  
  41.     <version>1.0.8</version>  
  42. </dependency>  
  43. <dependency>  
  44.     <groupId>com.dyuproject.protostuff</groupId>  
  45.     <artifactId>protostuff-runtime</artifactId>  
  46.     <version>1.0.8</version>  
  47. </dependency>  
  48.    
  49. <!-- ZooKeeper -->  
  50. <dependency>  
  51.     <groupId>org.apache.zookeeper</groupId>  
  52.     <artifactId>zookeeper</artifactId>  
  53.     <version>3.4.6</version>  
  54. </dependency>  
  55.    
  56. <!-- Apache Commons Collections -->  
  57. <dependency>  
  58.     <groupId>org.apache.commons</groupId>  
  59.     <artifactId>commons-collections4</artifactId>  
  60.     <version>4.0</version>  
  61. </dependency>  
  62.    
  63. <!-- Objenesis -->  
  64. <dependency>  
  65.     <groupId>org.objenesis</groupId>  
  66.     <artifactId>objenesis</artifactId>  
  67.     <version>2.1</version>  
  68. </dependency>  
  69.    
  70. <!-- CGLib -->  
  71. <dependency>  
  72.     <groupId>cglib</groupId>  
  73.     <artifactId>cglib</artifactId>  
  74.     <version>3.1</version>  
  75. </dependency>  

13 分布式RPC流程图


分享到:
评论

相关推荐

    分布式系统中文版PPT-南理工复习可用

    南理工 魏松杰 对应他的英文版本PPT复习可用,同时是学习分布式系统的很好的PPT,分布式系统ppt对应分布式系统第五版英文版ppt,复习,自学可用,了解分布式系统,共10章,01-概述,02-系统模型,03-进程间通信,04-...

    分布式系统原理介绍 - 刘杰 - 百度.pdf

    分布式系统理论体系非常庞大,涉及知识面也非常广博,本文精心选择了部分在工程实践中应用广泛、简单有效的分布式理论、算法、协议加以介绍。全文分为两大部分,第一部分介绍了分布式系统的一些基本概念并框定了本文...

    《分布式系统常用技术及案例分析》PDF

    本书分为三大部分,即分布式系统基础理论、分布式系统常用技术以及经典的分布式系统案例分析。第一部分主要介绍分布式系统基础理论知识,总结一些在设计分布式系统时需要考虑的范式、知识点以及可能会面临的问题,...

    大规模分布式系统架构与设计实战.完整版

    《大规模分布式系统架构与设计实战》写到,分布式并行计算的基本原理解剖;分布式协调的实现,包括如何实现公共配置管理,如何实现分布式锁,如何实现集群管理等;分布式缓存的实现,包括如何提供完整的分布式缓存来...

    分布式存储基础、Ceph、cinder及华为软件定义的存储方案.pdf

    分布式存储基础、Ceph、cinder及华为软件定义的存储方案.pdf分布式存储基础、Ceph、cinder及华为软件定义的存储方案.pdf分布式存储基础、Ceph、cinder及华为软件定义的存储方案.pdf分布式存储基础、Ceph、cinder及...

    大型分布式网站架构设计与实践.带目录书签.完整版 陈康贤

    《大型分布式网站架构设计与实践》主要介绍了大型分布式网站架构所涉及的一些技术细节,包括SOA架构的实现、互联网安全架构、构建分布式网站所依赖的基础设施、系统稳定性保障和海量数据分析等内容;深入地讲述了...

    SpringCloud分布式微服务项目Common通用依赖模块抽离示例代码

    SpringCloud分布式微服务项目Common通用依赖模块抽离示例代码 SpringCloud分布式微服务项目Common通用依赖模块抽离示例代码 SpringCloud分布式微服务项目Common通用依赖模块抽离示例代码 SpringCloud分布式微服务...

    分布式数据库参考文献

    分布式 数据库 参考文献 分布式数据库分布式 数据库 参考文献 分布式数据库分布式 数据库 参考文献 分布式数据库分布式 数据库 参考文献 分布式数据库

    分布式架构

    分布式架构体系描述 分布式架构体系描述 分布式架构体系描述 分布式架构体系描述 分布式架构体系描述 分布式架构体系描述 分布式架构体系描述 分布式架构体系描述 分布式架构体系描述 分布式架构体系描述 分布式架构...

    《分布式系统》教学大纲

    容错技术,分布式数据管理,分布式文件系统的设计问题与实现方法,分布式调度,分布式共享存储器技术以及基于对象的分布式系统,以及相关的前沿主题,包括web服务、网格、移动系统和无处不在系统等。通过这门课程的...

    分布式系统设计.pdf

    有关分布式计算的主题是多种多样的,许多研究人员正在研究关于分布式硬 件结构和分布式软件设计的各方面问题以开发利用其潜在的并行性和容错性。在这一章里,我 们将考虑一些基本概念以及与分布式计算相关的一些问题...

    分布式系统原理与泛型

    本书是Tanenbaum先生对所著的《分布式操作系统》的升级更新,是分布式系统的权威教材。全书分为两部分:原理和范型。第一部分详细讨论了分布式系统的原理、概念和技术,其中包括通信、进程、命名、同步、一致性和...

    33节点含分布式电源配电网的程序.zip_33节点_33节点 潮流_分布式配电_含分布式潮流_含分布式电源

    33节点符合分布式电源电力系统配电网的潮流计算

    【BAT必备】分布式相关面试题大全面试题

    【BAT必备】分布式相关面试题大全面试题【BAT必备】分布式相关面试题大全面试题【BAT必备】分布式相关面试题大全面试题【BAT必备】分布式相关面试题大全面试题【BAT必备】分布式相关面试题大全面试题【BAT必备】...

    分布式开发的技术关键

    分布式开发的技术关键分布式开发的技术关键分布式开发的技术关键分布式开发的技术关键

    SpringBoot面向线上支付平台的分布式微服务架构研究

    SpringBoot面向线上支付平台的分布式微服务架构研究 SpringBoot面向线上支付平台的分布式微服务架构研究 SpringBoot面向线上支付平台的分布式微服务架构研究 SpringBoot面向线上支付平台的分布式微服务架构研究 ...

    大规模分布式存储系统

    《大规模分布式存储系统:原理解析与架构实战》是分布式系统领域的经典著作,由阿里巴巴高级技术专家“阿里日照”(OceanBase核心开发人员)撰写,阳振坤、章文嵩、杨卫华、汪源、余锋(褚霸)、赖春波等来自阿里、...

    WEB Session 分布式处理方案

    购物车分布式Session处理方案,一个用户的分布式的购物车在集群分布式的情况下怎么处理解决Session共享的问题

    分布式存储技术及应用

    分布式存储技术及应用,分布式存储技术及应用,分布式存储技术及应用

    分布式系统设计(经典书籍)

    本书较为全面地介绍了分布式系统领域的一些基本概念,提出了分布式系统的各种问题,如互斥问题、死锁的预防和检测、处理机间的通信机制、可靠性问题、负载分配问题、数据管理问题及其可能的解决方案,并讨论了分布式...

Global site tag (gtag.js) - Google Analytics