Nacos 进阶篇---Nacos服务端怎么维护不健康的微服务实例 ?(七)

embedded/2024/11/14 21:51:18/
一、引言

  在 Nacos 后台管理服务列表中,我们可以看到微服务列表,其中有一栏叫“健康实例数”    (如下图),表示对应的客户端实例信息是否可用状态。

 

那Nacos服务端是怎么感知客户端的状态是否可用呢 ?

本章重点:

  • 实例心跳接口做了哪些事情 ?
  • 服务端是怎么维护不健康的实例的,怎么下线不健康实例的,做了哪些操作 ?

二、目录     

目录

一、引言

二、目录        

三、服务端实例心跳接口源码分析

四、服务端实例心跳健康检查定时任务源码分析

五、总结


   

三、服务端实例心跳接口源码分析

主线任务:实例心跳接口做了哪些事情 ?

 在客户端服务发起注册的时候 (在第二章节),会开启一个心跳任务,每5s发送一次健康心跳检查,告诉服务端我这个服务还活着。(前面已经讲过

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {if (NAMING_LOGGER.isDebugEnabled()) {NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());}// 组装请求参数Map<String, String> params = new HashMap<String, String>(8);Map<String, String> bodyMap = new HashMap<String, String>(2);if (!lightBeatEnabled) {bodyMap.put("beat", JacksonUtils.toJson(beatInfo));}params.put(CommonParams.NAMESPACE_ID, namespaceId);params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());params.put("ip", beatInfo.getIp());params.put("port", String.valueOf(beatInfo.getPort()));// 发送实例心跳接口请求String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);return JacksonUtils.toObj(result);
}

服务端接受到实例心跳接口,会现在内存注册表中找 Instance,如果找不到会重新注册。然后提交一个 clientBeatProcessor 异步任务,更改 lastBeat 属性

@CanDistro
@PutMapping("/beat")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public ObjectNode beat(HttpServletRequest request) throws Exception {// 省略部分代码// 获取请求参数namespaceId、serviceNameString namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);// 通过namespaceId、serviceName、ip、port、clusterName 从内存注册表当中获取对应的 Instance 实例对象Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);// 如果 instance 为空,那么会重新注册if (instance == null) {if (clientBeat == null) {result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);return result;}instance = new Instance();instance.setPort(clientBeat.getPort());instance.setIp(clientBeat.getIp());instance.setWeight(clientBeat.getWeight());instance.setMetadata(clientBeat.getMetadata());instance.setClusterName(clusterName);instance.setServiceName(serviceName);instance.setInstanceId(instance.getInstanceId());instance.setEphemeral(clientBeat.isEphemeral());// 这里调用重新注册的方法serviceManager.registerInstance(namespaceId, serviceName, instance);}// 通过namespaceId、serviceName获取对应的 ServiceService service = serviceManager.getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.SERVER_ERROR,"service not found: " + serviceName + "@" + namespaceId);}if (clientBeat == null) {clientBeat = new RsInfo();clientBeat.setIp(ip);clientBeat.setPort(port);clientBeat.setCluster(clusterName);}// 重点:开启异步任务,更改 lastBeat 属性service.processClientBeat(clientBeat);// 省略部分代码return result;
}

接着往下看重点 service.processClientBeat() 任务,这个方法会开启一个异步任务,异步任务的话肯定会有run 方法,那我们直接看 clientBeatProcessor 对象中的 run 方法

public void processClientBeat(final RsInfo rsInfo) {ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();clientBeatProcessor.setService(this);clientBeatProcessor.setRsInfo(rsInfo);// 立即执行HealthCheckReactor.scheduleNow(clientBeatProcessor);
}

在异步任务当中,首先会获取当前节点下所有的临时实例,然后通过 ip+port 找到当前 instance,然后把 instance 中的 lastBeat属性更改为当前时间,并且如果 该 instance 为不健康状态,更改为健康状态

public class ClientBeatProcessor implements Runnable {public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15);private RsInfo rsInfo;private Service service;@JsonIgnorepublic PushService getPushService() {return ApplicationUtils.getBean(PushService.class);}public RsInfo getRsInfo() {return rsInfo;}public void setRsInfo(RsInfo rsInfo) {this.rsInfo = rsInfo;}public Service getService() {return service;}public void setService(Service service) {this.service = service;}@Overridepublic void run() {Service service = this.service;if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());}// 本小节重点方法// 获取当前 ip、clusterNameString ip = rsInfo.getIp();String clusterName = rsInfo.getCluster();int port = rsInfo.getPort();Cluster cluster = service.getClusterMap().get(clusterName);// 获取当前 cluster 下所有的临时实例List<Instance> instances = cluster.allIPs(true);// 遍历临时实例for (Instance instance : instances) {// 通过判断ip、port,确认是否是当前 instance 的实例if (instance.getIp().equals(ip) && instance.getPort() == port) {if (Loggers.EVT_LOG.isDebugEnabled()) {Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());}// 把 lastBeat属性更改为当前时间instance.setLastBeat(System.currentTimeMillis());if (!instance.isMarked()) {// 如果 instance 为不健康状态,更改为健康状态if (!instance.isHealthy()) {instance.setHealthy(true);Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",cluster.getService().getName(), ip, port, cluster.getName(),UtilsAndCommons.LOCALHOST_SITE);getPushService().serviceChanged(service);}}}}}
}

小结

     首先在 客户端服务发起注册的时候 (在第二章节),会开启一个心跳任务,每5s发送一次健康心跳检查,告诉服务端我这个服务还活着。(前面已经讲过)

    那么服务端接受到了 实例心跳接口的请求,会现在内存注册表中找 Instance,如果找不到会重新注册。然后提交一个 clientBeatProcessor 异步任务,在异步任务当中,首先会找到当前集群下的所有临时实例,然后通过 ip +port 找到当前instance 实例,把当前instance 中的 lastBeat属性更改为当前时间,如果 instance 为不健康状态,更改为健康状态,到此实例心跳接口就结束了。

四、服务端实例心跳健康检查定时任务源码分析

主线任务:服务端是怎么维护不健康的实例的,怎么下线不健康实例的,做了哪些操作 ?

     这块代码是在服务端 register(注册)接口当中的,之前分析过 register 注册逻辑,因为这块是分支代码,前面没细看。

   我们来看下 createEmptyService 这个方法了,里面有个异步任务,作用就是:检查有哪些客户端是不健康的状态,如果不健康就需要对它进行处理

public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {// 不知道是创建了一个什么服务createEmptyService(namespaceId, serviceName, instance.isEphemeral());// 根据namespaceId、serviceName获取 Service服务Service service = getService(namespaceId, serviceName);// service为空就抛出异常if (service == null) {throw new NacosException(NacosException.INVALID_PARAM,"service not found, namespace: " + namespaceId + ", service: " + serviceName);}// 上面都是分支代码// 主线任务:添加服务实例addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

我们直接看重点代码,直接跳到开启异步任务这里。上面的代码流程:createEmptyService()-> createServiceIfAbsent()-> putServiceAndInit(service) -> service.init();

public void init() {// 开启异步延时任务 clientBeatCheckTask ,每5s执行一次HealthCheckReactor.scheduleCheck(clientBeatCheckTask);for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {entry.getValue().setService(this);entry.getValue().init();}
}

本章重点,开启了一个 clientBeatCheckTask 异步任务。

@Override
public void run() {try {// 本章重点// 获取全部临时实例List<Instance> instances = service.allIPs(true);for (Instance instance : instances) {// 当前时间 - instance中 lastBeat属性时间  > 15sif (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {if (!instance.isMarked()) {if (instance.isHealthy()) {// 如果这个 instance 实例还是健康状态,就更改为 "不健康状态"!instance.setHealthy(false);Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",instance.getIp(), instance.getPort(), instance.getClusterName(),service.getName(), UtilsAndCommons.LOCALHOST_SITE,instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());// 事件发布监听事件,通过 upd 协议发送通知getPushService().serviceChanged(service);ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));}}}}if (!getGlobalConfig().isExpireInstance()) {return;}// 这里还是遍历 临时实例for (Instance instance : instances) {if (instance.isMarked()) {continue;}// 当前时间 - instance中 lastBeat属性时间  > 30sif (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(),JacksonUtils.toJson(instance));// 直接从注册表中删除当前 instancedeleteIp(instance);}}} catch (Exception e) {Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);}}

小结:

  • 第一个循环的作用,为了筛选出不健康的 Instance 实例,并且把 Instance 中的 healthy  属性改为 false。那么怎么筛选出不健康的实例的 ?利用的就是 Instance 中的 lastBeat 属性。如果是健康的实例,那么客户端就会每5s调一次实例心跳接口,更新 lastBeat 属性为当前时间。如果是不健康的实例,那么 Instance 实例 中的 lastBeat 属性是不会变化的,一旦 lastBeat 跟当前时间比超过 15s,就会被认定为不健康的实例。
  • 第二个循环的作用,找出那些 Instance 是需要删除的,如果 lastBeat 跟当前时间比超过 30s,Nacos 会把该 Instance 从注册表当中进行删除。
五、总结

总结:

     本章讲了Nacos怎么维护整个微服务实例健康状态的流程,在客户端发起注册服务时会有心跳任务,每5s给服务端发送一次心态,服务端会把该 Instance 实例中的lastBeat 属性更新为当前时间。并且在服务端实例注册的时候,会开启心跳健康检查任务,把 lastBeat 跟当前时间比超过 15s,就会被标识为不健康的实例,把lastBeat 跟当前时间比超过 30s,Nacos 会把该 Instance 从注册表当中进行删除

最后的最后,别忘了把源码分析图补充完整: 


http://www.ppmy.cn/embedded/42206.html

相关文章

反序列化漏洞(JBoss、apache log4、apache Shiro、JWT)Weblogic未授权访问、代码执行、任意上传

1.1什么是反序列化 就是把一个对象变成可以传输的字符串&#xff0c;目的就是为了方便传输。假设&#xff0c;我们写了一个class&#xff0c;这个class里面存有一些变量。当这个class被实例化了之后&#xff0c;在使用过程中里面的一些变量值发生了改变。以后在某些时候还会用到…

Qt | 布局部件拉伸(Stretch)原理及大小策略

Qt | 布局管理器基础(QLayout)01、简介 1、部件的大小策略 sizePolicy、大小限制、拉伸因子(Stretch Factors)的含义 ①、部件的大小策略、大小提示、拉伸因子从三个方面对布局内的部件怎样进行拉伸以填 满布局进行了说明。 ②、拉伸因子:描述了各个部件在进行拉伸时,多个…

Vitis HLS 学习笔记--避免使用多重访问指针

目录 1. 简介 2. 代码解析 2.1 pointer_stream_bad 2.2 pointer_stream_better 2.3 pointer_stream_good 3. 总结 1. 简介 本文将探讨在 Vitis HLS 中正确处理多重访问指针重要性以及其对功能的影响。通过分析示例代码&#xff0c;我们将讨论在如何正确处理指针访问&…

如何开发一个基于通义千问-14B的对话应用

目录 一:开发流程 二:安装 一:开发流程 1:安装环境 需要安装python依赖环境 2:配置项目 我们利用Langchain-Chatchat和Qwen1.5-14B-Chat-GPTQ-Int4来实现一个对话项目应用 3:启动项目 二:安装 1:安装环境 初始化一个python环境: conda create -n qwen-chat py…

【kubernetes】多 master 高可用集群架构部署

目录 前言 一、环境部署 二、master02 节点部署 1、拷贝相关文件 2、修改配置文件 3、启动各服务并设置开机自启 4、 查看node节点状态 三、负载均衡部署 1、部署 nginx 服务 1.1 编译安装 nginx 1.2 修改 nginx 配置文件 2、部署 keepalived 服务 2.1 yum安装 ke…

使用FFmpeg进行多媒体处理的完整指南

介绍&#xff1a; FFmpeg是一个开源的多媒体处理工具&#xff0c;它提供了丰富的功能&#xff0c;能够处理音频和视频文件。本篇博客将介绍如何安装FFmpeg并演示一些常见的用法&#xff0c;帮助你更好地利用这个强大的工具。 正文&#xff1a; 安装FFmpeg 首先&#xff0c;你需…

如何使用Docker快速运行Firefox并实现远程访问本地火狐浏览器

文章目录 1. 部署Firefox2. 本地访问Firefox3. Linux安装Cpolar4. 配置Firefox公网地址5. 远程访问Firefox6. 固定Firefox公网地址7. 固定地址访问Firefox Firefox是一款免费开源的网页浏览器&#xff0c;由Mozilla基金会开发和维护。它是第一个成功挑战微软Internet Explorer浏…

RoctetMQ使用(2):在项目中使用

一、导入相关依赖 在项目中引入MQ客户端依赖&#xff0c;依赖版本最好和RocketMQ版本一致。 <!-- rocket客户端--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version&…