`
wyuxiao729
  • 浏览: 33666 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

osgi实战项目(osmp)一步一步玩转osgi之服务发现与服务路由(5)

    博客分类:
  • osgi
阅读更多

这一节里主要讲解osmp的服务发现与路由。osmp通过osmp-http组件对外发布了一个cxf的restful服务,所有的请求都通过这个restful这个接口接受请求并解析请求后再调用osgi的服务完成请求后返回到前端。

 

request->osmp-http的restful接口->解析请求->osgi服务发现->服务路由->调用服务->返回-->组装返回参数->返回

 

osmp通过osmp-service组件来对服务进行统一管理,主要功能包括服务的监听、bundle的监听、服务容器管理、服务注册到zookeeper等功能。

 

在osgi里可以通过ServiceTracker 服务跟踪器来跟踪某一类接口服务的新增、修改、删除等操作,通过BundlerContext.addBundleListener()注册 bundle监听器(BundleListener)来监听bundle各生命周期的事件,比如bundle的安装、卸载、更新、启动、停止等事件。

 

osmp为了便于对服务的统一管理,要求所有的服务都必须实现 osmp-intf-define里的定义的BaseDataService接口。这样我们就可以通过服务跟踪器跟踪到BaseDataService接口的服务新增、修改、删除等事件!

具体代码可以参照osmp-service里 ServiceWatcher.java类

 

/*   
 * Project: OSMP
 * FileName: ServiceWatcher.java
 * version: V1.0
 */
package com.osmp.service.watch;

import java.util.Date;
import java.util.UUID;

import org.osgi.framework.BundleContext;
import org.osgi.framework.BundleEvent;
import org.osgi.framework.BundleListener;
import org.osgi.framework.ServiceReference;
import org.osgi.util.tracker.ServiceTracker;
import org.osgi.util.tracker.ServiceTrackerCustomizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.osgi.context.BundleContextAware;
import org.springframework.util.Assert;

import com.osmp.intf.define.config.FrameConst;
import com.osmp.intf.define.interceptor.ServiceInterceptor;
import com.osmp.intf.define.service.BaseDataService;
import com.osmp.intf.define.service.ZookeeperService;
import com.osmp.service.bean.DataServiceInfo;
import com.osmp.service.bean.InterceptorInfo;
import com.osmp.service.factory.ServiceFactoryImpl;
import com.osmp.service.manager.ServiceStateManager;
import com.osmp.service.registration.ServiceContainer;
import com.osmp.service.util.ServiceUtil;
import com.osmp.utils.net.RequestInfoHelper;

/**
 * 
 * Description:服务注册、注销、监听
 * @author: wangkaiping
 * @date: 2016年8月9日 上午10:27:15上午10:51:30
 */
public class ServiceWatcher implements BundleContextAware, InitializingBean, DisposableBean {
	private Logger logger = LoggerFactory.getLogger(ServiceWatcher.class);

	private ServiceTracker dataServiceTracker;
	private ServiceTracker serviceInterceptorTracker;

	private ServiceStateManager serviceStateManager;
	private ServiceFactoryImpl serviceFactoryImpl;
	private BundleContext context;
	private ZookeeperService zookeeper;
	private final static String NODE_CHANGE = "/osmp/nodechange";

	@Override
	public void setBundleContext(BundleContext context) {
		this.context = context;
	}

	public void setServiceFactoryImpl(ServiceFactoryImpl serviceFactoryImpl) {
		this.serviceFactoryImpl = serviceFactoryImpl;
	}

	public void setServiceStateManager(ServiceStateManager serviceStateManager) {
		this.serviceStateManager = serviceStateManager;
	}

	public void setZookeeper(ZookeeperService zookeeper) {
		this.zookeeper = zookeeper;
	}

	@Override
	public void destroy() throws Exception {
		if (dataServiceTracker != null) {
			dataServiceTracker.close();
		}
		if (serviceInterceptorTracker != null) {
			serviceInterceptorTracker.close();
		}

		logger.info("服务监听结束");
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		Assert.notNull(context);
		Assert.notNull(serviceStateManager);
		Assert.notNull(serviceFactoryImpl);
		dataServiceTracker = new ServiceTracker(context, BaseDataService.class.getName(),
				new DataServiceTrackerCustomizer());
		serviceInterceptorTracker = new ServiceTracker(context, ServiceInterceptor.class.getName(),
				new ServiceInterceptorTrackerCustomizer());

		dataServiceTracker.open(true);
		serviceInterceptorTracker.open(true);

		context.addBundleListener(new BundleListener() {

			@Override
			public void bundleChanged(BundleEvent event) {
				if (event.getType() == BundleEvent.UNINSTALLED) {
					String name = event.getBundle().getSymbolicName();
					try {
						logger.info("uninstall bundle " + name);
						zookeeper.deleteNodeByBundle(name);
						ServiceWatcher.this.nodeUpdate();
					} catch (Exception e) {
						logger.error(
								"zookeeper delete service by bundle, bundle name : "
										+ name, e);
					}
				}
			}
		});

		logger.info("服务监听启动");
	}

	// dataService监听
	private class DataServiceTrackerCustomizer implements ServiceTrackerCustomizer {
		@Override
		public Object addingService(ServiceReference reference) {
			BaseDataService bsService = (BaseDataService) context.getService(reference);
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion().toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("组件" + bundleName + "(" + bundleVersion
						+ ") dataService服务name未设置");
				return bsService;
			}
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			ServiceContainer.getInstance().putDataService(bundleName, bundleVersion, name.toString(), bsService);
			DataServiceInfo info = new DataServiceInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(1);
			info.setUpdateTime(new Date());
			info.setMark(mark == null ? "" : mark.toString());
			serviceStateManager.updateDataService(info);
			String path = ZookeeperService.ROOT_PATH
					+ ZookeeperService.SERVICE + "/"
					+ RequestInfoHelper.getLocalIp() + "/";
			logger.debug("register service path: " + path + " bundle : " + bundleName + " to zookeeper ");
			ServiceWatcher.this.registerService(path, info);
			return bsService;
		}

		@Override
		public void modifiedService(ServiceReference reference, Object service) {
		}

		@Override
		public void removedService(ServiceReference reference, Object service) {
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion()
					.toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("组件" + bundleName + "(" + bundleVersion
						+ ") dataService服务name未设置");
				return;
			}
			ServiceContainer.getInstance().removeDataService(bundleName, bundleVersion, name.toString());
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			DataServiceInfo info = new DataServiceInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(0);
			info.setMark(mark == null ? "" : mark.toString());
			info.setUpdateTime(new Date());
			serviceStateManager.updateDataService(info);
			System.out.println("===============remove service bundleName:"
					+ bundleName + " name: " + name.toString() + " mark: "
					+ mark.toString());
			String path = ZookeeperService.ROOT_PATH
					+ ZookeeperService.SERVICE + "/"
					+ RequestInfoHelper.getLocalIp() + "/";
			ServiceWatcher.this.unRegisterService(path, info);
		}

	}

	// ServiceInterceptor监听
	private class ServiceInterceptorTrackerCustomizer implements ServiceTrackerCustomizer {
		@Override
		public Object addingService(ServiceReference reference) {
			ServiceInterceptor sicpt = (ServiceInterceptor) context.getService(reference);
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion().toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("组件" + bundleName + "(" + bundleVersion
						+ ") serviceInterceptor服务name未设置");
				return sicpt;
			}
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			ServiceContainer.getInstance().putInterceptor(
					ServiceUtil.generateServiceName(bundleName, bundleVersion, name.toString()), sicpt);
			InterceptorInfo info = new InterceptorInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(1);
			info.setUpdateTime(new Date());
			info.setMark(mark == null ? "" : mark.toString());
			serviceStateManager.updateServiceInterceptor(info);
			return sicpt;
		}

		@Override
		public void modifiedService(ServiceReference reference, Object service) {
		}

		@Override
		public void removedService(ServiceReference reference, Object service) {
			String bundleName = reference.getBundle().getSymbolicName();
			String bundleVersion = reference.getBundle().getVersion().toString();
			Object name = reference.getProperty(FrameConst.SERVICE_NAME);
			if (name == null || "".equals(name)) {
				logger.error("组件" + bundleName + "(" + bundleVersion
						+ ") serviceInterceptor服务name未设置");
				return;
			}
			Object mark = reference.getProperty(FrameConst.SERVICE_MARK);
			ServiceContainer.getInstance().removeInterceptor(
					ServiceUtil.generateServiceName(bundleName, bundleVersion, name.toString()));
			InterceptorInfo info = new InterceptorInfo();
			info.setBundle(bundleName);
			info.setVersion(bundleVersion);
			info.setName(name.toString());
			info.setState(0);
			info.setMark(mark == null ? "" : mark.toString());
			info.setUpdateTime(new Date());
			serviceStateManager.updateServiceInterceptor(info);
		}

	}

	/**
	 * 向zookeeper注册服务
	 * 
	 * @param path
	 * @param ds
	 */
	public void registerService(String path, DataServiceInfo ds) {
		String bundle = ds.getBundle();
		String cname = ds.getMark();
		String name = ds.getName();
		String version = ds.getVersion();
		String status = String.valueOf(ds.getState());
		try {
			if (!zookeeper.exists(path + name)) {
				zookeeper.createNode(path + name);
			}
			if (!zookeeper.exists(path + name + "/bundle")) {
				zookeeper.createNode(path + name + "/bundle", bundle);
			} else {
				zookeeper.setNodeData(path + name + "/bundle", bundle);
			}
			if (!zookeeper.exists(path + name + "/cname")) {
				zookeeper.createNode(path + name + "/cname", cname);
			} else {
				zookeeper.setNodeData(path + name + "/cname", cname);
			}
			if (!zookeeper.exists(path + name + "/version")) {
				zookeeper.createNode(path + name + "/version", version);
			} else {
				zookeeper.setNodeData(path + name + "/version", version);
			}
			if (!zookeeper.exists(path + name + "/status")) {
				zookeeper.createNode(path + name + "/status", status);
			} else {
				zookeeper.setNodeData(path + name + "/status", status);
			}
			this.nodeUpdate();
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("zookeeper register service fail, service name : "
					+ name, e);
		}
	}

	/**
	 * 卸载服务
	 * 
	 * @param path
	 * @param ds
	 */
	public void unRegisterService(String path, DataServiceInfo ds) {
		String name = ds.getName();
		try {
			if (zookeeper.exists(path + name + "/status")) {
				zookeeper.setNodeData(path + name + "/status",
						String.valueOf(ds.getState()));
				this.nodeUpdate();
			}
		} catch (Exception e) {
			e.printStackTrace();
			logger.error("zookeeper unRegister service fail, service name : "
					+ name, e);
		}
	}
	
	/**
	 * 更新节点状态变化
	 */
	public void nodeUpdate(){
		String data = UUID.randomUUID().toString();
		try {
			if(zookeeper.exists(NODE_CHANGE)){
				zookeeper.setNodeData(NODE_CHANGE, data);
			}else{
				zookeeper.createNode(NODE_CHANGE);
				zookeeper.setNodeData(NODE_CHANGE, data);
			}
		} catch (Exception e) {
			logger.error("更新节点变化状态错误", e);
		}
	}

}

 

  1. ServiceWatcher 实现 BundleContextAware接口,将BundleContext 设置进来。
  2. 在ServiceWatcher初始化的时候 实例化了两个服务跟踪器 (ServiceTracker)分别用来跟踪 BaseDataService、ServiceInterceptor 两类接口的服务。
  3. 在ServiceWatcher初始化的时候 给BundleContext新增Bunlde监听用来监听bundle的事件
  4. ServiceTracker需要传递一个ServiceTrackerCustomizer实例来具体执行监听的事件,在这里我们通过DataServiceTrackerCustomizer来具体执行BaseDataService接口服务跟踪操作。
  5. BaseDataService 服务被发布到osgi容器里的时候,会自动调用DataServiceTrackerCustomizer.addingService(ServiceReference reference) 方法,通过BundleContext.getService(ServiceReference reference),我们可以获取到当前被发布到osgi容器里的服务。
  6. 通过获取bundleName、bundleVersion、以前发布服务时定义的tag标签(name)组成唯一的key(bundleName+bundleVersion+name) 以获取到service为value。将此保存到服务容器(Map)里。

注:osmp-service里将监听到的服务同时保存到数据库里和注册到zookeeper请暂时忽略,稍后osmp注册中会作详细讲解。

 

服务发现和路由:

 

osmp-http接收到请求后解析服务名称,通过服务查询此服务是否绑定了拦截器,如果绑定了拦截器,则先执行拦截器链,拦截器如果执行失败则直接返回,如果拦截器执行成功则通过服务名称获取服务,获取服务成功后直接执行服务的execute方法,将并结果返回!

 

这里讲的简单点儿,具体osmp封装了一层代理实现,有兴趣的可以直接查看osmp-http源码。

 

至此osmp的服务发现和服务调用功能就讲到这里!

 

 

 

0
0
分享到:
评论

相关推荐

    《OSGi实战》完整中文版

    《 OSGi实战》是学习OSGi的全面指导,利用与架构和开发人员相关的示例清楚地讲解OSGi概念,同时探讨了很多实践场景和技术,阐述了开发人员有多需要OSGi,怎么将OSGi嵌入其他容器中,将遗留系统移入OSGi的最佳实践,...

    osgi实战(pdf超请版)

    文内难免有些错误,还请大家不吝指正,也希望本文能作为国内 OSGI 的抛砖之作,引出更多的关于 OSGI的 Opendoc,在我的 blog 上也会不断的编写关于自己在 OSGI、Equinox 上的实战的体会和心得,欢迎大家多多交流。

    OSGi实战(OSGi规范制定者亲自撰写,汇集Apache项目技术实战经验),完整扫描版

    为了弥补OSGi规范在应用指导方面的不足,四位活跃在OSGi开发第一线的技术专家联手打造了《OSGi实战》。《OSGi实战》面向OSGi规范的使用者,系统、全面、深入地阐述OSGi的重要特性及其使用方法。《OSGi实战》还介绍了...

    osgi,林昊写的osgi实战和进阶

    osgi,林昊写的osgi实战和进阶,学习osgi的好东西,入门的首选。

    OSGi实战

    资源名称:OSGi实战内容简介:为了弥补OSGi规范在应用指导方面的不足,四位活跃在OSGi开发第一线的技术专家联手打造了《OSGi实战》。《OSGi实战》面向OSGi规范的使用者,系统、全面、深入地阐述OSGi的重要特性及其...

    OSGI合集 OSGi原理与最佳实践

    网上收集的OSGI资料. 包括: OSGi原理与最佳实践(精选版).pdf OSGI实战和源码.rar osgi进阶.pdf Introduce.OSGi.ppt OSGi.in.action.ppt r4.cmpn.pdf r4.core.pdf r4.enterprise.pdf

    OSGi 入门+进阶+实战

    OSGI入门和整合Spring + OSGI进阶 + OSGI实战

    OSGI实战教程

    OSGI实战教程,从需求实现以及技术角度两方面来体验OSGI。

    OSGi原理与最佳实践pdf下载(完整版)

    OSGI原理与最佳实践的完整版,共12章 第1 章OSGi 简介 第2 章OSGi 框架简介 第3 章基于Spring-DM 实现Petstore 第4 章基于Apache CXF 实现分布式Petstore 第5 章构建OSGI Bundle Repositor'y 第6 章OSGi 规范解读 ...

    OSGI实战.docx

    java OSGI实战

    OSGI资料,OSGI进阶,OSGI实战,OSGI入门和整合Spring

    OSGI进阶.pdf,OSGI实战.pdf,OSGI入门和整合Spring.pdf

    OSGI实战和OSGI进阶打包提供

    BlueDavy的OSGI实战和OSGI进阶两个开源文档,学习OSGI不错的资料

    OSGI原理与最佳实践

    资源名称:OSGI原理与最佳实践内容简介:国内第一本OSGi图书OSGi国内推广者林昊多年经验的结晶涵盖OSGi从入门到深入的知识体系引领OSGi国内研究和普及本书基于作者多年使用OSGi的经验而编写,涵盖了OSGi从入门到深入...

    OSGI实战.pdf

    OSGI实战.pdfOSGI实战.pdfOSGI实战.pdfOSGI实战.pdfOSGI实战.pdf

    OSGI框架实战

    OSGI框架实战 一. 序..5 二. 体验OSGI..7 2.1. 需求实现..7 2.2. 技术角度..9 三. OSGI带来什么..11 四. OSGI案例..13 五. OSGI框架..15 5.1. Equinox..15 5.2. Oscar..15 5.3. Knopflerfish..15 六. 基于OSGI框架...

    OSGI实战及源码

    OSGI实战及源码

    OSGI实战中文版

    OSGI实战中文版。

    OSGI实战.zip

    OSGI实战.pdf

    基于osgi框架实战源码

    osgi实战源码 通过一个完整购物车实例来展示OSGI的实现.代码简单易懂,可扩展.

Global site tag (gtag.js) - Google Analytics