`
357029540
  • 浏览: 726188 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论
阅读更多

      我们接下来分析下EurekaClientAutoConfiguration文件下的EurekaClient bean的注入,这个类从名字我们就知道了是一个Eureka客户端的生成类,至于传入的参数bean ApplicationInfoManager和EurekaClientConfig在前面已经介绍过了,我们直接进入EurekaClient的实现类CloudEurekaClient中,可以看到它继承了netflix的DiscoveryClient类


 
 我们首先看下CloudEurekaClient类,它有如图所示的方法和字段

这里主要说明下cancelOverrideStatus()方法(主要是为了了解字段的数据来源)

public void cancelOverrideStatus(InstanceInfo info) {
  getEurekaHttpClient().deleteStatusOverride(info.getAppName(), info.getId(), info);
}

 

 进入到该方法,我们可以看到首先有一个getEurekaHttpClient()方法,我们看下这个方法

EurekaHttpClient getEurekaHttpClient() {
	if (this.eurekaHttpClient.get() == null) {
		try {
			Object eurekaTransport = this.eurekaTransportField.get(this);
			Field registrationClientField = ReflectionUtils
					.findField(eurekaTransport.getClass(), "registrationClient");
			ReflectionUtils.makeAccessible(registrationClientField);
			this.eurekaHttpClient.compareAndSet(null,
					(EurekaHttpClient) registrationClientField.get(eurekaTransport));
		}
		catch (IllegalAccessException e) {
			log.error("error getting EurekaHttpClient", e);
		}
	}
	return this.eurekaHttpClient.get();
}

 我们首先得看下eurekaTransportField这个字段的实现,可以看到它是在构造函数中的实现是

this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
				"eurekaTransport");
 通过这段代码进入到DiscoveryClient类找到eurekaTransport字段,我们发现eurekaTransport字段是一个对应的类型为静态的私有final内部类,回到getEurekaHttpClient()方法,通过反射方法获取到的registrationClient字段正好是EurekaHttpClient接口类,这个接口类中有如图的方法

 

 

  最终回到cancelOverrideStatus()方法,我们这里就可以明确了解到它调用了一个EurekaHttpClient接口的deleteStatusOverride()方法,从其实现类中了解到它是调用服务端的一个TYPE类型为DELETE的方法。

  在这个地方说明下EurekaHttpClient的实现,默认情况下eureka提供的是Jersey来实现的restful调用,如果在不引用Jersey框架的情况,spring cloud提供了RestTemplate的方式,我们可以如图所示的方式来查看到这个引用结果

 在RestTemplateTransportClientFactory中引用了RestTemplateEurekaHttpClient类,在RestTemplateTransportClientFactories类引用了RestTemplateTransportClientFactory类,在RestTemplateDiscoveryClientOptionalArgs类应用了RestTemplateTransportClientFactory类,最后在DiscoveryClientOptionalArgsConfiguration类定义了该类的bean的生效条件是

@ConditionalOnMissingClass("com.sun.jersey.api.client.filter.ClientFilter")
   因此从后往前推就可以看到RestTemplateEurekaHttpClient生效的条件,当然在EurekaClientAutoConfiguration自动配置类中我们可以看到DiscoveryClientOptionalArgsConfiguration是通过@import注解注入的。
  以上就主要说明了在CloudEurekaClient类中的实现,我们接下来看下在《spring cloud eureka源码分析(一)》提到的在父类DiscoveryClient类中如何实现向服务器注册客户端,如何让客户端与服务端保持联系等实现。通过CloudEurekaClient构造函数进入到DiscoveryClient的构造函数中我们可以明确了解到在里面实现了很多值的初始化,我们先来看下这段代码
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
	logger.info("Client configured to neither register nor query for data.");
	scheduler = null;
	heartbeatExecutor = null;
	cacheRefreshExecutor = null;
	eurekaTransport = null;
	instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

	// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
	// to work with DI'd DiscoveryClient
	DiscoveryManager.getInstance().setDiscoveryClient(this);
	DiscoveryManager.getInstance().setEurekaClientConfig(config);

	initTimestampMs = System.currentTimeMillis();
	logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
	initTimestampMs, this.getApplications().size());

	return;  // no need to setup up an network tasks and we are done
}
     这段代码判断的是不需要向注册中心注册并且不需要向注册中心获取注册信息,把注册的实例对象赋值为当前对象就可以了,然后直接返回,后面不用做处理,这个也说明这段代码实现的有可能就是注册中心服务器的配置实现需要。
     接下来我们看到在构造函数中实现了一个定时任务定义和心跳线程池及缓存刷新的线程池,
scheduleServerEndpointTask(eurekaTransport, args);
 这段代码是为了在内部类EurekaTransport中记录当前对象的属性值,而且该内部类只有一个方法那就是shutdown(),从名字就可以知道是一个关闭操作。
  然后在看看这段代码
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
	try {
	if (!register() ) {
	throw new IllegalStateException("Registration error at startup. Invalid server response.");
	}
	} catch (Throwable th) {
	logger.error("Registration error at startup: {}", th.getMessage());
	throw new IllegalStateException(th);
	}
}
 这段代码其实判断的是是否需要向服务器端注册同时是否强制在初始化的时候注册,通过shouldEnforceRegistrationAtInit这个字段的描述说明,我们可以知道在初始的时候使用有可能会出现异常,通过后面的代码也可以看到,看下register()方法
boolean register() throws Throwable {
	logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
	EurekaHttpResponse<Void> httpResponse;
	try {
		httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
	} catch (Exception e) {
		logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
		throw e;
	}
	if (logger.isInfoEnabled()) {
		logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
	}
	return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}
 从上面的代码可以看出它直接通过调用API restful的方式进行注册了,我们在看下在构造函数中的initScheduledTasks()方法
private void initScheduledTasks() {
	if (clientConfig.shouldFetchRegistry()) {
		// registry cache refresh timer
		int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
		int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
		scheduler.schedule(
				new TimedSupervisorTask(
						"cacheRefresh",
						scheduler,
						cacheRefreshExecutor,
						registryFetchIntervalSeconds,
						TimeUnit.SECONDS,
						expBackOffBound,
						new CacheRefreshThread()
				),
				registryFetchIntervalSeconds, TimeUnit.SECONDS);
	}

	if (clientConfig.shouldRegisterWithEureka()) {
		int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
		int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
		logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

		// Heartbeat timer
		scheduler.schedule(
				new TimedSupervisorTask(
						"heartbeat",
						scheduler,
						heartbeatExecutor,
						renewalIntervalInSecs,
						TimeUnit.SECONDS,
						expBackOffBound,
						new HeartbeatThread()
				),
				renewalIntervalInSecs, TimeUnit.SECONDS);

		// InstanceInfo replicator
		instanceInfoReplicator = new InstanceInfoReplicator(
				this,
				instanceInfo,
				clientConfig.getInstanceInfoReplicationIntervalSeconds(),
				2); // burstSize

		statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
			@Override
			public String getId() {
				return "statusChangeListener";
			}

			@Override
			public void notify(StatusChangeEvent statusChangeEvent) {
				if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
						InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
					// log at warn level if DOWN was involved
					logger.warn("Saw local status change event {}", statusChangeEvent);
				} else {
					logger.info("Saw local status change event {}", statusChangeEvent);
				}
				instanceInfoReplicator.onDemandUpdate();
			}
		};

		if (clientConfig.shouldOnDemandUpdateStatusChange()) {
			applicationInfoManager.registerStatusChangeListener(statusChangeListener);
		}

		instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
	} else {
		logger.info("Not registering with Eureka server per configuration");
	}
}
 在方法中我们启动了定时任务,我们首先看下cacheRefresh定时任务,在该定时任务中实现了CacheRefreshThread()线程类,进入到该线程类它调用了refreshRegistry()方法,进入该方法我们看下它调用的fetchRegistry()方法
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
	Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

	try {
		// If the delta is disabled or if it is the first time, get all
		// applications
		Applications applications = getApplications();

		if (clientConfig.shouldDisableDelta()
				|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
				|| forceFullRegistryFetch
				|| (applications == null)
				|| (applications.getRegisteredApplications().size() == 0)
				|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
		{
			......
			getAndStoreFullRegistry();
		} else {
			getAndUpdateDelta(applications);
		}
		applications.setAppsHashCode(applications.getReconcileHashCode());
		logTotalInstances();
	} catch (Throwable e) {
		logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
		return false;
	} finally {
		if (tracer != null) {
			tracer.stop();
		}
	}

	// Notify about cache refresh before updating the instance remote status
	onCacheRefreshed();

	// Update remote status based on refreshed data held in the cache
	updateInstanceRemoteStatus();

	// registry was fetched successfully, so return true
	return true;
}
 在getAndStoreFullRegistry()方法中
private void getAndStoreFullRegistry() throws Throwable {
	....

	Applications apps = null;
	EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
			? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
			: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
	if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
		apps = httpResponse.getEntity();
	}
	......
        localRegionApps.set(this.filterAndShuffle(apps));
        .....
}
 通过方法看到我们去服务器端获取了所有的注册信息然后保存起来,在看下getAndUpdateDelta()方法
private void getAndUpdateDelta(Applications applications) throws Throwable {
	long currentUpdateGeneration = fetchRegistryGeneration.get();

	Applications delta = null;
	EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
	if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
		delta = httpResponse.getEntity();
	}

	if (delta == null) {
		.....
		getAndStoreFullRegistry();
	} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
		.....
		String reconcileHashCode = "";
		if (fetchRegistryUpdateLock.tryLock()) {
			try {
				updateDelta(delta);
				reconcileHashCode = getReconcileHashCode(applications);
			} finally {
				fetchRegistryUpdateLock.unlock();
			}
		} else {
			.....
		}
		// There is a diff in number of instances for some reason
		if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
			reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
		}
	} else {
		......
	}
}
   在上面方法中我们通过去获取服务器上面的增量实例,然后判断是否有新增的,没有则去调用getAndStoreFullRegistry()方法来全量获取信息,否则通过锁机制去更新本地的实例数据,经过上面的步骤我们就完成了缓存的更新。
   我们在看下heartbeat的定时任务,在这个定时任务中实现了HeartbeatThread线程类,该线程类提供了一个renew()方法
boolean renew() {
	EurekaHttpResponse<InstanceInfo> httpResponse;
	try {
		httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
		......
		if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
			REREGISTER_COUNTER.increment();
			.....
			long timestamp = instanceInfo.setIsDirtyWithTime();
			boolean success = register();
			if (success) {
				instanceInfo.unsetIsDirty(timestamp);
			}
			return success;
		}
		return httpResponse.getStatusCode() == Status.OK.getStatusCode();
	} 
	.....
}
 在方法中我们首先发送一个心跳信息到服务端,如果返回正常则成功,如果没有发现,则调用register()方法进行注册。
 通过上面源码的分析我们就知道了如何向服务端注册以及保持连接,至于和ribbon的负载均衡就暂时不整理了,客户端的介绍就到这里,接下来我们一起分析服务端的源码实现。
 
  • 大小: 16.5 KB
  • 大小: 35.7 KB
  • 大小: 46.5 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics