Eureka服务注册源码

server/2024/12/27 21:00:24/

spring-cloud-starter-netflix-eureka-client 版本是3.0.3
核心装备类:
EurekaClientAutoConfiguration
EurekaDiscoveryClientConfiguration
核心类,以及引用的关系如下

EurekaRegistration - EurekaInstanceConfigBean 实例配置bean- ApplicationInfoManager 应用信息管理器- CloudEurekaClient 客户端- EurekaHealthCheckHandler 健康检查处理器EurekaAutoServiceRegistration- EurekaServiceRegistry 服务注册- EurekaRegistration 

EurekaAutoServiceRegistration 实现了SmartLifecycle,在容器启动会调用它的start方法
start方法里面调用了 EurekaServiceRegistry.register(EurekaRegistration reg)
stop方法里面调用了 EurekaServiceRegistry.deregister(EurekaRegistration reg)

public void start() {// only set the port if the nonSecurePort or securePort is 0 and this.port != 0if (this.port.get() != 0) {if (this.registration.getNonSecurePort() == 0) {this.registration.setNonSecurePort(this.port.get());}if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {this.registration.setSecurePort(this.port.get());}}// only initialize if nonSecurePort is greater than 0 and it isn't already running// because of containerPortInitializer belowif (!this.running.get() && this.registration.getNonSecurePort() > 0) {// 服务注册this.serviceRegistry.register(this.registration);this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));this.running.set(true);}
}public void stop() {// 下线 this.serviceRegistry.deregister(this.registration);this.running.set(false);
}

EurekaServiceRegistry.register(EurekaRegistration reg)注册方法里面有两块逻辑

// 修改状态,会发送StatusChangeEvent事件,通知相关listener
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
// 注册健康检查处理器
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));

这里setInstanceStatus方法,假如状态变更时,会通知监听器,执行notify方法

public synchronized void setInstanceStatus(InstanceStatus status) {InstanceStatus next = instanceStatusMapper.map(status);if (next == null) {return;}InstanceStatus prev = instanceInfo.setStatus(next);if (prev != null) {for (StatusChangeListener listener : listeners.values()) {try {listener.notify(new StatusChangeEvent(prev, next));} catch (Exception e) {logger.warn("failed to notify listener: {}", listener.getId(), e);}}}
}

继续追踪listener是如何放入ApplicationInfoManager的
1.ApplicationInfoManager.registerStatusChangeListener方法注册监听器
2.registerStatusChangeListener方法又是在initScheduledTasks方法里面执行的
3.initScheduledTasks方法在DiscoveryClient构造方法中执行的
4.CloudEurekaClient又是DiscoveryClient的子类,所以这块是在自动装配DiscoveryClient执行的
从DiscoveryClient构造器开始分析,由于代码太多,只保留核心逻辑

DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {......try {// 调度线程池scheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());// 心跳线程池heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build());  // use direct handoff// 缓存刷新线程池cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build());  // use direct handoff// eureka跟服务器交互的客户端eurekaTransport = new EurekaTransport();scheduleServerEndpointTask(eurekaTransport, args);AzToRegionMapper azToRegionMapper;if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);} else {azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);}if (null != remoteRegionsToFetch.get()) {azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));}instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());} catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);}if (clientConfig.shouldFetchRegistry()) {try {// 拉取服务信息boolean primaryFetchRegistryResult = fetchRegistry(false);if (!primaryFetchRegistryResult) {logger.info("Initial registry fetch from primary servers failed");}boolean backupFetchRegistryResult = true;if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {backupFetchRegistryResult = false;logger.info("Initial registry fetch from backup servers failed");}if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");}} catch (Throwable th) {logger.error("Fetch registry error at startup: {}", th.getMessage());throw new IllegalStateException(th);}}// call and execute the pre registration handler before all background tasks (inc registration) is startedif (this.preRegistrationHandler != null) {this.preRegistrationHandler.beforeRegistration();}...// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetchinitScheduledTasks();...
}

着重看下initScheduledTasks方法

  1. 开启缓存刷新定时调度
  2. 开启心跳续约定时调度
  3. 创建instanceInfoReplicator实例信息复制任务
  4. 创建匿名内部实现类ApplicationInfoManager$StatusChangeListener,notify方法调用了instanceInfoReplicator.onDemandUpdate();
  5. 将statusChangeListener实例注册到applicationInfoManager
  6. 调用instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());开启instanceInfoReplicator定时调度
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();cacheRefreshTask = new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread());scheduler.schedule(cacheRefreshTask,registryFetchIntervalSeconds, TimeUnit.SECONDS);}if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerheartbeatTask = new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread());scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSizestatusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {logger.info("Saw local status change event {}", statusChangeEvent);// 状态变更,如果满足条件则执行注册服务实例信息instanceInfoReplicator.onDemandUpdate();}};if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}// 开启定时调度推送最新服务实例信息instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}
}

所以当状态变更时,applicationInfoManager会执行listener的notify方法,也就是执行了instanceInfoReplicator.onDemandUpdate()方法
通过调度器提交了一个任务,任务里面执行了当前InstanceInfoReplicator任务的run方法,最终完成服务注册。

public void run() {try {// 刷新实例信息 discoveryClient.refreshInstanceInfo();Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {// 服务注册discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {// 固定间隔时间继续执行当前任务Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}
}

所以eureka是基于ScheduledExecutorService来定时刷新服务缓存,心跳续约,以及服务重新注册(如果服务实例信息和eureka服务器不一致的时候)


http://www.ppmy.cn/server/152973.html

相关文章

Apache Jmeter Liunx环境部署与接口压测

Jmeter有什么作用 JMeter是一种开源工具,主要用于测试软件应用程序的性能。它是Apache软件基金会的一部分,主要针对负载和性能测试。通过模拟多用户并发访问,以测试应用程序在极限状态下的稳定性和错误处理能力。 安装前准备 Java Development Kit (JDK),一般为jdk8及以上…

告别卡顿:CasaOS轻NAS设备安装Gopeed打造高效下载环境

文章目录 前言1. 更新应用中心2.Gopeed安装与配置3. 本地下载测试4. 安装内网穿透工具5. 配置公网地址6. 配置固定公网地址 前言 无论你是需要大量文件传输的专业人士&#xff0c;还是只是想快速下载电影或音乐的普通用户&#xff0c;都会使用到下载工具。如果你对现有的下载工…

20241230 基础数学-线性代数-(1)求解特征值(numpy, scipy)

所有代码实现&#xff0c;基于教程中的理论通过python实现出来的。效率不高&#xff0c;但有代码可以看。 由于scipy/sckitlearn/sparkx 底层的实现都被封装了&#xff08;小白兔水平有限&#xff0c;fortran代码实在没看懂&#xff09;这里的实现至少可以和理论公式对应的上。…

cursor保存更改操作技巧

1. 当我们在agent模式时&#xff0c;要求cursor更改代码时&#xff0c;cursor回答后&#xff0c;就已经更改了代码了&#xff0c;这时候就可以对程序进行编译和测试&#xff0c; 不一定先要点” accept“, 先测试如果没有问题再点“accept”&#xff0c;这样composer就会多一条…

EMQX构建简易的云服务

基本思路&#xff1a; 使用EMQX作为Mqtt brokermqtt-receive-server服务&#xff0c;用于接收设备上报的数据mqtt-sender-service服务&#xff0c;用于下发数据给设备KafKa实现数据解耦&#xff0c;mqtt-receive-server服务接收的数据简单处理下直接扔到Kafka中云服务各业务系…

【Apache Doris】周FAQ集锦:第 26 期

SQL问题 Q1 doris 3.0存算分离模式下&#xff0c;建表的时是否需要指定表的副本数 不需要&#xff0c;指定了也会忽略&#xff1b;存算分离模式下&#xff0c;数据副本由远端存储去管控。 Q2 doris 通过dbeaver查询时报错&#xff1a;[SXXXX]… doris的错误码通常都是EXXXX&…

Nacos的下载和启动(如何快速稳定下载在github中)

目录 Nacos的下载 下载加速器 在githup中找到Nacos 启动Nacos 访问Nacos Nacos的下载 下载加速器 首先&#xff0c;我们需要进入githup中&#xff0c;我们直接访问&#xff0c;肯定是访问不到的。 这里我们经常玩游戏的同学肯定知道steam&#xff0c;这个加速器。直接进入…

Ubuntu下通过Docker部署Caddy服务器

Docker和Caddy简介 Docker是一个强大的容器化平台&#xff0c;而Caddy是一个现代化的Web服务器&#xff0c;支持自动HTTPS和简单配置。这两款软件在现代IT领域扮演着重要的角色。 步骤一&#xff1a;安装Docker 首先&#xff0c;安装Docker。执行以下命令&#xff1a; sudo…