- 浏览: 108630 次
- 性别:
- 来自: 深圳
文章分类
最新评论
-
土豆蛋儿:
我想读取一个外部文件,以什么方式好了? 文件内容经常编辑
flume 自定义source -
土豆蛋儿:
大神,您好。
flume 自定义source
引用
原文:http://www.cnblogs.com/haippy/archive/2012/07/20/2600077.html
简介
Apache Zookeeper 是由 Apache Hadoop 的 Zookeeper 子项目发展而来,现在已经成为了 Apache 的顶级项目。Zookeeper 为分布式系统提供了高效可靠且易于使用的协同服务,它可以为分布式应用提供相当多的服务,诸如统一命名服务,配置管理,状态同步和组服务等。 Zookeeper 接口简单,开发人员不必过多地纠结在分布式系统编程难于处理的同步和一致性问题上,你可以使用 Zookeeper 提供的现成(off-the-shelf)服务来实现分布式系统的配置管理,组管理,Leader 选举等功能。
英文原文地址:
引用
http://zookeeper.apache.org/doc/current/javaExample.html
一个简单的 Zookeeper Watch 客户端
为了介绍 Zookeeper Java API 的基本用法,本文将带你如何一步一步实现一个功能简单的 Zookeeper 客户端。该 Zookeeper 客户端会监视一个你指定 Zookeeper 节点 Znode, 当被监视的节点发生变化时,客户端会启动或者停止某一程序。
基本要求
该客户端具备四个基本要求:
客户端所带参数:
Zookeeper 服务地址。
被监视的 Znode 节点名称。
可执行程序及其所带的参数
客户端会获取被监视 Znode 节点的数据并启动你所指定的可执行程序。
如果被监视的 Znode 节点发生改变,客户端重新获取其内容并再次启动你所指定的可执行程序。
如果被监视的 Znode 节点消失,客户端会杀死可执行程序。
程序设计
一般而言,Zookeeper 应用程序分为两部分,其中一部分维护与服务器端的连接,另外一部分监视 Znode 节点的数据。在本程序中,Executor 类负责维护 Zookeeper 连接,DataMonitor 类监视 Zookeeper 目录树中的数据, 同时,Executor 包含了主线程和程序主要的执行逻辑,它负责少量的用户交互,以及与可执行程序的交互,该可执行程序接受你向它传入的参数,并且会根据被监视的 Znode 节点的状态变化停止或重启。
Executor类
Executor 对象是本例程最基本的“容器”,它包括Zookeeper 对象和DataMonitor对象。
public static void main(String[] args) { if (args.length < 4) { System.err .println("USAGE: Executor hostPort znode filename program [args ...]"); System.exit(2); } String hostPort = args[0]; String znode = args[1]; String filename = args[2]; String exec[] = new String[args.length - 3]; System.arraycopy(args, 3, exec, 0, exec.length); try { new Executor(hostPort, znode, filename, exec).run(); } catch (Exception e) { e.printStackTrace(); } } public Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); } public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } }
回忆一下 Executor 的任务是根据 Zookeeper 中 Znode 节点状态改变所触发的事件来启动和停止你在命令行指定的可执行程序, 在上面的代码你可以看到,Executor 类在其构造函数中实例化 Zookeeper 对象时,将其自身的引用作为 Watch 参数传递给 Zookeeper 的构造函数,同时它也将其自身的引用作为 DataMonitorListener 参数传递给 DataMonitor 的构造函数。Executor 本身实现了以下接口:
public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { ...
DataMonitorListener 接口本身不是Zookeeper API 的一部分,它完全是一个自定义的接口,可以说是专门为本程序设计的。DataMonitor 对象使用该接口和“容器”(即 Executor 类)进行通信,DataMonitorListener 接口如下:
public interface DataMonitorListener { /** * The existence status of the node has changed. */ void exists(byte data[]); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); }
该接口在 DataMonitor 中定义,Executor 类实现该接口,当 Executor.exists() 被调用的时候,Executor 决定是否启动或停止事先指定的应用程序(回忆一下前文所说的,当 Znode 消失时 Zookeeper 客户端会杀死该可执行程序)。
当 Executor.closing() 被调用的时候,Executor 会根据 Zookeeper 连接永久性地消失来决定是否关闭自己。
你或许已经猜到,DataMonitor 对象根据 Zookeeper 状态变化来调用这些方法吧?
以下是 Executor 类中实现 DataMonitorListener.exists() 和 DataMonitorListener.closing()的代码:
public void exists( byte[] data ) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } } public void closing(int rc) { synchronized (this) { notifyAll(); } }
DataMonitor 类
DataMonitor 类是本程序 Zookeeper 逻辑的核心, 它差不多是异步的,并由事件驱动的。DataMonitor 构造函数如下:
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); }
调用 ZooKeeper.exists() 检查指定的 Znode 是否存在,并设置监视,传递自身引用作为回调对象,在某种意义上,在 watch 触发时就会引起真实的处理流程。
当 ZooKeeper.exists() 操作在服务器端完成时,ZooKeeper API 会在客户端调用 completion callback:
public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } }
上述代码首先检查 Znode 是否存在,以及其他重大的不可恢复的错误。如果文件(或者Znode)存在,它将从 Znode 获取数据,如果状态发生变化再调用 Executor 的 exists() 回调函数。注意,getData 函数本省必须要做任何的异常处理,因为本身就有监视可以处理任何错误:如果节点在调用 ZooKeeper.getData() 之前被删除,ZooKeeper.exists() 就会触发回调函数,如果存在通信错误,在连接上的监视会在该连接重建之前触发相应的事件,同时引发相应的处理。
最后,DataMonitor 处理监视事件的代码如下:
public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don't need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It's all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } }
如果客户端 Zookeeper 程序在会话失效时(Expired event)重新建立了通信信道(SyncConnected event) ,所有的会话监视会自动和服务器进行重连, (Zookeeper 3.0.0以上版本会重置之前设置的监视). 更多编程指南请参见 ZooKeeper Watches 。 当 DataMonitor 获得了指定 Znode 的事件后,它将调用 ZooKeeper.exists() 来决定究竟发生了什么。
完整的程序:
Executor.java:
/** * A simple example program to use DataMonitor to start and * stop executables based on a znode. The program watches the * specified znode and saves the data that corresponds to the * znode in the filesystem. It also starts the specified program * with the specified arguments when the znode exists and kills * the program if the znode goes away. */ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener { String znode; DataMonitor dm; ZooKeeper zk; String filename; String exec[]; Process child; public Executor(String hostPort, String znode, String filename, String exec[]) throws KeeperException, IOException { this.filename = filename; this.exec = exec; zk = new ZooKeeper(hostPort, 3000, this); dm = new DataMonitor(zk, znode, null, this); } /** * @param args */ public static void main(String[] args) { if (args.length < 4) { System.err .println("USAGE: Executor hostPort znode filename program [args ...]"); System.exit(2); } String hostPort = args[0]; String znode = args[1]; String filename = args[2]; String exec[] = new String[args.length - 3]; System.arraycopy(args, 3, exec, 0, exec.length); try { new Executor(hostPort, znode, filename, exec).run(); } catch (Exception e) { e.printStackTrace(); } } /*************************************************************************** * We do process any events ourselves, we just need to forward them on. * * @see org.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent) */ public void process(WatchedEvent event) { dm.process(event); } public void run() { try { synchronized (this) { while (!dm.dead) { wait(); } } } catch (InterruptedException e) { } } public void closing(int rc) { synchronized (this) { notifyAll(); } } static class StreamWriter extends Thread { OutputStream os; InputStream is; StreamWriter(InputStream is, OutputStream os) { this.is = is; this.os = os; start(); } public void run() { byte b[] = new byte[80]; int rc; try { while ((rc = is.read(b)) > 0) { os.write(b, 0, rc); } } catch (IOException e) { } } } public void exists(byte[] data) { if (data == null) { if (child != null) { System.out.println("Killing process"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { } } child = null; } else { if (child != null) { System.out.println("Stopping child"); child.destroy(); try { child.waitFor(); } catch (InterruptedException e) { e.printStackTrace(); } } try { FileOutputStream fos = new FileOutputStream(filename); fos.write(data); fos.close(); } catch (IOException e) { e.printStackTrace(); } try { System.out.println("Starting child"); child = Runtime.getRuntime().exec(exec); new StreamWriter(child.getInputStream(), System.out); new StreamWriter(child.getErrorStream(), System.err); } catch (IOException e) { e.printStackTrace(); } } } }
DataMonitor.java:
/** * A simple class that monitors the data and existence of a ZooKeeper * node. It uses asynchronous ZooKeeper APIs. */ import java.util.Arrays; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.AsyncCallback.StatCallback; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.data.Stat; public class DataMonitor implements Watcher, StatCallback { ZooKeeper zk; String znode; Watcher chainedWatcher; boolean dead; DataMonitorListener listener; byte prevData[]; public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher, DataMonitorListener listener) { this.zk = zk; this.znode = znode; this.chainedWatcher = chainedWatcher; this.listener = listener; // Get things started by checking if the node exists. We are going // to be completely event driven zk.exists(znode, true, this, null); } /** * Other classes use the DataMonitor by implementing this method */ public interface DataMonitorListener { /** * The existence status of the node has changed. */ void exists(byte data[]); /** * The ZooKeeper session is no longer valid. * * @param rc * the ZooKeeper reason code */ void closing(int rc); } public void process(WatchedEvent event) { String path = event.getPath(); if (event.getType() == Event.EventType.None) { // We are are being told that the state of the // connection has changed switch (event.getState()) { case SyncConnected: // In this particular example we don't need to do anything // here - watches are automatically re-registered with // server and any watches triggered while the client was // disconnected will be delivered (in order of course) break; case Expired: // It's all over dead = true; listener.closing(KeeperException.Code.SessionExpired); break; } } else { if (path != null && path.equals(znode)) { // Something has changed on the node, let's find out zk.exists(znode, true, this, null); } } if (chainedWatcher != null) { chainedWatcher.process(event); } } public void processResult(int rc, String path, Object ctx, Stat stat) { boolean exists; switch (rc) { case Code.Ok: exists = true; break; case Code.NoNode: exists = false; break; case Code.SessionExpired: case Code.NoAuth: dead = true; listener.closing(rc); return; default: // Retry errors zk.exists(znode, true, this, null); return; } byte b[] = null; if (exists) { try { b = zk.getData(znode, false, null); } catch (KeeperException e) { // We don't need to worry about recovering now. The watch // callbacks will kick off any exception handling e.printStackTrace(); } catch (InterruptedException e) { return; } } if ((b == null && b != prevData) || (b != null && !Arrays.equals(prevData, b))) { listener.exists(b); prevData = b; } } }
发表评论
-
ZooKeeper 分布式锁实现
2015-01-14 10:23 961场景描述 在分布式应 ... -
Zookeeper 分布式配置管理
2015-01-13 17:27 1109原创 配置中心代码: import java.io.IOE ... -
Zookeeper 进阶之——Zookeeper编程示例(如何使用Zookeeper实现屏障Barriers和队列Queues)
2015-01-13 09:36 468引用原文:http://www.cnblogs.com/hai ... -
ZooKeeper 典型的应用场景(二)
2015-01-12 17:26 386原文:http://www.cnblogs.com ... -
ZooKeeper 典型的应用场景
2015-01-12 17:23 491引用原文:http://www.cnblo ... -
zookeeper java-api
2015-01-12 17:14 535原文:http://www.cnblogs.com ... -
zookeeper的伪分布式搭建
2015-01-12 16:36 594引用原文:http://www.cnblo ...
相关推荐
Dubbo入门实例Demo
本文主要通过实例给大家详细分析了ZooKeeper用JAVA实现API编程的知识要点。
1)Zookeeper公司内部交流学习、培训PPT【ZooKeeper前言、是什么&为什么要用、ZAB、4大应用场景分析&JAVA实例Demo Code、3.4.9版本源代码Eclipse创建】 2)Eclipse 支持的 3.4.9版本源代码 JAVA Project
主要介绍了Apache Zookeeper使用方法的有关实例及详解,包含了其简介和程序设计的一些思想,内容丰富,需要的朋友可以参考下。
ZooKeeperStart系统环境jdk版本:jdk1.8zookeeper版本: 3.4.11操作系统版本:Ubuntu 5.4.0zookeeper常见知识权限create:创建子节点read:获取节点/子节点write:设置节点数据delete:删除子节点admin:设置权限打开...
主要介绍了在Java中操作Zookeeper的示例代码详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
关于dubbo+zookeeper的一个小实例,很适合刚接触dubbo的新手来进行学习
zookeeper的开发接口以前主要以java和c为主,随着python项目越来越多的使用zookeeper作为分布式集群实现,python的zookeeper接口也出现了很多,现在主流的纯python的zookeeper接口是kazoo。因此如何使用kazoo开发...
springboot-dubbo Springboot和dubbo整合的一个项目Demo
实现了对Zookeeper实例的统一管理。 前台使用了Bootstrap,基于gentelella主题实现。后台使用了Spring MVC,Mybatis,Curator等技术。Jar包管理通过Maven来实现。数据库选用了Mysql。 下载安装 下载 通过如下命令从...
【一线互联网大厂Java核心面试题库】Java基础、异常、集合、并发编程、JVM、Spring全家桶、MyBatis、Redis、数据库、中间件MQ、Dubbo、Linux、Tomcat、ZooKeeper、Netty等等..
Java客户端实现Kafka生产者与消费者实例 kafka的副本机制及选举原理剖析 基于kafka实现应用日志实时上报统计分析 RabbitMQ 初步认识RabbitMQ及高可用集群部署 详解RabbitMQ消息分发机制及主题消息分发 ...
修改它以指向zookeeper实例。 多个zk实例以逗号分隔。 例如:server1:2181,server2:2181。 第一台服务器应始终是领导者。 运行罐子。 (nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar和) 登录...
zookeeperDemo zookeeper节点操作 zookeeper节点操作 节点创建,删除,遍历 节点数据读写 节点类型 节点类型 持久节点 PERSISTENT 持久顺序节点 PERSISTENT_SEQUENTIAL 临时节点 EPHEMERAL 临时顺序节点 EPHEMERAL_...
主要介绍了为zookeeper配置相应的acl权限的相关实例,具有一定参考价值,需要的朋友可以了解下。
渠道架构模型-java,实例项目,用到dubbo+zookeeper分布式,稍后上传架构图
kafka_2.11-1.0.0 版本,java,spring连接实例程序,windows自动构建,无需下载安装包,一键完成jar包导入。
一个普通java工程, springboot、dubbo、zookeeper、redis、MQ、mybatis、druid、swagger 项目实例
这是一个参考了网上许多人的代码后,自己拼拼凑凑整合起来的一套dubbo应用实例,包含了生产者和消费者。用到了zookeeper、spring、springMVC、mybatis、dubbo等技术。详细的说明请查看同名文章。
自己看资料做了个dubbo实例测试,简述dubbo的流程,dubbo在zookeeper上注册