ClientCnxn是Zookeeper客户端和Zookeeper服务器端进行通信和事件通知处理的主要类,它内部包含两个类,1. SendThread 2. EventThread, SendThread负责客户端和服务器端的数据通信,也包括事件信息的传输,EventThread主要在客户端回调注册的Watchers进行通知处理
ClientCnxn构造方法
/**
* Creates a connection object. The actual network connect doesn't get
* established until needed. The start() instance method must be called
* subsequent to construction.
*
* @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
* @param hostProvider
* the list of ZooKeeper servers to connect to
* @param sessionTimeout
* the timeout for connections.
* @param zooKeeper
* the zookeeper object that this connection is related to.
* @param watcher watcher for this connection
* @param clientCnxnSocket
* the socket implementation used (e.g. NIO/Netty)
* @param sessionId session id if re-establishing session
* @param sessionPasswd session passwd if re-establishing session
* @param canBeReadOnly
* whether the connection is allowed to go to read-only
* mode in case of partitioning
* @throws IOException
*/
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
//如果zookeeper集群有1000台,那么会话超时时间岂不是要设置的很大?因此,zookeeper一般不会很大,3台或者5台足亦
connectTimeout = sessionTimeout / hostProvider.size();//链接超时时间是会话超时时间除以Zookeeper集群数
//读超时是会话超时的2/3
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
//读线程,在ClientCnxnd的start方法中启动
sendThread = new SendThread(clientCnxnSocket);
//会话线程,在ClientCnxnd的start方法中启动
eventThread = new EventThread();
}
对于SendThread数据传输线程包含两方面的内容,1是基于TCP/IP的Socket的数据传输,2.传输的数据内容。首先关注传输的数据内容,在TCP/IP传输的数据都是字节,因此,在SendThread发送数据之前,需要将要传输的数据结构进行序列化成字节流,服务器端会反序列化成相应的数据结构。当客户端收到服务器返回的字节流时,客户端将其反序列化为相应的数据结构。
首先看看数据的序列化和反序列化,接口定义:
package org.apache.jute //该借口不是Zookeeper原生提供的,是Apache的jute提供的
import java.io.IOException;
/**
* Interface that is implemented by generated classes.
*
*/
public interface Record {
public void serialize(OutputArchive archive, String tag) //序列化,tag在XmlInputArchive序列化器中,用作xml元素标签
throws IOException;
public void deserialize(InputArchive archive, String tag)//反序列话
throws IOException;
}
OutputArchive接口是数据结构序列化为字节流的字节流写入器,InputArchive接口是字节流反序列化为数据结构的字节流读取器。OutputArchive和InputArchive接口有三个成对使用的实现类
BinaryOutputArchive和BinaryInputArchive底层使用DataOutput和DataInput作为字节容器
XmlOutputArchive和XmlInputArchive底层使用PrintStream,XmlInputArchive使用Xml解析的方式得到相应的数据结构
CsvOutputArchive和CsvInputArchive底层使用PrintStream作为自己容器
Zookeeper客户端向服务器端发送请求,包含请求头和请求正文两部分,每个请求的请求头的类型都是一样的,而请求正文根据请求的不同,分为多种类型。
Zookeeper服务器端向客户端返回响应数据,包含响应头和响应正文两部分,每个响应的响应头的类型都是一样的,而响应正文根据请求的不同,分为多种类型。
请求头,各种请求正文,响应头和响应正文因为要在Socket上进行数据传输,所以它们应该都是可序列化和反序列话的,因此它们都是可序列化的
请求头:
public class RequestHeader implements Record {
private int xid; //请求的事务id,具体的含义和功能接下来分析
private int type; //请求类型?
public RequestHeader() {
}
public RequestHeader(
int xid,
int type) {
this.xid=xid;
this.type=type;
}
public int getXid() {
return xid;
}
public void setXid(int m_) {
xid=m_;
}
public int getType() {
return type;
}
public void setType(int m_) {
type=m_;
}
//序列化操作,将xid和type序列化到OutputArchive中,
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag); //对于最常使用的BinaryOutputArchive,此方法空实现
a_.writeInt(xid,"xid");
a_.writeInt(type,"type");
a_.endRecord(this,tag);//对于最常使用的BinaryOutputArchive,此方法空实现
}
//序列化操作,将xid和type反序列化
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);//对于最常使用的BinaryInputArchive,此方法空实现
xid=a_.readInt("xid");
type=a_.readInt("type");
a_.endRecord(tag);//对于最常使用的BinaryInputArchive,此方法空实现
}
public String toString() {
try {
java.io.ByteArrayOutputStream s =
new java.io.ByteArrayOutputStream();
CsvOutputArchive a_ =
new CsvOutputArchive(s);
a_.startRecord(this,"");//对于CsvOutputArchive,startRecord方法
a_.writeInt(xid,"xid");
a_.writeInt(type,"type");
a_.endRecord(this,"");
return new String(s.toByteArray(), "UTF-8");
} catch (Throwable ex) {
ex.printStackTrace();
}
return "ERROR";
}
public void write(java.io.DataOutput out) throws java.io.IOException {
BinaryOutputArchive archive = new BinaryOutputArchive(out);
serialize(archive, "");
}
public void readFields(java.io.DataInput in) throws java.io.IOException {
BinaryInputArchive archive = new BinaryInputArchive(in);
deserialize(archive, "");
}
public int compareTo (Object peer_) throws ClassCastException {
if (!(peer_ instanceof RequestHeader)) {
throw new ClassCastException("Comparing different types of records.");
}
RequestHeader peer = (RequestHeader) peer_;
int ret = 0;
ret = (xid == peer.xid)? 0 :((xid<peer.xid)?-1:1);
if (ret != 0) return ret;
ret = (type == peer.type)? 0 :((type<peer.type)?-1:1);
if (ret != 0) return ret;
return ret;
}
public boolean equals(Object peer_) {
if (!(peer_ instanceof RequestHeader)) {
return false;
}
if (peer_ == this) {
return true;
}
RequestHeader peer = (RequestHeader) peer_;
boolean ret = false;
ret = (xid==peer.xid);
if (!ret) return ret;
ret = (type==peer.type);
if (!ret) return ret;
return ret;
}
public int hashCode() {
int result = 17;
int ret;
ret = (int)xid;
result = 37*result + ret;
ret = (int)type;
result = 37*result + ret;
return result;
}
public static String signature() {
return "LRequestHeader(ii)";
}
}
请求正文有很多,比如
- 链接请求ConnectRequest
- 创建znode请求CreateRequest
- 节点是否存在请求ExistsRequest
- 删除znode请求DeleteRequest
- 获取child znodes请求GetChildrenRequest
- 设置znode数据SetDataRequest
- 事件WatcherEvent
以CreateRequest为例进行分析
public class CreateRequest implements Record {
private String path; //创建znode节点的path
private byte[] data; //创建znode节点时的节点数据
private java.util.List<org.apache.zookeeper.data.ACL> acl; //创建znode节点时的ACL
private int flags;//这个参数干啥的?
public CreateRequest() {
}
public CreateRequest(
String path,
byte[] data,
java.util.List<org.apache.zookeeper.data.ACL> acl,
int flags) {
this.path=path;
this.data=data;
this.acl=acl;
this.flags=flags;
}
public String getPath() {
return path;
}
public void setPath(String m_) {
path=m_;
}
public byte[] getData() {
return data;
}
public void setData(byte[] m_) {
data=m_;
}
public java.util.List<org.apache.zookeeper.data.ACL> getAcl() {
return acl;
}
public void setAcl(java.util.List<org.apache.zookeeper.data.ACL> m_) {
acl=m_;
}
public int getFlags() {
return flags;
}
public void setFlags(int m_) {
flags=m_;
}
public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this,tag);
a_.writeString(path,"path");//写入path
a_.writeBuffer(data,"data");//写入data字节数组
{
a_.startVector(acl,"acl");//写入acl,acl是List类型
if (acl!= null) {
int len1 = acl.size();
for(int vidx1 = 0; vidx1<len1; vidx1++) {
org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL) acl.get(vidx1);
a_.writeRecord(e1,"e1");//ACL也是一个Record
}
}
a_.endVector(acl,"acl");
}
a_.writeInt(flags,"flags");//写入flags
a_.endRecord(this,tag);
}
public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
path=a_.readString("path");
data=a_.readBuffer("data");
{
Index vidx1 = a_.startVector("acl");
if (vidx1!= null) { acl=new java.util.ArrayList<org.apache.zookeeper.data.ACL>();
for (; !vidx1.done(); vidx1.incr()) {
org.apache.zookeeper.data.ACL e1;
e1= new org.apache.zookeeper.data.ACL();
a_.readRecord(e1,"e1");
acl.add(e1);
}
}
a_.endVector("acl");
}
flags=a_.readInt("flags");
a_.endRecord(tag);
}
public String toString() {
try {
java.io.ByteArrayOutputStream s =
new java.io.ByteArrayOutputStream();
CsvOutputArchive a_ =
new CsvOutputArchive(s);
a_.startRecord(this,"");
a_.writeString(path,"path");
a_.writeBuffer(data,"data");
{
a_.startVector(acl,"acl");
if (acl!= null) { int len1 = acl.size();
for(int vidx1 = 0; vidx1<len1; vidx1++) {
org.apache.zookeeper.data.ACL e1 = (org.apache.zookeeper.data.ACL) acl.get(vidx1);
a_.writeRecord(e1,"e1");
}
}
a_.endVector(acl,"acl");
}
a_.writeInt(flags,"flags");
a_.endRecord(this,"");
return new String(s.toByteArray(), "UTF-8");
} catch (Throwable ex) {
ex.printStackTrace();
}
return "ERROR";
}
public void write(java.io.DataOutput out) throws java.io.IOException {
BinaryOutputArchive archive = new BinaryOutputArchive(out);
serialize(archive, "");
}
public void readFields(java.io.DataInput in) throws java.io.IOException {
BinaryInputArchive archive = new BinaryInputArchive(in);
deserialize(archive, "");
}
public int compareTo (Object peer_) throws ClassCastException {
throw new UnsupportedOperationException("comparing CreateRequest is unimplemented");
}
public boolean equals(Object peer_) {
if (!(peer_ instanceof CreateRequest)) {
return false;
}
if (peer_ == this) {
return true;
}
CreateRequest peer = (CreateRequest) peer_;
boolean ret = false;
ret = path.equals(peer.path);
if (!ret) return ret;
ret = org.apache.jute.Utils.bufEquals(data,peer.data);
if (!ret) return ret;
ret = acl.equals(peer.acl);
if (!ret) return ret;
ret = (flags==peer.flags);
if (!ret) return ret;
return ret;
}
public int hashCode() {
int result = 17;
int ret;
ret = path.hashCode();
result = 37*result + ret;
ret = java.util.Arrays.toString(data).hashCode();
result = 37*result + ret;
ret = acl.hashCode();
result = 37*result + ret;
ret = (int)flags;
result = 37*result + ret;
return result;
}
public static String signature() {
return "LCreateRequest(sB[LACL(iLId(ss))]i)";
}
}
ConnectRequest的请求数据:
private int protocolVersion;
private long lastZxidSeen; //客户端保存的Zxid最近时间,zxid有什么用呢?
private int timeOut;//会话超时时间
private long sessionId;
private byte[] passwd;
ClientCnxn的内部类Packet类封装了请求头,响应头,请求正文和响应征正文
static class Packet { RequestHeader requestHeader;//请求头 ReplyHeader replyHeader; //响应头 Record request;//请求正文 Record response; //响应正文 ByteBuffer bb;//上面四部分序列化的字节流 /** Client's view of the path (may differ due to chroot) **/ String clientPath; /** Servers's view of the path (may differ due to chroot) **/ String serverPath; boolean finished; AsyncCallback cb;//异步请求的响应Callback Object ctx; WatchRegistration watchRegistration; public boolean readOnly; /** Convenience ctor */ Packet(RequestHeader requestHeader, ReplyHeader replyHeader, Record request, Record response, WatchRegistration watchRegistration) { this(requestHeader, replyHeader, request, response, watchRegistration, false); } Packet(RequestHeader requestHeader, ReplyHeader replyHeader, Record request, Record response, WatchRegistration watchRegistration, boolean readOnly) { this.requestHeader = requestHeader; this.replyHeader = replyHeader; this.request = request; this.response = response; this.readOnly = readOnly; this.watchRegistration = watchRegistration; } public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);//序列化字节流容器 boa.writeInt(-1, "len"); // We'll fill this in later if (requestHeader != null) { requestHeader.serialize(boa, "header"); } if (request instanceof ConnectRequest) { request.serialize(boa, "connect"); // append "am-I-allowed-to-be-readonly" flag boa.writeBool(readOnly, "readOnly"); } else if (request != null) { request.serialize(boa, "request"); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray());//将字节流容器中的字节流复制给bb this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn("Ignoring unexpected exception", e); } } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("clientPath:" + clientPath); sb.append(" serverPath:" + serverPath); sb.append(" finished:" + finished); sb.append(" header:: " + requestHeader); sb.append(" replyHeader:: " + replyHeader); sb.append(" request:: " + request); sb.append(" response:: " + response); // jute toString is horrible, remove unnecessary newlines return sb.toString().replaceAll("\r*\n+", " "); } }
ClientCnxn类包含两个队列(LinkedList),队列中的元素都是Packet类型,pengdingQueue表示请求已经发送,等待响应结果;outgoingQueue表示等待发送请求的请求序列
/** * These are the packets that have been sent and are waiting for a response. */ private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>(); /** * These are the packets that need to be sent. */ private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
ClientCnxn的Socket的数据传输,将另外一篇进行单独分析
相关推荐
第2章 ZooKeeper之序列化组件源码解析【透视现象,直击本质】 第4章 持久化【高手过招必备】 第6章 服务器启动 【由浅入深,先学好单机版,才能掌握集群版】 第7章 会话管理 【无处不在的会话其实没那么难】 第8章 ...
zookeeper简单示例代码,包括对象、节点、通信协议、序列化、acl权限、curator应用、zkclient应用等。
Meteor-zookeeper-core 是一个对zookeeper操作的集成方案,其核心设计目的,对内高内聚、无入侵, 采用fastjson序列化与反序列化,内置了数据本地缓存,实现节点监听;对外开箱即用、易扩展、轻量级,提供丰富API方法,...
Redis缓存(ProtoStuff序列化) Redis Sentinel主从高可用方案 Druid(数据源配置 sql防注入 sql性能监控) Dubbo+Zookeeper分布式服务框架 合理的分布式服务划分(common+api+service+web) 资源调度和治理中心(SOA)...
json序列化和反序列化][二八例 protobuf序列化和反序列化][二九例 包管理工具 go vendor][三十例 包管理工具 go mod][三一例 zip压缩][三二例 交叉编译][三三例 线上环境部署][三四例 实现固定周期维护][三五例 聊天...
Spring:它是最强大的依赖注入框架,也...Protostuff:它基于 Protobuf 序列化框架,面向 POJO,无需编写 .proto 文件。 ZooKeeper:提供服务注册与发现功能,开发分布式系统的必备选择,同时它也具备天生的集群能力。
ZooKeeper本身可以以单机模式安装运行,不过它的长处在于通过分布式ZooKeeper集群(一个Leader,多个Follower),基于一定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。 1、Zookeeper...
Spring + Netty + Protostuff + ZooKeeper 实现了一个轻量级 RPC 框架,使用 Spring 提供依赖注入与参数配置,使用 Netty 实现 NIO 方式的数据传输,使用 Protostuff 实现对象序列化,使用 ZooKeeper 实现服务注册与...
大数据与云计算教程课件 优质大数据课程 05.Hadoop入门数据分析实战(共57页).pptx 大数据与云计算教程课件 优质大数据课程 06.HDFS(共38页).pptx 大数据与云计算教程课件 优质大数据课程 07.HDFS Shell命令(共...
大数据与云计算教程课件 优质大数据课程 05.Hadoop入门数据分析实战(共57页).pptx 大数据与云计算教程课件 优质大数据课程 06.HDFS(共38页).pptx 大数据与云计算教程课件 优质大数据课程 07.HDFS Shell命令(共...
使用Hadoop的数据和I/O构件实现压缩、数据完整性、序列化(包括Avro)和持久化;了解常见的陷阱和高级特性,以编写实用的MapReduce程序;设计、构建和管理专用的Hadoop集群——或者在云中运行Hadoop;使用Sqoop从...
使用Hadoop的数据和I/O构件实现压缩、数据完整性、序列化(包括Avro)和持久化;了解常见的陷阱和高级特性,以编写实用的MapReduce程序;设计、构建和管理专用的Hadoop集群——或者在云中运行Hadoop;使用Sqoop从...
答:①ArrayList和LinkedList可想从名字分析,它们一个是Array(动态数组)的数据结构,一个是Link(链表)的数据结构,此外,它们两个都是对List接口的实现。 前者是数组队列,相当于动态数组;后者为双向链表结构,也...
05.Hadoop入门数据分析实战 06.HDFS 07.HDFS Shell命令 08.HDFS文件接口 09.MapReduce序列化 10.MapReduce MP过程进阶 11.MapReduce IO操作 12.序列化框架 13.深入MapReduce应用开发 14.Hadoop集群配置 15.Hive 16....
05.Hadoop入门数据分析实战 06.HDFS 07.HDFS Shell命令 08.HDFS文件接口 09.MapReduce序列化 10.MapReduce MP过程进阶 11.MapReduce IO操作 12.序列化框架 13.深入MapReduce应用开发 14.Hadoop集群配置 15.Hive 16....
05.Hadoop入门数据分析实战 06.HDFS 07.HDFS Shell命令 08.HDFS文件接口 09.MapReduce序列化 10.MapReduce MP过程进阶 11.MapReduce IO操作 12.序列化框架 13.深入MapReduce应用开发 14.Hadoop集群配置 15.Hive 16....
05.Hadoop入门数据分析实战 06.HDFS 07.HDFS Shell命令 08.HDFS文件接口 09.MapReduce序列化 10.MapReduce MP过程进阶 11.MapReduce IO操作 12.序列化框架 13.深入MapReduce应用开发 14.Hadoop集群配置 15.Hive 16....
·熟悉hadoop的数据和ilo构件,用于压缩、数据集成、序列化和持久处理 ·洞悉编~mapreduce实际应用时的常见陷阱和高级特性 ·设计、构建和管理一个专用的hadoop集群或在云上运行hadoop ·使用高级查询...
理解通信协议传输过程中的序列化和反序列化机制 基于框架的RPC通信技术 WebService/ApacheCXF RMI/Spring RMI Hession 传统RPC技术在大型分布式架构下面临的问题 分布式架构下的RPC解决方案 Zookeeper ...
2.2.1 数据模型的"旋风之旅" 2.2.2 实现 2.3 安装 2.3.1 测试驱动 2.4 客户机 2.4.1 Java 2.4.2 Avro,REST,以及Thrift 2.5 示例 2.5.1 模式 2.5.2 加载数据 2.5.3 Web查询 2.6 HBase和RDBMS的比较 2.6.1 成功的...