1.发布订阅的基本概念
1.发布订阅模式可以看成一对多的关系:多个订阅者对象同时监听一个主题对象,这个主题对象在自身状态发生变化时,会通知所有的订阅者对象,使他们能够自动的更新自己的状态。
2.发布订阅模式,可以让发布方和订阅方,独立封装,独立改变,当一个对象的改变,需要同时改变其他的对象,而且它不知道有多少个对象需要改变时,可以使用发布订阅模式
3.发布订阅模式在分布式系统的典型应用有, 配置管理和服务发现。
配置管理:是指如果集群中机器拥有某些相同的配置,并且这些配置信息需要动态的改变,我们可以使用发布订阅模式,对配置文件做统一的管理,让这些机器各 自订阅配置文件的改变,当配置文件发生改变的时候这些机器就会得到通知,把自己的配置文件更新为最新的配置
服务发现:是指对集群中的服务上下线做统一的管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让模型机器作为订阅方,订阅工 作服务器的基本信息,当工作服务器的基本信息发生改变时如上下线,服务器的角色和服务范围变更,监控服务器就会得到通知,并响应这些变化。
package com.zk.subscribe; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; public class SubscribeZkClient { //需要多少个workserver private static final int CLIENT_QTY = 5; private static final String ZOOKEEPER_SERVER = "192.168.30.164:2181,192.168.30.165:2181,192.168.30.166:2181"; //节点的路径 private static final String CONFIG_PATH = "/config";//配置节点 private static final String COMMAND_PATH = "/command";//命令节点 private static final String SERVERS_PATH = "/servers";//服务器列表节点 public static void main(String[] args) throws Exception { //用来存储所有的clients List<ZkClient> clients = new ArrayList<ZkClient>(); //用来存储所有的workservers List<WorkServer> workServers = new ArrayList<WorkServer>(); ManagerServer manageServer = null; try { ServerConfig initConfig = new ServerConfig(); initConfig.setDbPwd("123456"); initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb"); initConfig.setDbUser("root"); ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer()); manageServer = new ManagerServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig); manageServer.start(); //根据定义的work服务个数,创建服务器后注册,然后启动 for ( int i = 0; i < CLIENT_QTY; ++i ) { ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer()); clients.add(client); ServerData serverData = new ServerData(); serverData.setId(i); serverData.setName("WorkServer#"+i); serverData.setAddress("192.168.1."+i); WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig); workServers.add(workServer); workServer.start(); } System.out.println("敲回车键退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine(); }finally{ //将workserver和client给关闭 System.out.println("Shutting down..."); for ( WorkServer workServer : workServers ) { try { workServer.stop(); } catch (Exception e) { e.printStackTrace(); } } for ( ZkClient client : clients ) { try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
package com.zk.subscribe; import java.util.List; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import com.alibaba.fastjson.JSON; public class ManagerServer { private String serversPath; private String commandPath; private String configPath; private ZkClient zkClient; private ServerConfig config; //用于监听zookeeper中servers节点的子节点列表变化 private IZkChildListener childListener; //用于监听zookeeper中command节点的数据变化 private IZkDataListener dataListener; //工作服务器的列表 private List<String> workServerList; /** * * @param serversPath * @param commandPath Zookeeper中存放命令的节点路径 * @param configPath * @param zkClient * @param config */ public ManagerServer(String serversPath, String commandPath,String configPath, ZkClient zkClient, ServerConfig config) { this.serversPath = serversPath; this.commandPath = commandPath; this.zkClient = zkClient; this.config = config; this.configPath = configPath; this.childListener = new IZkChildListener() { //用于监听zookeeper中servers节点的子节点列表变化 public void handleChildChange(String parentPath,List<String> currentChilds) throws Exception { //更新服务器列表 workServerList = currentChilds; System.out.println("work server list changed, new list is "); execList(); } }; //用于监听zookeeper中command节点的数据变化 this.dataListener = new IZkDataListener() { public void handleDataDeleted(String dataPath) throws Exception { } public void handleDataChange(String dataPath, Object data) throws Exception { String cmd = new String((byte[]) data); System.out.println("cmd:"+cmd); exeCmd(cmd); } }; } public void start() { initRunning(); } public void stop() { //取消订阅command节点数据变化和servers节点的列表变化 zkClient.unsubscribeChildChanges(serversPath, childListener); zkClient.unsubscribeDataChanges(commandPath, dataListener); } /** * 初始化 */ private void initRunning() { //执行订阅command节点数据变化和servers节点的列表变化 zkClient.subscribeDataChanges(commandPath, dataListener); zkClient.subscribeChildChanges(serversPath, childListener); } /* * 执行控制命令的函数 * 1: list 2: create 3: modify */ private void exeCmd(String cmdType) { if ("list".equals(cmdType)) { execList(); } else if ("create".equals(cmdType)) { execCreate(); } else if ("modify".equals(cmdType)) { execModify(); } else { System.out.println("error command!" + cmdType); } } private void execList() { System.out.println(workServerList.toString()); } private void execCreate() { if (!zkClient.exists(configPath)) { try { zkClient.createPersistent(configPath, JSON.toJSONString(config).getBytes()); } catch (ZkNodeExistsException e) { //节点已经存在异常,直接写入数据 zkClient.writeData(configPath, JSON.toJSONString(config).getBytes()); } catch (ZkNoNodeException e) { //表示其中的一个节点的父节点还没有被创建 String parentDir = configPath.substring(0,configPath.lastIndexOf('/')); zkClient.createPersistent(parentDir, true); execCreate(); } } } private void execModify() { config.setDbUser(config.getDbUser() + "_modify"); try { //回写到zookeeper中 zkClient.writeData(configPath, JSON.toJSONString(config).getBytes()); } catch (ZkNoNodeException e) { execCreate(); } } }
package com.zk.subscribe; import org.I0Itec.zkclient.IZkChildListener; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; /** * 代表工作服务器 * workServer服务器的信息 * */ public class WorkServer{ private String serversPath; private String configPath; private ZkClient zkClient; private ServerConfig config; private ServerData serverData; private IZkDataListener dataListener;//数据监听器 /** * * @param configPath 代表config节点的路径 * @param serversPath 代表servers节点的路径 * @param serverData 代表当前服务器的基本信息 * @param zkClient 底层与zookeeper集群通信的组件 * @param initconfig 当前服务器的初始配置 */ public WorkServer(String configPath,String serversPath,ServerData serverData,ZkClient zkClient, ServerConfig initconfig){ this.configPath = configPath; this.serversPath = serversPath; this.serverData = serverData; this.zkClient = zkClient; this.config = initconfig; /** * dataListener 用于监听config节点的数据改变 */ this.dataListener = new IZkDataListener() { public void handleDataDeleted(String arg0) throws Exception { } /** * 当数据的值改变时处理的 * Object data,这个data是将ServerConfig对象转成json字符串存入 * 可以通过参数中的Object data 拿到当前数据节点最新的配置信息 * 拿到这个data信息后将它反序列化成ServerConfig对象,然后更新到自己的serverconfig属性中 */ public void handleDataChange(String dataPath, Object data) throws Exception { String retJson = new String((byte[])data); ServerConfig serverConfigLocal = (ServerConfig)JSON.parseObject(retJson,ServerConfig.class); //更新配置 updateConfig(serverConfigLocal); System.out.println("new work server config is:"+serverConfigLocal.toString()); } }; } /** * 服务的启动 */ public void start(){ System.out.println("work server start..."); initRunning(); } /** * 服务的停止 */ public void stop(){ System.out.println("work server stop..."); //取消监听 zkClient.unsubscribeDataChanges(configPath, dataListener); } /** * 服务器的初始化 */ private void initRunning(){ registMeToZookeeper(); //订阅config节点的改变 zkClient.subscribeDataChanges(configPath, dataListener); } /** * 启动时向zookeeper注册自己 */ private void registMeToZookeeper(){ //向zookeeper中注册自己的过程其实就是向servers节点下注册一个临时节点 //构造临时节点 String mePath = serversPath.concat("/").concat(serverData.getAddress()); try{ //存入是将json序列化 zkClient.createEphemeral(mePath, JSON.toJSONString(serverData).getBytes()); } catch (ZkNoNodeException e) { //父节点不存在 zkClient.createPersistent(serversPath, true); registMeToZookeeper(); } } /** * 当监听到zookeeper中config节点的配置信息改变时,要读取配置信息来更新自己的配置信息 */ private void updateConfig(ServerConfig serverConfig){ this.config = serverConfig; } }
package com.zk.subscribe; /** * 用于记录WorkServer(工作服务器)的配置信息 */ public class ServerConfig { private String dbUrl; private String dbPwd; private String dbUser; public String getDbUrl() { return dbUrl; } public void setDbUrl(String dbUrl) { this.dbUrl = dbUrl; } public String getDbPwd() { return dbPwd; } public void setDbPwd(String dbPwd) { this.dbPwd = dbPwd; } public String getDbUser() { return dbUser; } public void setDbUser(String dbUser) { this.dbUser = dbUser; } @Override public String toString() { return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd + ", dbUser=" + dbUser + "]"; } }
package com.zk.subscribe; /** * 用于记录WorkServer(工作服务器)的基本信息 */ public class ServerData { private String address; private Integer id; private String name; public String getAddress() { return address; } public void setAddress(String address) { this.address = address; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "ServerData [address=" + address + ", id=" + id + ", name=" + name + "]"; } }
相关推荐
订阅数据变化 await client.SubscribeDataChange("/year", (ct, args) => { IEnumerable currentData = args.CurrentData; string path = args.Path; Watcher.Event.EventType eventType = ...
发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就非常适合使用。 ...
让Zookeeper API 使用起来更简单 非常方便订阅各种事件并自动重新绑定事件(会话建立、节点修改、节点删除、子节点变更等) session过期自动重连、机制 快速入门 下面部分将引导使用者快速入门。 快速指南: ...
分布式框架Zookeeper之服务注册与订阅 互联网系统垂直架构之Session解决方案 分库分表之后分布式下如何保证ID全局唯一性 互联网企业必备高质量API网关接口设计 大型公司面试必答之数据结构与算法精讲 高性能网络编程...
使用kafka作为flink的数据源对接flink Table,本次测试使用的是单节点的kafka以及flink,以下为一次简单的操作,包括kafka主题的创建、订阅、发布以及具体的小案例 kafka中主题的创建 [root@CentOSA kafka_2.11-...
简单、高效的Zookeeper Java客户端。 让Zookeeper API 使用起来更简单 非常方便订阅各种事件并自动重新绑定事件(会话建立、节点修改、节点删除、子节点变更等) session过期自动重连、机制
应用发布与监控 应用容灾及机房规划 系统动态扩容 分布式架构策略-分而治之 从简到难,从网络通信探究分布式通信原理 基于消息方式的系统间通信 理解通信协议传输过程中的序列化和反序列化机制 基于框架的...
Kafka是⼀种⾼吞吐量的发布订阅消息系统,它可以处理消费者规模的⽹站中的所有动 作流数据。 这种动作(⽹页浏览,搜索和其他⽤户的⾏动)是在现代⽹络上的许多社会功能的⼀个关键因素。 这些数据通常是由于吞吐量 ...
也可以将domain视为订阅主题,将每个订阅者注册到domain的node上,发布者将消息逐一更新每个node,订阅者监控每个属于自己的node的变化事件获取订阅消息,收到后删除内容等待下一个消息。但是Fourinone不实现JMS的...
也可以将domain视为订阅主题,将每个订阅者注册到domain的node上,发布者将消息逐一更新每个node,订阅者监控每个属于自己的node的变化事件获取订阅消息,收到后删除内容等待下一个消息。但是Fourinone不实现JMS的...
只订阅 只注册 静态服务 多协议 多注册中心 服务分组 多版本 分组聚合 参数验证 结果缓存 泛化引用 泛化实现 回声测试 上下文信息 隐式传参 异步调用 本地调用 参数回调 事件通知 本地存根 本地伪装 延迟暴露 并发...
JCM是基于ZooKeeper的分布式名称服务实现。 特征 管理超过十万个节点(ip) 将集群名称映射到node(ip)列表 每个节点上的运行状况检查 持久存储群集列表进入Zookeeper 完全分布,读写每个JCM服务器 基于JSON的...
json对请求进行编码/解码处理BalancerSelectorrandom服务节点过滤和池ServerServerrpc监听和服务器的RPC请求Pub/SubBrokerhttp发布和订阅事件TransportTransporthttp服务之间的通信机制示例服务项目描述...
MConn是用编写的HA队列,与模块管理结合使用,充当Mesosphere背后的事件订阅者,以响应传入的任务事件并基于上/下缩放实现简单的后处理(用于Service-Discovery-Solutions) 。 我们的目标之一是框架设计,该框架...
但是, 尚未实现,因此需要直接与Zookeeper对话以管理消费者组成员身份。 警告:Kafka-Pixy不支持通配符订阅,因此不能与使用它们的客户端共存于消费者组中。 如果他们通过全名订阅主题,则应该有可能使用与kafka-...
烧焦该项目提供了一个管理API,该API公开了分布式作业和任务的有状态操作。 此应用程序中使用的代码是从项目样本扩展而来的。用法Scorch提供一致性保证和使用ZooKeeper顺序订购的全局事件事务日志。 使用全局唯一...
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 Github:https://github.com/rabbitmq 官网地址:http://www.rabbitmq.com 安装RabbitMQ 安装RabbitMQ 可以参考之前的文章 ...
7、支持复杂的消息转换过滤功能:支持使用 Groovy 脚本在服务端进行消息内容的转化和过滤,能做大大地减少客户端和服务器的数据传输,同时减少客户端的处理消息的负载。 8、提供了一个易用性高的 Web 用户控制台,...