【ETCD】【源码阅读】深入解析 raftNode.start方法实现

news/2024/12/14 13:39:50/

让我们从源码层面逐步分析这段代码。这段代码是 ETCD Raft 实现中,raftNodestart 方法,负责启动一个 Raft 节点,并在一个新的 goroutine 中处理 Raft 相关的事件。以下是对代码的逐步分析:

代码总体结构

func (r *raftNode) start(rh *raftReadyHandler) {internalTimeout := time.Secondgo func() {defer r.onStop()  // 确保 goroutine 停止时调用 onStop 方法islead := false  // 用来记录节点是否是领导者for {select {case <-r.ticker.C:r.tick()  // 定时器事件,触发 tick 操作case rd := <-r.Ready():  // 等待 Raft 操作准备好// 处理 Raft 操作case <-r.stopped:  // 如果 raftNode 被停止return}}}()
}

1. 启动 Goroutine

go func() {defer r.onStop()
  • 这部分代码使用 go 关键字在一个新的 goroutine 中异步执行 Raft 节点的操作。defer r.onStop() 确保在 goroutine 停止时执行 onStop() 方法进行清理。

2. 初始化领导者标志

islead := false
  • 这行代码用于初始化 islead 标志,记录当前节点是否是 Raft 集群中的领导者。

3. 事件循环

for {select {case <-r.ticker.C:r.tick()  // 每当 ticker 定时器触发时,执行 tick 操作case rd := <-r.Ready():  // 获取 Raft 准备好的日志条目// 处理 Raft 准备好的日志条目case <-r.stopped:  // 监视是否收到停止信号return}
}
  • 这部分是事件循环,select 用来监听多个事件:
    • r.ticker.C:定时器事件,用于执行 tick 操作。
    • r.Ready():从 Raft 引擎获取准备好的日志条目并处理。
    • r.stopped:接收到停止信号时退出 goroutine。

4. 处理 Raft Ready 数据

case rd := <-r.Ready():if rd.SoftState != nil {newLeader := rd.SoftState.Lead != raft.None && rh.getLead() != rd.SoftState.Leadif newLeader {leaderChanges.Inc()  // 记录领导者变更}if rd.SoftState.Lead == raft.None {hasLeader.Set(0)} else {hasLeader.Set(1)}rh.updateLead(rd.SoftState.Lead)islead = rd.RaftState == raft.StateLeaderif islead {isLeader.Set(1)} else {isLeader.Set(0)}rh.updateLeadership(newLeader)r.td.Reset()}
  • rd := <-r.Ready():从 Raft 中读取准备好的日志条目和状态。
  • 通过 rd.SoftState 判断是否发生了领导者变更,并更新领导者相关的信息(例如,islead 是否为领导者)。
  • leaderChanges.Inc():增加领导者变更计数。
  • hasLeader.Set()isLeader.Set():分别更新是否存在领导者以及当前节点是否是领导者的状态。
  • rh.updateLead()rh.updateLeadership():更新领导者信息。
  • r.td.Reset():重置一个定时器。

5. 处理 Raft ReadState

if len(rd.ReadStates) != 0 {select {case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:case <-time.After(internalTimeout):r.lg.Warn("timed out sending read state", zap.Duration("timeout", internalTimeout))case <-r.stopped:return}
}
  • 处理 ReadState,如果有待处理的读取请求,尝试将其发送到 r.readStateC 信道。如果在 internalTimeout 时间内没有完成,则会记录警告日志。

6. 应用日志条目和快照

notifyc := make(chan struct{}, 1)
ap := apply{entries:  rd.CommittedEntries,snapshot: rd.Snapshot,notifyc:  notifyc,
}
updateCommittedIndex(&ap, rh)waitWALSync := shouldWaitWALSync(rd)
if waitWALSync {if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))}
}
  • 创建一个 apply 对象,表示已提交的日志条目和快照。
  • updateCommittedIndex(&ap, rh):更新已提交的日志索引。
  • shouldWaitWALSync(rd):判断是否需要等待 WAL(Write Ahead Log)同步。
  • 如果需要同步,则调用 r.storage.Save() 将硬状态和日志条目保存到存储中。

7. 处理领导者和快照

if islead {r.transport.Send(r.processMessages(rd.Messages))  // 领导者处理消息并发送
}if !raft.IsEmptySnap(rd.Snapshot) {if err := r.storage.SaveSnap(rd.Snapshot); err != nil {r.lg.Fatal("failed to save Raft snapshot", zap.Error(err))}
}if !waitWALSync {if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {r.lg.Fatal("failed to save Raft hard state and entries", zap.Error(err))}
}
  • 如果当前节点是领导者(islead),则处理 Raft 消息并发送。
  • 如果有快照,保存快照数据 (r.storage.SaveSnap(rd.Snapshot)),确保恢复后能够正确加载快照。

8. 同步存储和释放资源

if err := r.storage.Sync(); err != nil {r.lg.Fatal("failed to sync Raft snapshot", zap.Error(err))
}notifyc <- struct{}{}
r.raftStorage.ApplySnapshot(rd.Snapshot)
r.lg.Info("applied incoming Raft snapshot", zap.Uint64("snapshot-index", rd.Snapshot.Metadata.Index))
if err := r.storage.Release(rd.Snapshot); err != nil {r.lg.Fatal("failed to release Raft wal", zap.Error(err))
}
  • 调用 r.storage.Sync() 确保存储同步。
  • 应用快照 (r.raftStorage.ApplySnapshot(rd.Snapshot)) 并记录日志。
  • 释放存储中的快照资源 (r.storage.Release(rd.Snapshot)).

9. 将日志条目添加到存储中

r.raftStorage.Append(rd.Entries)
  • 将 Raft 条目追加到存储中。

10. 处理非领导者节点的消息

if !islead {msgs := r.processMessages(rd.Messages)notifyc <- struct{}{}select {case notifyc <- struct{}{}:case <-r.stopped:return}r.transport.Send(msgs)
} else {notifyc <- struct{}{}
}
  • 如果当前节点不是领导者(!islead),则处理 Raft 消息,并发送到网络。
  • 其他情况(领导者节点)发送通知。

11. 推进 Raft 状态

r.Advance()
  • 调用 r.Advance(),推动 Raft 状态,更新日志。

总结

此方法用于启动 Raft 节点并在 goroutine 中异步运行 Raft 状态机。它通过定时器定期触发 tick 操作,并通过 Ready() 方法获取 Raft 的操作数据进行处理。它处理日志条目、快照、领导者变更等 Raft 相关的核心操作。每个 Raft 操作都会被同步到存储,并根据节点角色(领导者或跟随者)执行不同的行为。


http://www.ppmy.cn/news/1555027.html

相关文章

数据结构——跳表

目录 1.什么是跳表-skiplist 2.skiplist的效率如何保证&#xff1f; 3.skiplist的实现 4.skiplist跟平衡搜索树和哈希表的对比 1.什么是跳表-skiplist skiplist本质上也是一种查找结构&#xff0c;用于解决算法中的查找问题&#xff0c;跟平衡搜索树和哈希表的价值是一样的…

2024年鄂州初级、中级职称公示了

本次鄂州市工程、中职教师、技工学校教师、档案、农业、新闻、艺术、群文、文博等专业中级和初级职称公示了&#xff0c;通过共有371人&#xff0c;公示时间为2024年12月9日至2024年12月13日。 本次公示工程类中级职称人员有210人通过评审&#xff0c;助理职称有35人通过评审。…

重生之我在异世界学智力题(6)

大家好&#xff0c;这里是小编的博客频道 小编的博客&#xff1a;就爱学编程 很高兴在CSDN这个大家庭与大家相识&#xff0c;希望能在这里与大家共同进步&#xff0c;共同收获更好的自己&#xff01;&#xff01;&#xff01; 本文目录 引言海盗分金币问题&#xff08;1&#x…

手机IP谜团:一个设备,两个IP?

在日常使用手机上网的过程中&#xff0c;有时我们会发现手机竟然显示了两个IP地址&#xff0c;这让人不禁感到困惑。那么&#xff0c;一个手机为何会出现两个IP呢&#xff1f;这背后究竟隐藏着什么原因&#xff1f;虎观代理小二将为您详细解析这一现象&#xff0c;并探讨其可能…

RK3576 Android14,内存大于4G时UVC应用无法申请内存

最近有个项目需要将Linux虚拟成UVC摄像头&#xff0c;开发过程中遇到一个奇怪的事情&#xff0c;通过V4l2框架接口申请内存时&#xff0c;相同的板子&#xff0c;只是内存一个4G一个8G。4G的内存可以申请成功&#xff0c;8G就不行。提示“内存不足” 内存更大反而内存不足&…

DevExpress WPF中文教程:Grid - 如何移动和调整列大小?(一)

DevExpress WPF拥有120个控件和库&#xff0c;将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序&#xff0c;这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 无论是Office办公软件…

cron服务执行定时任务

参考链接 cron表达式在线解析&#xff1a;quartz/Cron/Crontab表达式在线生成工具-BeJSON.com 定时任务运行时报错解决方法 运行脚本报 权限不够问题&#xff1a; 可以在脚本文件夹下直接执行 如下指令运行RequestAPI.sh 脚本 ./RequestAPI.sh 如果出现权限不够问…

Apache Seatunnel Web 使用指南

Apache Seatunnel Web 使用指南 项目地址:https://gitcode.com/gh_mirrors/sea/seatunnel-web 项目介绍 Apache Seatunnel Web 是一个强大的数据集成平台&#xff0c;旨在简化数据管道的构建和管理过程。它基于著名的 Apache Seatunnel&#xff08;原名 Flatten&#xff09;…