目录
1 简介
1.1 什么是 CP 架构?
1.2 Nacos 中的 CP 架构特点
1.3 优缺点
1.4适用场景
2 cp架构的主节点选举
2.1 选举流程
2.2 总结
3 cp架构主节点的心跳发送
3.1 leader发送心跳
3.2 follower接收心跳
3.3 总结
4 cp架构的服务注册
4.1 注册流程
4.2 总结
1 简介
Nacos(Naming Configuration Service)是一款由阿里巴巴开源的服务发现和配置管理平台。它支持服务注册、发现和配置管理,并在微服务架构中被广泛应用。Nacos 支持两种主要的分布式一致性架构:CP 模式和 AP模式。这里主要介绍一下 CP 架构。
1.1 什么是 CP 架构?
CP 架构遵循 CAP 理论中的 一致性(Consistency) 和 分区容错性(Partition Tolerance),在系统遇到网络分区问题时,CP 架构确保数据一致性,但可能会牺牲可用性(Availability)。这意味着在网络分区的情况下,系统会优先保证所有节点数据的一致性,即使需要暂停一些服务。
1.2 Nacos 中的 CP 架构特点
• 数据强一致性:CP 模式下,Nacos 集群中的数据保证强一致性,所有的写操作都需要通过大多数节点的确认才能生效。因此,当数据写入一个节点时,数据会被同步到集群中的大多数节点,确保一致性。
• Raft 协议:Nacos 的 CP 架构实现基于 Raft 一致性协议,这是一个分布式一致性算法,确保数据在多个节点之间保持一致。Raft 协议通过选举出一个主节点(Leader)来管理写操作,其他节点作为从节点(Follower)负责复制数据。
• 主从架构:Nacos 在 CP 模式下采用主从架构,主节点处理所有写请求,并将这些请求复制给从节点。若主节点不可用,则会在剩余节点中进行新的选举。
• 数据可靠性:因为数据需要被大多数节点确认后才被写入,CP 模式提供了更高的数据可靠性,适用于对一致性要求高的场景,如金融或订单系统。
1.3 优缺点
优点:
• 数据一致性:在分布式环境中保持数据的强一致性,减少数据不一致问题。
• 可靠性:保证在故障或网络分区情况下,数据不会出现不同步或冲突的现象。
缺点:
• 可用性降低:在网络分区或节点故障时,服务可用性可能受到影响,无法及时处理请求。
• 性能开销:因为写操作需要多数节点确认,写入性能可能相对较低。
1.4适用场景
CP 架构适用于对一致性要求极高的应用,如:
• 金融系统:需要确保每笔交易的准确性。
• 订单处理系统:要求数据在订单创建、修改等操作中保持严格一致。
2 cp架构的主节点选举
2.1 选举流程
核心类:RaftCore.java 核心方法:init
java">
public void init() throws Exception {Loggers.RAFT.info("initializing Raft sub-system");final long start = System.currentTimeMillis();raftStore.loadDatums(notifier, datums);setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());initialized = true;Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));//1 选举流程masterTask = GlobalExecutor.registerMasterElection(new MasterElection());//2 发心跳流程heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());versionJudgement.registerObserver(isAllNewVersion -> {stopWork = isAllNewVersion;if (stopWork) {try {shutdown();raftListener.removeOldRaftMetadata();} catch (NacosException e) {throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);}}}, 100);NotifyCenter.registerSubscriber(notifier);Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}//MasterElection中的run方法
public void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}//1 休眠一段时间RaftPeer local = peers.local();local.leaderDueMs -= GlobalExecutor.TICK_PERIOD_MS;if (local.leaderDueMs > 0) {return;}// reset timeoutlocal.resetLeaderDue();local.resetHeartbeatDue();//2 发起投票sendVote();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while master election {}", e);}}private void sendVote() {RaftPeer local = peers.get(NetUtils.localServer());Loggers.RAFT.info("leader timeout, start voting,leader: {}, term: {}", JacksonUtils.toJson(getLeader()),local.term);peers.reset();//1 选举周期加1local.term.incrementAndGet();//2 投票给自己local.voteFor = local.ip;//3 将自己改为候选者local.state = RaftPeer.State.CANDIDATE;Map<String, String> params = new HashMap<>(1);params.put("vote", JacksonUtils.toJson(local));//4 发起投票for (final String server : peers.allServersWithoutMySelf()) {final String url = buildUrl(server, API_VOTE);try {HttpClient.asyncHttpPost(url, null, params, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.error("NACOS-RAFT vote failed: {}, url: {}", result.getCode(), url);return;}RaftPeer peer = JacksonUtils.toObj(result.getData(), RaftPeer.class);Loggers.RAFT.info("received approve from peer: {}", JacksonUtils.toJson(peer));//5 解析选票 如果大于半数 将自己改为leaderpeers.decideLeader(peer);}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("error while sending vote to server: {}", server, throwable);}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.RAFT.warn("error while sending vote to server: {}", server);}}
}
2.2 总结
-
首先所有节点休眠一个随机时间
-
当节点苏醒后 将自己的选举周期加一 投票给自己 将自己修改为候选者
-
然后发送投票给其他所有节点
-
半数节点以上同意就将自己修改为leader 如果没有选择 开启下一轮的选举
3 cp架构主节点的心跳发送
3.1 leader发送心跳
核心类:RaftCore.java 核心方法:init
java">
public void init() throws Exception {Loggers.RAFT.info("initializing Raft sub-system");final long start = System.currentTimeMillis();raftStore.loadDatums(notifier, datums);setTerm(NumberUtils.toLong(raftStore.loadMeta().getProperty("term"), 0L));Loggers.RAFT.info("cache loaded, datum count: {}, current term: {}", datums.size(), peers.getTerm());initialized = true;Loggers.RAFT.info("finish to load data from disk, cost: {} ms.", (System.currentTimeMillis() - start));//1 选举流程masterTask = GlobalExecutor.registerMasterElection(new MasterElection());//2 发心跳流程heartbeatTask = GlobalExecutor.registerHeartbeat(new HeartBeat());versionJudgement.registerObserver(isAllNewVersion -> {stopWork = isAllNewVersion;if (stopWork) {try {shutdown();raftListener.removeOldRaftMetadata();} catch (NacosException e) {throw new NacosRuntimeException(NacosException.SERVER_ERROR, e);}}}, 100);NotifyCenter.registerSubscriber(notifier);Loggers.RAFT.info("timer started: leader timeout ms: {}, heart-beat timeout ms: {}",GlobalExecutor.LEADER_TIMEOUT_MS, GlobalExecutor.HEARTBEAT_INTERVAL_MS);
}//HeartBeat的run方法
public void run() {try {if (stopWork) {return;}if (!peers.isReady()) {return;}RaftPeer local = peers.local();local.heartbeatDueMs -= GlobalExecutor.TICK_PERIOD_MS;if (local.heartbeatDueMs > 0) {return;}local.resetHeartbeatDue();//发送心跳sendBeat();} catch (Exception e) {Loggers.RAFT.warn("[RAFT] error while sending beat {}", e);}}private void sendBeat() throws IOException, InterruptedException {RaftPeer local = peers.local();//1 如果不是leader 不能发送心跳if (EnvUtil.getStandaloneMode() || local.state != RaftPeer.State.LEADER) {return;}if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("[RAFT] send beat with {} keys.", datums.size());}local.resetLeaderDue();// build dataObjectNode packet = JacksonUtils.createEmptyJsonNode();packet.replace("peer", JacksonUtils.transferToJsonNode(local));ArrayNode array = JacksonUtils.createEmptyArrayNode();if (switchDomain.isSendBeatOnly()) {Loggers.RAFT.info("[SEND-BEAT-ONLY] {}", switchDomain.isSendBeatOnly());}//2 封装需要发送的信息 主要包含key和时间if (!switchDomain.isSendBeatOnly()) {for (Datum datum : datums.values()) {ObjectNode element = JacksonUtils.createEmptyJsonNode();if (KeyBuilder.matchServiceMetaKey(datum.key)) {element.put("key", KeyBuilder.briefServiceMetaKey(datum.key));} else if (KeyBuilder.matchInstanceListKey(datum.key)) {element.put("key", KeyBuilder.briefInstanceListkey(datum.key));}element.put("timestamp", datum.timestamp.get());array.add(element);}}packet.replace("datums", array);// broadcastMap<String, String> params = new HashMap<String, String>(1);params.put("beat", JacksonUtils.toJson(packet));//3 压缩消息String content = JacksonUtils.toJson(params);ByteArrayOutputStream out = new ByteArrayOutputStream();GZIPOutputStream gzip = new GZIPOutputStream(out);gzip.write(content.getBytes(StandardCharsets.UTF_8));gzip.close();byte[] compressedBytes = out.toByteArray();String compressedContent = new String(compressedBytes, StandardCharsets.UTF_8);if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("raw beat data size: {}, size of compressed data: {}", content.length(),compressedContent.length());}//4 将消息发送给其他节点for (final String server : peers.allServersWithoutMySelf()) {try {final String url = buildUrl(server, API_BEAT);if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("send beat to server " + server);}HttpClient.asyncHttpPostLarge(url, null, compressedBytes, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.error("NACOS-RAFT beat failed: {}, peer: {}", result.getCode(), server);MetricsMonitor.getLeaderSendBeatFailedException().increment();return;}peers.update(JacksonUtils.toObj(result.getData(), RaftPeer.class));if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("receive beat response from: {}", url);}}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("NACOS-RAFT error while sending heart-beat to peer: {} {}", server,throwable);MetricsMonitor.getLeaderSendBeatFailedException().increment();}@Overridepublic void onCancel() {}});} catch (Exception e) {Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);MetricsMonitor.getLeaderSendBeatFailedException().increment();}}}
3.2 follower接收心跳
核心类:RaftController.java 核心方法:beat
java">
public JsonNode beat(HttpServletRequest request, HttpServletResponse response) throws Exception {if (versionJudgement.allMemberIsNewVersion()) {throw new IllegalStateException("old raft protocol already stop");}String entity = new String(IoUtils.tryDecompress(request.getInputStream()), StandardCharsets.UTF_8);String value = URLDecoder.decode(entity, "UTF-8");value = URLDecoder.decode(value, "UTF-8");JsonNode json = JacksonUtils.toObj(value);//1 接受心跳RaftPeer peer = raftCore.receivedBeat(JacksonUtils.toObj(json.get("beat").asText()));return JacksonUtils.transferToJsonNode(peer);
}public RaftPeer receivedBeat(JsonNode beat) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}final RaftPeer local = peers.local();final RaftPeer remote = new RaftPeer();JsonNode peer = beat.get("peer");remote.ip = peer.get("ip").asText();remote.state = RaftPeer.State.valueOf(peer.get("state").asText());remote.term.set(peer.get("term").asLong());remote.heartbeatDueMs = peer.get("heartbeatDueMs").asLong();remote.leaderDueMs = peer.get("leaderDueMs").asLong();remote.voteFor = peer.get("voteFor").asText();//1 只处理leader的心跳if (remote.state != RaftPeer.State.LEADER) {Loggers.RAFT.info("[RAFT] invalid state from master, state: {}, remote peer: {}", remote.state,JacksonUtils.toJson(remote));throw new IllegalArgumentException("invalid state from master, state: " + remote.state);}if (local.term.get() > remote.term.get()) {Loggers.RAFT.info("[RAFT] out of date beat, beat-from-term: {}, beat-to-term: {}, remote peer: {}, and leaderDueMs: {}",remote.term.get(), local.term.get(), JacksonUtils.toJson(remote), local.leaderDueMs);throw new IllegalArgumentException("out of date beat, beat-from-term: " + remote.term.get() + ", beat-to-term: " + local.term.get());}//2 如果之前不是follower 将自己修改为follower节点if (local.state != RaftPeer.State.FOLLOWER) {Loggers.RAFT.info("[RAFT] make remote as leader, remote peer: {}", JacksonUtils.toJson(remote));// mk followerlocal.state = RaftPeer.State.FOLLOWER;local.voteFor = remote.ip;}final JsonNode beatDatums = beat.get("datums");local.resetLeaderDue();local.resetHeartbeatDue();peers.makeLeader(remote);if (!switchDomain.isSendBeatOnly()) {Map<String, Integer> receivedKeysMap = new HashMap<>(datums.size());for (Map.Entry<String, Datum> entry : datums.entrySet()) {//3 将自己内存的数据标记为0receivedKeysMap.put(entry.getKey(), 0);}// now check datumsList<String> batch = new ArrayList<>();int processedCount = 0;if (Loggers.RAFT.isDebugEnabled()) {Loggers.RAFT.debug("[RAFT] received beat with {} keys, RaftCore.datums' size is {}, remote server: {}, term: {}, local term: {}",beatDatums.size(), datums.size(), remote.ip, remote.term, local.term);}//4 开始解析远程发来的keyfor (Object object : beatDatums) {processedCount = processedCount + 1;JsonNode entry = (JsonNode) object;String key = entry.get("key").asText();final String datumKey;if (KeyBuilder.matchServiceMetaKey(key)) {datumKey = KeyBuilder.detailServiceMetaKey(key);} else if (KeyBuilder.matchInstanceListKey(key)) {datumKey = KeyBuilder.detailInstanceListkey(key);} else {// ignore corrupted key:continue;}long timestamp = entry.get("timestamp").asLong();//5 标记是leader发来的keyreceivedKeysMap.put(datumKey, 1);try {//6 如果本地包含这个key 并且时间戳大于等于leader的时间戳 不处理if (datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp&& processedCount < beatDatums.size()) {continue;}//7 如果不包含这个key或者本地key不是新的 存储起来if (!(datums.containsKey(datumKey) && datums.get(datumKey).timestamp.get() >= timestamp)) {batch.add(datumKey);}//8 如果没有达到批量处理条件 继续处理if (batch.size() < 50 && processedCount < beatDatums.size()) {continue;}String keys = StringUtils.join(batch, ",");if (batch.size() <= 0) {continue;}Loggers.RAFT.info("get datums from leader: {}, batch size is {}, processedCount is {}"+ ", datums' size is {}, RaftCore.datums' size is {}", getLeader().ip, batch.size(),processedCount, beatDatums.size(), datums.size());// update datum entry//9 调用leader的接口 获取不是最新key的信息String url = buildUrl(remote.ip, API_GET);Map<String, String> queryParam = new HashMap<>(1);queryParam.put("keys", URLEncoder.encode(keys, "UTF-8"));HttpClient.asyncHttpGet(url, null, queryParam, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {return;}List<JsonNode> datumList = JacksonUtils.toObj(result.getData(), new TypeReference<List<JsonNode>>() {});for (JsonNode datumJson : datumList) {Datum newDatum = null;OPERATE_LOCK.lock();try {Datum oldDatum = getDatum(datumJson.get("key").asText());if (oldDatum != null && datumJson.get("timestamp").asLong() <= oldDatum.timestamp.get()) {Loggers.RAFT.info("[NACOS-RAFT] timestamp is smaller than that of mine, key: {}, remote: {}, local: {}",datumJson.get("key").asText(),datumJson.get("timestamp").asLong(), oldDatum.timestamp);continue;}if (KeyBuilder.matchServiceMetaKey(datumJson.get("key").asText())) {Datum<Service> serviceDatum = new Datum<>();serviceDatum.key = datumJson.get("key").asText();serviceDatum.timestamp.set(datumJson.get("timestamp").asLong());serviceDatum.value = JacksonUtils.toObj(datumJson.get("value").toString(), Service.class);newDatum = serviceDatum;}if (KeyBuilder.matchInstanceListKey(datumJson.get("key").asText())) {Datum<Instances> instancesDatum = new Datum<>();instancesDatum.key = datumJson.get("key").asText();instancesDatum.timestamp.set(datumJson.get("timestamp").asLong());instancesDatum.value = JacksonUtils.toObj(datumJson.get("value").toString(), Instances.class);newDatum = instancesDatum;}if (newDatum == null || newDatum.value == null) {Loggers.RAFT.error("receive null datum: {}", datumJson);continue;}raftStore.write(newDatum);datums.put(newDatum.key, newDatum);notifier.notify(newDatum.key, DataOperation.CHANGE, newDatum.value);local.resetLeaderDue();if (local.term.get() + 100 > remote.term.get()) {getLeader().term.set(remote.term.get());local.term.set(getLeader().term.get());} else {local.term.addAndGet(100);}raftStore.updateTerm(local.term.get());Loggers.RAFT.info("data updated, key: {}, timestamp: {}, from {}, local term: {}",newDatum.key, newDatum.timestamp, JacksonUtils.toJson(remote), local.term);} catch (Throwable e) {Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader, datum: {}", newDatum,e);} finally {OPERATE_LOCK.unlock();}}try {TimeUnit.MILLISECONDS.sleep(200);} catch (InterruptedException e) {Loggers.RAFT.error("[RAFT-BEAT] Interrupted error ", e);}return;}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT-BEAT] failed to sync datum from leader", throwable);}@Overridepublic void onCancel() {}});batch.clear();} catch (Exception e) {Loggers.RAFT.error("[NACOS-RAFT] failed to handle beat entry, key: {}", datumKey);}}List<String> deadKeys = new ArrayList<>();//10 将leader已经删除的key删除for (Map.Entry<String, Integer> entry : receivedKeysMap.entrySet()) {if (entry.getValue() == 0) {deadKeys.add(entry.getKey());}}for (String deadKey : deadKeys) {try {deleteDatum(deadKey);} catch (Exception e) {Loggers.RAFT.error("[NACOS-RAFT] failed to remove entry, key={} {}", deadKey, e);}}}return local;
}
3.3 总结
-
首先是leader每隔一定时间会发送心跳给follower节点 心跳的数据主要包含key值和时间戳
-
follower接收到心跳后 将与本地的key和时间戳做对比 将过期的数据重新向leader节点拉取一份更新到本地
-
如果本地的key值在leader发来的心跳中没有出现 说明这个key在leader节点被删除了 需要删除
4 cp架构的服务注册
4.1 注册流程
核心类:RaftConsistencyServiceImpl.java 核心方法:put
java">
public void put(String key, Record value) throws NacosException {checkIsStopWork();try {//1 注册服务信息raftCore.signalPublish(key, value);} catch (Exception e) {Loggers.RAFT.error("Raft put failed.", e);throw new NacosException(NacosException.SERVER_ERROR, "Raft put failed, key:" + key + ", value:" + value,e);}
}public void signalPublish(String key, Record value) throws Exception {if (stopWork) {throw new IllegalStateException("old raft protocol already stop work");}//1 判断节点是不是leaderif (!isLeader()) {ObjectNode params = JacksonUtils.createEmptyJsonNode();params.put("key", key);params.replace("value", JacksonUtils.transferToJsonNode(value));Map<String, String> parameters = new HashMap<>(1);parameters.put("key", key);final RaftPeer leader = getLeader();//2 如果不是leader 转发请求给leaderraftProxy.proxyPostLarge(leader.ip, API_PUB, params.toString(), parameters);return;}OPERATE_LOCK.lock();try {final long start = System.currentTimeMillis();final Datum datum = new Datum();datum.key = key;datum.value = value;if (getDatum(key) == null) {datum.timestamp.set(1L);} else {datum.timestamp.set(getDatum(key).timestamp.incrementAndGet());}ObjectNode json = JacksonUtils.createEmptyJsonNode();json.replace("datum", JacksonUtils.transferToJsonNode(datum));json.replace("source", JacksonUtils.transferToJsonNode(peers.local()));//3 将改动存储到本地文件并发布事件通知onPublish(datum, peers.local());final String content = json.toString();//4 通知其他节点变更 满足半数以上即可final CountDownLatch latch = new CountDownLatch(peers.majorityCount());for (final String server : peers.allServersIncludeMyself()) {if (isLeader(server)) {latch.countDown();continue;}final String url = buildUrl(server, API_ON_PUB);HttpClient.asyncHttpPostLarge(url, Arrays.asList("key", key), content, new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (!result.ok()) {Loggers.RAFT.warn("[RAFT] failed to publish data to peer, datumId={}, peer={}, http code={}",datum.key, server, result.getCode());return;}latch.countDown();}@Overridepublic void onError(Throwable throwable) {Loggers.RAFT.error("[RAFT] failed to publish data to peer", throwable);}@Overridepublic void onCancel() {}});}//5 不满足半数直接抛异常if (!latch.await(UtilsAndCommons.RAFT_PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS)) {// only majority servers return success can we consider this update successLoggers.RAFT.error("data publish failed, caused failed to notify majority, key={}", key);throw new IllegalStateException("data publish failed, caused failed to notify majority, key=" + key);}long end = System.currentTimeMillis();Loggers.RAFT.info("signalPublish cost {} ms, key: {}", (end - start), key);} finally {OPERATE_LOCK.unlock();}
}
4.2 总结
-
当客户端发送服务注册的消息时 如果发送到follower节点 follower节点会把消息转发给leader节点
-
leader节点手动服务注册的消息时 首先将服务信息存储到本地文件 并且广播给follower节点存储服务信息
-
然后leader节点通知follower节点将服务信息存储提交 满足半数以上响应成功