`
xiaoyu966
  • 浏览: 255250 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ZooKeeper之分布式锁(Python版)

阅读更多

============================================================================

原创作品,允许转载。转载时请务必以超链接形式标明原始出处、以及本声明。

请注明转自:http://yunjianfei.iteye.com/blog/

============================================================================


前言

在做分布式系统开发的时候,分布式锁可以说是必需的一个组件。最近做了一些调研和尝试,经过对比,基于ZooKeeper的分布式锁还是很不错的。

 

参照了IBM的一个帖子:https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper/

其中有一段话描述了ZooKeeper的共享锁(即分布式锁)实现,如下:
共享锁在同一个进程中很容易实现,但是在跨进程或者在不同 Server 之间就不好实现了。Zookeeper 却很容易实现这个功能,实现方式也是需要获得锁的 Server 创建一个 EPHEMERAL_SEQUENTIAL 目录节点,然后调用 getChildren方法获取当前的目录节点列表中最小的目录节点是不是就是自己创建的目录节点,如果正是自己创建的,那么它就获得了这个锁,如果不是那么它就调用 exists(String path, boolean watch) 方法并监控 Zookeeper 上目录节点列表的变化,一直到自己创建的节点是列表中最小编号的目录节点,从而获得锁,释放锁很简单,只要删除前面它自己所创建的目录节点就行了。

 通过这段话,大概可以明白其原理。下面我主要写一下基于Python的分布式锁实现。

 

实现

Google了一下,有个叫Kazoo的python开源包很好的实现了对ZooKeeper的支持。

Kazoo is a Python library designed to make working with Zookeeper a more hassle-free experience that is less prone to errors.

 

链接如下:https://kazoo.readthedocs.org/en/latest/

GitHub地址: https://github.com/python-zk/kazoo

 

 首先,我们去GitHub,下载其源码包。解压缩之后,进行安装

python setup.py install

 

OK,准备工作完成,一切尽在代码中:

 

    文件名:zk_lock.py

#!/usr/bin/env python2.7
# -*- coding:utf-8 -*-
#
#   Author  :   yunjianfei
#   E-mail  :   yunjianfei1987@gmail.com
#   Date    :   2014/12/09
#   Desc    :
#

import logging, os, time
from kazoo.client import KazooClient
from kazoo.client import KazooState
from kazoo.recipe.lock import Lock

class ZooKeeperLock():
    def __init__(self, hosts, id_str, lock_name, logger=None, timeout=1):
        self.hosts = hosts
        self.id_str = id_str
        self.zk_client = None
        self.timeout = timeout
        self.logger = logger
        self.name = lock_name
        self.lock_handle = None

        self.create_lock()

    def create_lock(self):
        try:
            self.zk_client = KazooClient(hosts=self.hosts, logger=self.logger, timeout=self.timeout)
            self.zk_client.start(timeout=self.timeout)
        except Exception, ex:
            self.init_ret = False
            self.err_str = "Create KazooClient failed! Exception: %s" % str(ex)
            logging.error(self.err_str)
            return

        try:
            lock_path = os.path.join("/", "locks", self.name)
            self.lock_handle = Lock(self.zk_client, lock_path)
        except Exception, ex:
            self.init_ret = False
            self.err_str = "Create lock failed! Exception: %s" % str(ex)
            logging.error(self.err_str)
            return

    def destroy_lock(self):
        #self.release()

        if self.zk_client != None:
            self.zk_client.stop()
            self.zk_client = None

    def acquire(self, blocking=True, timeout=None):
        if self.lock_handle == None:
            return None

        try:
            return self.lock_handle.acquire(blocking=blocking, timeout=timeout)
        except Exception, ex:
            self.err_str = "Acquire lock failed! Exception: %s" % str(ex)
            logging.error(self.err_str)
            return None

    def release(self):
        if self.lock_handle == None:
            return None
        return self.lock_handle.release()


    def __del__(self):
        self.destroy_lock()


def main():
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    sh = logging.StreamHandler()
    formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')
    sh.setFormatter(formatter)
    logger.addHandler(sh)

    zookeeper_hosts = "192.168.10.2:2181, 192.168.10.3:2181, 192.168.10.4:2181"
    lock_name = "test"

    lock = ZooKeeperLock(zookeeper_hosts, "myid is 1", lock_name, logger=logger)
    ret = lock.acquire()
    if not ret:
        logging.info("Can't get lock! Ret: %s", ret)
        return

    logging.info("Get lock! Do something! Sleep 10 secs!")
    for i in range(1, 11):
        time.sleep(1)
        print str(i)

    lock.release()

if __name__ == "__main__":
    try:
        main()
    except Exception, ex:
        print "Ocurred Exception: %s" % str(ex)
        quit()

 

测试的时候,只需要改一下“zookeeper_hosts ”这个参数,改为你自己的ZooKeeper的server地址即可.

 

将该测试文件copy到多个服务器,同时运行,就可以看到分布式锁的效果了。

 

0
0
分享到:
评论

相关推荐

    基于zookeeper的分布式锁简单实现

    基于zookeeper的分布式锁简单实现,包含测试代码,实用工具类

    zookeeper做分布式锁

    zookeeper做分布式锁

    distributed_lock:分布式锁

    使用mysql和zookeeper实现mysql_lock.py:Mysql 分布式锁zk_lock.py:zookeeper 分布式锁(需要 python 模块 'kazoo') test_mysql_lock.py:测试Mysql分布式锁test_zookeeper_lock.py:测试zookeeper分布式锁

    计划任务的分布式锁-Python开发

    ShedLock ShedLock确保您...请注意,如果一个任务已经在一个节点上执行,则在其他节点上的执行不会等待,这仅仅是ShedLock使用外部存储(例如Mongo,JDBC数据库,Redis,Hazelcast,ZooKeeper或其他存储库)进行协调。

    浅谈分布式锁的几种使用方式(redis、zookeeper、数据库)

    分布式锁 我们需要怎么样的分布式锁? 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。 这把锁要是一把可重入锁(避免死锁) 这把锁最好是一把阻塞锁(根据业务需求...

    基于zookeeper的多活、主备模式控制

    基于zookeeper的临时节点实现多活,基于分布式锁和临时节点实现主备切换,此为python实例代码

    pyetcdlock:基于etcd的互斥网络锁

    pyetcdlock基于etcd的分布式锁,简单说就是利用etcd.test_and_set函数来判断lock key是否被占用,存在那就说明有人占用。在创建key的时候加入了ttl,防止因为进程异常退出而没有释放锁。更新:支持指定renew时间renew...

    Hadoop权威指南 第二版(中文版)

     Python版本  Hadoop Pipes  编译运行 第3章 Hadoop分布式文件系统  HDFS的设计  HDFS的概念  数据块  namenode和datanode  命令行接口  基本文件系统操作  Hadoop文件系统  接口  Java接口  从Hadoop ...

    hadoop大数据就业面试题

    9. ZooKeeper:提供分布式锁和配置管理服务。 10. ZKFC:提供 ZooKeeper Failover Controller 服务,监控 ZooKeeper 的状态。 三、Hadoop 的 shell 命令 1. 杀死一个 Job:hadoop job –list 获取 Job 的 ID,然后...

    Hadoop权威指南(中文版)2015上传.rar

    Python版本 Hadoop Pipes 编译运行 第3章 Hadoop分布式文件系统 HDFS的设计 HDFS的概念 数据块 namenode和datanode 命令行接口 基本文件系统操作 Hadoop文件系统 接口 Java接口 从Hadoop URL中读取数据 通过...

    xmljava系统源码-jstarcraft-tutorial:一套涵盖核心编程,人工智能,数字图像处理,自然语言处理,推荐与搜索,云服务领域

    xml java系统源码 JStarCraft Framework 一套涵盖核心编程,人工智能,数字...transaction模块:支持7种分布式锁(Cassandra/ElasticSearch/Hazelcast/Hibernate/Mongo/Redis/ZooKeeper) 2020.05.11 common模块instant包:

    springCloud

    1) 服务的注册与发现 Spring Cloud是一个基于Spring Boot实现的云应用开发工具,它为基于JVM的云应用开发中涉及的配置管理、服务发现、断路器、智能路由、微代理、控制总线、全局锁、决策竞选、分布式会话和集群...

Global site tag (gtag.js) - Google Analytics