文章讲解如何使用Thrift和Curator实现自己的服务注册与发现功能,服务注册中心使用Zookeeper框架。
技术栈
zookeeper-3.4.13
thrift-0.11.0
curator-2.13.0
特性
1、服务集群部署:同一个服务可以部署到多台服务器上,注册中心维护一个服务的多份payload信息
2、客户端软负载均衡:暂支持随机和轮询两种方式
3、服务提供者和服务消费者可选择是否要注册到注册中心
4、业务服务实现类自动加载和注册
5、一个端口监听多个业务服务
最新源码: https://github.com/chenjuwen/thrift-microservice
功能主要包含以下关键实现点:
扫描和加载业务服务实现类
发布业务服务到thrift处理器
注册业务服务到注册中心
创建thrift服务端
从注册中心查找业务服务
调用业务服务接口
1、扫描和加载业务服务实现类
业务服务接口通过Thrift的IDL进行描述,并使用Thrift提供的工具编译生成接口文件。
Common.thrift namespace java com.seasy.microservice.api struct Message { 1: i32 type; 2: binary data; } struct Response { 1: i32 code; 2: string message; } Hello.thrift namespace java com.seasy.microservice.api include "Common.thrift" service Hello{ string helloString(1:string param) Common.Response sendMessage(1:Common.Message message) }
开发业务服务接口的实现类,所有实现类统一用自定义注解类ServiceAnnotation加以标注。
@ServiceAnnotation(serviceClass=Hello.class, version="1.0.0") public class HelloServiceImpl implements Hello.Iface{ @Override public String helloString(String param) throws TException { return param; } @Override public Response sendMessage(Message message) throws TException { System.out.println(message.getType() + ", " + new String(message.getData())); Response response = new Response(0, "success"); return response; } }
根据自定义注解类扫描和加载业务服务实现类。
private ConcurrentHashMap<String, ServiceInformation> loadService(String packagePath) throws Exception { ConcurrentHashMap<String, ServiceInformation> serviceInformationMap = new ConcurrentHashMap<String, ServiceInformation>(); Reflections reflections = new Reflections(packagePath); //查找有指定注解类的服务类 Set<Class<?>> serviceImplementClassSet = reflections.getTypesAnnotatedWith(ServiceAnnotation.class); for(Class<?> serviceImplementClass : serviceImplementClassSet){ ServiceAnnotation serviceAnnotation = serviceImplementClass.getAnnotation(ServiceAnnotation.class); //服务相关信息封装在ServiceInformation类中 ServiceInformation serviceInformation = new ServiceInformation(); serviceInformation.setId(serviceInformation.getId()); serviceInformation.setServiceClass(serviceAnnotation.serviceClass()); serviceInformation.setVersion(serviceAnnotation.version()); serviceInformation.setTimeout(serviceAnnotation.timeout()); serviceInformation.setServiceImplementClassInstance(serviceImplementClass.newInstance()); String key = serviceAnnotation.serviceClass().getName(); serviceInformationMap.put(key, serviceInformation); logger.debug("Class [" + key + "] loaded!"); } return serviceInformationMap; }
2、发布业务服务到thrift处理器
使用多路复用处理器TMultiplexedProcessor注册多个业务服务,这样通过监听一个端口即可提供多种服务。
private void buildMultiplexedProcessor(){ multiplexedProcessor = new TMultiplexedProcessor(); for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){ String serviceClassFullname = it.next(); logger.debug("serviceClassFullname=" + serviceClassFullname); //服务名 String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1); logger.debug("serviceName=" + serviceName); Object serviceImplementClassInstance = serviceInformationMap.get(serviceClassFullname).getServiceImplementClassInstance(); TProcessor serviceProcessor = createServiceProcessor(serviceClassFullname, serviceImplementClassInstance); if(serviceProcessor != null){ //以接口主类的SimpleName作为服务名 multiplexedProcessor.registerProcessor(serviceName, serviceProcessor); serviceInformationMap.get(serviceClassFullname).setProcessorRegistered(true); logger.info("Processor [" + serviceName + "] published!"); } } } private TProcessor createServiceProcessor(String serviceClassFullname, Object serviceImplementClassInstance){ try{ String processorClassName = serviceClassFullname + "$Processor"; String ifaceClassName = serviceClassFullname + "$Iface"; Class<?> processorClass = Class.forName(processorClassName); Class<?> ifaceClass = Class.forName(ifaceClassName); //一个服务实现类对应一个Processor Constructor<?> constructor = processorClass.getDeclaredConstructor(new Class[]{ifaceClass}); TProcessor processor = (TProcessor) constructor.newInstance(new Object[]{serviceImplementClassInstance}); return processor; }catch(Exception ex){ logger.error("create service[" + serviceClassFullname + "] processor error", ex); } return null; }
3、注册业务服务到注册中心
功能采用Zookeeper作为服务注册中心,使用Curator与Zookeeper进行通信。服务注册需要用到Curator扩展包curator-x-discovery.jar
构造CuratorFramework实例
curator = CuratorFrameworkFactory.builder() .connectString(connectString) .namespace("thrift-microservice") .build(); curator.start();
构造ServiceDiscovery实例
serviceDiscovery = ServiceDiscoveryBuilder.builder(ThriftServicePayload.class) .client(curator) .serializer(new JsonInstanceSerializer<>(ThriftServicePayload.class)) .basePath(basePath) .build(); serviceDiscovery.start();
将业务服务注册到Zookeeper注册中心
private void registerBusinessService(){ for(Iterator<String> it=serviceInformationMap.keySet().iterator(); it.hasNext(); ){ String serviceClassFullname = it.next(); String serviceName = serviceClassFullname.substring(serviceClassFullname.lastIndexOf(".")+1); ServiceInformation serviceInformation = serviceInformationMap.get(serviceClassFullname); if(serviceInformation.isProcessorRegistered()){ try{ //构造ServiceInstance对象,该对象表示一个业务服务,用于存储业务服务相关的参数数据 ServiceInstance<ThriftServicePayload> serviceInstance = ServiceInstance.<ThriftServicePayload>builder() .name(serviceName) .id(StringUtil.isEmpty(serviceInformation.getId()) ? serviceName : serviceInformation.getId()) .address(EnvUtil.getLocalIp()) .port(getPort()) .payload(new ThriftServicePayload(serviceInformation.getVersion(), serviceInformation.getServiceClass().getName())) .registrationTimeUTC(System.currentTimeMillis()) .serviceType(ServiceType.DYNAMIC) .build(); serviceRegistry.registerService(serviceInstance); serviceInformation.setServiceRegistered(true); logger.info("Service [" + serviceName + "] registered!"); }catch(Exception ex){ logger.error("register service[" + serviceName + "] error", ex); } } } }
4、创建thrift服务端
private void startServer()throws Exception{ serverSocket = new TNonblockingServerSocket(getPort()); TNonblockingServer.Args tArgs = new TNonblockingServer.Args(serverSocket); tArgs.processor(multiplexedProcessor); tArgs.transportFactory(new TFramedTransport.Factory()); tArgs.protocolFactory(new TCompactProtocol.Factory()); tserver = new TNonblockingServer(tArgs); tserver.setServerEventHandler(new DefaultServerEventHandler()); tserver.serve(); }
5、从注册中心查找业务服务
根据业务服务的Client类从注册中心查找对应的服务配置信息,并根据服务配置信息实例化服务Client类的对象。
public <T> T getServiceClient(Class<T> serviceClientClass){ try{ String serviceClientClassName = serviceClientClass.getName(); if(!serviceClientClassName.endsWith("$Client")){ throw new IllegalArgumentException("serviceClientClass must be $Client class"); } String serviceName = serviceClientClassName.replace("$Client", ""); serviceName = serviceName.substring(serviceName.lastIndexOf(".")+1); Object object = getServiceClient(serviceName); if(object != null){ return (T)object; } }catch(Exception ex){ logger.error("Failed to get ServiceClient", ex); } return null; } public Object getServiceClient(String serviceName) { try{ ServiceInstance<ThriftServicePayload> serviceInstance = queryForInstance(serviceName); if(serviceInstance != null){ //服务所在机器的IP地址 String host = serviceInstance.getAddress(); //服务所在机器的监听端口 int port = serviceInstance.getPort(); ServiceClientFactory factory = null; ServiceClientWrapper wrapper = null; String key = host + ":" + port; if(serviceClientFactoryMap.containsKey(key)){ factory = serviceClientFactoryMap.get(key); wrapper = factory.getServiceClientWrapper(serviceName); }else{ logger.info("create ServiceClientFactory..."); factory = ServiceClientFactory.getInstance(); factory.setHost(host); factory.setPort(port); factory.open(); serviceClientFactoryMap.put(key, factory); } if(wrapper == null){ Class<?> serviceClientClass = Class.forName(serviceInstance.getPayload().getInterfaceName() + "$Client"); wrapper = new ServiceClientWrapper(serviceInstance, serviceClientClass, serviceName); factory.addServiceClientWrapper(wrapper); return factory.getServiceClientWrapper(serviceName).getServiceClientInstanceObject(); }else{ return wrapper.getServiceClientInstanceObject(); } }else{ //服务不存在 logger.error("service not found: " + serviceName); } }catch(Exception ex){ logger.error("Failed to get ServiceClient", ex); } return null; }
一个ServiceClientFactory对象代表一个服务提供者,一个服务提供者可以提供多个业务服务,每个业务服务对应的客户端对象用ServiceClientWrapper实例表示。
public void open(){ try{ if(StringUtil.isNotEmpty(this.host) && this.port != 0){ if(transport == null){ transport = new TFramedTransport(new TSocket(this.host, this.port)); transport.open(); logger.debug("Transport opened --> " + this.host + ":" + this.port); }else if(!transport.isOpen()){ transport.open(); } }else{ throw new IllegalArgumentException("parameter host or port is invalid!"); } }catch(Exception ex){ close(); throw new RuntimeException("Failed to open service Socket", ex); } } public ServiceClientWrapper getServiceClientWrapper(String serviceName){ return serviceClientWrapperMap.get(serviceName); } public void addServiceClientWrapper(ServiceClientWrapper wrapper){ if(!serviceClientWrapperMap.containsKey(wrapper.getServiceName())){ wrapper = instanceServiceClient(wrapper); serviceClientWrapperMap.put(wrapper.getServiceName(), wrapper); } } private ServiceClientWrapper instanceServiceClient(ServiceClientWrapper wrapper){ try{ logger.debug("instance ServiceClient: " + wrapper.getServiceClientClass().getName()); TProtocol protocol = new TCompactProtocol(transport); TMultiplexedProtocol multiplexedProtocol = new TMultiplexedProtocol(protocol, wrapper.getServiceName()); Class[] classes = new Class[]{TProtocol.class}; Object serviceClientInstanceObject = wrapper.getServiceClientClass().getConstructor(classes).newInstance(multiplexedProtocol); wrapper.setServiceClientInstanceObject(serviceClientInstanceObject); return wrapper; }catch(Exception ex){ logger.error("instance ServiceClient error", ex); } return wrapper; }
public class ServiceClientWrapper { private ServiceInstance<ThriftServicePayload> serviceInstance; private Class<?> serviceClientClass; private String serviceName; private Object serviceClientInstanceObject; //服务客户端类的实例对象 public ServiceClientWrapper(ServiceInstance<ThriftServicePayload> serviceInstance, Class<?> serviceClientClass, String serviceName){ this.serviceInstance = serviceInstance; this.serviceClientClass = serviceClientClass; this.serviceName = serviceName; } public ServiceInstance<ThriftServicePayload> getServiceInstance() { return serviceInstance; } public Class<?> getServiceClientClass() { return serviceClientClass; } public String getServiceName() { return serviceName; } public Object getServiceClientInstanceObject() { return serviceClientInstanceObject; } public void setServiceClientInstanceObject(Object serviceClientInstanceObject) { this.serviceClientInstanceObject = serviceClientInstanceObject; } }
6、调用业务服务接口
客户端获取服务Client实例对象的方式:
Hello.Client client = getServiceClient(Hello.Client.class) 或者 Object object = clientBootstrap.getServiceClient("Hello"); if(object != null){ Hello.Client client = (Hello.Client)object; }
访问接口方法:
client.helloString("hello string") ByteBuffer data = ByteBuffer.wrap("hello world".getBytes("UTF-8")); Response response = client.sendMessage(new Message(1, data)); System.out.println(response.getCode() + ", " + response.getMessage());
7、注册中心管理(截图)
相关推荐
Thrift+SpringBoot+RocketMQ+Elasticsearch+C#; 1.通过thrift的RPC实现C#与java通信, 2.阿里巴巴RocketMQ实现高并发数据消息队列 3.Elasticsearch实现对数据全文检索
基于thrift+scribe实现分布式日志收集,并能够基于log4j进行集成
服务治理框架,一般存在与RPC的上一层,用来在大量RPC服务至上,协调客户端和服务器的调用工作。这个示例工程和我的博客《架构设计:系统间通信(13)——RPC实例Apache Thrift 下篇》...
最佳环境 CentOS 5.4 Scribe真正可用rpm安装包 apache-thrift-0.7.0-1.x86_64.rpm,fb303-0.7.0-1.x86_64.rpm,scribe-2.2-3.x86_64.rpm。无需编译,一命令安装。简单快捷,方便部署。...scribe+thrift+fb303.7z
主要是对thrift0.9.0 TSimpleServer、TThreadPoolServer 、TNonblockingServer、THsHaServer等服务模型实例和AsynClient 异步客户端实例代码的演示
包含了所有的依赖,windows7下亲测能够使用,不再需要其他依赖组件 thrift thrift源码 libthrift.jar slf4j-api.jar slf4j-simple.jar
netty+thrift高并发高性能
thrift源码+DEMO+简单教程
NULL 博文链接:https://zhuwx.iteye.com/blog/2377030
thrift官方代码+与dubbo集成支持原生thrift协议
基于Thrift、zookeeper的rpc实现 基于注解配置ThriftService、ThriftReference 基于权重的简单负载均衡 使用TMultiplexedProcessor发布多个服务
Thrift RPC客户端的服务化框架代码,
Thrift中实现Java与Python的RPC互相调用示例代码;Thrift中实现Java与Python的RPC互相调用示例代码;Thrift中实现Java与Python的RPC互相调用示例代码
泉边引擎 基于Spring Boot的嵌入式后端引擎。 它支持Netty + Thrift,还支持Jetty...服务注册,负载均衡和路由规则 重试和短路 指标监控器 分布式跟踪日志 除了提供Java智能客户端之外,它还将提供代理来支持其他语言。
Thrift初探:简单实现C#通讯服务程序
thrift入门教程+代码
Golang通过Thrift框架完美实现跨语言调用
thrift特性、不支持的特性、对各个语言的支持情况、语法参考、Thrift 架构、协议、传输层、服务端类型、各种thriftServer实现的比较、Thrift对多接口服务的支持
NULL 博文链接:https://shift-alt-ctrl.iteye.com/blog/1990026