一:zk的客户端注册Watcher源码剖析
zookeeper有很多中方式都可以传入一个watcher对象,比如exist,getData等,下面我们以getData的源码来剖析zk的client和server端是如何处理实现整个watcher机制的。
原生的zookeeper API在获得节点数据的时候,可以通过getData方法获取,并且可以传入一个watcher对象,表示对当前节点进行监听,当节点的数据发送了改变,客户端会收到server端的通知事件告知该节点状态发生了改变。下面我们根据getData()方法来看看zk是如何进行事件监听处理的。
在开始下面的讲解之前,先了解一下几个概念:
1.1 Packet
Packet是Zookeeper中用来通信的最小单元,所以任何需要网络进行传输的对象都需要包装成Packet对象。
1.2 SendThead
SendThread是Zookeeper中专门用来接收事件通知的线程,当服务端响应了客户端的请求后,会交给SendThread处理。
1.3 EventThread
EventThread是Zookeeper专门用来处理事件通知的线程,SendThread在接收到通知事件后会将事件传给EventThread进行处理。
1.4:ServerCnxn
ServerCnxn代表的是一个客户端和服务端的连接,客户端像服务端注册Watcher的时候,并不会真正将Watcher对象传递到服务端,而服务端也仅仅是保存了与当前客户端连接的ServerCnxn对象。
getData()方法如下:
public byte[] getData(final String path, Watcher watcher, Stat stat) throws KeeperException, InterruptedException { ..................... // the watch contains the un-chroot path WatchRegistration wcb = null; if (watcher != null) { wcb = new DataWatchRegistration(watcher, clientPath); } final String serverPath = prependChroot(clientPath); RequestHeader h = new RequestHeader(); h.setType(ZooDefs.OpCode.getData); GetDataRequest request = new GetDataRequest(); request.setPath(serverPath); request.setWatch(watcher != null); GetDataResponse response = new GetDataResponse(); ReplyHeader r = cnxn.submitRequest(h, request, response, wcb); .................... return response.getData(); }
(1)当客户端传入一个watcher对象的时候,如果不为null,首先会暂时将watcher包装成一个WatchRegistration,WatchRegistration是一个抽象类,里面保存了watcher对象和节点路径之间的关系。
(2)标记request,如果watcher不为null,将该请求标记为需要使用watcher监听,而RequestHeader标记的是一些头信息。
(3)ClientCnxn提交请求,封装成packet对象
public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) throws InterruptedException { ReplyHeader r = new ReplyHeader(); Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration); synchronized (packet) { while (!packet.finished) { packet.wait(); } } return r; }
其中重点看下queuePacket方法,它的源代码如下:
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { .............. synchronized (outgoingQueue) { packet = new Packet(h, r, request, response, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } } sendThread.getClientCnxnSocket().wakeupCnxn(); return packet; }
(4) SendThread.readResponse()方法接收服务器端返回
class SendThread extends Thread { ............ void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); .............. try { if (packet.requestHeader.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr( KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid() + " with err " + + replyHdr.getErr() + " expected Xid " + packet.requestHeader.getXid() + " for a packet with details: " + packet ); } ................ } finally { finishPacket(packet); } }
可以看到它将WatchRegistration对象封装成Packet对象,然后放到发送队列里面去,这里需要注意的是zk并不是把整个WatchRegistration对象都封装到Packet中:
其调用ClientCnxnSocket的sendPacket方法将封装好的packet对象传入, 源码如下:
@Override void sendPacket(Packet p) throws IOException { SocketChannel sock = (SocketChannel) sockKey.channel(); if (sock == null) { throw new IOException("Socket is null!"); } p.createBB(); ByteBuffer pbb = p.bb; sock.write(pbb); } public void createBB() { try { ............ if (requestHeader != null) { requestHeader.serialize(boa, "header"); } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); // append "am-I-allowed-to-be-readonly" flag boa.writeBool(readOnly, "readOnly"); } else if (request != null) { request.serialize(boa, "request"); } ........ } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); } }
源码里的finishPacket()方法会将从刚刚我们封装的packet中取出来,通过调用 p.watchRegistration.register()会将我们暂时保存的Watcher对象
保存到ZKWatcherManager中,从这里我们也可以看到,实际上zk并没有真正把watcher对象传递到了服务端,而是会将watcher存在本地的WatcherManager中等收到服务端的响应后再从本地的ZKWatcherManager中取。
private void finishPacket(Packet p) { if (p.watchRegistration != null) { p.watchRegistration.register(p.replyHeader.getErr()); } ............ }
本例中的getData对应的是DataWatchRegistration
class DataWatchRegistration extends WatchRegistration { public DataWatchRegistration(Watcher watcher, String clientPath) { super(watcher, clientPath); } @Override protected Map<String, Set<Watcher>> getWatches(int rc) { return watchManager.dataWatches; } }
会放到本地ZKWatcherManager的dataWatches中去,其中dataWatches是一个Map,存储了数据节点路径和Watcher的映射关系
private static class ZKWatchManager implements ClientWatchManager { private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>(); ............. }
二:服务端Watcher处理
当服务端收到了客户端的请求后,如果客户端标记了需要使用Watcher监听,服务端会触发相应的事件
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) { WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); HashSet<Watcher> watchers; synchronized (this) { watchers = watchTable.remove(path); if (watchers == null || watchers.isEmpty()) { if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); } return null; } for (Watcher w : watchers) { HashSet<String> paths = watch2Paths.get(w); if (paths != null) { paths.remove(path); } } } for (Watcher w : watchers) { if (supress != null && supress.contains(w)) { continue; } w.process(e); } return watchers; }
第一步:先将事件类型,路径和通知的状态封装成WatchedEvent对象
第二步:从watchTable中获取对应的watcher
第三步:调用process方法回调
@Override synchronized public void process(WatchedEvent event) { ReplyHeader h = new ReplyHeader(-1, -1L, 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this); } // Convert WatchedEvent to a type that can be sent over the wire WatcherEvent e = event.getWrapper(); sendResponse(h, e, "notification"); }
在回调里面会将WatchedEvent反序列化成WatcherEvent便于网络传输,而且还会在请求头里面设置一个标记-1,代表的是通知请求。
最后调用sendResponse()方法发送该通知。
这样,一个服务端处理Watcher的过程就走完了,可以看到并没有涉及任何处理Wacther的真正的业务逻辑,因为这块是在客户端执行的。
三:客户端回调
当服务端触发了watcher后,服务端使用ServerCnxn对应的TCP连接像客户端发送了一个WatcherEvent事件,当客户端收到后会进行下面的处理:
前面我们已经讲的,客户端的SendThread线程用来接收事件的通知
class SendThread extends Thread { private long lastPingSentNs; private final ClientCnxnSocket clientCnxnSocket; private Random r = new Random(System.nanoTime()); private boolean isFirstConnect = true; //用来接收服务端的事件 void readResponse(ByteBuffer incomingBuffer) throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; } if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } //反序列化 WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else if (serverPath.length() > chrootPath.length()) event.setPath(serverPath.substring(chrootPath.length())); else { LOG.warn("Got server path " + event.getPath() + " which is too short for chroot path " + chrootPath); } } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } eventThread.queueEvent( we ); return; } } }
对应请求头标记为-1的事件,首先要将字节流转为WatcherEvent对象,然后还原成WatchedEvent,最后将WatchedEvent传递给EventThread线程进行处理:
public void queueEvent(WatchedEvent event) { if (event.getType() == EventType.None && sessionState == event.getState()) { return; } sessionState = event.getState(); // materialize the watchers based on the event WatcherSetEventPair pair = new WatcherSetEventPair( watcher.materialize(event.getState(), event.getType(), event.getPath()), event); // queue the pair (watch set & event) for later processing waitingEvents.add(pair); } }
EventThread线程首先会从ZKWatcherManager中取出相应的Watcher
public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) { Set<Watcher> result = new HashSet<Watcher>(); switch (type) { case None: //...................... synchronized(dataWatches) { for(Set<Watcher> ws: dataWatches.values()) { result.addAll(ws); } if (clear) { dataWatches.clear(); } } return result; case NodeDataChanged: //.............. case NodeCreated: synchronized (dataWatches) { addTo(dataWatches.remove(clientPath), result); } synchronized (existWatches) { addTo(existWatches.remove(clientPath), result); } break; } return result; } }
从客户端回调的方法中可以看到,客户端判断出事件类型后,会从相应的存储Watcher的Map中remove掉,也就是Watcher监听是一次性的。
当获取了Watcher后,将其放入waitingEvents队列里,这是一个基于链表的有界队列。当队列里面有Watcher的时候,SendThread线程的run()方法则不断从队列里面取数据进行处理。
@Override public void run() { try { isRunning = true; while (true) { Object event = waitingEvents.take(); if (event == eventOfDeath) { wasKilled = true; } else { processEvent(event); } if (wasKilled) synchronized (waitingEvents) { if (waitingEvents.isEmpty()) { isRunning = false; break; } } } } catch (InterruptedException e) { LOG.error("Event thread exiting due to interruption", e); } LOG.info("EventThread shut down"); }
这个地方是一个串行同步的处理方式,所以不能因为某一个watcher的长时间处理而影响了客户端回调队列里其它的Watcher。
相关推荐
Zookeeper3.5.1(源码解析)
Zookeeper源码分析.epub
zookeeper 服务监控和管理,zookeeper 服务监控和管理
zookeeper 3.6.3 源码
zookeeper源码分析(一)工作原理概述 zookeeper源码分析(二)FastLeader选举算法 Zookeeper源码分析之Paxos算法之旅
zookeeper-release-3.4.8.zip源码包
目前市面上功能最强,性能最好,数据最全的一款Zookeeper监控软件工具,该工具运行在.Net Framework4及上版本的Windows环境下。绿色软件,不需要安装,直接运行。
zookeeper的dotnet客户端源码实现,ClientTests类为调用例子。
zookeeper3.4.5:原始解析
zookeeper源码
taokeeper用以监控zookeeper集群状态,根据zookeeper提供的接口,可以自动人监控zookeeper集群,在需要的时候进行告警。
第2章 ZooKeeper之序列化组件源码解析【透视现象,直击本质】 第4章 持久化【高手过招必备】 第6章 服务器启动 【由浅入深,先学好单机版,才能掌握集群版】 第7章 会话管理 【无处不在的会话其实没那么难】 第8章 ...
zookeeper ui界面源码(github)
zookeeper源码(注释版)
zookeeper源码
目前开源世界中暂没有一个比较成熟的zookeeper-monitor,公司内部的各个zookeeper运行也都是无监控,无报表状态。于是开始zookeeper监控这块工作。 目前zookeeper-monitor能做哪些事情,讲到这个,首先来看看哪些...
ZooKeeper源码阅读,庖丁解牛的带你进入zk的世界。ZooKeeper的类初始化 ZooKeeper在初始化的时候, 会调用类初始化方法, 初始化日志环境(使用SLF4J), 并且记录相关环境变量. 环境变量被存放在Environment的类中, 使用...
zkui:zookeeper的ui监控服务,zookeeper版本zookeeper-3.4.9
dubbo+zookeeper案例,dubbo和Zookeeper详解,Java源码
zookeeper 3.5.8源码下载