`

Zookeeper故障总结

 
阅读更多

背景:

10月29号,系统切换的时候,发生了master信息的变更,有个监听master信息的系统未收到zookeeper的通知。

 

分析:

原代码如下所示, 监听系统中用以下代码来监听path中数据的变更。当数据发生变更的时候,会回调process方法,然后处理相应的业务。(我们使用的是curator的jar包)

		client = CuratorFrameworkFactory.newClient("*:2181", new ExponentialBackoffRetry(1000, 3));
		client.start();
		try {
			client.getData().usingWatcher(this).inBackground().forPath("/id/master");
		} catch (Exception e) {
			throw new RuntimeException(e);
		}

 问题点: 

    以上代码再第一次发生变更的时候,可以接收到zookeeper的变更通知,但是在zookeeper server会将watcher删除,在以后的变更中将不会继续通知。

 

源码分析:

   当调用forPath函数时将会调用GetDataBuilderImpl的forPath方法,这个方法中会异步处理操作

  

    @Override
    public byte[] forPath(String path) throws Exception
    {
        path = client.fixForNamespace(path);

        byte[]      responseData = null;
        if ( backgrounding.inBackground() )
        {
            client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null);
        }
        else
        {
            responseData = pathInForeground(path);
        }
        return responseData;
    }

    

    异步调用GetDataBuilderImpl的performBackgroundOperation方法。然后会调用appcahe原生zookeeper的jar包。

    @Override
    public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception
    {
        if ( watching.isWatched() )
        {
            client.getZooKeeper().getData(operationAndData.getData(), true, callback, backgrounding.getContext());
        }
        else
        {
            client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext());
        }
    }

    然后会调用zookeeper.java中的getData函数,发送请求到zookeeper server. setWatch为true

 public void getData(final String path, Watcher watcher,
            DataCallback cb, Object ctx)
    {
        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        cnxn.queuePacket(h, new ReplyHeader(), request, response, cb,
                clientPath, serverPath, ctx, wcb);
    }

    zookeeper server接收到客户端的getData请求时,会调用FinalRequestProcessor类中的以下代码处理

case OpCode.getData: {
                lastOp = "GETD";
                GetDataRequest getDataRequest = new GetDataRequest();
                ByteBufferInputStream.byteBuffer2Record(request.request,
                        getDataRequest); 
                Stat stat = new Stat();
                byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                        getDataRequest.getWatch() ? cnxn : null);
                rsp = new GetDataResponse(b, stat);
                break;
            }

    接下去会调用DataTree中的getData函数,这个函数中会将监听的节点加入到WatcherManager中,这样我们就添加了对某个路径的监听

    

public byte[] getData(String path, Stat stat, Watcher watcher)
            throws KeeperException.NoNodeException {
        DataNode n = nodes.get(path); 
        synchronized (n) {
            n.copyStat(stat);
            if (watcher != null) {
                dataWatches.addWatch(path, watcher);
            }
            return n.data;
        }
    }

    当另一个客户端调用setData方法时,Zookeeper客户端将会调用DataTree类中的setData方法。这个方法将会变更数据,然后调用triggerWatch方法,通知变更。

 public Stat setData(String path, byte data[], int version, long zxid,
            long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path); 
        byte lastdata[] = null;
        synchronized (n) {
            lastdata = n.data;
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
        }
        // now update if the path is in a quota subtree.
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
          this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
              - (lastdata == null ? 0 : lastdata.length));
        }
        dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }

    然后调用WatcherManager中的triggerWatch方法,方法中将会删除监听。然后调用watcher的process方法

 public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type,
                KeeperState.SyncConnected, path);
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path);
            if (watchers == null || watchers.isEmpty()) {                 
                return null;
            }
            for (Watcher w : watchers) {
                HashSet<String> paths = watch2Paths.get(w);
                if (paths != null) {
                    paths.remove(path);
                }
            }
        }
        for (Watcher w : watchers) {
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e);
        }
        return watchers;
    }

    调用NIOServerCnxn类中的process方法,发送通知到客户端。

    

 synchronized public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                     "Deliver event " + event + " to 0x"
                                     + Long.toHexString(this.sessionId)
                                     + " through " + this);
        }

        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();

        sendResponse(h, e, "notification");
    }

   

     客户端接收到notification时,会调用ClientCnxn类的下面代码处理。queueEvent方法中会调用watcher.materialize方法确定watcherEvent的处理类

     

            if (replyHdr.getXid() == -1) {                 
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response"); 
                WatchedEvent we = new WatchedEvent(event); 
                eventThread.queueEvent( we );
                return;
            }

          public void queueEvent(WatchedEvent event) {
            // materialize the watchers based on the event
            WatcherSetEventPair pair = new WatcherSetEventPair(
                    watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
                            event);
            // queue the pair (watch set & event) for later processing
            waitingEvents.add(pair);
        }

     调用zookeeper的materialize方法获取watcher对象,并且重watcher列表中删除。

       @Override
        public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
            Set<Watcher> result = new HashSet<Watcher>();

            switch (type) {
            case None:
                result.add(defaultWatcher);
                boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                        state != Watcher.Event.KeeperState.SyncConnected;

                synchronized(dataWatches) {
                    for(Set<Watcher> ws: dataWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) {
                    for(Set<Watcher> ws: existWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) {
                    for(Set<Watcher> ws: childWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        childWatches.clear();
                    }
                }

                return result;
        }

     总结:

     从上面源码,我们可以看出,zookeeper的每次只能监听一次变更。

 

      解决: 

      curator为我们封装了重复监听的类NodeCache.java. 它是怎么实现的呢?

NodeCache nc = new NodeCache(zkclient.getClient(), path);
nc.start(true);

     start方法会获取初始值,然后调用reset方法

public void     start(boolean buildInitial) throws Exception
    {
        Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

        ensurePath.ensure(client.getZookeeperClient());

        client.getConnectionStateListenable().addListener(connectionStateListener);

        if ( buildInitial )
        {
            internalRebuild();
        }
        reset();
    }

    reset方法会调用zookeeper的forpath方法,并用包装的watcher类

private void     reset() throws Exception
    {
        if ( (state.get() == State.STARTED) && isConnected.get() )
        {
            client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
        }
    }

    当配置发生变化时,watcher类中会重新调用reset方法,再次监听

    private final CuratorWatcher watcher = new CuratorWatcher()
    {
        @Override
        public void process(WatchedEvent event) throws Exception
        {
            reset();
        }
    };

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics