`
乡里伢崽
  • 浏览: 108653 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

Zookeeper 进阶之——Zookeeper编程示例(如何使用Zookeeper实现屏障Barriers和队列Queues)

 
阅读更多
引用
原文:http://www.cnblogs.com/haippy/archive/2012/07/26/2609769.html

引言

本文将告诉你如何使用 Zookeeper 实现两种常用的分布式数据结构,屏障(barriers) 和队列(queues),我们为此还分别实现了两个类:Barrier and Queue. 本文中的例子假设你已经成功运行了Zookeeper服务器。

上述两种最基本的原语都使用了下面的常见编码规范:

static ZooKeeper zk = null;
    static Integer mutex;

    String root;

    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
    }

    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

以上两个类都扩展了 SyncPrimitive. In this way, we execute steps that are common to all primitives in the constructor of SyncPrimitive. To keep the examples simple, we create a ZooKeeper object the first time we instantiate either a barrier object or a queue object, and we declare a static variable that is a reference to this object. The subsequent instances of Barrier and Queue check whether a ZooKeeper object exists. Alternatively, we could have the application creating a ZooKeeper object and passing it to the constructor of Barrier and Queue.

We use the process() method to process notifications triggered due to watches. In the following discussion, we present code that sets watches. A watch is internal structure that enables ZooKeeper to notify a client of a change to a node. For example, if a client is waiting for other clients to leave a barrier, then it can set a watch and wait for modifications to a particular node, which can indicate that it is the end of the wait. This point becomes clear once we go over the examples.

Barriers

A barrier is a primitive that enables a group of processes to synchronize the beginning and the end of a computation. The general idea of this implementation is to have a barrier node that serves the purpose of being a parent for individual process nodes. Suppose that we call the barrier node "/b1". Each process "p" then creates a node "/b1/p". Once enough processes have created their corresponding nodes, joined processes can start the computation.

In this example, each process instantiates a Barrier object, and its constructor takes as parameters:

the address of a ZooKeeper server (e.g., "zoo1.foo.com:2181")

the path of the barrier node on ZooKeeper (e.g., "/b1")

the size of the group of processes

The constructor of Barrier passes the address of the Zookeeper server to the constructor of the parent class. The parent class creates a ZooKeeper instance if one does not exist. The constructor of Barrier then creates a barrier node on ZooKeeper, which is the parent node of all process nodes, and we call root (Note: This is not the ZooKeeper root "/").


/**
         * Barrier constructor
         *
         * @param address
         * @param root
         * @param size
         */
        Barrier(String address, String root, int size) {
            super(address);
            this.root = root;
            this.size = size;

            // Create barrier node
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }

            // My node name
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                System.out.println(e.toString());
            }

        }


To enter the barrier, a process calls enter(). The process creates a node under the root to represent it, using its host name to form the node name. It then wait until enough processes have entered the barrier. A process does it by checking the number of children the root node has with "getChildren()", and waiting for notifications in the case it does not have enough. To receive a notification when there is a change to the root node, a process has to set a watch, and does it through the call to "getChildren()". In the code, we have that "getChildren()" has two parameters. The first one states the node to read from, and the second is a boolean flag that enables the process to set a watch. In the code the flag is true.

/**
         * Join barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean enter() throws KeeperException, InterruptedException{
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);

                    if (list.size() < size) {
                        mutex.wait();
                    } else {
                        return true;
                    }
                }
            }
        }



Note that enter() throws both KeeperException and InterruptedException, so it is the reponsability of the application to catch and handle such exceptions.

Once the computation is finished, a process calls leave() to leave the barrier. First it deletes its corresponding node, and then it gets the children of the root node. If there is at least one child, then it waits for a notification (obs: note that the second parameter of the call to getChildren() is true, meaning that ZooKeeper has to set a watch on the the root node). Upon reception of a notification, it checks once more whether the root node has any child.

/**
         * Wait until all reach barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean leave() throws KeeperException, InterruptedException{
            zk.delete(root + "/" + name, 0);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                        if (list.size() > 0) {
                            mutex.wait();
                        } else {
                            return true;
                        }
                    }
                }
        }
    }



Producer-Consumer Queues

A producer-consumer queue is a distributed data estructure thata group of processes use to generate and consume items. Producer processes create new elements and add them to the queue. Consumer processes remove elements from the list, and process them. In this implementation, the elements are simple integers. The queue is represented by a root node, and to add an element to the queue, a producer process creates a new node, a child of the root node.

The following excerpt of code corresponds to the constructor of the object. As with Barrier objects, it first calls the constructor of the parent class, SyncPrimitive, that creates a ZooKeeper object if one doesn't exist. It then verifies if the root node of the queue exists, and creates if it doesn't.

/**
         * Constructor of producer-consumer queue
         *
         * @param address
         * @param name
         */
        Queue(String address, String name) {
            super(address);
            this.root = name;
            // Create ZK node name
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }


A producer process calls "produce()" to add an element to the queue, and passes an integer as an argument. To add an element to the queue, the method creates a new node using "create()", and uses the SEQUENCE flag to instruct ZooKeeper to append the value of the sequencer counter associated to the root node. In this way, we impose a total order on the elements of the queue, thus guaranteeing that the oldest element of the queue is the next one consumed.

/**
         * Add element to the queue.
         *
         * @param i
         * @return
         */

        boolean produce(int i) throws KeeperException, InterruptedException{
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;

            // Add child with value i
            b.putInt(i);
            value = b.array();
            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT_SEQUENTIAL);

            return true;
        }



To consume an element, a consumer process obtains the children of the root node, reads the node with smallest counter value, and returns the element. Note that if there is a conflict, then one of the two contending processes won't be able to delete the node and the delete operation will throw an exception.

A call to getChildren() returns the list of children in lexicographic order. As lexicographic order does not necessary follow the numerical order of the counter values, we need to decide which element is the smallest. To decide which one has the smallest counter value, we traverse the list, and remove the prefix "element" from each one.


/**
         * Remove first element from the queue.
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        int consume() throws KeeperException, InterruptedException{
            int retvalue = -1;
            Stat stat = null;

            // Get the first element available
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                    if (list.size() == 0) {
                        System.out.println("Going to wait");
                        mutex.wait();
                    } else {
                        Integer min = new Integer(list.get(0).substring(7));
                        for(String s : list){
                            Integer tempValue = new Integer(s.substring(7));
                            //System.out.println("Temporary value: " + tempValue);
                            if(tempValue < min) min = tempValue;
                        }
                        System.out.println("Temporary value: " + root + "/element" + min);
                        byte[] b = zk.getData(root + "/element" + min,
                                    false, stat);
                        zk.delete(root + "/element" + min, 0);
                        ByteBuffer buffer = ByteBuffer.wrap(b);
                        retvalue = buffer.getInt();

                        return retvalue;
                    }
                }
            }
        }
    }


完整例子

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

public class SyncPrimitive implements Watcher {

    static ZooKeeper zk = null;
    static Integer mutex;

    String root;

    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
        //else mutex = new Integer(-1);
    }

    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            //System.out.println("Process: " + event.getType());
            mutex.notify();
        }
    }

    /**
     * Barrier
     */
    static public class Barrier extends SyncPrimitive {
        int size;
        String name;

        /**
         * Barrier constructor
         *
         * @param address
         * @param root
         * @param size
         */
        Barrier(String address, String root, int size) {
            super(address);
            this.root = root;
            this.size = size;

            // Create barrier node
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }

            // My node name
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                System.out.println(e.toString());
            }

        }

        /**
         * Join barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean enter() throws KeeperException, InterruptedException{
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);

                    if (list.size() < size) {
                        mutex.wait();
                    } else {
                        return true;
                    }
                }
            }
        }

        /**
         * Wait until all reach barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean leave() throws KeeperException, InterruptedException{
            zk.delete(root + "/" + name, 0);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                        if (list.size() > 0) {
                            mutex.wait();
                        } else {
                            return true;
                        }
                    }
                }
        }
    }

    /**
     * Producer-Consumer queue
     */
    static public class Queue extends SyncPrimitive {

        /**
         * Constructor of producer-consumer queue
         *
         * @param address
         * @param name
         */
        Queue(String address, String name) {
            super(address);
            this.root = name;
            // Create ZK node name
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }

        /**
         * Add element to the queue.
         *
         * @param i
         * @return
         */

        boolean produce(int i) throws KeeperException, InterruptedException{
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;

            // Add child with value i
            b.putInt(i);
            value = b.array();
            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT_SEQUENTIAL);

            return true;
        }


        /**
         * Remove first element from the queue.
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        int consume() throws KeeperException, InterruptedException{
            int retvalue = -1;
            Stat stat = null;

            // Get the first element available
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                    if (list.size() == 0) {
                        System.out.println("Going to wait");
                        mutex.wait();
                    } else {
                        Integer min = new Integer(list.get(0).substring(7));
                        for(String s : list){
                            Integer tempValue = new Integer(s.substring(7));
                            //System.out.println("Temporary value: " + tempValue);
                            if(tempValue < min) min = tempValue;
                        }
                        System.out.println("Temporary value: " + root + "/element" + min);
                        byte[] b = zk.getData(root + "/element" + min,
                                    false, stat);
                        zk.delete(root + "/element" + min, 0);
                        ByteBuffer buffer = ByteBuffer.wrap(b);
                        retvalue = buffer.getInt();

                        return retvalue;
                    }
                }
            }
        }
    }

    public static void main(String args[]) {
        if (args[0].equals("qTest"))
            queueTest(args);
        else
            barrierTest(args);

    }

    public static void queueTest(String args[]) {
        Queue q = new Queue(args[1], "/app1");

        System.out.println("Input: " + args[1]);
        int i;
        Integer max = new Integer(args[2]);

        if (args[3].equals("p")) {
            System.out.println("Producer");
            for (i = 0; i < max; i++)
                try{
                    q.produce(10 + i);
                } catch (KeeperException e){

                } catch (InterruptedException e){

                }
        } else {
            System.out.println("Consumer");

            for (i = 0; i < max; i++) {
                try{
                    int r = q.consume();
                    System.out.println("Item: " + r);
                } catch (KeeperException e){
                    i--;
                } catch (InterruptedException e){

                }
            }
        }
    }

    public static void barrierTest(String args[]) {
        Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
        try{
            boolean flag = b.enter();
            System.out.println("Entered barrier: " + args[2]);
            if(!flag) System.out.println("Error when entering the barrier");
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }

        // Generate random integer
        Random rand = new Random();
        int r = rand.nextInt(100);
        // Loop for rand iterations
        for (int i = 0; i < r; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {

            }
        }
        try{
            b.leave();
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }
        System.out.println("Left barrier");
    }
}
分享到:
评论

相关推荐

    Zookeeper 进阶之——典型应用场景(一)1

    Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 get

    Zookeeper 进阶之——典型应用场景(二)1

    该进程(节点)创建第 x 个节点——即最后的进程(节点),它将会看到 x 个节点,并唤醒其他进程(节点),注意,所有的等待进程(节点)只是在退出的时候被唤醒,所

    zookeeper 配置中心——利用ZkConfig实现分布式配置文件实时同步

    NULL 博文链接:https://quanzhong.iteye.com/blog/2203256

    zookeeper 经典应用设计 示例代码

    zookeeper 经典应用设计 锁、同步和队列分析

    zookeeper基础进阶&分布式集群部署

    zookeeper基础进阶&分布式集群部署,xmind文件,包含zk基础知识,linux环境下分布式集群安装部署,以及进阶内容

    1、zookeeper3.7.1安装与验证

    zookeeper集群需要奇数台机器,本示例使用3台机器,机器名称分别为server1、server2、server3。 本集群是centos6版本。 本示例是在已经安装完jdk8的环境中进行操作的,如果没有安装jdk,则需要提前安装jdk

    使用ZooKeeper实现软负载均衡示例

    使用zookeeper实现软负载均衡的一个示例,代码说明见http://blog.csdn.net/autfish/article/details/51576695

    《Paxos到Zookeeper——分布式一致性原理与实践》高清完整版

    《Paxos到Zookeeper——分布式一致性原理与实践》,学习zookeeper必备。该资源仅供大家了解书的内容,如果真有兴趣深入学习,建议购买正版书籍。

    zookeeper集群的配置文件示例

    zookeeper集群的配置文件示例,zoo.cfg,里面已经配置好了,一看就懂。只要把该文件放在zookeeper目录下的conf文件夹下即可

    从Paxos到Zookeeper

    同时,本书深入介绍了分布式一致性问题的工业解决方案——ZooKeeper,并着重向读者展示这一分布式协调框架的使用方法、内部实现及运维技巧,旨在帮助读者全面了解ZooKeeper,并更好地使用和运维ZooKeeper。...

    zookeeper示例代码。

    zookeeper简单示例代码,包括对象、节点、通信协议、序列化、acl权限、curator应用、zkclient应用等。

    C#使用Zookeeper示例

    1. 使用前先Nuget搜索ZooKeeper.Net安装 2. 安装ZooKeeper,百度自行搜索“Windows下安装ZooKeeper” 3. 下载路径:https://mirrors.cnnic.cn/apache/zookeeper/

    zookeeper-3.4.11.tar.gz

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、...

    最简单的Dubbo案例之二:SpringBoot + dubbo 无zookeeper方式点对点直连

    本项目只适合dubbo入门学习者,高手请不要浪费金钱; 本项目技术栈 springboot, dubbo ,无 zookeeper 本项目旨在提供最单纯的 dubbo 服务提供者 和消费者的点对点直连,而摒弃任何多余技术对dubbo直连的理解

    zookeeper日志查看工具

    #Zookeeper的日志可以用LogFormatter查看 ##命令方式如下 java -classpath .:slf4j-api-1.7.2.jar:zookeeper-3.4.6.jar org.apache.zookeeper.server.LogFormatter /var/lib/zookeeper/version-2/log.1 ##window...

    springboot-dubbo-zookeeper完整示例

    springboot-dubbo-zookeeper完整示例: 服务端、消费端,包含zookeeper安装包,可直接运行查看

    实现zookeeper集群节点管理及Master选举完整示例java项目下载(含依赖jar包).txt

    该项目通过zookeeper三个节点Node服务客户端代码,实现zookeeper集群管理与Master选举功能示例,项目结构如下图所示,其中依赖包包含:log4j-1.2.14.jar、slf4j-api-1.7.2.jar、slf4j-log4j12-1.7.2.jar、zookeeper-...

    ZooKeeper3.7.0(ZooKeeper-client)编译的头文件和库

    linux下编译zookeeper3.7.0出的头文件和库: proto.h recordio.h zookeeper.h zookeeper.jute.h zookeeper_log.h zookeeper_version.h libzookeeper_mt.a libzookeeper_mt.la libzookeeper_mt.so libzookeeper_mt....

    ZooKeeper典型使用场景

    基于对Paxos 算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得zookeeper 能够应用于很多场景。网上对zk的使用场景也有不少介绍,本文将结合作者身边的项目例子,系统的对zk 的...

    ZooKeeper入门简介及配置使用

    可以建⽴立在同步、配置管理、分组和命名等服务的更⾼高级别的实现的基础之上。 ZooKeeper 意欲设计⼀一个易于编程的环境,它的⽂文件系统使⽤用我们所熟悉的⽬目录树结构。 ZooKeeper 使⽤用 Java 所编写,但是⽀...

Global site tag (gtag.js) - Google Analytics