1. Eureka 服务注册中心
1.1 关于服务注册中心
服务注册中心本质上是为了解耦服务提供者和服务消费者。
对于任何一个微服务,原则上都应存在或者支持多个提供者,这是由微服务的分布式属性决定的。
为了支持弹性扩容缩特性,一个微服务的提供者的数量和分布往往是动态变化的,因此,原本单体应用阶段常用的静态LB就不再适用。需要引入额外的组件来管理微服务提供者的注册与发现,而这个组件就是服务注册中心。
1.1.1 服务注册中心的原理
分布式微服务架构中,服务注册中⼼⽤于存储服务提供者地址信息、服务发布相关
的属性信息,消费者通过主动查询和被动通知的⽅式获取服务提供者的地址信息,
⽽不再需要通过硬编码⽅式得到提供者的地址信息。消费者只需要知道当前系统发
布了那些服务,⽽不需要知道服务具体存在于什么位置,这就是透明化路由。
- 服务提供者启动
- 服务提供者将相关服务信息主动注册到注册中心
- 服务消费者获取服务注册信息
- Pull模式:服务消费者可以主动拉取可⽤的服务提供者清单
- Push模式:服务消费者订阅服务(当服务提供者有变化时,注册中心也会主动推送
更新后的服务清单给消费者)
- 服务消费者直接调用服务提供者
另外,注册中⼼也需要完成服务提供者的健康监控,当发现服务提供者失效时需要
及时剔除
1.1.2 主流服务注册中心
- Zookeeper - CP
- Eureka - AP
- Consul - CP
- Nacos - 支持AP/CP切换
1.2 服务注册中心组件 Eureka
Eureka 基础架构
Eureka 交互流程及原理
- 图中us-east-1c、us-east-1d,us-east-1e代表不同的区也就是不同的机房
- 图中每⼀个Eureka Server都是⼀个集群。
- 图中Application Service作为服务提供者向Eureka Server中注册服务,
Eureka Server接受到注册事件会在集群和分区中进⾏数据同步,Application
Client作为消费端(服务消费者)可以从Eureka Server中获取到服务注册信
息,进行服务调用。 - 微服务启动后,会周期性地向Eureka Server发送⼼跳(默认周期为30秒)
以续约自己的信息。 - Eureka Server在⼀定时间内没有接收到某个微服务节点的心跳,Eureka
Server将会注销该微服务节点(默认90秒)。 - 每个Eureka Server同时也是Eureka Client,多个Eureka Server之间通过复
制的⽅式完成服务注册列表的同步。 - Eureka Client会缓存Eureka Server中的信息。即使所有的Eureka Server节
点都宕掉,服务消费者依然可以使用缓存中的信息找到服务提供者。
Eureka通过心跳检测、健康检查和客户端缓存等机制,提高系统的灵活性、可
伸缩性和可用性。
1.3 Eureka 高可用搭建与应用
// TODO
1.4 Eureka 部分细节讲解
1.4.1 元数据
Eureka的元数据有两种:标准元数据和自定义元数据。
- **标准元数据:**主机名、IP 地址、端口等信息都会发布在服务注册表中,用于服务之间调用
- **自定义元数据:**可以使用eureka.instance.metadata-map配置,符合KEY/VALUE的
存储格式。这些元数据可以在远程客户端中访问。
eureka:
client:
service-url:
defaultZone: http://s1:10010/eureka/,http://s2:10011/eureka/
instance:
prefer-ip-address: true
instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}:@project.version@
# 自定义元数据
metadata-map:
region: chengdu
获取自定义元数据信息
List<ServiceInstance> instances = discoveryClient.getInstances("service-name");
// 显示服务元数据
for (ServiceInstance instance : instances) {
System.out.println(instance.getMetadata());
}
1.4.2 客户端
服务注册
当导入了 eureka-client 依赖坐标后, 配置 eureka 服务注册中心地址,,服务在启动时会向注册中心,发起注册请求, 携带服务元数据信息, eureka 注册中心回把服务的信息保存在 Map中。
服务续约(服务提供者)
服务每隔 30 秒会向注册中心续约(报告心跳)一次,如果没有续约,租约将在 90 秒后到期, 然后服务会被剔除。
获取服务列表(服务消费者)
每隔 30 秒会从注册中心拉取一份服务列表,该时间可通过eureka.client.registry-fetch-interval-seconds配置。
服务消费者启动时,从 EurekaServer 服务列表获取只读备份,缓存到本地。每隔 30 秒,会重新获取并更新数据。
1.4.3 服务端
服务下线
当服务正常关闭操作时,会发送服务下线的 REST 请求给 EurekaServer。服务中心接受请求后,将该服务下线。
失效剔除
Eureka Server会定时(间隔值是eureka.server.eviction-interval-timer-in-ms,默认60s)进行检查,如果发现实例在在一定时间(此值由客户端设置的eureka.instance.lease-expiration-duration-in-seconds定义,默认值为90s)内没有收到心跳,则会注销此实例。
自我保护
如果在15分钟内超过85%的客户端节点都没有正常的心跳,那么Eureka就认为客户端与注册中心出现了网络故障,Eureka Server自动进入自我保护机制。
为什么会有自我保护机制?
默认情况下,如果Eureka Server在一定时间内(默认90秒)没有接收到某个微服务实例的心跳,Eureka Server将会移除该实例。但是当网络分区故障发生时,微服务与Eureka Server之间无法正常通信,而微服务本身是正常运行的,此时不应该移除这个微服务,所以引入了自我保护机制。
当处于自我保护模式时
- 不会剔除任何服务实例(可能是服务提供者和EurekaServer之间网络问题)保证了大多数服务依然可用。
- Eureka Server仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上,保证当前节点依然可用,当网络稳定时,当前Eureka Server新的注册信息会被同步到其它节点中。
- 在Eureka Server工程中通过eureka.server.enable-self-preservation配置可用关停自我保护,默认值是打开。
eureka:
server:
enable-self-preservation: false # 关闭自我保护模式(缺省为打开)
建议生产环境打开自我保护机制
1.5 Eureka 核心源码分析
1.5.1 Eureka Server 启动过程
入口:
在spring-cloud-netflix-eureka-server jar 中 META-INF 文件夹下,查看 spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
Spring Boot 将自动装配 EurekaServerAutoConfiguration 类
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
// ...
}
由于 @ConditionalOnBean 注解要实现该配置类的自动装配,首先需要在 IOC 容器中注入 Marker 对象。
在实现 Eureka Server 时,代码中只使用了 @EnableEurekaServer 注解。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
在该注解中又引入了 EurekaServerMarkerConfiguration 类,在该类中注入了 Marker 实例。
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
@Bean
public Marker eurekaServerMarkerBean() {
return new Marker();
}
class Marker {
}
}
所以,只有在添加了 @EnableEurekaServer 注解后,才会执行后续逻辑。
回到EurekaServerAutoConfiguration类中
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
/*
...
*/
/**
* 注入一个对外的接口。
* 该接口即 Eureka 的 dashboard 界面
* 若想关闭该接口,可在配置文件中配置eureka.dashboard.enabled=false
**/
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
matchIfMissing = true)
public EurekaController eurekaController() {
return new EurekaController(this.applicationInfoManager);
}
/**
* 对等节点感知实例注册器
* 在集群模式中生效,Eureka Server 集群中的节点都是对等的,没有主从之分
**/
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
ServerCodecs serverCodecs) {
this.eurekaClient.getApplications(); // force initialization
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
serverCodecs, this.eurekaClient,
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
/**
* 注入 PeerEurekaNodes
* 用来管理 eureka 集群节点,如创建、更新、获取节点信息
* eureka 集群节点具体的操作是在 PeerEurekaNode 类中
* 比如注册实例,取消实例,实例发送心跳,更新实例状态,删除实例等
**/
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
ServerCodecs serverCodecs,
ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
replicationClientAdditionalFilters);
}
/**
* 注入 Eureka Server 上下文
* EurekaServerContext 为其他中间件提供使用 Eureka 相关服务配置的入口
**/
@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
registry, peerEurekaNodes, this.applicationInfoManager);
}
/**
* 注入 EurekaServerBootstrap
**/
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager,
this.eurekaClientConfig, this.eurekaServerConfig, registry,
serverContext);
}
/**
* 注册 Jersey 过滤器,过滤掉含"/eureka"的请求。
*
* Register the Jersey filter.
* @param eurekaJerseyApp an {@link Application} for the filter to be registered
* @return a jersey {@link FilterRegistrationBean}
*/
@Bean
public FilterRegistrationBean<?> jerseyFilterRegistration(
javax.ws.rs.core.Application eurekaJerseyApp) {
FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
bean.setFilter(new ServletContainer(eurekaJerseyApp));
bean.setOrder(Ordered.LOWEST_PRECEDENCE);
bean.setUrlPatterns(
Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));
return bean;
}
/*
...
*/
}
关注个别类中的主要实现
com.netflix.eureka.cluster.PeerEurekaNodes
在该类中主要关注 start 方法
public void start() {
// 创建线程池
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 启动更新节点信息
updatePeerEurekaNodes(resolvePeerUrls());
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
// 子线程中动态更新节点信息
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
// 放入线程池中,根据配置文件中配置的间隔时间,定时执行
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
在 DefaultEurekaServerContext 类中,主要关注 initialize 方法。
该方法会在类实例构建完毕后立即执行。
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
// 执行上面 PeerEurekaNodes 中的 start 方法。
peerEurekaNodes.start();
try {
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}
在 EurekaServerBootstrap 类中,主要关注 contextInitialized 方法
public void contextInitialized(ServletContext context) {
try {
// 根据配置文件中的内容,初始化 Eureka 环境
initEurekaEnvironment();
// 初始化上下文 重点关注该方法
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
log.error("Cannot bootstrap eureka server :", e);
throw new RuntimeException("Cannot bootstrap eureka server :", e);
}
}
protected void initEurekaEnvironment() throws Exception {
log.info("Setting the eureka configuration..");
String dataCenter = ConfigurationManager.getConfigInstance()
.getString(EUREKA_DATACENTER);
if (dataCenter == null) {
log.info(
"Eureka data center value eureka.datacenter is not set, defaulting to default");
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
}
String environment = ConfigurationManager.getConfigInstance()
.getString(EUREKA_ENVIRONMENT);
if (environment == null) {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
log.info(
"Eureka environment value eureka.environment is not set, defaulting to test");
}
else {
ConfigurationManager.getConfigInstance()
.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
}
}
// 初始化上下文
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
this.eurekaClientConfig, this.registry, this.applicationInfoManager);
this.awsBinder.start();
}
// 为非 IOC 容器提供获取 EurekaServerContext 的接口
// 实现细节见下文
EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
// Copy registry from neighboring eureka node
// 从集群中其他 EurekaServer 同步注册信息
// 实现细节见下文
int registryCount = this.registry.syncUp();
// 更改实例状态为 UP,对外服务
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}
EurekaServerContextHolder 类
public class EurekaServerContextHolder {
private final EurekaServerContext serverContext;
private EurekaServerContextHolder(EurekaServerContext serverContext) {
this.serverContext = serverContext;
}
public EurekaServerContext getServerContext() {
return this.serverContext;
}
private static EurekaServerContextHolder holder;
public static synchronized void initialize(EurekaServerContext serverContext) {
holder = new EurekaServerContextHolder(serverContext);
}
public static EurekaServerContextHolder getInstance() {
return holder;
}
}
syncUp
方法的具体实现在 PeerAwareInstanceRegistryImpl
类中
@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {
@Override
public int syncUp() {
// Copy entire entry from neighboring DS node
// 注册数量
int count = 0;
// 多次尝试从其他节点同步注册信息到自己的注册表中
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
// ① 当第一次没有拿到注册信息时,当前线程 sleep
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 从本地com.netflix.discovery.DiscoveryClient#localRegionApps 本地变量中获取
Applications apps = eurekaClient.getApplications();
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
// ② 注册到本地的注册表中
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
}
上面代码①处,对当前线程进行了 sleep,那么,是在什么时候把节点信息保存到 DiscoveryClient 中的 localRegionApps 中呢?
答案就在 com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry
该方法用于全量同步注册表
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
logger.info("Getting all instance registry info from the eureka server");
Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
// 向节点发送GET请求到 /apps 接口 全量抓取注册表
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 将 Applications 缓存到本地
localRegionApps.set(this.filterAndShuffle(apps));
// Applications 中返回了注册中心 apps 的 hash 值
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
代码 ② 处,把同步的注册信息注册到本地的注册表中。register
方法在类 AbstractInstanceRegistry
中
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
/*
....
*/
// 注册表
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
read.lock();
try {
// 根据服务名称从本地注册表中获取实例信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
// 初次注册,以 AppName 为 key 保存到 Map 中
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 获取租约信息
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
// 已存在的实例最后更新时间
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
// 新注册的实例最后更新时间
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
// 如果存在的实例比新注册尽量的实例后更新,就直接把新注册的实例设置为已存在的实例
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// 新注册时,租约信息不存在
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
// 期望续约的客户端数量 + 1
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
// ① 更新每分钟续约请求次数的阈值
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 创建新的租约
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
// 放入最近注册的队列
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// This is where the initial state transfer of overridden status happens
// 覆盖状态
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
// 更改实例状态
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
// 实例状态为 UP 时,设置 Lease 的时间戳为当前时间戳
lease.serviceUp();
}
// 设置动作是 ADDED
registrant.setActionType(ActionType.ADDED);
// 添加到最近变更的队列
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 设置最后更新时间为当前时间
registrant.setLastUpdatedTimestamp();
// 失效缓存
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
}
上面代码 ① 处,更新每分钟续约的次数,具体细节如下
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
该阈值的计算公式即每分钟续约阈值 = 期望续约的客户端数量 * (60 / 续约间隔时间) * 续约百分比
若当前共注册了 10 个实例,则
expectedNumberOfClientsSendingRenews
为 10,续约间隔时间为 30 秒,续约百分比默认为 0.85,则每分钟阈值为 10 * (60.0 / 30) * 0.85 = 17 。
查看完 register
方法后,回到contextInitialized
方法
int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
分析完 syncUp
方法后,继续查看openForTraffic
方法,集群模式下,该方法的实现在PeerAwareInstanceRegistryImpl
类中
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
// 设置每分钟期待续约数
this.expectedNumberOfClientsSendingRenews = count;
// 更新每分钟续约请求次数的阈值
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
// 更改实例状态为 UP
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// 开启定时任务,默认每个 60 秒剔除已失效的服务
super.postInit();
}
进入 postInit
方法
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
// EvictionTask 为实现逻辑
evictionTaskRef.set(new EvictionTask());
// 设置定时任务,在配置文件中修改默认间隔时间,默认为 60 秒
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
至此 Eureka Server 启动过程大部分逻辑已分析完毕。