cruii
发布于 2021-10-29 / 805 阅读 / 0 评论 / 0 点赞

Spring Cloud Netflix 系列之 Eureka 简要分析

1. Eureka 服务注册中心

1.1 关于服务注册中心

服务注册中心本质上是为了解耦服务提供者和服务消费者。

对于任何一个微服务,原则上都应存在或者支持多个提供者,这是由微服务的分布式属性决定的。

为了支持弹性扩容缩特性,一个微服务的提供者的数量和分布往往是动态变化的,因此,原本单体应用阶段常用的静态LB就不再适用。需要引入额外的组件来管理微服务提供者的注册与发现,而这个组件就是服务注册中心。

1.1.1 服务注册中心的原理

a

分布式微服务架构中,服务注册中⼼⽤于存储服务提供者地址信息、服务发布相关
的属性信息,消费者通过主动查询和被动通知的⽅式获取服务提供者的地址信息,
⽽不再需要通过硬编码⽅式得到提供者的地址信息。消费者只需要知道当前系统发
布了那些服务,⽽不需要知道服务具体存在于什么位置,这就是透明化路由。

  1. 服务提供者启动
  2. 服务提供者将相关服务信息主动注册到注册中心
  3. 服务消费者获取服务注册信息
    • Pull模式:服务消费者可以主动拉取可⽤的服务提供者清单
    • Push模式:服务消费者订阅服务(当服务提供者有变化时,注册中心也会主动推送
      更新后的服务清单给消费者)
  4. 服务消费者直接调用服务提供者

另外,注册中⼼也需要完成服务提供者的健康监控,当发现服务提供者失效时需要
及时剔除

1.1.2 主流服务注册中心

  • Zookeeper - CP
  • Eureka - AP
  • Consul - CP
  • Nacos - 支持AP/CP切换

1.2 服务注册中心组件 Eureka

Eureka 基础架构

eureka基础架构

Eureka 交互流程及原理

Eureka交互流程及原理

  1. 图中us-east-1c、us-east-1d,us-east-1e代表不同的区也就是不同的机房
  2. 图中每⼀个Eureka Server都是⼀个集群。
  3. 图中Application Service作为服务提供者向Eureka Server中注册服务,
    Eureka Server接受到注册事件会在集群和分区中进⾏数据同步,Application
    Client作为消费端(服务消费者)可以从Eureka Server中获取到服务注册信
    息,进行服务调用。
  4. 微服务启动后,会周期性地向Eureka Server发送⼼跳(默认周期为30秒)
    以续约自己的信息。
  5. Eureka Server在⼀定时间内没有接收到某个微服务节点的心跳,Eureka
    Server将会注销该微服务节点(默认90秒)。
  6. 每个Eureka Server同时也是Eureka Client,多个Eureka Server之间通过复
    制的⽅式完成服务注册列表的同步。
  7. Eureka Client会缓存Eureka Server中的信息。即使所有的Eureka Server节
    点都宕掉,服务消费者依然可以使用缓存中的信息找到服务提供者。

Eureka通过心跳检测、健康检查和客户端缓存等机制,提高系统的灵活性、可
伸缩性和可用性。

1.3 Eureka 高可用搭建与应用

// TODO

1.4 Eureka 部分细节讲解

1.4.1 元数据

Eureka的元数据有两种:标准元数据和自定义元数据。

  • **标准元数据:**主机名、IP 地址、端口等信息都会发布在服务注册表中,用于服务之间调用
  • **自定义元数据:**可以使用eureka.instance.metadata-map配置,符合KEY/VALUE的
    存储格式。这些元数据可以在远程客户端中访问。
eureka:
  client:
    service-url:
      defaultZone: http://s1:10010/eureka/,http://s2:10011/eureka/
  instance:
    prefer-ip-address: true
    instance-id: ${spring.cloud.client.ip-address}:${spring.application.name}:${server.port}:@project.version@
    # 自定义元数据
    metadata-map:
      region: chengdu

获取自定义元数据信息

List<ServiceInstance> instances = discoveryClient.getInstances("service-name");
// 显示服务元数据
for (ServiceInstance instance : instances) {
	System.out.println(instance.getMetadata());
}

1.4.2 客户端

服务注册

当导入了 eureka-client 依赖坐标后, 配置 eureka 服务注册中心地址,,服务在启动时会向注册中心,发起注册请求, 携带服务元数据信息, eureka 注册中心回把服务的信息保存在 Map中。

服务续约(服务提供者)

服务每隔 30 秒会向注册中心续约(报告心跳)一次,如果没有续约,租约将在 90 秒后到期, 然后服务会被剔除。

获取服务列表(服务消费者)

每隔 30 秒会从注册中心拉取一份服务列表,该时间可通过eureka.client.registry-fetch-interval-seconds配置。

服务消费者启动时,从 EurekaServer 服务列表获取只读备份,缓存到本地。每隔 30 秒,会重新获取并更新数据。

1.4.3 服务端

服务下线

当服务正常关闭操作时,会发送服务下线的 REST 请求给 EurekaServer。服务中心接受请求后,将该服务下线。

失效剔除

Eureka Server会定时(间隔值是eureka.server.eviction-interval-timer-in-ms,默认60s)进行检查,如果发现实例在在一定时间(此值由客户端设置的eureka.instance.lease-expiration-duration-in-seconds定义,默认值为90s)内没有收到心跳,则会注销此实例。

自我保护

如果在15分钟内超过85%的客户端节点都没有正常的心跳,那么Eureka就认为客户端与注册中心出现了网络故障,Eureka Server自动进入自我保护机制。

为什么会有自我保护机制?

默认情况下,如果Eureka Server在一定时间内(默认90秒)没有接收到某个微服务实例的心跳,Eureka Server将会移除该实例。但是当网络分区故障发生时,微服务与Eureka Server之间无法正常通信,而微服务本身是正常运行的,此时不应该移除这个微服务,所以引入了自我保护机制。

当处于自我保护模式时

  1. 不会剔除任何服务实例(可能是服务提供者和EurekaServer之间网络问题)保证了大多数服务依然可用。
  2. Eureka Server仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上,保证当前节点依然可用,当网络稳定时,当前Eureka Server新的注册信息会被同步到其它节点中。
  3. 在Eureka Server工程中通过eureka.server.enable-self-preservation配置可用关停自我保护,默认值是打开。
eureka:
  server:
    enable-self-preservation: false # 关闭自我保护模式(缺省为打开)

建议生产环境打开自我保护机制

1.5 Eureka 核心源码分析

1.5.1 Eureka Server 启动过程

入口:

在spring-cloud-netflix-eureka-server jar 中 META-INF 文件夹下,查看 spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

Spring Boot 将自动装配 EurekaServerAutoConfiguration 类

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
	// ...
}

由于 @ConditionalOnBean 注解要实现该配置类的自动装配,首先需要在 IOC 容器中注入 Marker 对象。

在实现 Eureka Server 时,代码中只使用了 @EnableEurekaServer 注解。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

在该注解中又引入了 EurekaServerMarkerConfiguration 类,在该类中注入了 Marker 实例。

@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {

	@Bean
	public Marker eurekaServerMarkerBean() {
		return new Marker();
	}

	class Marker {

	}

}

所以,只有在添加了 @EnableEurekaServer 注解后,才会执行后续逻辑。

回到EurekaServerAutoConfiguration类中

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {

	/*
	 ...
	 */
	
	/**
	 * 注入一个对外的接口。
	 * 该接口即 Eureka 的 dashboard 界面
	 * 若想关闭该接口,可在配置文件中配置eureka.dashboard.enabled=false
	 **/
	@Bean
	@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled",
			matchIfMissing = true)
	public EurekaController eurekaController() {
		return new EurekaController(this.applicationInfoManager);
	}

	/**
	 * 对等节点感知实例注册器
	 * 在集群模式中生效,Eureka Server 集群中的节点都是对等的,没有主从之分
	 **/
	@Bean
	public PeerAwareInstanceRegistry peerAwareInstanceRegistry(
			ServerCodecs serverCodecs) {
		this.eurekaClient.getApplications(); // force initialization
		return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
				serverCodecs, this.eurekaClient,
				this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
				this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
	}

	/**
	 * 注入 PeerEurekaNodes
	 * 用来管理 eureka 集群节点,如创建、更新、获取节点信息
	 * eureka 集群节点具体的操作是在 PeerEurekaNode 类中
	 * 比如注册实例,取消实例,实例发送心跳,更新实例状态,删除实例等
	 **/
	@Bean
	@ConditionalOnMissingBean
	public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry,
			ServerCodecs serverCodecs,
			ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
		return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
				this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
				replicationClientAdditionalFilters);
	}
	
	/**
	 * 注入 Eureka Server 上下文
	 * EurekaServerContext 为其他中间件提供使用 Eureka 相关服务配置的入口
	 **/
	@Bean
	@ConditionalOnMissingBean
	public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs,
			PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
		return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs,
				registry, peerEurekaNodes, this.applicationInfoManager);
	}

	/**
	 * 注入 EurekaServerBootstrap
	 **/
	@Bean
	public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
			EurekaServerContext serverContext) {
		return new EurekaServerBootstrap(this.applicationInfoManager,
				this.eurekaClientConfig, this.eurekaServerConfig, registry,
				serverContext);
	}

	/**
	 * 注册 Jersey 过滤器,过滤掉含"/eureka"的请求。
	 *
	 * Register the Jersey filter.
	 * @param eurekaJerseyApp an {@link Application} for the filter to be registered
	 * @return a jersey {@link FilterRegistrationBean}
	 */
	@Bean
	public FilterRegistrationBean<?> jerseyFilterRegistration(
			javax.ws.rs.core.Application eurekaJerseyApp) {
		FilterRegistrationBean<Filter> bean = new FilterRegistrationBean<Filter>();
		bean.setFilter(new ServletContainer(eurekaJerseyApp));
		bean.setOrder(Ordered.LOWEST_PRECEDENCE);
		bean.setUrlPatterns(
				Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));

		return bean;
	}

	/*
	 ...
	 */
}

关注个别类中的主要实现

com.netflix.eureka.cluster.PeerEurekaNodes

在该类中主要关注 start 方法

public void start() {
        // 创建线程池
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
            // 启动更新节点信息
            updatePeerEurekaNodes(resolvePeerUrls());
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        // 子线程中动态更新节点信息
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            // 放入线程池中,根据配置文件中配置的间隔时间,定时执行
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL:  {}", node.getServiceUrl());
        }
    }

DefaultEurekaServerContext 类中,主要关注 initialize 方法。

该方法会在类实例构建完毕后立即执行。

@PostConstruct
@Override
public void initialize() {
    logger.info("Initializing ...");
    // 执行上面 PeerEurekaNodes 中的 start 方法。
    peerEurekaNodes.start();
    try {
        registry.init(peerEurekaNodes);
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    logger.info("Initialized");
}

EurekaServerBootstrap 类中,主要关注 contextInitialized 方法

public void contextInitialized(ServletContext context) {
		try {
			// 根据配置文件中的内容,初始化 Eureka 环境
			initEurekaEnvironment();
			// 初始化上下文 重点关注该方法
			initEurekaServerContext();

			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
}

protected void initEurekaEnvironment() throws Exception {
		log.info("Setting the eureka configuration..");

		String dataCenter = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_DATACENTER);
		if (dataCenter == null) {
			log.info(
					"Eureka data center value eureka.datacenter is not set, defaulting to default");
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT);
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter);
		}
		String environment = ConfigurationManager.getConfigInstance()
				.getString(EUREKA_ENVIRONMENT);
		if (environment == null) {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST);
			log.info(
					"Eureka environment value eureka.environment is not set, defaulting to test");
		}
		else {
			ConfigurationManager.getConfigInstance()
					.setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, environment);
		}
}

// 初始化上下文
protected void initEurekaServerContext() throws Exception {
		// For backward compatibility
		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);
		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);

		if (isAws(this.applicationInfoManager.getInfo())) {
			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
			this.awsBinder.start();
		}
		// 为非 IOC 容器提供获取 EurekaServerContext 的接口
		// 实现细节见下文
		EurekaServerContextHolder.initialize(this.serverContext);

		log.info("Initialized server context");

		// Copy registry from neighboring eureka node
		// 从集群中其他 EurekaServer 同步注册信息
		// 实现细节见下文
		int registryCount = this.registry.syncUp();
		// 更改实例状态为 UP,对外服务
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);

		// Register all monitoring statistics.
		EurekaMonitors.registerAllStats();
}

EurekaServerContextHolder

public class EurekaServerContextHolder {

    private final EurekaServerContext serverContext;

    private EurekaServerContextHolder(EurekaServerContext serverContext) {
        this.serverContext = serverContext;
    }

    public EurekaServerContext getServerContext() {
        return this.serverContext;
    }

    private static EurekaServerContextHolder holder;

    public static synchronized void initialize(EurekaServerContext serverContext) {
        holder = new EurekaServerContextHolder(serverContext);
    }

    public static EurekaServerContextHolder getInstance() {
        return holder;
    }
}

syncUp 方法的具体实现在 PeerAwareInstanceRegistryImpl 类中

@Singleton
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry {

		@Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        // 注册数量
        int count = 0;
        // 多次尝试从其他节点同步注册信息到自己的注册表中
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    // ① 当第一次没有拿到注册信息时,当前线程 sleep
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            // 从本地com.netflix.discovery.DiscoveryClient#localRegionApps 本地变量中获取
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            // ② 注册到本地的注册表中
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }
}

上面代码①处,对当前线程进行了 sleep,那么,是在什么时候把节点信息保存到 DiscoveryClient 中的 localRegionApps 中呢?

答案就在 com.netflix.discovery.DiscoveryClient#getAndStoreFullRegistry

该方法用于全量同步注册表

private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        logger.info("Getting all instance registry info from the eureka server");

        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                // 向节点发送GET请求到 /apps 接口 全量抓取注册表
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());

        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            // 将 Applications 缓存到本地
            localRegionApps.set(this.filterAndShuffle(apps));
            // Applications 中返回了注册中心 apps 的 hash 值
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
}

代码 ② 处,把同步的注册信息注册到本地的注册表中。register 方法在类 AbstractInstanceRegistry

public abstract class AbstractInstanceRegistry implements InstanceRegistry {
	/*
	 ....
	 */
	
	// 注册表
	private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
            = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

	public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        read.lock();
        try {
            // 根据服务名称从本地注册表中获取实例信息
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                // 初次注册,以 AppName 为 key 保存到 Map 中
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            // 获取租约信息
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                // 已存在的实例最后更新时间
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                // 新注册的实例最后更新时间
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                // 如果存在的实例比新注册尽量的实例后更新,就直接把新注册的实例设置为已存在的实例
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // 新注册时,租约信息不存在
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        // 期望续约的客户端数量 + 1
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        // ① 更新每分钟续约请求次数的阈值
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // 创建新的租约
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            // 放入最近注册的队列
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            // This is where the initial state transfer of overridden status happens
            // 覆盖状态
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);

            // 更改实例状态
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
            // 实例状态为 UP 时,设置 Lease 的时间戳为当前时间戳
                lease.serviceUp();
            }
            // 设置动作是 ADDED
            registrant.setActionType(ActionType.ADDED);
            // 添加到最近变更的队列
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            // 设置最后更新时间为当前时间
            registrant.setLastUpdatedTimestamp();
            // 失效缓存
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

}

上面代码 ① 处,更新每分钟续约的次数,具体细节如下

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

该阈值的计算公式即每分钟续约阈值 = 期望续约的客户端数量 * (60 / 续约间隔时间) * 续约百分比

若当前共注册了 10 个实例,则expectedNumberOfClientsSendingRenews为 10,续约间隔时间为 30 秒,续约百分比默认为 0.85,则每分钟阈值为 10 * (60.0 / 30) * 0.85 = 17 。

查看完 register 方法后,回到contextInitialized方法

int registryCount = this.registry.syncUp();
this.registry.openForTraffic(this.applicationInfoManager, registryCount);

分析完 syncUp 方法后,继续查看openForTraffic方法,集群模式下,该方法的实现在PeerAwareInstanceRegistryImpl类中

@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // 设置每分钟期待续约数
    this.expectedNumberOfClientsSendingRenews = count;
    // 更新每分钟续约请求次数的阈值
    updateRenewsPerMinThreshold();
    logger.info("Got {} instances from neighboring DS node", count);
    logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        logger.info("Priming AWS connections for all replicas..");
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    // 更改实例状态为 UP
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    // 开启定时任务,默认每个 60 秒剔除已失效的服务
    super.postInit();
}

进入 postInit 方法

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    // EvictionTask 为实现逻辑
    evictionTaskRef.set(new EvictionTask());
    // 设置定时任务,在配置文件中修改默认间隔时间,默认为 60 秒
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}

至此 Eureka Server 启动过程大部分逻辑已分析完毕。

1.5.2 EurekaServer 服务接口暴露策略


评论