`
liudunxu2
  • 浏览: 31978 次
  • 性别: Icon_minigender_1
  • 来自: 青岛
文章分类
社区版块
存档分类
最新评论

zkClient的并发控制

 
阅读更多

1.ZkConnection实现IZkConnection,是org.apache.zookeeper.ZooKeeper的代理类,通过ReentrantLock实现connect和close的并发控制。因为connect方法和close方法对成员变量_zk进行了赋值和判空操作,所有要进行并发控制。代码如下:

  @Override
    public void connect(Watcher watcher) {
        _zookeeperLock.lock();
        try {
            if (_zk != null) {
                throw new IllegalStateException("zk client has already been started");
            }
            try {
                LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");
                _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);
            } catch (IOException e) {
                throw new ZkException("Unable to connect to " + _servers, e);
            }
        } finally {
            _zookeeperLock.unlock();
        }
    }

    public void close() throws InterruptedException {
        _zookeeperLock.lock();
        try {
            if (_zk != null) {
                LOG.debug("Closing ZooKeeper connected to " + _servers);
                _zk.close();
                _zk = null;
            }
        } finally {
            _zookeeperLock.unlock();
        }
    }

2.org.I0Itec.zkclient.ZkLock继承jdk自旋锁ReentrantLock,内部定义三个java.util.concurrent.locks.Condition的成员变量,分别实现节点数据改变,节点状态改变和节点改变三种condition,java的condition介绍可以参考
http://blog.sina.com.cn/s/blog_87fc744801018q7l.html,在ZkClient里,包含一个ZkLock的成员变量,用于实现zkClient的并发控制,比如zkClient的waitUntilExists方法:

 public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) throws ZkInterruptedException {
        Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
        LOG.debug("Waiting until znode '" + path + "' becomes available.");
        if (exists(path)) {
            return true;
        }
        acquireEventLock();
        try {
            while (!exists(path, true)) {
                boolean gotSignal = getEventLock().getZNodeEventCondition().awaitUntil(timeout);
                if (!gotSignal) {
                    return false;
                }
            }
            return true;
        } catch (InterruptedException e) {
            throw new ZkInterruptedException(e);
        } finally {
            getEventLock().unlock();
        }
    }

如果path未创建成功,zkNodeEventCondition会wait一段时间,然后process方法负责释放condition,如下:

 public void process(WatchedEvent event) {
        LOG.debug("Received event: " + event);
        _zookeeperEventThread = Thread.currentThread();

        boolean stateChanged = event.getPath() == null;
        boolean znodeChanged = event.getPath() != null;
        boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated
                || event.getType() == EventType.NodeChildrenChanged;

        getEventLock().lock();
        try {

            // We might have to install child change event listener if a new node was created
            if (getShutdownTrigger()) {
                LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
                return;
            }
            if (stateChanged) {
                processStateChanged(event);
            }
            if (dataChanged) {
                processDataOrChildChange(event);
            }
        } finally {
            if (stateChanged) {
                getEventLock().getStateChangedCondition().signalAll();

                // If the session expired we have to signal all conditions, because watches might have been removed and
                // there is no guarantee that those
                // conditions will be signaled at all after an Expired event
                // TODO PVo write a test for this
                if (event.getState() == KeeperState.Expired) {
                    getEventLock().getZNodeEventCondition().signalAll();
                    getEventLock().getDataChangedCondition().signalAll();
                    // We also have to notify all listeners that something might have changed
                    fireAllEvents();
                }
            }
            if (znodeChanged) {
                getEventLock().getZNodeEventCondition().signalAll();
            }
            if (dataChanged) {
                getEventLock().getDataChangedCondition().signalAll();
            }
            getEventLock().unlock();
            LOG.debug("Leaving process event");
        }
    }


分享到:
评论

相关推荐

    zkclient 1.0 源码包

    4. **并发控制**: zkclient 在多线程环境下的同步和并发控制策略,如使用 ReentrantReadWriteLock 和 Semaphore。 5. **异常处理**: zkclient 对 ZooKeeper 异常的捕获和处理,以及如何保证在异常情况下能优雅地...

    zkclient jar包

    9. **版本控制**:ZKClient可以处理ZooKeeper中的版本信息,允许对节点数据进行版本控制,防止数据冲突。 10. **集群选举**:在分布式系统中,ZKClient可以配合ZooKeeper进行集群选举,确定服务的主节点,实现负载...

    基于zookeeper实现的分布式读写锁

    总之,基于Zookeeper的分布式读写锁是解决分布式系统中并发控制的有效手段,结合Zkclient,我们可以更加便捷地实现这一功能。通过理解Zookeeper的工作原理和读写锁的实现机制,开发者能够在复杂环境中保证数据的一致...

    通过zookeeper实现分布式锁

    分布式锁是解决分布式系统中并发控制的关键组件。Zookeeper 实现分布式锁的核心思路如下: 1. **创建临时节点**:每个需要获取锁的客户端在 Zookeeper 的指定路径下创建一个临时节点。由于 Zookeeper 节点的唯一性...

    ZooKeeper 客户端的使用(二).

    ZooKeeper 支持 SASL 认证和 ACL(Access Control Lists)权限控制,可以对不同用户或角色设置不同的访问权限,确保数据的安全。 8. **ZooKeeper 集群** 在实际应用中,ZooKeeper 通常运行在多个服务器组成的集群...

    zookeeper安装包(内附客户端连接工具)

    - **分布式锁**:Zookeeper提供了两种类型的锁,一种是分布式互斥锁,用于控制分布式环境下的并发操作;另一种是顺序锁,用于控制资源的访问顺序。 - **集群管理**:Zookeeper可以监控集群中节点的状态,帮助实现...

    zookeeper第三节课1

    分布式锁可以帮助解决在分布式系统中多个节点同时执行同一操作的问题,确保同一时刻只有一个节点持有锁,从而避免并发冲突。 在服务注册与发现的场景中,Zookeeper可以作为一个服务中心,每个服务实例可以在特定的...

    第三课:zookeeper 典型使用场景实践1

    - **目的**:在分布式环境中,多节点可能会并发访问共享资源,分布式锁用于解决这类并发控制问题,确保同一时刻只有一个节点能持有锁。 - **实现原理**:Zookeeper提供了创建临时顺序节点的功能,通过比较节点顺序...

    zk第三节-解答1

    - 分布式事务的性能优化通常涉及事务的并发控制、两阶段提交等技术,而Zookeeper主要提供分布式一致性保障,不直接处理事务性能问题。 - 在某些场景下,Zookeeper可以辅助实现分布式事务的一致性,但解决性能问题...

    zookeeper python接口实例详解

    下面是一个简单的 zkpython 客户端示例(`zkclient.py`): ```python import zookeeper, time, threading from collections import namedtuple # 定义默认超时时间、权限和连接状态映射 DEFAULT_TIMEOUT = 30000 ...

    zookeeper的分布式全局锁纯代码解决方案

    分布式全局锁是分布式系统中一个重要的同步控制工具,它允许在多节点环境下,对共享资源进行独占式访问,防止并发问题。Zookeeper,作为Apache的一个高性能、高可用的分布式协调服务,常被用于实现这样的功能。在这...

    zookeeper的jar包

    1. **guava-20.0-hal.jar**:这是Google的Guava库,提供了许多Java集合框架的增强功能,如并发、缓存、原生类型支持、函数式编程工具等。在Zookeeper中,Guava用于提供高效的实用工具类和数据结构。 2. **zookeeper...

    1000道 互联网Java工程师面试题 485页

    - **第三方客户端**:如Curator、ZkClient等。 #### 24、chubby是什么,和zookeeper比你怎么看? - **chubby**:Google的分布式锁服务。 - **比较**:两者都提供了一致性服务,但chubby更专注于锁服务,而...

Global site tag (gtag.js) - Google Analytics