`
liudunxu2
  • 浏览: 30636 次
  • 性别: Icon_minigender_1
  • 来自: 青岛
文章分类
社区版块
存档分类
最新评论

zkClient的并发控制

 
阅读更多

1.ZkConnection实现IZkConnection,是org.apache.zookeeper.ZooKeeper的代理类,通过ReentrantLock实现connect和close的并发控制。因为connect方法和close方法对成员变量_zk进行了赋值和判空操作,所有要进行并发控制。代码如下:

  @Override
    public void connect(Watcher watcher) {
        _zookeeperLock.lock();
        try {
            if (_zk != null) {
                throw new IllegalStateException("zk client has already been started");
            }
            try {
                LOG.debug("Creating new ZookKeeper instance to connect to " + _servers + ".");
                _zk = new ZooKeeper(_servers, _sessionTimeOut, watcher);
            } catch (IOException e) {
                throw new ZkException("Unable to connect to " + _servers, e);
            }
        } finally {
            _zookeeperLock.unlock();
        }
    }

    public void close() throws InterruptedException {
        _zookeeperLock.lock();
        try {
            if (_zk != null) {
                LOG.debug("Closing ZooKeeper connected to " + _servers);
                _zk.close();
                _zk = null;
            }
        } finally {
            _zookeeperLock.unlock();
        }
    }

2.org.I0Itec.zkclient.ZkLock继承jdk自旋锁ReentrantLock,内部定义三个java.util.concurrent.locks.Condition的成员变量,分别实现节点数据改变,节点状态改变和节点改变三种condition,java的condition介绍可以参考
http://blog.sina.com.cn/s/blog_87fc744801018q7l.html,在ZkClient里,包含一个ZkLock的成员变量,用于实现zkClient的并发控制,比如zkClient的waitUntilExists方法:

 public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) throws ZkInterruptedException {
        Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
        LOG.debug("Waiting until znode '" + path + "' becomes available.");
        if (exists(path)) {
            return true;
        }
        acquireEventLock();
        try {
            while (!exists(path, true)) {
                boolean gotSignal = getEventLock().getZNodeEventCondition().awaitUntil(timeout);
                if (!gotSignal) {
                    return false;
                }
            }
            return true;
        } catch (InterruptedException e) {
            throw new ZkInterruptedException(e);
        } finally {
            getEventLock().unlock();
        }
    }

如果path未创建成功,zkNodeEventCondition会wait一段时间,然后process方法负责释放condition,如下:

 public void process(WatchedEvent event) {
        LOG.debug("Received event: " + event);
        _zookeeperEventThread = Thread.currentThread();

        boolean stateChanged = event.getPath() == null;
        boolean znodeChanged = event.getPath() != null;
        boolean dataChanged = event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted || event.getType() == EventType.NodeCreated
                || event.getType() == EventType.NodeChildrenChanged;

        getEventLock().lock();
        try {

            // We might have to install child change event listener if a new node was created
            if (getShutdownTrigger()) {
                LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath() + "}' since shutdown triggered");
                return;
            }
            if (stateChanged) {
                processStateChanged(event);
            }
            if (dataChanged) {
                processDataOrChildChange(event);
            }
        } finally {
            if (stateChanged) {
                getEventLock().getStateChangedCondition().signalAll();

                // If the session expired we have to signal all conditions, because watches might have been removed and
                // there is no guarantee that those
                // conditions will be signaled at all after an Expired event
                // TODO PVo write a test for this
                if (event.getState() == KeeperState.Expired) {
                    getEventLock().getZNodeEventCondition().signalAll();
                    getEventLock().getDataChangedCondition().signalAll();
                    // We also have to notify all listeners that something might have changed
                    fireAllEvents();
                }
            }
            if (znodeChanged) {
                getEventLock().getZNodeEventCondition().signalAll();
            }
            if (dataChanged) {
                getEventLock().getDataChangedCondition().signalAll();
            }
            getEventLock().unlock();
            LOG.debug("Leaving process event");
        }
    }


分享到:
评论

相关推荐

    数据库的并发控制 ppt 课件

    数据库的并发控制  数据库并发控制的含义  事务  并发控制的必要性 基于封锁的并发控制技术  基于时间戳的并发控制技术

    数据库并发控制PPT

    数据库并发控制PPT 并发控制的机制和方法

    数据库并发控制及SQL

    数据库并发控制及SQL 包含内容包括:数据库并发控制及SQL Server的并发控制机制 事务及并发控制的基本概念 封锁机制 SQL Server的并发控制机制 等等

    分布式数据库并发控制

    分布式数据库并发控制分布式数据库并发控制

    Hibernate事务和并发控制

    Hibernate事务和并发控制Hibernate事务和并发控制Hibernate事务和并发控制

    数据库思维导图——并发控制

    数据库思维导图——并发控制 并发控制 多事务执行方式 (1)事务串行执行 每个时刻只有一个事务运行,其他事务必须等到这个事务结束以后方能运行 不能充分利用系统资源,发挥数据库共享资源的特点 (2)交叉并发方式...

    数据库并发处理控制 pdf

    8.1 并发控制概述 8.2 封锁 8.3 封锁协议 8.4 活锁和死锁 8.5 并发调度的可串行性 8.6 两段锁协议 8.7 封锁的粒度 8.8 Oracle的并发控制 8.9 小结

    数据库并发控制sqlserver2000

    关于数据库并发控制的一些介绍 数据库并发控制 sql server

    数据库并发控制的基本方法

    讲述了数据库中的并发控制,包括事务处理,加锁机制,以及恢复,日志记录

    论文研究-一种面向多租户中间件的应用级并发控制方法.pdf

    针对在多租户中间件上存在的租户隔离、资源侵占的问题, 提出一种面向多租户中间件的应用级并发控制方法。首先分析了现有并发控制方法的局限性, 然后介绍了一种基于工作管理器的多租户请求处理模型的原理及设计, ...

    事务并发控制

    并发控制指的是当多个用户同时更新运行时,用于保护数据库完整性的各种技术。并发机制不正确可能导致脏读、幻读和不可重复读等此类问题。并发控制的目的是保证一个用户的工作不会对另一个用户的工作产生不合理的影响...

    数据库PPT关于并发控制

    关于数据库并发控制,该PPT主要以高等教育数据库系统概论为基础,介绍了锁的使用

    增加并发控制后的globalmem程序

    增加并发控制后的globalmem程序

    Oracle中使用锁进行并发控制

    Oracle中使用锁进行并发控制

    sql并发控制

    并发控制机制调度并发事务操作是否正确的判别准则是可串行性 并发操作的正确性则通常由两段锁协议来保证。 两段锁协议是可串行化调度的充分条件,但不是必要条件

    并发控制—悲观锁和乐观锁

    并发控制—悲观锁和乐观锁

    数据库恢复和并发控制

    掌握事务的概念及性质 掌握数据库恢复的基本技术和策略 ...2、掌握并发控制的技术:封锁机制、三级封锁协议、 活锁的避免、死锁的预防、诊断及解除 3、掌握并发调度的正确性标准和技术(可串行性、 两段锁协议)

    资源竞争与并发控制

    非常浅显的讲述《资源竞争与并发控制》。 1.应用层并发控制?不是重点 2.Lost Update?乐观锁与悲观锁 3.隔离级别:脏读Dirty Read+幻读Phantom Read 4.InnoDB锁与多版本控制? 5.意向锁? 6.事务传播行为?

    分布式数据库 第八章 分布式并发控制

    分布式数据库 分布式数据库系统 第八章 分布式并发控制 东北大学

    分布式事务的并发控制

    这是一节关于研究生研一分布式设计课程中的课件,里面的内容是关于分布式事务的并发控制。

Global site tag (gtag.js) - Google Analytics