源码篇--Nacos服务--中章(8):Nacos服务端感知客户端实例变更(集群数据校验)-4

news/2024/9/23 5:00:05/

文章目录

  • 前言
  • 一、集群数据校验:
  • 二、数据校验过程
    • 2.1 心跳定时任务:
    • 2.2 客户端版本数据发送:
      • 2.2.1 任务的添加:
      • 2.2.2 任务的执行:
    • 2.3 服务端本数据处理:
    • 2.4 客户度数据全量推送:
  • 总结


前言

本文对Nacos 集群节点间实例数据校验过程进行介绍,服务端版本 3.0.13。


一、集群数据校验:

在这里插入图片描述

在 Distro 集群启动之后,各台机器之间会定期的发送心跳(每隔5s发送一次)。心跳信息主要为各个机器上的所有数据的元信息(之所以使用元信息,是因为需要保证网络中数据传输的量级维持在一个较低水平)。这种数据校验会以心跳的形式进行,即每台机器在固定时间间隔会向其他机器发起一次数据校验请求。

一旦在数据校验过程中,某台机器发现其他机器上的数据与本地数据不一致,则会发起一次全量拉取请求,将数据补齐。

二、数据校验过程

2.1 心跳定时任务:

在DistroProtocol 对象构建时,调用startDistroTask() 方法,改方法中 startVerifyTask() 每隔5s 发送一次心跳检查;

java">public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder,DistroTaskEngineHolder distroTaskEngineHolder) {this.memberManager = memberManager;this.distroComponentHolder = distroComponentHolder;this.distroTaskEngineHolder = distroTaskEngineHolder;// 开始任务startDistroTask();
}private void startDistroTask() {if (EnvUtil.getStandaloneMode()) {isInitialized = true;return;}// 校验的定时任务 每隔5s 发送一次startVerifyTask();// 启动加载任务startLoadTask();
}
private void startVerifyTask() {GlobalExecutor.schedulePartitionDataTimedSync(new DistroVerifyTimedTask(memberManager, distroComponentHolder,distroTaskEngineHolder.getExecuteWorkersManager()),DistroConfig.getInstance().getVerifyIntervalMillis());
}

2.2 客户端版本数据发送:

2.2.1 任务的添加:

DistroVerifyTimedTask 类中通过 run() 方法将添加进入的任务,进行调度

java">@Override
public void run() {try {// 获取集群内其它节点List<Member> targetServer = serverMemberManager.allMembersWithoutSelf();if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("server list is: {}", targetServer);}for (String each : distroComponentHolder.getDataStorageTypes()) {// 发送数据校验请求verifyForDataStorage(each, targetServer);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify task failed.", e);}
}

verifyForDataStorage 会给集群中的每个节点都发送 注册在改节点的客户端 id 及版本信息

java">private void verifyForDataStorage(String type, List<Member> targetServer) {// 获取数据存储DistroDataStorage dataStorage = distroComponentHolder.findDataStorage(type);if (!dataStorage.isFinishInitial()) {Loggers.DISTRO.warn("data storage {} has not finished initial step, do not send verify data",dataStorage.getClass().getSimpleName());return;}// 获取需要校验的数据List<DistroData> verifyData = dataStorage.getVerifyData();if (null == verifyData || verifyData.isEmpty()) {return;}for (Member member : targetServer) {// 遍历集群节点// 获取 代理DistroTransportAgent agent = distroComponentHolder.findTransportAgent(type);if (null == agent) {continue;}// 添加任务executeTaskExecuteEngine.addTask(member.getAddress() + type,new DistroVerifyExecuteTask(agent, verifyData, member.getAddress(), type));}
}

这里将任务添加 TaskExecuteWorker 中,最终放入到BlockingQueue queue 队列中,后续由线程从队列中获取任务并执行

java">@Override
public void addTask(Object tag, AbstractExecuteTask task) {// 任务执行器NacosTaskProcessor processor = getProcessor(tag);if (null != processor) {processor.process(task);return;}//  对tag 进行hash %  executeWorkers.length 获取到任务要放入到数组中的哪个workerTaskExecuteWorker worker = getWorker(tag);// 放入任务worker.process(task);}

2.2.2 任务的执行:

发送DistroVerifyExecuteTask 对象任务 ,在DistroVerifyExecuteTask类中的run() 方法进行处理

java">@Override
public void run() {// 任务执行-- 客户端版本号校验for (DistroData each : verifyData) {try {if (transportAgent.supportCallbackTransport()) {// 遍历要校验的客户端数据给多 对应的服务端doSyncVerifyDataWithCallback(each);} else {doSyncVerifyData(each);}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] verify data for type {} to {} failed.", resourceType, targetServer, e);}}
}
private void doSyncVerifyDataWithCallback(DistroData data) {transportAgent.syncVerifyData(data, targetServer, new DistroVerifyCallback());
}
@Override
public void syncVerifyData(DistroData verifyData, String targetServer, DistroCallback callback) {// 节点校验if (isNoExistTarget(targetServer)) {callback.onSuccess();return;}// 构建 DistroDataRequest 请求DistroDataRequest request = new DistroDataRequest(verifyData, DataOperation.VERIFY);Member member = memberManager.find(targetServer);if (checkTargetServerStatusUnhealthy(member)) {Loggers.DISTRO.warn("[DISTRO] Cancel distro verify caused by target server {} unhealthy, key: {}", targetServer,verifyData.getDistroKey());callback.onFailed(null);return;}try {DistroVerifyCallbackWrapper wrapper = new DistroVerifyCallbackWrapper(targetServer,verifyData.getDistroKey().getResourceKey(), callback, member);// 请求发送 DistroDataRequestHandler 处理校验的请求clusterRpcClientProxy.asyncRequest(member, request, wrapper);} catch (NacosException nacosException) {callback.onFailed(nacosException);}
}

2.3 服务端本数据处理:

发送DistroDataRequest 请求,事件类型为VERIFY; 由 DistroDataRequestHandler# handle 处理校验的请求

java">@Override
public DistroDataResponse handle(DistroDataRequest request, RequestMeta meta) throws NacosException {try {// 获取操作类型switch (request.getDataOperation()) {case VERIFY:// 校验 请求处理return handleVerify(request.getDistroData(), meta);case SNAPSHOT:// 返回改节点下的注册实例信息return handleSnapshot();case ADD:case CHANGE:case DELETE:// 实例变化类型return handleSyncData(request.getDistroData());case QUERY:return handleQueryData(request.getDistroData());default:return new DistroDataResponse();}} catch (Exception e) {Loggers.DISTRO.error("[DISTRO-FAILED] distro handle with exception", e);DistroDataResponse result = new DistroDataResponse();result.setErrorCode(ResponseCode.FAIL.getCode());result.setMessage("handle distro request with exception");return result;}
}

版本号校验

java">private DistroDataResponse handleVerify(DistroData distroData, RequestMeta meta) {DistroDataResponse result = new DistroDataResponse();// 校验if (!distroProtocol.onVerify(distroData, meta.getClientIp())) {// 校验失败,版本号不一致result.setErrorInfo(ResponseCode.FAIL.getCode(), "[DISTRO-FAILED] distro data verify failed");}//return result;
}
/*** Receive verify data, find processor to process.** @param distroData    verify data* @param sourceAddress source server address, might be get data from source server* @return true if verify data successfully, otherwise false*/
public boolean onVerify(DistroData distroData, String sourceAddress) {if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[DISTRO] Receive verify data type: {}, key: {}", distroData.getType(),distroData.getDistroKey());}String resourceType = distroData.getDistroKey().getResourceType();// 获取数据处理器DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType);if (null == dataProcessor) {Loggers.DISTRO.warn("[DISTRO] Can't find verify data process for received data {}", resourceType);return false;}return dataProcessor.processVerifyData(distroData, sourceAddress);
}

2.4 客户度数据全量推送:

当校验失败,即出现集群节点,相同实例的版本号不一致的情况,节点返回false 状态;客户端重新发送实例注册的请求;校验结果会被DistroVerifyCallbackWrapper# onResponse 处理;

java">@Override
public void onResponse(Response response) {if (checkResponse(response)) {NamingTpsMonitor.distroVerifySuccess(member.getAddress(), member.getIp());distroCallback.onSuccess();} else {// 数据校验失败Loggers.DISTRO.info("Target {} verify client {} failed, sync new client", targetServer, clientId);// 发送 ClientVerifyFailedEvent 校验失败事件NotifyCenter.publishEvent(new ClientEvent.ClientVerifyFailedEvent(clientId, targetServer));NamingTpsMonitor.distroVerifyFail(member.getAddress(), member.getIp());distroCallback.onFailed(null);}}

发现校验失败 发送 ClientVerifyFailedEvent 事件,DistroClientDataProcessor # onEvent 方法进行处理:

java">/**
* ap 模式* Nacos 每个节点是平等的都可以处理写请求,同时把新数据同步到其他节点。* 每个节点只负责部分数据,定时发送自己负责数据的校验值到其他节点来保持数据一致性。* 每个节点独立处理读请求,及时从本地发出响应。* @param event {@link Event}*/
@Override
public void onEvent(Event event) {// 单机模式直接返回if (EnvUtil.getStandaloneMode()) {return;}if (event instanceof ClientEvent.ClientVerifyFailedEvent) {// 客户端检验失败syncToVerifyFailedServer((ClientEvent.ClientVerifyFailedEvent) event);} else {// distro 协议将实例信息发送给集群内其它节点syncToAllServer((ClientEvent) event);}
}private void syncToVerifyFailedServer(ClientEvent.ClientVerifyFailedEvent event) {Client client = clientManager.getClient(event.getClientId());if (isInvalidClient(client)) {return;}DistroKey distroKey = new DistroKey(client.getClientId(), TYPE);// Verify failed data should be sync directly.// 校验失败重新发送,推送节点下所有实例信息给到对应的节点,事件类型为 ADDdistroProtocol.syncToTarget(distroKey, DataOperation.ADD, event.getTargetServer(), 0L);
}

推送节点下注册的实例信息到对应节点,实际走的是实例注册的流程,详细可以查看: 源码篇–Nacos服务–中章(8):Nacos服务端感知客户端实例变更-3,文章的 2.1 实例注册信息通知:


总结

本文对集群内节点间的心跳监测,以及实例信息的检查和同步做介绍。


http://www.ppmy.cn/news/1443735.html

相关文章

赋能智慧校园!A3D数字孪生可视化,轻量又高效!

放假之后&#xff0c;学生们会逐步返学&#xff0c;大量人员出入校园&#xff0c;安全更是不容忽视&#xff0c;如何在短时间内对大批人员及设施进行智能监管&#xff1f;数字化转型是关键手段&#xff0c;我们可以融合线上线下数据&#xff0c;搭建3D立体的智慧校园&#xff0…

WebGL/Cesium 大空间相机抖动 RTE(Relative to Eye)实现原理简析

在浏览器中渲染大尺寸 3D 模型&#xff1a;Speckle 处理空间抖动的方法 WebGL/Cesium 大空间相机抖动 RTE(Relative to Eye)实现原理简析 注: 相机空间和视图空间 概念等效混用 1、实现的关键代码 const material new THREE.RawShaderMaterial({uniforms: {cameraPostion: {…

人人开源框架运行

Getting started renrenio/renren-fast-vue Wiki GitHub 人人开源 1.启动navicat&#xff1a;新建一个数据库renren-fast&#xff0c;字符集为utf-8,utf-8mb3或者utf-8mb4&#xff0c;排序规则不选 2.数据库操作在renren-fast数据库中选择表&#xff0c;运行renren-fast-ma…

【C++ STL序列容器】list 双向链表

文章目录 【 1. 基本原理 】【 2. list 的创建 】2.1 创建1个空的 list2.2 创建一个包含 n 个元素的 list&#xff08;默认值&#xff09;2.3 创建一个包含 n 个元素的 list&#xff08;赋初值&#xff09;2.4 通过1个 list 初始化另一个 list2.5 拷贝其他类型容器的指定元素创…

Java各种Map实现类以及特点

目录 1. HashMap 2. LinkedHashMap 3. TreeMap 4. Hashtable 5. ConcurrentHashMap 6. WeakHashMap 7. IdentityHashMap 8. EnumMap 1. HashMap 特性: 无序集合&#xff0c;基于哈希表实现。允许存储null键和null值。不保证映射的顺序&#xff1b;顺序可能随时间发生变…

机器人--机械臂的组成

控制器 控制器-----机器人的大脑。 简单视频介绍 怎么发挥作用的 组成 固定的硬件可实现的软件控制器。 作用 1. 控制器&#xff1a;动作指令信号的输出装置。 2. 驱动器&#xff1a;接收控制器发出的指令&#xff0c;并驱动马达(电机)的运动。 3. 伺服马达&#xff1a;…

Linux cmake 初窥【1】

1.开发背景 linux 下编译程序需要用到对应的 Makefile&#xff0c;用于编译应用程序&#xff0c;但是 Makefile 的语法过于繁杂&#xff0c;甚至有些反人类&#xff0c;所以这里引用了cmake&#xff0c;cmake 其中一个主要功能就是用于生成 Makefile&#xff0c;cmake 的语法更…

笔记:编写程序,绘制一个展示马尾松、樟树、杉木、 桂花 4 个树种不同季节的细根生物量的误差棒图。

文章目录 前言一、分析题目二、什么是误差棒图&#xff1f;二、编写程序总结 前言 编写程序&#xff0c;绘制一个展示马尾松、樟树、杉木、 桂花 4 个树种不同季节的细根生物量的误差棒图&#xff0c;实现过程如下&#xff1a; &#xff08;1&#xff09; 导入 matplotlib.pyp…