`
357029540
  • 浏览: 726037 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论
阅读更多

      在上篇文章中我们介绍了EurekaServerContext Bean的注入,我们接下来看下PeerAwareInstanceRegistry类,这个类主要是用作服务实例对象的注册,在EurekaServerAutoConfiguration类代码的定义中它首先通过客户端获取所有的application对象,这些对象是已经注册到服务端的实例对象,这里就不多做介绍了,可以自己去看实现类,这里我们主要了解下InstanceRegistry代码的实现,进入到InstanceRegistry类,可以看到有如下方法

   1.我们首先看下register()方法,它有2个不同参数的方法,从参数名称上理解分别是实例对象(InstanceInfo),有效时间(leaseDuration),是否需要复制到其他节点(isReplication),在这个方法中它都回去调用一个事件发布的方法进行消息事件的发布,我们直接通过2个参数的register()方法去查看父类的方法,该父类是PeerAwareInstanceRegistryImpl类

public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
     在该register()方法中它又调用了父类AbstractInstanceRegistry类的register()方法,以及replicateToPeers()方法,在调用父类的注册方法前,它提供了默认的leaseDuration(90s)或通过配置获取到的leaseDuration值,在AbstractInstanceRegistry类的register()方法中,总的来说就是将新增的InstanceInfo对象存入到一个ConcurrentHashMap对象的register变量中,当然还在一些自定义队列对象中添加了该InstanceInfo对象,接下来我们看看注册完后的replicateToPeers()方法
private void replicateToPeers(Action action, String appName, String id,
                              InstanceInfo info /* optional */,
                              InstanceStatus newStatus /* optional */, boolean isReplication) {
    Stopwatch tracer = action.getTimer().start();
    try {
        if (isReplication) {
            numberOfReplicationsLastMin.increment();
        }
        // If it is a replication already, do not replicate again as this will create a poison replication
        if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
            return;
        }

        for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
            // If the url represents this host, do not replicate to yourself.
            if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                continue;
            }
            replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
        }
    } finally {
        tracer.stop();
    }
}
   通过上面的代码知道如果没有服务端节点或是其他节点同步过来的服务节点数据就不进行后面的数据处理,直接返回,如果上面条件都不满足则循环向除了当前节点的其他节点进行注册,通过上面的代码我们就明白了eureka是怎么实现服务中心数据的同步以及防止实例注册的循环传播。

 

  2.下面我们看下cancel()方法

public boolean cancel(final String appName, final String id,
                      final boolean isReplication) {
    if (super.cancel(appName, id, isReplication)) {
        replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                // Since the client wants to cancel it, reduce the number of clients to send renews
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                updateRenewsPerMinThreshold();
            }
        }
        return true;
    }
    return false;
}
     通过上面的代码了解到,它会去调用AbstractInstanceRegistry类的cancel()方法,在该方法中,它去完成了从ConcurrentHashMap对象的register变量中移除当前需要服务端实例,它也去调用了register()方法中提到的replicateToPeers()方法,只不过最终去调用了各个服务器实例的cancel方法实现。最后去定义了服务实例需要发送的个数以及每分钟刷新的实例个数。

 

  3.在看下renew()方法

public boolean renew(final String appName, final String id, final boolean isReplication) {
    if (super.renew(appName, id, isReplication)) {
        replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
        return true;
    }
    return false;
}
   从其实现的代码上看,知道它是一个心跳检测方法,首先它也去进行了服务器实例的判断,有该实例才进行心跳的检测,它也调用了replicateToPeers()方法,只不过最终去调用了各个服务器实例的heartbeat方法实现。

 

  4.看下这个internalCancel()方法,这个方法从实现来说只是删除了内部的服务器实例,并没有像cancel()方法那样最终调用远程的去删除,这种可能是在当前服务器下线的时候实现的,删除自己保存的其他服务实例即可。

 5.openForTraffic()方法从名字上看可以理解为正常运行什么之类的,其实只看这个还真不知道是做什么用的,我们还是去看看它的实现,它首先去调用了一个更新每分钟刷新个数的公式,然后判断了isAws进行对服务器节点的hearbeat操作,在将ApplicationInfoManager里面的实例对象状态更新为UP,最后在调用了父类AbstractInstanceRegistry类的postInit()方法,在该方法中应用了一个EvictionTask类,我们主要看下在这个类的run()方法中调用的evict()方法,它首先进行了如下判断

if (!isLeaseExpirationEnabled()) {
     ......
     return;
}

      这个判断是说是否开启了过期时间,如果开启过期时间则进行后面的判断,在isLeaseExpirationEnabled()方法中,我们可以看到

public boolean isLeaseExpirationEnabled() {
    if (!isSelfPreservationModeEnabled()) {
        // The self preservation mode is disabled, hence allowing the instances to expire.
        return true;
    }
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
}
     首先它判断是否开启了自我保护机制,就是在eureka管理页面上看到的那句红色的字,如果开启了自我保护机制,则不在进行对超时的客户端实例对象进行删除操作,如果没有开启自我保护机制则判断每分钟产生的特定实例数量大于0并且上一分钟产生的心跳数量每分钟产生的特定实例数量则没有过期

 

List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
    Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
    if (leaseMap != null) {
        for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
            Lease<InstanceInfo> lease = leaseEntry.getValue();
            if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                expiredLeases.add(lease);
            }
        }
    }
}

        它去遍历了在ConcurrentHashMap对象的register变量中的所有注册实例,对满足已经不存在或当前系统时间大于上次更新时间的实例对象添加到expeiredLease列表中
 

int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;

int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);
            
            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            ......
            internalCancel(appName, id, false);
    }
}

       然后判断服务器实例总数减去了服务器实例*更新阈值百分比与expeiredLease个数的最小值,如果最新值大于0,则随机的获取删除的实例,调用internalCancel()方法进行删除操作。

      通过上面的源码分析我们了解到了服务中心的数据同步以及防止数据的传播性,以及在是否开启自我保护机制的时候如何进行过期实例对象的删除

       对于剩下的EurekaServerBootstrap bean的实现中的contextInitialized()方法,在其中initEurekaEnvironment()方法主要是初始化一些默认值的添加,而在initEurekaServerContext()方法中,它基本上都是在调用上面介绍的PeerAwareInstanceRegistry类,实现openForTraffic()方法以及通过eurekaClient.getApplications()去其他服务器实例中获取已经注册的客户端对象,然后注册到当前的服务器实例中。

          在这里还要说明下在官方文档中提到了Why Is It so Slow to Register a Service?通过该文档介绍说是因为默认的一个实例向服务注册发送心跳的默认时间是30秒,只有当实例、服务器和客户端的本地缓存中都有相同的元数据时,客户端才能发送服务,因此可能需要3次心跳来验证,我们可以通过配置参数eureka.instance.leaseRenewalIntervalInSecond来改变心跳的发送时间间隔,但是在生产环境,最好还是不要改变默认的30秒时间间隔,因为在服务器中内部计算对租约时间进行了假设。

      到此我们就分析完了所有的eureka主要代码。

  • 大小: 31.6 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics