我们接下来分析下EurekaClientAutoConfiguration文件下的EurekaClient bean的注入,这个类从名字我们就知道了是一个Eureka客户端的生成类,至于传入的参数bean ApplicationInfoManager和EurekaClientConfig在前面已经介绍过了,我们直接进入EurekaClient的实现类CloudEurekaClient中,可以看到它继承了netflix的DiscoveryClient类
我们首先看下CloudEurekaClient类,它有如图所示的方法和字段
这里主要说明下cancelOverrideStatus()方法(主要是为了了解字段的数据来源)
public void cancelOverrideStatus(InstanceInfo info) {
getEurekaHttpClient().deleteStatusOverride(info.getAppName(), info.getId(), info);
}
进入到该方法,我们可以看到首先有一个getEurekaHttpClient()方法,我们看下这个方法
EurekaHttpClient getEurekaHttpClient() {
if (this.eurekaHttpClient.get() == null) {
try {
Object eurekaTransport = this.eurekaTransportField.get(this);
Field registrationClientField = ReflectionUtils
.findField(eurekaTransport.getClass(), "registrationClient");
ReflectionUtils.makeAccessible(registrationClientField);
this.eurekaHttpClient.compareAndSet(null,
(EurekaHttpClient) registrationClientField.get(eurekaTransport));
}
catch (IllegalAccessException e) {
log.error("error getting EurekaHttpClient", e);
}
}
return this.eurekaHttpClient.get();
}
我们首先得看下eurekaTransportField这个字段的实现,可以看到它是在构造函数中的实现是
this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
"eurekaTransport");
通过这段代码进入到DiscoveryClient类找到eurekaTransport字段,我们发现eurekaTransport字段是一个对应的类型为静态的私有final内部类,回到getEurekaHttpClient()方法,通过反射方法获取到的registrationClient字段正好是EurekaHttpClient接口类,这个接口类中有如图的方法
最终回到cancelOverrideStatus()方法,我们这里就可以明确了解到它调用了一个EurekaHttpClient接口的deleteStatusOverride()方法,从其实现类中了解到它是调用服务端的一个TYPE类型为DELETE的方法。
在这个地方说明下EurekaHttpClient的实现,默认情况下eureka提供的是Jersey来实现的restful调用,如果在不引用Jersey框架的情况,spring cloud提供了RestTemplate的方式,我们可以如图所示的方式来查看到这个引用结果
在RestTemplateTransportClientFactory中引用了RestTemplateEurekaHttpClient类,在RestTemplateTransportClientFactories类引用了RestTemplateTransportClientFactory类,在RestTemplateDiscoveryClientOptionalArgs类应用了RestTemplateTransportClientFactory类,最后在DiscoveryClientOptionalArgsConfiguration类定义了该类的bean的生效条件是
@ConditionalOnMissingClass("com.sun.jersey.api.client.filter.ClientFilter")
因此从后往前推就可以看到RestTemplateEurekaHttpClient生效的条件,当然在EurekaClientAutoConfiguration自动配置类中我们可以看到DiscoveryClientOptionalArgsConfiguration是通过@import注解注入的。
以上就主要说明了在CloudEurekaClient类中的实现,我们接下来看下在《spring cloud eureka源码分析(一)》提到的在父类DiscoveryClient类中如何实现向服务器注册客户端,如何让客户端与服务端保持联系等实现。通过
CloudEurekaClient构造函数进入到DiscoveryClient的构造函数中我们可以明确了解到在里面实现了很多值的初始化,我们先来看下这段代码
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
这段代码判断的是不需要向注册中心注册并且不需要向注册中心获取注册信息,把注册的实例对象赋值为当前对象就可以了,然后直接返回,后面不用做处理,这个也说明这段代码实现的有可能就是注册中心服务器的配置实现需要。
接下来我们看到在构造函数中实现了一个定时任务定义和心跳线程池及缓存刷新的线程池,
scheduleServerEndpointTask(eurekaTransport, args);
这段代码是为了在内部类
EurekaTransport中记录当前对象的属性值,而且该内部类只有一个方法那就是shutdown(),从名字就可以知道是一个关闭操作。
然后在看看这段代码
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
这段代码其实判断的是是否需要向服务器端注册同时是否强制在初始化的时候注册,通过shouldEnforceRegistrationAtInit这个字段的描述说明,我们可以知道在初始的时候使用有可能会出现异常,通过后面的代码也可以看到,看下register()方法
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
从上面的代码可以看出它直接通过调用API restful的方式进行注册了,我们在看下在构造函数中的initScheduledTasks()方法
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
在方法中我们启动了定时任务,我们首先看下cacheRefresh定时任务,在该定时任务中实现了CacheRefreshThread()线程类,进入到该线程类它调用了refreshRegistry()方法,进入该方法我们看下它调用的fetchRegistry()方法
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
......
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
在getAndStoreFullRegistry()方法中
private void getAndStoreFullRegistry() throws Throwable {
....
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
......
localRegionApps.set(this.filterAndShuffle(apps));
.....
}
通过方法看到我们去服务器端获取了所有的注册信息然后保存起来,在看下
getAndUpdateDelta()方法
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
.....
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
.....
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
.....
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
......
}
}
在上面方法中我们通过去获取服务器上面的增量实例,然后判断是否有新增的,没有则去调用getAndStoreFullRegistry()方法来全量获取信息,否则通过锁机制去更新本地的实例数据,经过上面的步骤我们就完成了缓存的更新。
我们在看下heartbeat的定时任务,在这个定时任务中实现了HeartbeatThread线程类,该线程类提供了一个renew()方法
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
......
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
REREGISTER_COUNTER.increment();
.....
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
}
.....
}
在方法中我们首先发送一个心跳信息到服务端,如果返回正常则成功,如果没有发现,则调用register()方法进行注册。
通过上面源码的分析我们就知道了如何向服务端注册以及保持连接,至于和ribbon的负载均衡就暂时不整理了,客户端的介绍就到这里,接下来我们一起分析服务端的源码实现。
- 大小: 16.5 KB
- 大小: 35.7 KB
- 大小: 46.5 KB
分享到:
相关推荐
Spring Cloud Eureka源码分析
为什么要看源码: 1、提升技术功底:学习源码里的优秀设计思想,比如一些疑难问题的解决思路,还有一些优秀的设计模式,整体提 升自己的技术功底 2、深度掌握技术框架:源码看多了,对于一个新技术或框架的掌握速度会有...
负载均衡器源码分析 33 负载均衡器重试机制 33 服务保护机制SpringCloud Hystrix 33 微服务高可用技术 33 服务降级、熔断、限流概念 34 服务学崩效应 34 服务降级 34 服务熔断 35 服务隔离 35 服务限流 36 ...
spring cloud组件源码分析,包括Eureka,feign,gateway,ribbon,Hystrix组件
本文将从SpringCloud源码角度出发,让大家能够了解到相关组件内部的运行机制,从而更好的回馈开发的流程和配置上,为用户提供更好的方案。SpringCloud提供了微服务架构中的众多组件,例如API网关、注册中心、负载...
资源名字:基于Springcloud+mysql的分布式架构网上商城设计与实现(源码+设计文档+部署说明+视频演示).zip 资源内容:项目全套源码+完整文档 源码说明: 全部项目源码都是经过测试校正后百分百成功运行。 基于...
基于SpingBoot+SpringCloud+Maven+Eureka+Vue的分布式架构网上商城系统源码+数据已获导师指导。 本项目是一套基于SpringCloud的分布式架构网上商城系统,主要针对计算机相关专业的正在做毕设的学生和需要项目实战...
面试必考之HashMap源码分析与实现 ,微服务架构之Spring Cloud Eureka 场景分析与实战,高性能必学之Mysql主从架构实践 ,架构师不得不知道的Spring事物不能回滚的深层次原因 ,分库分表之后分布式下如何保证ID全局...
视频讲解知识内容包括:HashMap源码分析与实现、JVM底层奥秘ClassLoader源码分析与案例讲解、大型网站数据库瓶颈之数据库分库分表方案实践、Spring Cloud Eureka场景分析与实战、分库分表之后分布式下如何保证ID全局...
前文 SpringCloud 简介 SpringCloud 版本选型 SpringCloud 工程构建 SpringCloud —— Eureka 注册中心 SpringCloud —— Eureka 集群 ...文章目录前文Ribbon 负载均衡原理Ribbon 源码分析RoundRo
Spring全家桶源码分析 Tomcat架构原理 Web请求处理原理 数据访问层框架原理 架构与设计思维模式 程序中的数学 数据分析 机器智能算法剖析与应用 云原生 自动化DevOps 流量治理 链路监控 弹性扩容 分布式存储Redis6.0...
应用Spring Cloud Eureka作为服务注册中心 应用Spring Cloud Zuul作为网关分发请求 应用MyBatis-Plus作为持久层框架 使用Ribbon实现了负载均衡技术,自定义均衡算法 拆分Spring Security成单独微服务作为权限验证...
java版商城源码下载 关于作者 前腾讯、前阿里员工,从事Java后台工作; 对Docker和Kubernetes充满热爱...Eureka源码分析专题 spring-cloud-alibaba实战 ; ; ; ; ; spring-cloud-kubernetes特辑 dubbo实战特辑 Docke
2017卧底面试题答案解析.txt...微服务架构之Spring Cloud Eureka 场景分析与实战 高性能必学之Mysql主从架构实践 架构师不得不知道的Spring事物不能回滚的深层次原因 大型公司面试必答之数据结构与算法精讲 ... 等
java版商城源码下载 关于作者 ...前腾讯、前阿里员工,从事Java后台工作; 对Docker和Kubernetes充满热爱;...Eureka源码分析专题 spring-cloud-alibaba实战 ; ; ; ; ; spring-cloud-kubernetes特辑 Docker 基础知
java版商城源码下载 关于作者 ...前腾讯、前阿里员工,从事Java后台工作; 对Docker和Kubernetes充满热爱;...Eureka源码分析专题 spring-cloud-alibaba实战 ; ; ; ; ; spring-cloud-kubernetes特
【原始笔记】专注于Java... SpringCloud(Eureka,Ribbon,Hystrix,Zuul,Config,Feign ...) 四郎 Tomcat 西塔 运河 联合会 卡夫卡 纳科斯 动物园管理员 我的猫 ...... 微信搜: 公众号:原始笔记 联系我:艰苦奋斗
10.微服务架构之Spring Cloud Eureka 场景分析与实战 11.高性能必学之Mysql主从架构实践 13.RPC底层通讯原理之Netty线程模型源码分析 14.分库分表之后分布式下如何保证ID全局唯一性 10道腾讯的Java面试题.txt Dubbo...
│ │ 13.RPC底层通讯原理之Netty线程模型源码分析.wmv │ │ │ ├─14.分库分表之后分布式下如何保证ID全局唯一性 │ │ 14.分库分表之后分布式下如何保证ID全局唯一性.mp4 │ │ │ └─15.大型公司面试必答之...