Nacos服务端健康检查-篇五

news/2024/11/30 9:55:19/

Nacos服务端健康检查-篇五

🕐Nacos 客户端服务注册源码分析-篇一

🕑Nacos 客户端服务注册源码分析-篇二

🕒Nacos 客户端服务注册源码分析-篇三

🕓Nacos 服务端服务注册源码分析-篇四


上篇分析l服务端的注册服务的整个流程,探究了如何将客户端的实例信息注册变为 Client 模型实体,完成服务端 Service 与 客户端模型 Client 以及实例信息 instance 三个之间的关联的,原图点这里

请添加图片描述

长连接

在之前的第四篇以及第三篇,探究其客户端的注册实现的时候,曾分析 NamingClientProxyDelegate 代理类中 getExecuteClientProxy 关于当前通讯所实现的具体协议。

private NamingClientProxy getExecuteClientProxy(Instance instance) {// 临时节点,走grpc长连接;持久节点,走http短连接return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;}

那么长连接与短连接之间有何异同呢?

长连接,是指一个连接只要建立,就可以发送多个数据包进行响应,如果没有数据包发送,则需要双方发送链路检测包,实时的检测当前链路的状态。

Nacos 在 2.0 之后,用 gPRC 长连接代替了原来的 Http 短连接请求。

NamingClientProxy 接口负责底层通讯,调用服务端接口。有三个实现类:

  • NamingClientProxyDelegate:代理类,对 NacosNamingService 中的方法进行代理 ,根据实际的请求情况选择 http 或 gRPC 协议请求服务端。
  • NamingGrpcClientProxy:底层通讯基于 gRPC 长连接
  • NamingHttpClientProxy: 底层通讯基于http短连接

NamingClientProxyDelegate会根据instance实例是否是临时节点而选择不同的协议

​ 临时instance:gRPC

​ 持久instance:http

健康检查

​ 在之前的1.x版本中临时实例走Distro协议内存存储,客户端向注册中心发送心跳来维持自身healthy状态,持久实例走Raft协议持久化存储,服务端定时与客户端建立tcp连接做健康检查。

​ 但是2.0版本以后持久化实例没有什么变化,但是2.0临时实例不在使用心跳,而是通过长连接是否存活来判断实例是否健康。

ConnectionManager负责管理所有客户端的长连接。

每3s检测所有超过20s没发生过通讯的客户端,向客户端发起ClientDetectionRequest探测请求,如果客户端在1s内成功响应,则检测通过,否则执行unregister方法移除Connection。

如果客户端持续与服务端通讯,服务端是不需要主动探活的

Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>();
@PostConstruct
public void start() {// 启动不健康连接排除功能.RpcScheduledExecutor.COMMON_SERVER_EXECUTOR.scheduleWithFixedDelay(new Runnable() {@Overridepublic void run() {try {int totalCount = connections.size();Loggers.REMOTE_DIGEST.info("Connection check task start");MetricsMonitor.getLongConnectionMonitor().set(totalCount);//统计过时(20s)连接Set<Map.Entry<String, Connection>> entries = connections.entrySet();int currentSdkClientCount = currentSdkClientCount();boolean isLoaderClient = loadClient >= 0;int currentMaxClient = isLoaderClient ? loadClient : connectionLimitRule.countLimit;int expelCount = currentMaxClient < 0 ? 0 : Math.max(currentSdkClientCount - currentMaxClient, 0);Loggers.REMOTE_DIGEST.info("Total count ={}, sdkCount={},clusterCount={}, currentLimit={}, toExpelCount={}",totalCount, currentSdkClientCount, (totalCount - currentSdkClientCount),currentMaxClient + (isLoaderClient ? "(loaderCount)" : ""), expelCount);List<String> expelClient = new LinkedList<>();Map<String, AtomicInteger> expelForIp = new HashMap<>(16);//1. calculate expel count  of ip.for (Map.Entry<String, Connection> entry : entries) {Connection client = entry.getValue();String appName = client.getMetaInfo().getAppName();String clientIp = client.getMetaInfo().getClientIp();if (client.getMetaInfo().isSdkSource() && !expelForIp.containsKey(clientIp)) {//get limit for current ip.int countLimitOfIp = connectionLimitRule.getCountLimitOfIp(clientIp);if (countLimitOfIp < 0) {int countLimitOfApp = connectionLimitRule.getCountLimitOfApp(appName);countLimitOfIp = countLimitOfApp < 0 ? countLimitOfIp : countLimitOfApp;}if (countLimitOfIp < 0) {countLimitOfIp = connectionLimitRule.getCountLimitPerClientIpDefault();}if (countLimitOfIp >= 0 && connectionForClientIp.containsKey(clientIp)) {AtomicInteger currentCountIp = connectionForClientIp.get(clientIp);if (currentCountIp != null && currentCountIp.get() > countLimitOfIp) {expelForIp.put(clientIp, new AtomicInteger(currentCountIp.get() - countLimitOfIp));}}}}Loggers.REMOTE_DIGEST.info("Check over limit for ip limit rule, over limit ip count={}", expelForIp.size());if (expelForIp.size() > 0) {Loggers.REMOTE_DIGEST.info("Over limit ip expel info, {}", expelForIp);}Set<String> outDatedConnections = new HashSet<>();long now = System.currentTimeMillis();//2.get expel connection for ip limit.for (Map.Entry<String, Connection> entry : entries) {Connection client = entry.getValue();String clientIp = client.getMetaInfo().getClientIp();AtomicInteger integer = expelForIp.get(clientIp);if (integer != null && integer.intValue() > 0) {integer.decrementAndGet();expelClient.add(client.getMetaInfo().getConnectionId());expelCount--;} else if (now - client.getMetaInfo().getLastActiveTime() >= KEEP_ALIVE_TIME) {outDatedConnections.add(client.getMetaInfo().getConnectionId());}}//3. if total count is still over limit.if (expelCount > 0) {for (Map.Entry<String, Connection> entry : entries) {Connection client = entry.getValue();if (!expelForIp.containsKey(client.getMetaInfo().clientIp) && client.getMetaInfo().isSdkSource() && expelCount > 0) {expelClient.add(client.getMetaInfo().getConnectionId());expelCount--;outDatedConnections.remove(client.getMetaInfo().getConnectionId());}}}String serverIp = null;String serverPort = null;if (StringUtils.isNotBlank(redirectAddress) && redirectAddress.contains(Constants.COLON)) {String[] split = redirectAddress.split(Constants.COLON);serverIp = split[0];serverPort = split[1];}for (String expelledClientId : expelClient) {try {Connection connection = getConnection(expelledClientId);if (connection != null) {ConnectResetRequest connectResetRequest = new ConnectResetRequest();connectResetRequest.setServerIp(serverIp);connectResetRequest.setServerPort(serverPort);connection.asyncRequest(connectResetRequest, null);Loggers.REMOTE_DIGEST.info("Send connection reset request , connection id = {},recommendServerIp={}, recommendServerPort={}",expelledClientId, connectResetRequest.getServerIp(),connectResetRequest.getServerPort());}} catch (ConnectionAlreadyClosedException e) {unregister(expelledClientId);} catch (Exception e) {Loggers.REMOTE_DIGEST.error("Error occurs when expel connection, expelledClientId:{}", expelledClientId, e);}}//4.client active detection.Loggers.REMOTE_DIGEST.info("Out dated connection ,size={}", outDatedConnections.size());//异步请求所有需要检测的连接if (CollectionUtils.isNotEmpty(outDatedConnections)) {Set<String> successConnections = new HashSet<>();final CountDownLatch latch = new CountDownLatch(outDatedConnections.size());for (String outDateConnectionId : outDatedConnections) {try {Connection connection = getConnection(outDateConnectionId);if (connection != null) {ClientDetectionRequest clientDetectionRequest = new ClientDetectionRequest();connection.asyncRequest(clientDetectionRequest, new RequestCallBack() {@Overridepublic Executor getExecutor() {return null;}@Overridepublic long getTimeout() {return 1000L;}@Overridepublic void onResponse(Response response) {latch.countDown();if (response != null && response.isSuccess()) {connection.freshActiveTime();successConnections.add(outDateConnectionId);}}@Overridepublic void onException(Throwable e) {latch.countDown();}});Loggers.REMOTE_DIGEST.info("[{}]send connection active request ", outDateConnectionId);} else {latch.countDown();}} catch (ConnectionAlreadyClosedException e) {latch.countDown();} catch (Exception e) {Loggers.REMOTE_DIGEST.error("[{}]Error occurs when check client active detection ,error={}",outDateConnectionId, e);latch.countDown();}}latch.await(3000L, TimeUnit.MILLISECONDS);Loggers.REMOTE_DIGEST.info("Out dated connection check successCount={}", successConnections.size());// 对于没有成功响应的客户端,执行unregister移出for (String outDateConnectionId : outDatedConnections) {if (!successConnections.contains(outDateConnectionId)) {Loggers.REMOTE_DIGEST.info("[{}]Unregister Out dated connection....", outDateConnectionId);unregister(outDateConnectionId);}}}//reset loader clientif (isLoaderClient) {loadClient = -1;redirectAddress = null;}Loggers.REMOTE_DIGEST.info("Connection check task end");} catch (Throwable e) {Loggers.REMOTE.error("Error occurs during connection check... ", e);}}}, 1000L, 3000L, TimeUnit.MILLISECONDS);}//注销(移出)连接方法
public synchronized void unregister(String connectionId) {Connection remove = this.connections.remove(connectionId);if (remove != null) {String clientIp = remove.getMetaInfo().clientIp;AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);if (atomicInteger != null) {int count = atomicInteger.decrementAndGet();if (count <= 0) {connectionForClientIp.remove(clientIp);}}remove.close();Loggers.REMOTE_DIGEST.info("[{}]Connection unregistered successfully. ", connectionId);clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);}
}

移除connection后,继承ClientConnectionEventListener的ConnectionBasedClientManager会移除Client,发布ClientDisconnectEvent事件

@Override
public boolean clientDisconnected(String clientId) {Loggers.SRV_LOG.info("Client connection {} disconnect, remove instances and subscribers", clientId);ConnectionBasedClient client = clients.remove(clientId);if (null == client) {return true;}client.release();NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));return true;
}

ClientDisconnectEvent会触发几个事件:

1)Distro协议:同步移除的client数据

2)清除两个索引缓存:ClientServiceIndexesManager中Service与发布Client的关系;ServiceStorage中Service与Instance的关系

3)服务订阅:ClientDisconnectEvent会间接触发ServiceChangedEvent事件,将服务变更通知客户端。


http://www.ppmy.cn/news/47561.html

相关文章

大四的告诫

&#x1f442; LOCK OUT - $atori Zoom/KALONO - 单曲 - 网易云音乐 &#x1f442; 喝了一口星光酒&#xff08;我只想爱爱爱爱你一万年&#xff09; - 木小雅 - 单曲 - 网易云音乐 其实不是很希望这篇文章火&#xff0c;不然就更卷了。。 从大一开始&#xff0c;每天10小时…

mysql 事务的 ACID 特征与使用

事务的四大特征&#xff1a; A 原子性&#xff1a;事务是最小的单位&#xff0c;不可以再分割&#xff1b;C 一致性&#xff1a;要求同一事务中的 SQL 语句&#xff0c;必须保证同时成功或者失败&#xff1b;I 隔离性&#xff1a;事务1 和 事务2 之间是具有隔离性的&#xff1…

【性能测试】5年测试老鸟,总结性能测试基础到指标,进阶性能测试专项......

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 性能测试是为了评…

软文推广:真实有效提升软文排名与收录的三大方法!

软文是一种具有良好传播效果的文体&#xff0c;可以通过在搜索引擎中排名靠前的方式&#xff0c;为品牌或企业带来更多曝光。但是&#xff0c;如何让软文在搜索引擎中得到更好的收录和排名呢&#xff1f;在本文中&#xff0c;我们将讨论如何提升软文的收录和排名&#xff0c;以…

Unity记录3.4-地图-柏林噪声生成 1D 地图及过渡地图

文章首发及后续更新&#xff1a;https://mwhls.top/4489.html&#xff0c;无图/无目录/格式错误/更多相关请至首发页查看。 新的更新内容请到mwhls.top查看。 欢迎提出任何疑问及批评&#xff0c;非常感谢&#xff01; 汇总&#xff1a;Unity 记录 摘要&#xff1a;柏林噪声生成…

深入理解栈:从CPU和函数的视角看栈的管理、从栈切换的角度理解进程和协程

我们知道栈被操作系统安排在进程的高地址处&#xff0c;它是向下增长的。但这只是对栈相关知识的“浅尝辄止”。栈是每一个程序员都很熟悉的话题&#xff0c;但你敢说你真的完全了解它吗&#xff1f;我相信&#xff0c;你在工作中肯定遇到过栈溢出&#xff08;StackOverflow&am…

java轻量级框架MiniDao的详解

MiniDao是一款基于Java语言开发的轻量级持久层框架&#xff0c;它的目标是简化数据库操作流程&#xff0c;提高开发效率&#xff0c;减少代码量。MiniDao采用简单的注解配置方式&#xff0c;可以很容易地与Spring等常用框架集成使用。 MiniDao的主要特点包括&#xff1a; 简单…

ChatGPT实战100例 - (03) 网站用不惯?油猴子盘它

文章目录 ChatGPT实战100例 - (03) 网站用不惯&#xff1f;油猴子盘它一、需求与思路二、油猴子脚本二、油猴子脚本部署 ChatGPT实战100例 - (03) 网站用不惯&#xff1f;油猴子盘它 一、需求与思路 需求&#xff1a;网页太长&#xff0c;要回顶部慢慢拖&#xff1f; No&…