`
woodding2008
  • 浏览: 285398 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Zookeeper Session机制

 
阅读更多

1、zookeeper session是什么

    客户端与服务端的任何交换操作都与会话相关,包括:临时节点的生命周期,客户端的请求顺序执行以及Watcher通知机制。

 2、客户端操作

2.1、Zookeeper客户端一次创建会话的过程。

 

2.2、构造会话请求

long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,sessionTimeout, sessId, sessionPasswd);
outgoingQueue.addFirst(new Packet(null, null, conReq,null, null, readOnly));

 

2.3、处理Response

ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(),this.sessionId,conRsp.getPasswd(), isRO);

2.3.1、更新客户端Timeout等信息

void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            if (negotiatedSessionTimeout <= 0) {
                state = States.CLOSED;

                eventThread.queueEvent(new WatchedEvent(
                        Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.Expired, null));
                eventThread.queueEventOfDeath();
                throw new SessionExpiredException(
                        "Unable to reconnect to ZooKeeper service, session 0x"
                                + Long.toHexString(sessionId) + " has expired");
            }
            if (!readOnly && isRO) {
                LOG.error("Read/write client got connected to read-only server");
            }
            readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
            state = (isRO) ?
                    States.CONNECTEDREADONLY : States.CONNECTED;
            seenRwServerBefore |= !isRO;
            LOG.info("Session establishment complete on server "
                    + clientCnxnSocket.getRemoteSocketAddress()
                    + ", sessionid = 0x" + Long.toHexString(sessionId)
                    + ", negotiated timeout = " + negotiatedSessionTimeout
                    + (isRO ? " (READ-ONLY mode)" : ""));
            KeeperState eventState = (isRO) ?
                    KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            eventThread.queueEvent(new WatchedEvent(
                    Watcher.Event.EventType.None,
                    eventState, null));
        }

 

2.4、发送Ping请求

    客户端当sessionTimeout/3(readTimeout=sessionTimeout * 2 /3 )之内都没有发送通信请求后,会主动发起一个ping请求,以维持会话状态.

if (state.isConnected()) {
    int timeToNextPing = readTimeout / 2
                          - clientCnxnSocket.getIdleSend();
   if (timeToNextPing <= 0) {
             sendPing();
            clientCnxnSocket.updateLastSend();
    } else {
             if (timeToNextPing < to) {
                to = timeToNextPing;
              }
     }
}

 

 3、服务端操作 

3.1、服务端会话处理流程示意图

    

 

3.2、协商sessionTimeout

    2Ticktime <= sessionTimeout <= 20Ticktime(tickTime默认等于2s)

int sessionTimeout = connReq.getTimeOut();
 byte passwd[] = connReq.getPasswd();
 int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout) {
      sessionTimeout = minSessionTimeout;
}
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout) {
     sessionTimeout = maxSessionTimeout;
 }
cnxn.setSessionTimeout(sessionTimeout);

 

3.3、判断是否需要创建新会话

long sessionId = connReq.getSessionId();
 if (sessionId != 0) {
       long clientSessionId = connReq.getSessionId();
       LOG.info("Client attempting to renew session 0x"+ Long.toHexString(clientSessionId) + " at " + cnxn.getRemoteSocketAddress());
       serverCnxnFactory.closeSession(sessionId);
       cnxn.setSessionId(sessionId);
       reopenSession(cnxn, sessionId, passwd, sessionTimeout);
   } else {
        LOG.info("Client attempting to establish new session at "
                    + cnxn.getRemoteSocketAddress());
        createSession(cnxn, passwd, sessionTimeout);
  }

 

3.4、创建sessionId

     每次客户端创建新会话时,zookeeper都会为其分配一个全局唯一的sessionID。

public static long initializeNextSession(long id) {
        long nextSid = 0;
        nextSid = (System.currentTimeMillis() << 24) >> 8;
        nextSid =  nextSid | (id <<56);
        return nextSid;
}
synchronized public long createSession(int sessionTimeout) {
     addSession(nextSessionId, sessionTimeout);
     return nextSessionId++;
}

 

 

 3.5、注册会话

 

SessionTracker中维护两个数据结构sessionsWithTimeout和sessionsById,前者根据sessionID保存了所有会话超时时间,后者则根据sessionID保存了会话实体。

 

  HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
  ConcurrentHashMap<Long, Integer> sessionsWithTimeout; 

   synchronized public void addSession(long id, int sessionTimeout) {
        sessionsWithTimeout.put(id, sessionTimeout);
        if (sessionsById.get(id) == null) {
            SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
            sessionsById.put(id, s);
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                        "SessionTrackerImpl --- Adding session 0x"
                        + Long.toHexString(id) + " " + sessionTimeout);
            }
        } else {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
                        "SessionTrackerImpl --- Existing session 0x"
                        + Long.toHexString(id) + " " + sessionTimeout);
            }
        }
        touchSession(id, sessionTimeout);
    }

 

 3.6、激活会话

 

  3.6.1、分桶策略

   

将类似的会话放在同一区块中进行管理,以便于不同区块的隔离,相同区块的统一处理。

   分配的原则:会话的下一次超时的时间点(ExpirationTime, ExpirationInterval=tickTime 单位:毫秒)。

   ExpirationTime = CurrentTime + SessionTimeout

   ExpirationTime =  (ExpirationTime / ExpirationInterval +1) x ExpirationInterval

   以ExpirationTime为key存入Map中,实现了分桶。

 

 3.6.2、激活

 

   客户端会向服务端发送ping请求来保持会话的有效,服务端收到心跳会不断的更新对应的客户端会话。

  

HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();

private long roundToInterval(long time) {
        // We give a one interval grace period
        return (time / expirationInterval + 1) * expirationInterval;
}
 synchronized public boolean touchSession(long sessionId, int timeout) {
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,
                                     ZooTrace.CLIENT_PING_TRACE_MASK,
                                     "SessionTrackerImpl --- Touch session: 0x"
                    + Long.toHexString(sessionId) + " with timeout " + timeout);
        }
        SessionImpl s = sessionsById.get(sessionId);
        // Return false, if the session doesn't exists or marked as closing
        if (s == null || s.isClosing()) {
            return false;
        }
        long expireTime = roundToInterval(System.currentTimeMillis() + timeout);
        if (s.tickTime >= expireTime) {
            // Nothing needs to be done
            return true;
        }
        SessionSet set = sessionSets.get(s.tickTime);
        if (set != null) {
            set.sessions.remove(s);
        }
        s.tickTime = expireTime;
        set = sessionSets.get(s.tickTime);
        if (set == null) {
            set = new SessionSet();
            sessionSets.put(expireTime, set);
        }
        set.sessions.add(s);
        return true;
    }

 

3.7、会话清理

 

3.7.1、会话超时检测

       分桶策略的数据迁移是被动触发,没有做迁移的sessionId会一直保存在之前的桶中,SessionTracker线程会检测到过期(没有迁移的)的会话。

 

3.7.2、清理过程

1、标记会话为“已关闭”

2、发起“会话关闭”请求

3、收集要清理的临时节点

4、添加“节点删除”事务

5、删除临时节点

6、移除会话

7、关闭NioServerCnxn

 

    synchronized public void run() {
        try {
            while (running) {
                currentTime = System.currentTimeMillis();
                if (nextExpirationTime > currentTime) {
                    this.wait(nextExpirationTime - currentTime);
                    continue;
                }
                SessionSet set;
                set = sessionSets.remove(nextExpirationTime);
                if (set != null) {
                    for (SessionImpl s : set.sessions) {
                        setSessionClosing(s.sessionId);
                        expirer.expire(s);
                    }
                }
                nextExpirationTime += expirationInterval;
            }
        } catch (InterruptedException e) {
            LOG.error("Unexpected interruption", e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }

    synchronized public void setSessionClosing(long sessionId) {
        if (LOG.isTraceEnabled()) {
            LOG.info("Session closing: 0x" + Long.toHexString(sessionId));
        }
        SessionImpl s = sessionsById.get(sessionId);
        if (s == null) {
            return;
        }
        s.isClosing = true;
    }

    public void expire(Session session) {
        long sessionId = session.getSessionId();
        LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
                + ", timeout of " + session.getTimeout() + "ms exceeded");
        close(sessionId);
    }

    private void close(long sessionId) {
        submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
    }

    

   

PrepRequestProcessor.pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
case OpCode.closeSession:
                // We don't want to do this check since the session expiration thread
                // queues up this operation without being the session owner.
                // this request is the last of the session so it should be ok
                //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                HashSet<String> es = zks.getZKDatabase()
                        .getEphemerals(request.sessionId);
                synchronized (zks.outstandingChanges) {
                    for (ChangeRecord c : zks.outstandingChanges) {
                        if (c.stat == null) {
                            // Doing a delete
                            es.remove(c.path);
                        } else if (c.stat.getEphemeralOwner() == request.sessionId) {
                            es.add(c.path);
                        }
                    }
                    for (String path2Delete : es) {
                        addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
                                path2Delete, null, 0, null));
                    }

                    zks.sessionTracker.setSessionClosing(request.sessionId);
                }

                LOG.info("Processed session termination for sessionid: 0x"
                        + Long.toHexString(request.sessionId));
                break;

 

 

 

FinalRequestProcessor.processRequest(Request request) 
       if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
            ServerCnxnFactory scxn = zks.getServerCnxnFactory();
            // this might be possible since
            // we might just be playing diffs from the leader
            if (scxn != null && request.cnxn == null) {
                // calling this if we have the cnxn results in the client's
                // close session response being lost - we've already closed
                // the session/socket here before we can send the closeSession
                // in the switch block below
                scxn.closeSession(request.sessionId);
                return;
            }
        }
 

 

  • 大小: 342 KB
  • 大小: 333.3 KB
分享到:
评论

相关推荐

    Zookeeper篇.pdf

    1.0 zookeeper 是什么? 1.1 zookeeper 提供了什么? 1.2 zookeeper 文件系统 1.3 zookeeper 的四种类型的 znode 1.4 zookeeper 通知机制 1.5 zookeeper 有哪些应用场景?...2.8 zk 的 session 机制

    大数据运维技术第8章 ZooKeeper组件安装配置课件.pptx

    ZooKeeper相关知识; 会话(Session):Session指的是...由sessionID是ZooKeeper会话的一个重要标识,许多与会话相关的运行机制都是基于这个sessionID的,因此,无论是哪台服务器为客户端分配的sessionID,都务必保证

    深入浅出Zookeeper(二)基于Zookeeper的分布式锁与领导选举

    如上文《Zookeeper架构及FastLeaderElection机制》所述,Zookeeper提供了一个类似于Linux文件系统的树形结构。该树形结构中每个节点被称为znode,可按如下两个维度分类Persistvs.EphemeralPersist节点,一旦被创建,...

    分布式协调工具-ZooKeeper实现动态负载均衡

    ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode...

    ZookeeperJava客户端zkclient.zip

    简单、高效的Zookeeper Java客户端。... session过期自动重连、机制 快速入门 下面部分将引导使用者快速入门。 快速指南: 简单快速入门使用,满足大部分需求 FAQ: 常见的问题 标签:zkclient

    java面试资料

    精选JavaWeb面试题,包括线程池,jvm,spring框架,mybaits框架,hibernate框架,各种锁机制,zookeeper,redis,session机制等

    Java思维导图xmind文件+导出图片

    基于Zookeeper Watcher 核心机制深入源码分析 Zookeeper集群升级、迁移 基于Zookeeper实现分布式服务器动态上下线感知 深入分析Zookeeper Zab协议及选举机制源码解读 Dubbo 使用Dubbo对单一应用服务化改造 ...

    zkclient-v2.1-3-gc18569d

    简单、高效的Zookeeper Java客户端。 让Zookeeper API 使用起来更简单 非常方便订阅各种事件并自动重新绑定事件(会话建立、节点修改、节点删除、子节点变更等) session过期自动重连、机制

    Java面试宝典PLUS.pdf

    包括JavaSE基础(多态、异常处理、常用API、数据类型、IO操作、集合、多线程和并发库、内部类)、JavaSE高级(反射、动态代理、设计模式&回收机制、加载器、JVM基础、GC基础)、JavaWeb基础(JDBC技术、HTTP协议、...

    java面试笔试资料包括JAVA基础核心知识点深度学习Spring面试题等资料合集.zip

    多台web服务器之间共享session.docx 消息中间件Kafka与RabbitMQ.doc 电商项目描述注意点.doc 秒杀业务的流量削峰场景如何解决.doc 面试题:Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点.doc

    Java常见面试题208道.docx

    163.说一下 zookeeper 的通知机制? 十七、MySql 164.数据库的三范式是什么? 165.一张自增表里面总共有 7 条数据,删除了最后 2 条数据,重启 mysql 数据库,又插入了一条数据,此时 id 是几? 166.如何获取当前...

    java面试题,180多页,绝对良心制作,欢迎点评,涵盖各种知识点,排版优美,阅读舒心

    180多页面试题,前前后后不间断的更新了两年,准备换工作时,总是拿来看看,...【Zookeeper】zookeeper介绍 178 1、简介 178 2、基本概念 179 3. ZooKeeper典型应用场景 183 4、ZooKeeper在大型分布式系统中的应用 189

    蚂蚁课堂(每特学院)第一期-Java高端培训视频教程

    0022--SpringCloud.zip ├─0023--Zookeeper实战分布式锁.zip ├─0024--Dubbo.zip ├─0025--分布式定时job-xxljob.zip ├─0026--深入理解Java虚拟机.zip ├─0027--垃圾回收机制算法.zip ├─0028--MySQL优化入门...

    大型分布式网站架构与实践

     2.1.3 分布式session 69  2.2 持久化存储 71  2.2.1 MySQL扩展 72  2.2.2 HBase 80  2.2.3 Redis 91  2.3 消息系统 95  2.3.1 ActiveMQ & JMS 96  2.4 垂直化搜索引擎 104  2.4.1 Lucene简介 105  2.4.2 ...

    2021互联网大厂Java架构师面试题突击视频教程

    41_说说zookeeper一般都有哪些使用场景? 42_分布式锁是啥?对比下redis和zk两种分布式锁的优劣? 43_说说你们的分布式session方案是啥?怎么做的? 44_了解分布式事务方案吗?你们都咋做的?有啥坑? 45_说说一般...

    一个适合新手学习的电商项目

    可以将这次主要的登录映射到其他应用中用于同一个用户的登录的机制。 它是目前比较流行的企业业务整合的解决方案之一。 用户登录: 1、接收用户名和密码 2、校验用户名密码 3、生成token,可以使用UUID 4、把...

    最新Java面试题视频网盘,Java面试题84集、java面试专属及面试必问课程

    │ Java面试题79:redis数据淘汰机制.mp4 │ Java面试题80:java访问redis级redis集群?.mp4 │ Java面试题81:微信公众号分类和微信开发原理.mp4 │ Java面试题82:怎么把微信和业务平台进行绑定.mp4 │ Java面试题...

    Hyperf 协程框架 v2.1.10

    #3195 为 JsonRpcPoolTransporter 新增了重试机制, 当连接、发包、收包失败时,默认重试 2 次,收包超时不进行重试。 优化 #3169 优化了 ErrorExceptionHandler 中与 set_error_handler 相关的入参代码, 解决静态...

Global site tag (gtag.js) - Google Analytics