`
liyonghui160com
  • 浏览: 761149 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

zookeeper api Zookeeper的数据发布与订阅模式

阅读更多

 

 

1.发布订阅的基本概念

 

     1.发布订阅模式可以看成一对多的关系:多个订阅者对象同时监听一个主题对象,这个主题对象在自身状态发生变化时,会通知所有的订阅者对象,使他们能够自动的更新自己的状态。
 
     2.发布订阅模式,可以让发布方和订阅方,独立封装,独立改变,当一个对象的改变,需要同时改变其他的对象,而且它不知道有多少个对象需要改变时,可以使用发布订阅模式

 
    3.发布订阅模式在分布式系统的典型应用有, 配置管理和服务发现。
   
        配置管理:是指如果集群中机器拥有某些相同的配置,并且这些配置信息需要动态的改变,我们可以使用发布订阅模式,对配置文件做统一的管理,让这些机器各       自订阅配置文件的改变,当配置文件发生改变的时候这些机器就会得到通知,把自己的配置文件更新为最新的配置
    

            服务发现:是指对集群中的服务上下线做统一的管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让模型机器作为订阅方,订阅工       作服务器的基本信息,当工作服务器的基本信息发生改变时如上下线,服务器的角色和服务范围变更,监控服务器就会得到通知,并响应这些变化。

 

 

package com.zk.subscribe;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;


public class SubscribeZkClient {
	    
	    //需要多少个workserver
	    private static final int  CLIENT_QTY = 5;

	    private static final String  ZOOKEEPER_SERVER = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181";
	    //节点的路径
	    private static final String  CONFIG_PATH = "/config";//配置节点
	    private static final String  COMMAND_PATH = "/command";//命令节点
	    private static final String  SERVERS_PATH = "/servers";//服务器列表节点
	       
	    public static void main(String[] args) throws Exception
	    {
	    	//用来存储所有的clients
	        List<ZkClient>  clients = new ArrayList<ZkClient>();
	        //用来存储所有的workservers
	        List<WorkServer>  workServers = new ArrayList<WorkServer>();
	        ManagerServer manageServer = null;

	        try
	        {
	        	ServerConfig initConfig = new ServerConfig();
	        	initConfig.setDbPwd("123456");
	        	initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
	        	initConfig.setDbUser("root");
	        	
	        	ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
	        	manageServer = new ManagerServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig);
	        	manageServer.start();
	        	
	        	//根据定义的work服务个数,创建服务器后注册,然后启动
	            for ( int i = 0; i < CLIENT_QTY; ++i )
	            {
	                ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
	                clients.add(client);
	                ServerData serverData = new ServerData();
	                serverData.setId(i);
	                serverData.setName("WorkServer#"+i);
	                serverData.setAddress("192.168.1."+i);

	                WorkServer  workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
	                workServers.add(workServer);
	                workServer.start();	                
	                
	            }	            
	            System.out.println("敲回车键退出!\n");
	            new BufferedReader(new InputStreamReader(System.in)).readLine();
	            
	        }finally{
	        	//将workserver和client给关闭
	        	
	            System.out.println("Shutting down...");

	            for ( WorkServer workServer : workServers )
	            {
	            	try {
	            		workServer.stop();
					} catch (Exception e) {
						e.printStackTrace();
					}           	
	            }
	            for ( ZkClient client : clients )
	            {
	            	try {
	            		client.close();
					} catch (Exception e) {
						e.printStackTrace();
					}
	            	
	            }
	        }
	    }	

}

 

package com.zk.subscribe;

import java.util.List;

import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import com.alibaba.fastjson.JSON;

public class ManagerServer {
	
	private String serversPath;
	private String commandPath;
	private String configPath;
	private ZkClient zkClient;
	private ServerConfig config;
	//用于监听zookeeper中servers节点的子节点列表变化
	private IZkChildListener childListener;
	//用于监听zookeeper中command节点的数据变化
	private IZkDataListener dataListener;
	//工作服务器的列表
	private List<String> workServerList;

	/**
	 * 
	 * @param serversPath
	 * @param commandPath Zookeeper中存放命令的节点路径
	 * @param configPath
	 * @param zkClient
	 * @param config
	 */
	public ManagerServer(String serversPath, String commandPath,String configPath, ZkClient zkClient, ServerConfig config) {
		this.serversPath = serversPath;
		this.commandPath = commandPath;
		this.zkClient = zkClient;
		this.config = config;
		this.configPath = configPath;
		this.childListener = new IZkChildListener() {
			//用于监听zookeeper中servers节点的子节点列表变化
			public void handleChildChange(String parentPath,List<String> currentChilds) throws Exception {
				//更新服务器列表
				workServerList = currentChilds;
				
				System.out.println("work server list changed, new list is ");
				execList();

			}
		};
		
		//用于监听zookeeper中command节点的数据变化
		this.dataListener = new IZkDataListener() {

			public void handleDataDeleted(String dataPath) throws Exception {
			
			}

			public void handleDataChange(String dataPath, Object data)
					throws Exception {
			    
				String cmd = new String((byte[]) data);
				System.out.println("cmd:"+cmd);
				exeCmd(cmd);

			}
		};

	}
	
	public void start() {
		initRunning();
	}

	public void stop() {
		//取消订阅command节点数据变化和servers节点的列表变化
		zkClient.unsubscribeChildChanges(serversPath, childListener);
		zkClient.unsubscribeDataChanges(commandPath, dataListener);
	}
	
	/**
	 * 初始化
	 */
	private void initRunning() {
		//执行订阅command节点数据变化和servers节点的列表变化
		zkClient.subscribeDataChanges(commandPath, dataListener);
		zkClient.subscribeChildChanges(serversPath, childListener);
	
	}
	

    /*
	 * 执行控制命令的函数
	 * 1: list 2: create 3: modify
	 */
	private void exeCmd(String cmdType) {
		if ("list".equals(cmdType)) {
			execList();

		} else if ("create".equals(cmdType)) {
			execCreate();
		} else if ("modify".equals(cmdType)) {
			execModify();
		} else {
			System.out.println("error command!" + cmdType);
		}

	}

	
	private void execList() {
	
		System.out.println(workServerList.toString());
	}

	private void execCreate() {
		if (!zkClient.exists(configPath)) {
			try {
				
				zkClient.createPersistent(configPath, JSON.toJSONString(config).getBytes());
			
			} catch (ZkNodeExistsException e) {
				//节点已经存在异常,直接写入数据
				zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
			} catch (ZkNoNodeException e) {
				//表示其中的一个节点的父节点还没有被创建
				String parentDir = configPath.substring(0,configPath.lastIndexOf('/'));
				zkClient.createPersistent(parentDir, true);
				execCreate();
				
			}
		}
	}

	private void execModify() {
		config.setDbUser(config.getDbUser() + "_modify");

		try {
			//回写到zookeeper中
			zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
		} catch (ZkNoNodeException e) {
			execCreate();
		}
	}
	
	
}

 

package com.zk.subscribe;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

/**
 * 代表工作服务器
 * workServer服务器的信息
 *
 */
public class WorkServer{
	private String serversPath;
	private String configPath;
	private ZkClient zkClient;
	private ServerConfig config;
    private ServerData serverData;
    
	private IZkDataListener dataListener;//数据监听器
	
	
	/**
	 * 
	 * @param configPath 代表config节点的路径
	 * @param serversPath 代表servers节点的路径
	 * @param serverData   代表当前服务器的基本信息
	 * @param zkClient     底层与zookeeper集群通信的组件
	 * @param initconfig   当前服务器的初始配置
	 */
	public WorkServer(String configPath,String serversPath,ServerData serverData,ZkClient zkClient, ServerConfig initconfig){
		
		this.configPath = configPath;
		this.serversPath = serversPath;
	    this.serverData = serverData;
		this.zkClient = zkClient;
		this.config = initconfig;
		
		/**
		 * dataListener 用于监听config节点的数据改变
		 */
		this.dataListener = new IZkDataListener() {
			
			public void handleDataDeleted(String arg0) throws Exception {
				
			}
			
			/**
			 * 当数据的值改变时处理的
			 * Object data,这个data是将ServerConfig对象转成json字符串存入
			 * 可以通过参数中的Object data 拿到当前数据节点最新的配置信息
			 * 拿到这个data信息后将它反序列化成ServerConfig对象,然后更新到自己的serverconfig属性中
			 */
			public void handleDataChange(String dataPath, Object data) throws Exception {
				String retJson = new String((byte[])data);
			    ServerConfig serverConfigLocal = (ServerConfig)JSON.parseObject(retJson,ServerConfig.class);
			    //更新配置
			    updateConfig(serverConfigLocal);
			    System.out.println("new work server config is:"+serverConfigLocal.toString());
			}
		};
		
	}
	
	/**
	 * 服务的启动
	 */
	public void start(){
		System.out.println("work server start...");
		initRunning();
	}
	
	/**
	 * 服务的停止
	 */
	public void stop(){
		System.out.println("work server stop...");
		//取消监听
		zkClient.unsubscribeDataChanges(configPath, dataListener);
	
	}
	
	/**
	 * 服务器的初始化
	 */
	private void initRunning(){
		registMeToZookeeper();
		//订阅config节点的改变
		zkClient.subscribeDataChanges(configPath, dataListener);
	}
	
	/**
	 * 启动时向zookeeper注册自己
	 */
	private void registMeToZookeeper(){
		//向zookeeper中注册自己的过程其实就是向servers节点下注册一个临时节点
		//构造临时节点
		String mePath = serversPath.concat("/").concat(serverData.getAddress());
	    try{
	    	//存入是将json序列化
			zkClient.createEphemeral(mePath, JSON.toJSONString(serverData).getBytes());	
	    } catch (ZkNoNodeException e) {
	    	//父节点不存在
			zkClient.createPersistent(serversPath, true);
			registMeToZookeeper();
		}
		
	}
	
	/**
	 * 当监听到zookeeper中config节点的配置信息改变时,要读取配置信息来更新自己的配置信息
	 */
	private void updateConfig(ServerConfig serverConfig){
		this.config = serverConfig;
	}
	
}

 

package com.zk.subscribe;

/**
 * 用于记录WorkServer(工作服务器)的配置信息
 */
public class ServerConfig {
	private String dbUrl;
	private String dbPwd;
	private String dbUser;
	public String getDbUrl() {
		return dbUrl;
	}
	public void setDbUrl(String dbUrl) {
		this.dbUrl = dbUrl;
	}
	public String getDbPwd() {
		return dbPwd;
	}
	public void setDbPwd(String dbPwd) {
		this.dbPwd = dbPwd;
	}
	public String getDbUser() {
		return dbUser;
	}
	public void setDbUser(String dbUser) {
		this.dbUser = dbUser;
	}
	@Override
	public String toString() {
		return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd + ", dbUser=" + dbUser + "]";
	}
	
	
}

 

package com.zk.subscribe;
/**
 * 用于记录WorkServer(工作服务器)的基本信息
 */
public class ServerData {
	private String address;
	private Integer id;
	private String name;
	public String getAddress() {
		return address;
	}
	public void setAddress(String address) {
		this.address = address;
	}
	public Integer getId() {
		return id;
	}
	public void setId(Integer id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	
	@Override
	public String toString() {
		return "ServerData [address=" + address + ", id=" + id + ", name=" + name + "]";
	}
	
	
}

 

 

 

 

 

 

分享到:
评论

相关推荐

    支持.Net Core的ZooKeeper异步客户端.zip

    订阅数据变化 await client.SubscribeDataChange("/year", (ct, args) =&gt; {  IEnumerable currentData = args.CurrentData;  string path = args.Path;  Watcher.Event.EventType eventType = ...

    分布式协调工具-ZooKeeper实现动态负载均衡

    发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就非常适合使用。 ...

    ZookeeperJava客户端zkclient.zip

    让Zookeeper API 使用起来更简单 非常方便订阅各种事件并自动重新绑定事件(会话建立、节点修改、节点删除、子节点变更等) session过期自动重连、机制 快速入门 下面部分将引导使用者快速入门。 快速指南: ...

    java面试难点讲解:hashmap,spring aop,classload,dubbo,zookeeper,session等。

    分布式框架Zookeeper之服务注册与订阅 互联网系统垂直架构之Session解决方案 分库分表之后分布式下如何保证ID全局唯一性 互联网企业必备高质量API网关接口设计 大型公司面试必答之数据结构与算法精讲 高性能网络编程...

    Flink 1.9 Table API -kafkaSource

    ​ 使用kafka作为flink的数据源对接flink Table,本次测试使用的是单节点的kafka以及flink,以下为一次简单的操作,包括kafka主题的创建、订阅、发布以及具体的小案例 kafka中主题的创建 [root@CentOSA kafka_2.11-...

    zkclient-v2.1-3-gc18569d

    简单、高效的Zookeeper Java客户端。 让Zookeeper API 使用起来更简单 非常方便订阅各种事件并自动重新绑定事件(会话建立、节点修改、节点删除、子节点变更等) session过期自动重连、机制

    Java思维导图xmind文件+导出图片

    应用发布与监控 应用容灾及机房规划 系统动态扩容 分布式架构策略-分而治之 从简到难,从网络通信探究分布式通信原理 基于消息方式的系统间通信 理解通信协议传输过程中的序列化和反序列化机制 基于框架的...

    各大数据组件介绍.pdf

    Kafka是⼀种⾼吞吐量的发布订阅消息系统,它可以处理消费者规模的⽹站中的所有动 作流数据。 这种动作(⽹页浏览,搜索和其他⽤户的⾏动)是在现代⽹络上的许多社会功能的⼀个关键因素。 这些数据通常是由于吞吐量 ...

    Fourinone分布式并行计算四合一框架

    也可以将domain视为订阅主题,将每个订阅者注册到domain的node上,发布者将消息逐一更新每个node,订阅者监控每个属于自己的node的变化事件获取订阅消息,收到后删除内容等待下一个消息。但是Fourinone不实现JMS的...

    fourinone-3.04.25

    也可以将domain视为订阅主题,将每个订阅者注册到domain的node上,发布者将消息逐一更新每个node,订阅者监控每个属于自己的node的变化事件获取订阅消息,收到后删除内容等待下一个消息。但是Fourinone不实现JMS的...

    dubbo技术介绍

    只订阅 只注册 静态服务 多协议 多注册中心 服务分组 多版本 分组聚合 参数验证 结果缓存 泛化引用 泛化实现 回声测试 上下文信息 隐式传参 异步调用 本地调用 参数回调 事件通知 本地存根 本地伪装 延迟暴露 并发...

    jcm:JCM是分布式名称服务(集群地图管理器)

    JCM是基于ZooKeeper的分布式名称服务实现。 特征 管理超过十万个节点(ip) 将集群名称映射到node(ip)列表 每个节点上的运行状况检查 持久存储群集列表进入Zookeeper 完全分布,读写每个JCM服务器 基于JSON的...

    基于微服务库的可插拔RPCgo-micro.zip

    json对请求进行编码/解码处理BalancerSelectorrandom服务节点过滤和池ServerServerrpc监听和服务器的RPC请求Pub/SubBrokerhttp发布和订阅事件TransportTransporthttp服务之间的通信机制示例服务项目描述...

    mconn:MConn是一个框架,可在Mesosphere的顶级马拉松比赛上构建自定义服务发现解决方案

    MConn是用编写的HA队列,与模块管理结合使用,充当Mesosphere背后的事件订阅者,以响应传入的任务事件并基于上/下缩放实现简单的后处理(用于Service-Discovery-Solutions) 。 我们的目标之一是框架设计,该框架...

    kafka-pixy:Kafka的gRPCREST代理

    但是, 尚未实现,因此需要直接与Zookeeper对话以管理消费者组成员身份。 警告:Kafka-Pixy不支持通配符订阅,因此不能与使用它们的客户端共存于消费者组中。 如果他们通过全名订阅主题,则应该有可能使用与kafka-...

    scorch:状态机即服务,用于管理分布式作业和任务

    烧焦该项目提供了一个管理API,该API公开了分布式作业和任务的有状态操作。 此应用程序中使用的代码是从项目样本扩展而来的。用法Scorch提供一致性保证和使用ZooKeeper顺序订购的全局事件事务日志。 使用全局唯一...

    springCloud

    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 Github:https://github.com/rabbitmq 官网地址:http://www.rabbitmq.com 安装RabbitMQ 安装RabbitMQ 可以参考之前的文章 ...

    DDMQ消息队列-其他

    7、支持复杂的消息转换过滤功能:支持使用 Groovy 脚本在服务端进行消息内容的转化和过滤,能做大大地减少客户端和服务器的数据传输,同时减少客户端的处理消息的负载。 8、提供了一个易用性高的 Web 用户控制台,...

Global site tag (gtag.js) - Google Analytics