`
agapple
  • 浏览: 1583612 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

基于zookeeper的分布式lock实现

 
阅读更多

背景

 继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,那对应的分布式lock实现在所难免。

 

 这一周,在基于BooleanMutex的基础上,实现了zookeeper的分布式锁,用于控制多进程+多线程的lock控制

 

算法

可以预先看一下zookeeper的官方文档: 

 

lock操作过程:
  • 首先为一个lock场景,在zookeeper中指定对应的一个根节点,用于记录资源竞争的内容
  • 每个lock创建后,会lazy在zookeeper中创建一个node节点,表明对应的资源竞争标识。 (小技巧:node节点为EPHEMERAL_SEQUENTIAL,自增长的临时节点)
  • 进行lock操作时,获取对应lock根节点下的所有字节点,也即处于竞争中的资源标识
  • 按照Fair竞争的原则,按照对应的自增内容做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id是否就为owner id,如果是则返回,lock成功。
  • 如果自己非owner id,按照排序的结果找到序号比自己前一位的id,关注它锁释放的操作(也就是exist watcher),形成一个链式的触发过程。
unlock操作过程:
  • 将自己id对应的节点删除即可,对应的下一个排队的节点就可以收到Watcher事件,从而被唤醒得到锁后退出

其中的几个关键点:
  1. node节点选择为EPHEMERAL_SEQUENTIAL很重要。
    * 自增长的特性,可以方便构建一个基于Fair特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免"惊群效应"(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。
    * 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。
  2. 获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,正好使用上一篇文章中提到的BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)

注意:
  • 使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
  • 同样使用PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象。
没有两全其美的做法,两者取其一,选择自己一个能接受的即可

 

代码

public class DistributedLock {

    private static final byte[]  data      = { 0x12, 0x34 };
    private ZooKeeperx           zookeeper = ZooKeeperClient.getInstance();
    private final String         root;                                     //根节点路径
    private String               id;
    private LockNode             idName;
    private String               ownerId;
    private String               lastChildId;
    private Throwable            other     = null;
    private KeeperException      exception = null;
    private InterruptedException interrupt = null;

    public DistributedLock(String root) {
        this.root = root;
        ensureExists(root);
    }

    /**
     * 尝试获取锁操作,阻塞式可被中断
     */
    public void lock() throws InterruptedException, KeeperException {
        // 可能初始化的时候就失败了
        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            throw interrupt;
        }

        if (other != null) {
            throw new NestableRuntimeException(other);
        }

        if (isOwner()) {//锁重入
            return;
        }

        BooleanMutex mutex = new BooleanMutex();
        acquireLock(mutex);
        // 避免zookeeper重启后导致watcher丢失,会出现死锁使用了超时进行重试
        try {
            mutex.get(DEFAULT_TIMEOUT_PERIOD, TimeUnit.MICROSECONDS);// 阻塞等待值为true
            // mutex.get();
        } catch (TimeoutException e) {
            if (!mutex.state()) {
                lock();
            }
        }

        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            throw interrupt;
        }

        if (other != null) {
            throw new NestableRuntimeException(other);
        }
    }

    /**
     * 尝试获取锁对象, 不会阻塞
     * 
     * @throws InterruptedException
     * @throws KeeperException
     */
    public boolean tryLock() throws KeeperException {
        // 可能初始化的时候就失败了
        if (exception != null) {
            throw exception;
        }

        if (isOwner()) {//锁重入
            return true;
        }

        acquireLock(null);

        if (exception != null) {
            throw exception;
        }

        if (interrupt != null) {
            Thread.currentThread().interrupt();
        }

        if (other != null) {
            throw new NestableRuntimeException(other);
        }

        return isOwner();
    }

    /**
     * 释放锁对象
     */
    public void unlock() throws KeeperException {
        if (id != null) {
            try {
                zookeeper.delete(root + "/" + id, -1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (KeeperException.NoNodeException e) {
                // do nothing
            } finally {
                id = null;
            }
        } else {
            //do nothing
        }
    }

    private void ensureExists(final String path) {
        try {
            Stat stat = zookeeper.exists(path, false);
            if (stat != null) {
                return;
            }

            zookeeper.create(path, data, CreateMode.PERSISTENT);
        } catch (KeeperException e) {
            exception = e;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            interrupt = e;
        }
    }

    /**
     * 返回锁对象对应的path
     */
    public String getRoot() {
        return root;
    }

    /**
     * 判断当前是不是锁的owner
     */
    public boolean isOwner() {
        return id != null && ownerId != null && id.equals(ownerId);
    }

    /**
     * 返回当前的节点id
     */
    public String getId() {
        return this.id;
    }

    // ===================== helper method =============================

    /**
     * 执行lock操作,允许传递watch变量控制是否需要阻塞lock操作
     */
    private Boolean acquireLock(final BooleanMutex mutex) {
        try {
            do {
                if (id == null) {//构建当前lock的唯一标识
                    long sessionId = zookeeper.getDelegate().getSessionId();
                    String prefix = "x-" + sessionId + "-";
                    //如果第一次,则创建一个节点
                    String path = zookeeper.create(root + "/" + prefix, data,
                            CreateMode.EPHEMERAL_SEQUENTIAL);
                    int index = path.lastIndexOf("/");
                    id = StringUtils.substring(path, index + 1);
                    idName = new LockNode(id);
                }

                if (id != null) {
                    List<String> names = zookeeper.getChildren(root, false);
                    if (names.isEmpty()) {
                        id = null;//异常情况,重新创建一个
                    } else {
                        //对节点进行排序
                        SortedSet<LockNode> sortedNames = new TreeSet<LockNode>();
                        for (String name : names) {
                            sortedNames.add(new LockNode(name));
                        }

                        if (sortedNames.contains(idName) == false) {
                            id = null;//清空为null,重新创建一个
                            continue;
                        }

                        //将第一个节点做为ownerId
                        ownerId = sortedNames.first().getName();
                        if (mutex != null && isOwner()) {
                            mutex.set(true);//直接更新状态,返回
                            return true;
                        } else if (mutex == null) {
                            return isOwner();
                        }

                        SortedSet<LockNode> lessThanMe = sortedNames.headSet(idName);
                        if (!lessThanMe.isEmpty()) {
                            //关注一下排队在自己之前的最近的一个节点
                            LockNode lastChildName = lessThanMe.last();
                            lastChildId = lastChildName.getName();
                            //异步watcher处理
                            zookeeper.exists(root + "/" + lastChildId, new AsyncWatcher() {

                                public void asyncProcess(WatchedEvent event) {
                                    acquireLock(mutex);
                                }

                            });

                            if (stat == null) {
                                acquireLock(mutex);// 如果节点不存在,需要自己重新触发一下,watcher不会被挂上去
                            }
                        } else {
                            if (isOwner()) {
                                mutex.set(true);
                            } else {
                                id = null;// 可能自己的节点已超时挂了,所以id和ownerId不相同
                            }
                        }
                    }
                }
            } while (id == null);
        } catch (KeeperException e) {
            exception = e;
            if (mutex != null) {
                mutex.set(true);
            }
        } catch (InterruptedException e) {
            interrupt = e;
            if (mutex != null) {
                mutex.set(true);
            }
        } catch (Throwable e) {
            other = e;
            if (mutex != null) {
                mutex.set(true);
            }
        }

        if (isOwner() && mutex != null) {
            mutex.set(true);
        }
        return Boolean.FALSE;
    }
}

相关说明:


 

 

测试代码:

 

@Test
    public void test_lock() {
        ExecutorService exeucotr = Executors.newCachedThreadPool();
        final int count = 50;
        final CountDownLatch latch = new CountDownLatch(count);
        final DistributedLock[] nodes = new DistributedLock[count];
        for (int i = 0; i < count; i++) {
            final DistributedLock node = new DistributedLock(dir);
            nodes[i] = node;
            exeucotr.submit(new Runnable() {

                public void run() {
                    try {
                        Thread.sleep(1000);
                        node.lock(); //获取锁
                        Thread.sleep(100 + RandomUtils.nextInt(100));

                        System.out.println("id: " + node.getId() + " is leader: " + node.isOwner());
                    } catch (InterruptedException e) {
                        want.fail();
                    } catch (KeeperException e) {
                        want.fail();
                    } finally {
                        latch.countDown();
                        try {
                            node.unlock();
                        } catch (KeeperException e) {
                            want.fail();
                        }
                    }

                }
            });
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            want.fail();
        }

        exeucotr.shutdown();
    }

 

升级版

 实现了一个分布式lock后,可以解决多进程之间的同步问题,但设计多线程+多进程的lock控制需求,单jvm中每个线程都和zookeeper进行网络交互成本就有点高了,所以基于DistributedLock,实现了一个分布式二层锁。

 

大致原理就是ReentrantLock 和 DistributedLock的一个结合。

 

 

  •  单jvm的多线程竞争时,首先需要先拿到第一层的ReentrantLock的锁
  • 拿到锁之后这个线程再去和其他JVM的线程竞争锁,最后拿到之后锁之后就开始处理任务。
锁的释放过程是一个反方向的操作,先释放DistributedLock,再释放ReentrantLock。 可以思考一下,如果先释放ReentrantLock,假如这个JVM ReentrantLock竞争度比较高,一直其他JVM的锁竞争容易被饿死。

代码:
public class DistributedReentrantLock extends DistributedLock {

    private static final String ID_FORMAT     = "Thread[{0}] Distributed[{1}]";
    private ReentrantLock       reentrantLock = new ReentrantLock();

    public DistributedReentrantLock(String root) {
        super(root);
    }

    public void lock() throws InterruptedException, KeeperException {
        reentrantLock.lock();//多线程竞争时,先拿到第一层锁
        super.lock();
    }

    public boolean tryLock() throws KeeperException {
        //多线程竞争时,先拿到第一层锁
        return reentrantLock.tryLock() && super.tryLock();
    }

    public void unlock() throws KeeperException {
        super.unlock();
        reentrantLock.unlock();//多线程竞争时,释放最外层锁
    }

    @Override
    public String getId() {
        return MessageFormat.format(ID_FORMAT, Thread.currentThread().getId(), super.getId());
    }

    @Override
    public boolean isOwner() {
        return reentrantLock.isHeldByCurrentThread() && super.isOwner();
    }

}
 
测试代码:
@Test
    public void test_lock() {
        ExecutorService exeucotr = Executors.newCachedThreadPool();
        final int count = 50;
        final CountDownLatch latch = new CountDownLatch(count);

        final DistributedReentrantLock lock = new DistributedReentrantLock(dir); //单个锁
        for (int i = 0; i < count; i++) {
            exeucotr.submit(new Runnable() {

                public void run() {
                    try {
                        Thread.sleep(1000);
                        lock.lock();
                        Thread.sleep(100 + RandomUtils.nextInt(100));

                        System.out.println("id: " + lock.getId() + " is leader: " + lock.isOwner());
                    } catch (InterruptedException e) {
                        want.fail();
                    } catch (KeeperException e) {
                        want.fail();
                    } finally {
                        latch.countDown();
                        try {
                            lock.unlock();
                        } catch (KeeperException e) {
                            want.fail();
                        }
                    }

                }
            });
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            want.fail();
        }

        exeucotr.shutdown();
    }

最后

其实再可以发散一下,实现一个分布式的read/write lock,也差不多就是这个理了。项目结束后,有时间可以写一下

 

大致思路:

 

  1. 竞争资源标示:  read_自增id , write_自增id
  2. 首先按照自增id进行排序,如果队列的前边都是read标识,对应的所有read都获得锁。如果队列的前边是write标识,第一个write节点获取锁
  3. watcher监听: read监听距离自己最近的一个write节点的exist,write监听距离自己最近的一个节点(read或者write节点)

 

 

  • 大小: 74.4 KB
分享到:
评论
8 楼 he037 2018-07-20  
a417930422 写道
引用

使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。


有几个问题:
1
引用
竞争资源的下一个id就可以获取锁

如果id标示被销毁,只会通知此id后的watcher。
根据这段代码:ownerId = sortedNames.first().getName();
如果被销毁的id非最小的id,此id后的watcher如何可以获得锁?

2
引用
这时可能会有两个process同时拿到锁在跑任务

如果销毁了某个id,zookeeper只会通知比它大的client,只通知一个客户端,怎么会有两个process同时拿到锁?


设想如下的执行序列:

客户端1创建了znode节点 /lock ,获得了锁。

客户端1进入了长时间的GC pause。

客户端1连接到ZooKeeper的Session过期了。znode节点 /lock 被自动删除。

客户端2创建了znode节点 /lock ,从而获得了锁。

客户端1从GC pause中恢复过来,它仍然认为自己持有锁。

最后,客户端1和客户端2都认为自己持有了锁,冲突了
7 楼 a417930422 2016-02-18  
引用

使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。


有几个问题:
1
引用
竞争资源的下一个id就可以获取锁

如果id标示被销毁,只会通知此id后的watcher。
根据这段代码:ownerId = sortedNames.first().getName();
如果被销毁的id非最小的id,此id后的watcher如何可以获得锁?

2
引用
这时可能会有两个process同时拿到锁在跑任务

如果销毁了某个id,zookeeper只会通知比它大的client,只通知一个客户端,怎么会有两个process同时拿到锁?
6 楼 weiboxie 2015-12-06  
大哥,能提供一下LockNode这个对象结构吗
5 楼 zhaoshijie 2015-09-12  
建议楼主把源码弄出来
4 楼 kojavaee 2015-03-15  
大哥,你的分布锁有遇到zkclient节点宕机了,造成死锁的问题么?
可以提供一下ZooKeeperClient的代码么?
3 楼 agapple 2013-11-18  
accp_huangxin 写道
大哥,能提供一下LockNode这个对象结构吗

public class LockNode implements Comparable<LockNode> {

    private final String name;
    private String       prefix;
    private int          sequence = -1;

    public LockNode(String name){
        Assert.notNull(name, "id cannot be null");
        this.name = name;
        this.prefix = name;
        int idx = name.lastIndexOf('-');
        if (idx >= 0) {
            this.prefix = name.substring(0, idx);
            try {
                this.sequence = Integer.parseInt(name.substring(idx + 1));
            } catch (Exception e) {
                // ignore
            }
        }
    }

    public int compareTo(LockNode that) {
        int s1 = this.sequence;
        int s2 = that.sequence;
        if (s1 == -1 && s2 == -1) {
            return this.name.compareTo(that.name);
        }

        if (s1 == -1) {
            return -1;
        } else if (s2 == -1) {
            return 1;
        } else {
            return s1 - s2;
        }
    }

    public String getName() {
        return name;
    }

    public int getSequence() {
        return sequence;
    }

    public String getPrefix() {
        return prefix;
    }

    public String toString() {
        return name.toString();
    }

    // ==================== hashcode & equals方法=======================

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((name == null) ? 0 : name.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        LockNode other = (LockNode) obj;
        if (name == null) {
            if (other.name != null) {
                return false;
            }
        } else if (!name.equals(other.name)) {
            return false;
        }
        return true;
    }

}
2 楼 accp_huangxin 2013-11-15  
大哥,能提供一下LockNode这个对象结构吗
1 楼 ykdsg 2013-01-08  
能提供源码不

相关推荐

    Zookeeper 分布式重入排它锁实现

    最新版本SpringBoot集成zookeeper实现分布式可重入排他锁实现,支持拓展其他种类锁

    zookeeper分布式锁实例源码

    基于zookeeper的不可重入锁Shared Lock 举例,可重入锁Shared Reentrant Lock 举例,可重入读写锁Shared Reentrant ReadWriteLock 举例

    springboot redis zookeeperlock rabbit实现的分布式锁

    springboot redis zookeeperlock rabbit实现的分布式锁 代码

    zookeeper:基于Zookeeper的分布式锁

    使用zookeeper来实现分布式锁 原理 监听zookeeper的临时有序节点,监听到NodeDeleted事件,就会让线程重新获取锁 测试方法 public class ZookeeperLockTest { public static void main(String[] args) throws ...

    zookeeper实现分布式锁

    在java中对于同一个jvm而言,jdk已经提供了lock和同步等。但是在分布式情况下,往往存在多个进程对一些资源产生竞争关系,而这些进程往往在不同的机器上,这个时候jdk中提供的已经不能满足。分布式锁顾明思议就是...

    springboot redis zookeeperlock rabbit实现的分布式锁.zip

    springboot redis zookeeperlock rabbit实现的分布式锁.zip

    ZooKeeper 实现分布式锁的方法示例

    ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、分布式协调/通知、集群管理、Master 选举、分布式锁等功能。 节点 在介绍 ZooKeeper 分布式...

    zkClient:实现zookeeper客户端,实现客户端连接,重连,基本数据操作,监听等功能

    client&lt;/artifactId&gt; &lt;groupId&gt;com.danyy.zk&lt;/groupId&gt; &lt;version&gt;1.0.0-SNAPSHOT&lt;/version&gt;&lt;/dependency&gt;后期完善zookeeper分布式锁实现思路(即SimpleLock思路)1. 首先指定一个作为锁的znode,通常用它来描述被...

    ZkLocker:基于Zookeeper的分布式储物柜

    ZkLocker TODO..c 使用 Zookeeper C API 实现特点:全分布式。 没有延迟。 'try lock' 或 'blocking lock' 与指定的密钥。 锁获取和锁释放回调。

    分布式锁三种实现方式及对比

    3. 基于Zookeeper实现分布式锁; 一, 基于数据库实现分布式锁 1. 悲观锁 利用select … where … for update 排他锁 注意: 其他附加功能与实现一基本一致,这里需要注意的是“where name=lock ”,name字段必须要走...

    distributed_lock:分布式锁

    使用mysql和zookeeper实现mysql_lock.py:Mysql 分布式锁zk_lock.py:zookeeper 分布式锁(需要 python 模块 'kazoo') test_mysql_lock.py:测试Mysql分布式锁test_zookeeper_lock.py:测试zookeeper分布式锁

    zookeeper应用场景实现demo及ppt资料

    1、master选举:mastersel 2、数据的发布和订阅:subscribe 3、负载均衡:balance 4、分布式锁:lock 5、分布式队列:queue 6、命名服务:nameservice 资料来自极客学院

    rivers-lock:Rivers Lock,是一款轻量级的分布式锁软件,不依赖Zookeeper,redis等第三方

    Rivers Lock,是一款轻量级的分布式锁软件,不依赖Zookeeper,redis等第三方,支持集群局域网自动识别或者通过IP识别组成集群。 Usage Example $ Java&gt;=1.8 river.setHosts("10.200.241.18,210.200.242.181");//可选...

    一款基于 Spring Boot Starter 机制的分布式锁框架

    一款基于 Spring Boot Starter 机制的分布式锁框架,实现了redis和zookeeper两种模式的分布式锁功能,以注解的方式(@RLock和@ZLock)对方法进行加锁操作,零代码实现业务加锁能力,涵盖各种加锁方式,并支持redis和...

    zookeeper-application:zookeeper应用

    ZooKeeper是一个分布式的开源协调服务,用于分布式应用程序。它公开了一组简单的原子操作,分布式应用程序可以构建这些原子操作,以实现更高级别的服务,以实现同步,配置维护以及组和命名。 它的设计易于编程,并...

    java7源码-distributed-lock:分布式锁

    感兴趣可以去Redisson官网看看如何在项目中引入Redisson的依赖,然后基于Redis实现分布式锁的加锁与释放锁。 一段简单的使用代码片段,先直观的感受一下: 是不是感觉简单的不行!此外,还支持Redis单实例、Redis...

    zookeeper常见应用场景简单实现及ppt

    1、master选举:mastersel 2、数据的发布和订阅:subscribe 3、负载均衡:balance 4、分布式锁:lock 5、分布式队列:queue 6、命名服务:nameservice 演示代码下载(代码来自极客学院演示demo):

    .net 分布式锁

    包括redis锁和zookeeper锁的.net实现。 分布式锁在解决分布式环境下的业务一致性是非常有用的。 开源地址:http://git.oschina.net/chejiangyi/XXF.BaseService.DistributedLock

    redis-distribute-lock:Redis分布式锁的try-with-resources实现

    基于zookeeper实现的 今天我们重点说一下基于redis的分布式锁,redis分布式锁的实现我们可以参照。 实现Redis分布式锁的最简单的方法就是在Redis中创建一个key,这个key有一个失效时间(TTL),以保证锁最终会被自动...

    资源

    ares5k-generator java-web代码生成器二进制地址GitHub: : 编码云: : ares5k-zookeeper-distribute-lock是基于curator实现的zookeeper分布式锁Aop注解组件GitHub: : 编码云: : ares5k延迟的承诺消息是兔子MQ...

Global site tag (gtag.js) - Google Analytics