nacos集群AP架构源码解析

news/2024/11/14 3:02:14/

1 心跳检测

核心类:ClientBeatCheckTask 核心方法:run


public void run() {try {//1 集群状态下心跳处理if (!getDistroMapper().responsible(service.getName())) {return;}if (!getSwitchDomain().isHealthCheckEnabled()) {return;}List<Instance> instances = service.allIPs(true);// first set health status of instances:for (Instance instance : instances) {if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// then remove obsolete instances:for (Instance instance : instances) {if (instance.isMarked()) {continue;}if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {// delete instanceLoggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));deleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}public boolean responsible(String serviceName) {final List<String> servers = healthyList;if (!switchDomain.isDistroEnabled() || EnvUtil.getStandaloneMode()) {return true;}if (CollectionUtils.isEmpty(servers)) {// means distro config is not ready yetreturn false;}int index = servers.indexOf(EnvUtil.getLocalAddress());int lastIndex = servers.lastIndexOf(EnvUtil.getLocalAddress());if (lastIndex < 0 || index < 0) {return true;}//通过对服务名hash进行取模运算获取目标服务器int target = distroHash(serviceName) % servers.size();return target >= index && target <= lastIndex;
}

总结:

当客户端进行服务注册时 服务端接收请求后会进行心跳检测 心跳检测的任务通过hash运算指定一台机器执行定时任务检查健康状况

2 集群机器状态同步

核心类:ServerStatusReporter 核心方法:run


public void run() {try {if (EnvUtil.getPort() <= 0) {return;}int weight = Runtime.getRuntime().availableProcessors() / 2;if (weight <= 0) {weight = 1;}long curTime = System.currentTimeMillis();String status = LOCALHOST_SITE + "#" + EnvUtil.getLocalAddress() + "#" + curTime + "#" + weight+ "\r\n";List<Member> allServers = getServers();if (!contains(EnvUtil.getLocalAddress())) {Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}",EnvUtil.getLocalAddress(), allServers);return;}if (allServers.size() > 0 && !EnvUtil.getLocalAddress().contains(IPUtil.localHostIP())) {for (Member server : allServers) {if (Objects.equals(server.getAddress(), EnvUtil.getLocalAddress())) {continue;}// This metadata information exists from 1.3.0 onwards "version"if (server.getExtendVal(MemberMetaDataConstants.VERSION) != null) {Loggers.SRV_LOG.debug("[SERVER-STATUS] target {} has extend val {} = {}, use new api report status",server.getAddress(), MemberMetaDataConstants.VERSION,server.getExtendVal(MemberMetaDataConstants.VERSION));continue;}Message msg = new Message();msg.setData(status);//将自身状态发送给其他节点机器synchronizer.send(server.getAddress(), msg);}}} catch (Exception e) {Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);} finally {GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());}}public void send(final String serverIP, Message msg) {if (StringUtils.isEmpty(serverIP)) {return;}final Map<String, String> params = new HashMap<String, String>(2);params.put("serverStatus", msg.getData());//发送请求告知其他节点 本节点状态是健康的String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/operator/server/status";if (IPUtil.containsPort(serverIP)) {url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT+ "/operator/server/status";}try {HttpClient.asyncHttpGet(url, null, params, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}",serverIP);}}@Overridepublic void onError(Throwable throwable) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIP, throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serverStatus, remote server: {}", serverIP, e);}
}

总结:

nacos服务端在启动时 会开启一个定时任务 定时向其他集群的机器告知自己的状态

3 集群间实例信息同步

核心类:ServiceReporter 核心方法:run


public void run() {try {Map<String, Set<String>> allServiceNames = getAllServiceNames();if (allServiceNames.size() <= 0) {//ignorereturn;}for (String namespaceId : allServiceNames.keySet()) {ServiceChecksum checksum = new ServiceChecksum(namespaceId);for (String serviceName : allServiceNames.get(namespaceId)) {//1 选择本机器执行心跳检测的所有服务if (!distroMapper.responsible(serviceName)) {continue;}Service service = getService(namespaceId, serviceName);if (service == null || service.isEmpty()) {continue;}service.recalculateChecksum();//2 存储服务状态checksum.addItem(serviceName, service.getChecksum());}Message msg = new Message();msg.setData(JacksonUtils.toJson(checksum));Collection<Member> sameSiteServers = memberManager.allMembers();if (sameSiteServers == null || sameSiteServers.size() <= 0) {return;}for (Member server : sameSiteServers) {if (server.getAddress().equals(NetUtils.localServer())) {continue;}//3 向其他节点发送实例最新状态synchronizer.send(server.getAddress(), msg);}}} catch (Exception e) {Loggers.SRV_LOG.error("[DOMAIN-STATUS] Exception while sending service status", e);} finally {GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(),TimeUnit.MILLISECONDS);}
}public void send(final String serverIP, Message msg) {if (serverIP == null) {return;}Map<String, String> params = new HashMap<String, String>(10);params.put("statuses", msg.getData());params.put("clientIP", NetUtils.localServer());//同步健康状态String url = "http://" + serverIP + ":" + EnvUtil.getPort() + EnvUtil.getContextPath()+ UtilsAndCommons.NACOS_NAMING_CONTEXT + "/service/status";if (IPUtil.containsPort(serverIP)) {url = "http://" + serverIP + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT+ "/service/status";}try {HttpClient.asyncHttpPostLarge(url, null, JacksonUtils.toJson(params), new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: {}",serverIP);}}@Overridepublic void onError(Throwable throwable) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.SRV_LOG.warn("[STATUS-SYNCHRONIZE] failed to request serviceStatus, remote server: " + serverIP, e);}}

总结:

由于每一个服务的心跳任务监听的服务不同 所以节点仅向其他机器发送本节点监听的服务状态 同步给其他机器


http://www.ppmy.cn/news/1546378.html

相关文章

第十一天 线性代数基础

线性代数是数学的一个分支&#xff0c;广泛应用于计算机科学、物理学、工程学等领域。Python 提供了一些强大的库来进行线性代数计算&#xff0c;其中最著名的是 NumPy 和 SciPy。下面是一些线性代数的基础概念和如何在 Python 中使用这些库的示例。 线性代数是数学的一个分支…

05 SQL炼金术:深入探索与实战优化

文章目录 SQL炼金术&#xff1a;深入探索与实战优化一、SQL解析与执行计划1.1 获取执行计划1.2 解读执行计划 二、统计信息与执行上下文2.1 收集统计信息2.2 执行上下文 三、SQL优化工具与实战3.1 SQL Profile3.2 Hint3.3 Plan Baselines3.4 实战优化示例 SQL炼金术&#xff1a…

R和MATLAB及Python混合效应模型

R片段 使用 R 进行混合效应模型的分析是一种强有力的方法&#xff0c;尤其在研究带有嵌套或重复测量的复杂数据结构时。混合效应模型能够有效地检测和分析多层级数据中的固定效应和随机效应&#xff0c;这在研究选择性扰动效应时尤为有用。 R 中常用的软件包 lme4&#xff1…

Android 开发指南:初学者入门

Android 是全球最受欢迎的移动操作系统之一&#xff0c;为开发者提供了丰富的工具和资源来创建各种类型的应用程序。本文将为你提供一个全面的入门指南&#xff0c;帮助你从零开始学习 Android 开发。 目录 1. 了解 Android 平台[1]2. 设置开发环境[2]3. 学习基础知识[3]4. 创…

快速入门Zookeeper

Zookeeper ZooKeeper作为一个强大的开源分布式协调服务&#xff0c;扮演着分布式系统中至关重要的角色。它提供了一个中心化的服务&#xff0c;用于维护配置信息、命名、提供分布式同步以及提供组服务等。通过其高性能和可靠的特性&#xff0c;ZooKeeper能够确保在复杂的分布式…

标题:网络安全:数字时代的守护盾

标题&#xff1a;网络安全&#xff1a;数字时代的守护盾 在21世纪的今天&#xff0c;随着信息技术的飞速发展&#xff0c;网络已成为我们生活、工作和学习中不可或缺的一部分。它像一张无形的网&#xff0c;将世界各地的人们紧密相连&#xff0c;极大地促进了信息的交流与共享…

鸿蒙ArkTS和TS有什么区别?

Ark是强类型&#xff0c;打包编译的时候会保留类型。提高程序运行效率。 TS 支持类型校验&#xff0c;但是并不严格&#xff0c;支持any、unknown 这些不明确的类型。编译后其实jsd.ts,js文件不保留类型。 ArkTS基本禁用了动态类型 比如对象不能随意添加或者删除某些属性&am…

天地图入门|标注|移动飞行|缩放,商用地图替换

“天地图”是国家测绘地理信息局建设的地理信息综合服务网站。集成了来自国家、省、市&#xff08;县&#xff09;各级测绘地理信息部门&#xff0c;以及相关政府部门、企事业单位 、社会团体、公众的地理信息公共服务资源&#xff0c;如果做的项目是政府部门、企事业单位尽量选…