1 心跳检测
核心类:ClientBeatCheckTask 核心方法:run
public void run() {try {//1 集群状态下心跳处理if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}public boolean responsible(String serviceName) {final List<String> servers = healthyList;if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {return true;}if (CollectionUtils.isEmpty(servers)) {// means distro config is not ready yetreturn false;}int index = servers.indexOf(EnvUtil.getLocalAddress());int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());if (lastIndex < 0 || index < 0) {return true;}//通过对服务名hash进行取模运算获取目标服务器int target = distroHash(serviceName) % servers.size();return target >= index && target <= lastIndex;
}
总结:
当客户端进行服务注册时 服务端接收请求后会进行心跳检测 心跳检测的任务通过hash运算指定一台机器执行定时任务检查健康状况
2 集群机器状态同步
核心类:ServerStatusReporter 核心方法:run
public void run() {try {if (EnvUtil.getPort() <= 0) {return;}int weight = Runtime.getRuntime().availableProcessors() / 2;if (weight <= 0) {weight = 1;}long curTime = System.currentTimeMillis();String status = LOCALHOST_SITE + "#" + EnvUtil.getLocalAddress() + "#" + curTime + "#" + weight+ "\r\n";List<Member> allServers = getServers();if (!contains(EnvUtil.getLocalAddress())) {Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}",EnvUtil.getLocalAddress(), allServers);return;}if (allServers.size() > 0 && !EnvUtil.getLocalAddress().contains(IPUtil.localHostIP())) {for (Member server : allServers) {if (Objects.equals(server.getAddress(), EnvUtil.getLocalAddress())) {continue;}// This metadata information exists from 1.3.0 onwards "version"if (server.getExtendVal(MemberMetaDataConstants.VERSION) != null) {Loggers.SRV_LOG.debug("[SERVER-STATUS] target {} has extend val {} = {}, use new api report status",server.getAddress(), MemberMetaDataConstants.VERSION,server.getExtendVal(MemberMetaDataConstants.VERSION));continue;}Message msg = new Message();msg.setData(status);//将自身状态发送给其他节点机器synchronizer.send(server.getAddress(), msg);}}} catch (Exception e) {Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);} finally {GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());}}public void send(final String serverIP, Message msg) {if (StringUtils.isEmpty(serverIP)) {return;}final Map<String, String> params = new HashMap<String, String>(2);params.put("serverStatus", msg.getData());//发送请求告知其他节点 本节点状态是健康的String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator/server/status";if (IPUtil.containsPort(serverIP)) {url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT+ "/operator/server/status";}try {HttpClient.asyncHttpGet(url, null, params, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}",serverIP);}}@Overridepublic void onError(Throwable throwable) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIP, throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIP, e);}
}
总结:
nacos服务端在启动时 会开启一个定时任务 定时向其他集群的机器告知自己的状态
3 集群间实例信息同步
核心类:ServiceReporter 核心方法:run
public void run() {try {Map<String, Set<String>> allServiceNames = getAllServiceNames();if (allServiceNames.size() <= 0) {//ignorereturn;}for (String namespaceId : allServiceNames.keySet()) {ServiceChecksum checksum = new ServiceChecksum(namespaceId);for (String serviceName : allServiceNames.get(namespaceId)) {//1 选择本机器执行心跳检测的所有服务if (!distroMapper.responsible(serviceName)) {continue;}Service service = getService(namespaceId, serviceName);if (service == null || service.isEmpty()) {continue;}service.recalculateChecksum();//2 存储服务状态checksum.addItem(serviceName, service.getChecksum());}Message msg = new Message();msg.setData(JacksonUtils.toJson(checksum));Collection<Member> sameSiteServers = memberManager.allMembers();if (sameSiteServers == null || sameSiteServers.size() <= 0) {return;}for (Member server : sameSiteServers) {if (server.getAddress().equals(NetUtils.localServer())) {continue;}//3 向其他节点发送实例最新状态synchronizer.send(server.getAddress(), msg);}}} catch (Exception e) {Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);} finally {GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),TimeUnit.MILLISECONDS);}
}public void send(final String serverIP, Message msg) {if (serverIP == null) {return;}Map<String, String> params = new HashMap<String, String>(10);params.put("statuses", msg.getData());params.put("clientIP", NetUtils.localServer());//同步健康状态String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";if (IPUtil.containsPort(serverIP)) {url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT+ "/service/status";}try {HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}",serverIP);}}@Overridepublic void onError(Throwable throwable) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);}}
总结:
由于每一个服务的心跳任务监听的服务不同 所以节点仅向其他机器发送本节点监听的服务状态 同步给其他机器