`
zhangwei_david
  • 浏览: 469474 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

ZooKeeper源码分析(一)

 
阅读更多

   为了更好地了解ZooKeeper客户端的工作原理,首先需要从客户端的会话创建过程学起。

  初始化阶段:

 

  •   初始化ZooKeeper对象

        通过调用ZooKeeper的构造方法实例化一个ZooKeeper对象,在初始化过程中会创建一个客户端Watcher管理器ClientWatcherManager

 

  • 设置会话默认Watcher

      如果构造方法中传入了一个Watcher对象,那么客户端将这个Watcher对象作为默认Watcher保存到ClientWatcherManager中

 

  • 构造ZK服务器地址列表管理器HostProvider

        对于构造器中传入的服务器端地址,客户端将其保存在服务器地址列表管理器HostProvider中    

 

  • 创建并初始化客户端网络连接

          ZooKeeper客户端首先会创建一个网络连接器ClientCnxn,用来管理客户端与服务器的网络交互。另外,客户端在创建 ClientCnxn的同时还会初始化客户端两个核心队列outGoingQueue和pendingQueue,分别作为客户端请求组发送队列和服务端 响应等待队列

 

  • 初始化SendThread和EventThread

        客户端会创建两个核心网络线程SendThread和EventThread,前者用于管理客户端和服务器端之间所有网络IO,后者用户进行客户端的事件处理。

 

   下面就从代码上去分析这个过程

 

 

 

 

 

/**
  这是ZooKeeper客户端库的主类。使用ZK服务,应用必须收银实例化ZooKeeper类。
  所有迭代都是通过调用ZooKeeper类的方法完成。
  这个类的方法是除了特殊说明以外都是线程安全的。
   一旦和服务器建立连接,客户端就分配到一个会话ID(session ID).客户端将周期性发送心跳信号到服务器来保持会话有效。
   只要客户端的会话ID有效,应用都可以通过客户端调用ZooKeeper的接口
   如果因为一些原因导致客户端长时间向服务器发送心跳信号失败(例如,超过会话的超时时间),服务器将会话过期,同时会话ID也失效。客户端对象将不再可用。为了调用ZooKeeper 接口,应用必须创建一个新的客户端。
   如果ZooKeeper服务器的客户端的当前连接失败或其他原因没有响应,客户端在会话ID过期前会自动连接其他服务器。如果成功连接其他服务器,应用可以继续使用客户端。
   ZooKeeper的接口方法是同步或异步的。同步方法一直挂起到服务器响应。异步方法只是将发送的请求排队并立即返回。它使用一个回调对象,无论请求执行成功与否都会执行回调方法返回一个结果码指示结果状态。
  一些ZooKeeper API 成功被调用可以在ZK服务器的数据节点上注监视器。其他ZooKeeper API 成功调用可以触发监哪些监视器。一旦一个监视器被触发,时间将被传递到第一步注册监视器的客户端。每个监视器只能够被触发一次
  因此,一旦一个时间被传递到客户端的监视器,它将被注销。
   客户端需要一个实现Watcher接口的对象去处理传递到客户端的事件。
  当客户端丢失当前连接后重新连接服务器,所有被认为是触发的监视器,但是没有送达的事件将丢失。
   为了模式这个场景,客户端将产生一个特殊的事件,去告诉事件处理器连接被删除。这个特殊的事件的类型是EventNone 状态是KeeperStateDiscounnected
***/
public class ZooKeeper {

    public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";

    protected final ClientCnxn cnxn;
    private static final Logger LOG;
    static {
        //Keep these two lines together to keep the initialization order explicit
        LOG = LoggerFactory.getLogger(ZooKeeper.class);
        Environment.logEnv("Client environment:", LOG);
    }

    public ZooKeeperSaslClient getSaslClient() {
        return cnxn.zooKeeperSaslClient;
    }
	
	// 初始化客户端Watcher管理器,在初始化ZooKeeper时就对其进行初始化
    private final ZKWatchManager watchManager = new ZKWatchManager();

	// 获取数据监视器列表
    List<String> getDataWatches() {
        synchronized(watchManager.dataWatches) {
            List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
            return rc;
        }
    }
	// 获取Exist监视器列表
    List<String> getExistWatches() {
        synchronized(watchManager.existWatches) {
            List<String> rc =  new ArrayList<String>(watchManager.existWatches.keySet());
            return rc;
        }
    }
	// 获取子节点监视器列表
    List<String> getChildWatches() {
        synchronized(watchManager.childWatches) {
            List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet());
            return rc;
        }
    }

    /**
	  管理监视器 & 处理由ClientCnxn对象生成的事件。这个实现作为ZooKeeper的一个嵌套类,这样可以避免公开方法被作为ZooKeeper 客户端API的一部分被公开。
     */
    private static class ZKWatchManager implements ClientWatchManager {
	    // 数据节点监视器
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
		// exist 监视器
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
		// 子节点监视器
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();
        // 默认监视器
        private volatile Watcher defaultWatcher;
	
		// 添加监视器
        final private void addTo(Set<Watcher> from, Set<Watcher> to) {
            if (from != null) {
                to.addAll(from);
            }
        }

        /* 
		 * 返回事件需要通知的监视器列表,管理器不能通知监视器,如果监视器已经被触发,管理器将更新它内部的结构。
		   这样做的目的是被调用者目前也可能在以后的某个时间是负责通知的事件的监视。
         * @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, 
         *                                                        Event.EventType, java.lang.String)
         */
        @Override
        public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
			// 需要通知的监视器
            Set<Watcher> result = new HashSet<Watcher>();
		   // 时间类型
            switch (type) {
			// 事件类型为None通知所有监视器
            case None:
				
                result.add(defaultWatcher);
				
                boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                        state != Watcher.Event.KeeperState.SyncConnected;
				
                synchronized(dataWatches) {
					// 添加所有数据监视器
                    for(Set<Watcher> ws: dataWatches.values()) {
                        result.addAll(ws);
                    }
					// 如果需要清空则清空数据监视器
                    if (clear) {
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) {
                    for(Set<Watcher> ws: existWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) {
                    for(Set<Watcher> ws: childWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        childWatches.clear();
                    }
                }
	
                return result;
            case NodeDataChanged:
			// 数据节点被创建
            case NodeCreated:
				// 通知数据节点监视器和exist监视器,同时将其删除
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
				//子节点列表变化事件,通知子节点变化监视器,同时删除
            case NodeChildrenChanged:
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
				// 节点删除事件
            case NodeDeleted:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                // XXX This shouldn't be needed, but just in case
                synchronized (existWatches) {
                    Set<Watcher> list = existWatches.remove(clientPath);
                    if (list != null) {
                        addTo(existWatches.remove(clientPath), result);
                        LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                    }
                }
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default:
                String msg = "Unhandled watch event type " + type
                    + " with state " + state + " on path " + clientPath;
                LOG.error(msg);
                throw new RuntimeException(msg);
            }

            return result;
        }
    }

    /**
     * 为一个特殊的路径注册监视器.
     */
    abstract class WatchRegistration {
	    // 监视器
        private Watcher watcher;
		// 客户端路径
        private String clientPath;
		// 构造方法
        public WatchRegistration(Watcher watcher, String clientPath)
        {
            this.watcher = watcher;
            this.clientPath = clientPath;
        }
        // 获取路径监视器的映射关系
        abstract protected Map<String, Set<Watcher>> getWatches(int rc);

        /**
		 *  在一个路径上注册监视器
         */
        public void register(int rc) {
            if (shouldAddWatch(rc)) {
                Map<String, Set<Watcher>> watches = getWatches(rc);
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);
                }
            }
        }
        /**
		 *  基于结果码判断是否需要添加监视器 
         */
        protected boolean shouldAddWatch(int rc) {
            return rc == 0;
        }
    }

    /** Handle the special case of exists watches - they add a watcher
     * even in the case where NONODE result code is returned.
     */
    class ExistsWatchRegistration extends WatchRegistration {
        public ExistsWatchRegistration(Watcher watcher, String clientPath) {
            super(watcher, clientPath);
        }

        @Override
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            return rc == 0 ?  watchManager.dataWatches : watchManager.existWatches;
        }

        @Override
        protected boolean shouldAddWatch(int rc) {
            return rc == 0 || rc == KeeperException.Code.NONODE.intValue();
        }
    }

    class DataWatchRegistration extends WatchRegistration {
        public DataWatchRegistration(Watcher watcher, String clientPath) {
            super(watcher, clientPath);
        }

        @Override
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            return watchManager.dataWatches;
        }
    }

    class ChildWatchRegistration extends WatchRegistration {
        public ChildWatchRegistration(Watcher watcher, String clientPath) {
            super(watcher, clientPath);
        }

        @Override
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            return watchManager.childWatches;
        }
    }

    public enum States {
        CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
        CLOSED, AUTH_FAILED, NOT_CONNECTED;
		
	    // 判断连接是否存活,只要不是CLOSED也不是AUTH_FAILED都是存活的
        public boolean isAlive() {
            return this != CLOSED && this != AUTH_FAILED;
        }

        /**
         * 判断是否和服务器见有连接,如果状态是CONNECTED或者是CONNECTEDREADONLY都表示有连接
         * */
        public boolean isConnected() {
            return this == CONNECTED || this == CONNECTEDREADONLY;
        }
    }

   
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
        throws IOException
    {
        this(connectString, sessionTimeout, watcher, false);
    }

   
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
		// 2 设置默认监视器
        watchManager.defaultWatcher = watcher;
		//创建一个连接字符串的解析器
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
		// 3 使用解析器解析出的服务器地址构建一个HostProvider
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
		// 4 构建一个客户端连接对象		
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
		// 启动连接
        cnxn.start();
    }

   
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            long sessionId, byte[] sessionPasswd)
        throws IOException
    {   
	 // 默认 canBeReadOnly=false
        this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false);
    }

  
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout
                + " watcher=" + watcher
                + " sessionId=" + Long.toHexString(sessionId)
                + " sessionPasswd="
                + (sessionPasswd == null ? "<null>" : "<hidden>"));

        watchManager.defaultWatcher = watcher;

        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
        cnxn.seenRwServerBefore = true; // since user has provided sessionId
        cnxn.start();
    }

 

 

 

  • 大小: 28.2 KB
  • 大小: 19.7 KB
5
1
分享到:
评论

相关推荐

    Zookeeper源码分析.epub

    Zookeeper源码分析.epub

    Zookeeper源码分析

    zookeeper源码分析(一)工作原理概述 zookeeper源码分析(二)FastLeader选举算法 Zookeeper源码分析之Paxos算法之旅

    zookeeper源码分析

    第2章 ZooKeeper之序列化组件源码解析【透视现象,直击本质】 第4章 持久化【高手过招必备】 第6章 服务器启动 【由浅入深,先学好单机版,才能掌握集群版】 第7章 会话管理 【无处不在的会话其实没那么难】 第8章 ...

    zookeeper源码阅读

    ZooKeeper源码阅读,庖丁解牛的带你进入zk的世界。ZooKeeper的类初始化 ZooKeeper在初始化的时候, 会调用类初始化方法, 初始化日志环境(使用SLF4J), 并且记录相关环境变量. 环境变量被存放在Environment的类中, 使用...

    08_尚硅谷技术之Zookeeper(源码解析)V3.3.pdf

    Zookeeper3.5.1(源码解析)

    Zookeeper源码剖析:深入理解Leader选举机制

    本文详细分析了Zookeeper的源码,特别是Leader选举过程的实现。首先,介绍了阅读源码的意义,包括技术提升、框架掌握、问题定位、面试准备、深入理解技术以及参与开源社区。接着,提供了一系列高效阅读源码的方法,...

    ZooKeeper-:ZooKeeper源码剖析

    ZooKeeper源码分析 优秀时间学习了一下ZooKeeper:分布式过程协调这本书的内容,对ZooKeeper实现的细节很好奇,所以顺便把ZooKeeper源码看了一遍。看完之后想写点内容做个笔记,确实发现不好开始。由于ZooKeeper一个...

    03-05-08-zookeeper源码之watcher原理分析1

    //注册监听//修改节点的值触发监ZooKeeper API 的初始化过程转载请注明《咕泡学院》,建议自己分析一遍在创建一个 ZooKeeper 客户端对象实例

    zookeeper第四节课原理源码分析资料1

    zookeeper第四节课原理源码分析资料1

    zookeeper分析流程.txt

    zookeeper源码执行流程。 Zookeeper是一个开源的分布式应用程序协调服务器,其为分布式系统提供一致性服务。其一致性是通过基于Paxos算法的ZAB协议完成的。其主要功能包括:配置维护、域名服务、分布式同步、集群...

    第四课:zookeeper ZAB协议实现源码分析1

    课程概要:启动流程源码分析快照与事物日志的存储结构一、启动流程知识点:工程结构介绍启动流程宏观图集群启动详细流程netty 服务工作机制1.工程结构介绍项目地址

    zookeeper-3.4.14.rar

    zookeeper-3.4.14...............................................................

    03-05-07-zookeeper原理之Leader选举源码分析1

    在第一节课,我们讲到了 zookeeper 的来源,是来自于 google chubby。为了解决在分布式环境下,如何从多个 server 中选举出 maste

    zookeeperdemo.zip

    由于zookeeper源代码在网上比较不好整理,这里给大家整理好了,方便调试和理解,分布式系统就像...将会从源码层次研究分析Zookeeper,通过源码帮助我们深入理解Zookeeper实现思路,并提高我们对分布式一致性问题的认识。

    Zookeeper集群架构全面实战

    非常强的一套Zookeeper集群实战,包含了全套的学习代码,学习笔记还有...内容从Zookeeper入门教学,本地安装,Zookeeper集群实战,项目需求,Zookeeper企业面试实战,Zookeeper算法实战,Zookeeper核心源码分析等内容。

    java高级软件工程师教程快速入门Zookeeper+dubbo视频教程

    本套课程中,第一阶段深入Zookeeper原理和源码,分析Zookeeper的核心实现,并通过多个应用场景说明,体现出其重要性和广泛的应用性。第二阶段深入Dubbo RPC、SPI等核心功能点,列举多个示例加以应用说明,期望能够...

    深入探索Zookeeper的ZAB协议:分布式系统的核心解析深入探索Zookeeper的ZAB协议:分布式系统的核心解析

    此外,文章还提供了Zookeeper源码中关于ZAB协议实现的细节分析,为理解分布式系统的底层逻辑提供了宝贵的视角。本文适合对分布式系统感兴趣的开发者和研究者,帮助他们深入理解Zookeeper的核心机制

    Hadoop源码分析完整版

    Apache上就出现了一个类似的解决方案,目前它们都属亍Apache的Hadoop项目,对应的分删是: Chubby--&gt;ZooKeeper GFS--&gt;HDFS BigTable--&gt;HBase MapReduce--&gt;Hadoop 目前,基亍类似思想的Open Source项目迓径多,如...

    zookeeper理论原理

    zookeeper理论原理 ZAB机制 客户端流程 客户端源码分析 fastelection选举

Global site tag (gtag.js) - Google Analytics