背景:
10月29号,系统切换的时候,发生了master信息的变更,有个监听master信息的系统未收到zookeeper的通知。
分析:
原代码如下所示, 监听系统中用以下代码来监听path中数据的变更。当数据发生变更的时候,会回调process方法,然后处理相应的业务。(我们使用的是curator的jar包)
client = CuratorFrameworkFactory.newClient("*:2181", new ExponentialBackoffRetry(1000, 3)); client.start(); try { client.getData().usingWatcher(this).inBackground().forPath("/id/master"); } catch (Exception e) { throw new RuntimeException(e); }
问题点:
以上代码再第一次发生变更的时候,可以接收到zookeeper的变更通知,但是在zookeeper server会将watcher删除,在以后的变更中将不会继续通知。
源码分析:
当调用forPath函数时将会调用GetDataBuilderImpl的forPath方法,这个方法中会异步处理操作
@Override public byte[] forPath(String path) throws Exception { path = client.fixForNamespace(path); byte[] responseData = null; if ( backgrounding.inBackground() ) { client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), null, backgrounding.getContext()), null); } else { responseData = pathInForeground(path); } return responseData; }
异步调用GetDataBuilderImpl的performBackgroundOperation方法。然后会调用appcahe原生zookeeper的jar包。
@Override public void performBackgroundOperation(final OperationAndData<String> operationAndData) throws Exception { if ( watching.isWatched() ) { client.getZooKeeper().getData(operationAndData.getData(), true, callback, backgrounding.getContext()); } else { client.getZooKeeper().getData(operationAndData.getData(), watching.getWatcher(), callback, backgrounding.getContext()); } }
然后会调用zookeeper.java中的getData函数,发送请求到zookeeper server. setWatch为true
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx) { final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, clientPath, serverPath, ctx, wcb); }
zookeeper server接收到客户端的getData请求时,会调用FinalRequestProcessor类中的以下代码处理
case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); Stat stat = new Stat(); byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDataRequest.getWatch() ? cnxn : null); rsp = new GetDataResponse(b, stat); break; }
接下去会调用DataTree中的getData函数,这个函数中会将监听的节点加入到WatcherManager中,这样我们就添加了对某个路径的监听
public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException { DataNode n = nodes.get(path); synchronized (n) { n.copyStat(stat); if (watcher != null) { dataWatches.addWatch(path, watcher); } return n.data; } }
当另一个客户端调用setData方法时,Zookeeper客户端将会调用DataTree类中的setData方法。这个方法将会变更数据,然后调用triggerWatch方法,通知变更。
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException { Stat s = new Stat(); DataNode n = nodes.get(path); byte lastdata[] = null; synchronized (n) { lastdata = n.data; n.data = data; n.stat.setMtime(time); n.stat.setMzxid(zxid); n.stat.setVersion(version); n.copyStat(s); } // now update if the path is in a quota subtree. String lastPrefix; if((lastPrefix = getMaxPrefixWithQuota(path)) != null) { this.updateBytes(lastPrefix, (data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)); } dataWatches.triggerWatch(path, EventType.NodeDataChanged); return s; }
然后调用WatcherManager中的triggerWatch方法,方法中将会删除监听。然后调用watcher的process方法
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }
调用NIOServerCnxn类中的process方法,发送通知到客户端。
synchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); sendResponse(h, e, "notification"); }
客户端接收到notification时,会调用ClientCnxn类的下面代码处理。queueEvent方法中会调用watcher.materialize方法确定watcherEvent的处理类
if (replyHdr.getXid() == -1) { WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); WatchedEvent we = new WatchedEvent(event); eventThread.queueEvent( we ); return; } public void queueEvent(WatchedEvent event) { // materialize the watchers based on the event WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); }
调用zookeeper的materialize方法获取watcher对象,并且重watcher列表中删除。
@Override public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { Set<Watcher> result = new HashSet<Watcher>(); switch (type) { case None: result.add(defaultWatcher); boolean clear = ClientCnxn.getDisableAutoResetWatch() && state != Watcher.Event.KeeperState.SyncConnected; synchronized(dataWatches) { for(Set<Watcher> ws: dataWatches.values()) { result.addAll(ws); } if (clear) { dataWatches.clear(); } } synchronized(existWatches) { for(Set<Watcher> ws: existWatches.values()) { result.addAll(ws); } if (clear) { existWatches.clear(); } } synchronized(childWatches) { for(Set<Watcher> ws: childWatches.values()) { result.addAll(ws); } if (clear) { childWatches.clear(); } } return result; }
总结:
从上面源码,我们可以看出,zookeeper的每次只能监听一次变更。
解决:
curator为我们封装了重复监听的类NodeCache.java. 它是怎么实现的呢?
NodeCache nc = new NodeCache(zkclient.getClient(), path); nc.start(true);
start方法会获取初始值,然后调用reset方法
public void start(boolean buildInitial) throws Exception { Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); ensurePath.ensure(client.getZookeeperClient()); client.getConnectionStateListenable().addListener(connectionStateListener); if ( buildInitial ) { internalRebuild(); } reset(); }
reset方法会调用zookeeper的forpath方法,并用包装的watcher类
private void reset() throws Exception { if ( (state.get() == State.STARTED) && isConnected.get() ) { client.checkExists().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } }
当配置发生变化时,watcher类中会重新调用reset方法,再次监听
private final CuratorWatcher watcher = new CuratorWatcher() { @Override public void process(WatchedEvent event) throws Exception { reset(); } };
相关推荐
在Zookeeper中,znode是一个跟Unix文件系统路径相似的节点,可以往这个节点存储或获取数据。如果在创建znode时Flag设置为EPHEMERAL,那么当创建这个znode的节点和Zookeeper失去连接后,这个znode将不再存在在...
主要介绍了zookeeper 的概述,特点,作用,角色,安装,shell命令
使用总结 -- zookeeper使用
对于zookeeper 的机制原理有一个清晰翔实的梳理。
Zookeeper和Hbase安装总结手册.
apache-zookeeper-3.7.1 apache-zookeeper-3.7.1 apache-zookeeper-3.7.1 apache-zookeeper-3.7.1 apache-zookeeper-3.7.1 apache-zookeeper-3.7.1 apache-zookeeper-3.7.1 apache-zookeeper-3.7.1 apache-zookeeper...
#Zookeeper的日志可以用LogFormatter查看 ##命令方式如下 java -classpath .:slf4j-api-1.7.2.jar:zookeeper-3.4.6.jar org.apache.zookeeper.server.LogFormatter /var/lib/zookeeper/version-2/log.1 ##window...
个人学习zookeeper总结,不存在侵权, !
从PAXOS到ZOOKEEPER分布式一致性原理与实践全版高清 和zookeeper-3.4.6总结
zookeeper 3.6.3 源码
zookeeper linux 搭建流程,zookeeper linux 搭建流程zookeeper linux 搭建流程zookeeper linux 搭建流程。
zookeeper可视化工具
Zookeeper中常用概念及用法,master选举源码解析,Zookeeper应用场景,Base, Cap理论
【BAT必备】zookeeper面试题【BAT必备】zookeeper面试题【BAT必备】zookeeper面试题【BAT必备】zookeeper面试题【BAT必备】zookeeper面试题【BAT必备】zookeeper面试题【BAT必备】zookeeper面试题【BAT必备】...
史上最详细的zookeeper总结文档,涵盖了zookeeper的各个方面,案例代码,内容应有尽有,建议收藏下载。
dubbo2.6.0 + Zookeeper3.4.9 + Zookeeper3.8.0 + Zookeeper3.7.1
第三部分(第9~10章)主要介绍ZooKeeper内部原理及如何运行ZooKeeper.第9章介绍ZooKeeper的作者们在设计时所采用的方案。第10章介绍如何对ZooKeeper进行配置。 购买地址:https://item.jd.com/11880788.html
zookeeper复习面试个人总结题集
ZooKeeper 未授权访问【原理扫描】,zookeeper安全漏洞修复方法和操作步骤
zookeeper windows