nacos1.4-CP架构源码

ops/2024/10/18 14:23:21/

本文主要介绍nacos1.4的CP架构,nacos通过raft协议(半数以上成功)来控制集群的强一致性,在源代码中使用到countdownlatch锁来控制半数以上成功。

1.Raft协议

演示网址:http://thesecretlivesofdata.com/raft/

分区容错性:针对多节点的系统,分区指网络分区(由于网络原因节点之间无法通信同步数据),容错指系统节点出现出现分去了,对外依然要提供服务,不能因为分区而导致整个系统不能提供服务。

 在集群状态下,CP保证半数以上写入成功才算成功。(一半以上投票的机制)

在nacos中,ephemeral默认为true,表示临时实例,使用的是AP架构,只将实例写到内存;非临时实例除了写到内存,还会写到磁盘文件。

Raft中有三种节点状态:

1)follower

2)candidate

3)leader

follower -> candidate:follower向其他节点发送 向自己投票的请求,如果超过半数,则认为自己被选举成功;

zookeeper与nacos的 Leader选举的区别

nacos:raft中每个节点有休眠时间,谁先苏醒,谁先申请投票自己

zookeeper:ZAB每个节点都会进行向自己的投票的动作,然后会进行比对!

所有的修改操作都得在leader节点进行!

raft是一个两阶段写的过程,ZAB也是 

1.先写leader,leader广播收到的新数据

2.半数以上返回确认,leader提交到日志,然后将commit同步给follower(日志提交)

选举的超时时间,随机150到300ms之间

先苏醒的节点开始向自己投票,然后告诉其他节点来向自己投票。其他节点都还没开始投票,那就同意这次投票(其他节点重置休眠时间)

新选举的leader向其他节点发送心跳机制(心跳包 包含数据)进行同步!(每次心跳都会重置休眠时间)

注意:如果两个节点同时苏醒,那就重新开始,因为票数相同。重新休眠!

节点的数据同步是通过下一次心跳将同步信息同步给follower节点

Raft奇数个节点可以解决集群脑裂的问题。

2.向Leader节点写数据,并同步

RaftConsistencyServiceImpl#put

java">@Overridepublic void put(String key, Record value) throws NacosException {checkIsStopWork();try {raftCore.signalPublish(key, value);} catch (Exception e) {Loggers.RAFT.error("Raft put failed.", e);throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,e);}}

 写新实例,并进行同步

写新实例到leader,然后leader先将该实例写到日志,然后写到leader的内存;

然后leader将数据同步到其他节点,半数以上同步成功,则成功,否则抛出异常。

(小Bug,虽然从节点抛出了异常,但是主节点仍然写成功了,后面的nacos版本通过jraft协议进行了完善)

 涉及到的代码如下:

java">public void signalPublish(String key, Record value) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}if (!isLeader()) {// 判断当前节点是不是leaderObjectNode params = JacksonUtils.createEmptyJsonNode();params.put("key", key);params.replace("value", JacksonUtils.transferToJsonNode(value));Map<String, String> parameters = new HashMap<>(1);parameters.put("key", key);final RaftPeer leader = getLeader();raftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);// 如果不是leader,则转发到Leader节点return;}OPERATE_LOCK.lock();try {final long start = System.currentTimeMillis();final Datum datum = new Datum();datum.key = key;datum.value = value;if (getDatum(key) == null) {datum.timestamp.set(1L);} else {datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());}ObjectNode json = JacksonUtils.createEmptyJsonNode();json.replace("datum", JacksonUtils.transferToJsonNode(datum));json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));onPublish(datum, peers.local());// leader写完磁盘文件,同步内存final String content = json.toString();final CountDownLatch latch = new CountDownLatch(peers.majorityCount());// leader同步数据for (final String server : peers.allServersIncludeMyself()) {if (isLeader(server)) {latch.countDown();continue;}final String url = buildUrl(server, API_ON_PUB);// nacos原生 只有一步提交// 向其他节点同步数据HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",datum.key, server, result.getCode());return;}latch.countDown();}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);}@Overridepublic void onCancel() {}});}// 虽然抛出了异常,但是主节点写成功了// 半数以上写成功了if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {// only majority servers return success can we consider this update successLoggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);}long end = System.currentTimeMillis();Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);} finally {OPERATE_LOCK.unlock();}}

3.集群数据一致性的源码

RaftCore#init():核心的代码入口

加载数据,写入缓存

包含两个定时任务:选举任务;心跳任务。核心代码如下:

java">@PostConstruct
public void init() throws Exception {Loggers.RAFT.info("initializing Raft sub-system");final long start = System.currentTimeMillis();raftStore.loadDatums(notifier, datums);// 加载数据,写到缓存setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());initialized = true;Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));masterTask = GlobalExecutor.registerMasterElection(new MasterElection());// leader选举任务 500msheartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());// 心跳任务 500msversionJudgement.registerObserver(isAllNewVersion -> {stopWork = isAllNewVersion;if (stopWork) {try {shutdown();raftListener.removeOldRaftMetadata();} catch (NacosException e) {throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);}}}, 100);NotifyCenter.registerSubscriber(notifier);Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}

两个定时任务:

masterTask = GlobalExecutor.registerMasterElection(new MasterElection());// leader选举任务 500ms

heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());// 心跳任务 500ms

选举的核心代码:

苏醒的节点向其他节点发起投票请求,然后判断是否到达半数。在这个过程中会重置心跳时间,选举时间

用回调函数来判断是否多于半数(用到countdownlatch来控制),MasterElection#run的核心代码:

java">    @Overridepublic void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}RaftPeer local = peers.local();local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;// 随机休眠if (local.leaderDueMs > 0) {return;}// reset timeoutlocal.resetLeaderDue();local.resetHeartbeatDue();sendVote();// 发起投票} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while master election {}", e);}}

发起选举的核心代码:

java">    private void sendVote() {RaftPeer local = peers.get(NetUtils.localServer());Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),local.term);peers.reset();local.term.incrementAndGet();local.voteFor = local.ip;local.state = RaftPeer.State.CANDIDATE;// 组装投票的信息Map<String, String> params = new HashMap<>(1);params.put("vote", JacksonUtils.toJson(local));for (final String server : peers.allServersWithoutMySelf()) {// 发送投票final String url = buildUrl(server, API_VOTE);try {HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {// 调用http请求@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);return;}RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);// 投票信息Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));peers.decideLeader(peer);// 判断其他节点发回来的消息,判断是否多于半数}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.RAFT.warn("error while sending vote to server: {}", server);}}}

心跳的核心代码:

只有主节点可以发送心跳,入口方法run->sendBeat

java">@Override
public void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}RaftPeer local = peers.local();local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;if (local.heartbeatDueMs > 0) {return;}local.resetHeartbeatDue();sendBeat();// 发起心跳} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);}}

onBeat:将所有注册信息的key和时间戳放到element中,然后进行压缩,然后发给其他节点

RaftController#beat:接收心跳信息

java">@PostMapping("/beat")
public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {if (versionJudgement.allMemberIsNewVersion()) {throw new IllegalStateException("old raft protocol already stop");}String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);String value = URLDecoder.decode(entity, "UTF-8");value = URLDecoder.decode(value, "UTF-8");JsonNode json = JacksonUtils.toObj(value);RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));return JacksonUtils.transferToJsonNode(peer);
}

批量处理同步的数据,一批一批的回调leader进行同步;本地删除状态为0的数据(主节点的该数据修改了)。

java">if (batch.size() < 50 && processedCount < beatDatums.size()) {// 批量处理同步数据continue;
}

RaftCore#receivedBeat

java">List<String> deadKeys = new ArrayList<>();
for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {if (entry.getValue() == 0) {deadKeys.add(entry.getKey());}
}for (String deadKey : deadKeys) {try {deleteDatum(deadKey);} catch (Exception e) {Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);}
}


http://www.ppmy.cn/ops/122523.html

相关文章

Linux云计算 |【第四阶段】RDBMS1-DAY5

主要内容&#xff1a; 试图概述&#xff08;创建视图VIEW、修改、查看、删除&#xff09;、变量&#xff08;全局变量、会话变量、用户变量、局部变量&#xff09;、存储过程&#xff08;创建、调用、删除存储过程&#xff09;、流程控制结构&#xff08;分支结构&#xff1a;…

【Verilog学习日常】—牛客网刷题—Verilog进阶挑战—VL25

输入序列连续的序列检测 描述 请编写一个序列检测模块&#xff0c;检测输入信号a是否满足01110001序列&#xff0c;当信号满足该序列&#xff0c;给出指示信号match。 模块的接口信号图如下&#xff1a; 模块的时序图如下&#xff1a; 请使用Verilog HDL实现以上功能&#x…

用Sklearn和Statsmodels来做linear_regression和Logistic_regression注意事项

用Sklearn和Statsmodels来做linear_regression和Logistic_regression注意事项&#xff0c;区别。主要在于 intercept 项&#xff0c;和 regularization。 X np.array([-1, 0, 1]) # 自变量 Y np.array([-2, 0, 5]) # 因变量一、Linear regression 的截距项 又叫 intercep…

操作系统_名词_文件下载_反弹SHELL_防火墙绕过

操作系统 操作系统-用途&命令&权限&用户&防火墙 1、个人计算机&服务器用机 2、windows&Linux常见命令 3、文件权限&服务权限&用户权限 4、系统用户&用户组&服务用户等分类 5、自带防火墙出站&入站规则策略协议 实用案例1&#x…

Leetcode—152. 乘积最大子数组【中等】

2024每日刷题&#xff08;174&#xff09; Leetcode—152. 乘积最大子数组 C实现代码 class Solution { public:int maxProduct(vector<int>& nums) {int n nums.size();int mx nums[0];int mn nums[0];int ans mx;for(int i 1; i < n; i) {const int prem…

认知战认知作战:认知战与安全挑战中方企业在海外的应对策略分析

认知战认知作战&#xff1a;认知战与安全挑战中方企业在海外的应对策略分析 关键词&#xff1a;认知战, 中方企业, 恐怖袭击, 安全挑战, 信息传播, 社会责任, 风险管理, 国际合作,认知作战,新质生产力,人类命运共同体,认知战,认知域,认知战研究中心,认知战争,认知战战术,认知战…

Redis——分布式锁

在一个分布式系统中&#xff0c;只要涉及到多个节点访问同一个公共资源的时候&#xff0c;就需要加锁来实现互斥&#xff0c;从而达到线程安全的问题。 但是呢&#xff0c;分布式系统不同一些&#xff0c;因为分布式系统部署在不同的服务器上&#xff0c;很可能大量的请求打到…

City Builder Urban 城市都市街道建筑场景模型

目前拥有178项优质资产。 城市建设者:Urban一个高质量的资产包,专为快速的纽约式城市建设而设计,与所有渲染管道兼容。 资产 56个带LOD的街道和屋顶道具 13个可堆叠的建筑部件与LOD混合搭配 10个不同尺寸的建筑装饰/分离器,总共40个装饰 请参阅秋季列表的技术细节 1个带有C…