论文很重要 raft-zh_cn/raft-zh_cn.md at master · maemual/raft-zh_cn · GitHub
Part 3A: leader election (moderate)
实现 Raft 的领导者选举和心跳机制(AppendEntries
RPC,无日志条目)。第 3A 部分的目标是实现以下功能:
- 集群中能够成功选举出一个领导者。
- 如果没有节点故障,当前领导者能够保持其领导地位。
- 如果当前领导者发生故障,或者其与其他节点之间的通信中断,集群能够选举出新的领导者接替其位置。
type LogEntry struct {Term int // 用于区分不同的Leader任期CommandValid bool // 当前指令是否有效。如果无效,follower 可以拒绝复制Command interface{} // 表示可以存储任意类型的指令。
}type Role intconst (Leader Role = iotaFollowerCandidate
)// A Go object implementing a single Raft peer.
type Raft struct {mu sync.Mutex // Lock to protect shared access to this peer's statepeers []*labrpc.ClientEnd // RPC end points of all peerspersister *Persister // Object to hold this peer's persisted stateme int // this peer's index into peers[]dead int32 // set by Kill()// Your data here (3A, 3B, 3C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.log []LogEntrycurrentTerm intvotedFor introle RoleelectionStart time.TimeelectionTimeout time.DurationcommitIndex int //已知已提交的最高的日志条目的索引(初始值为0,单调递增)lastApplied int //已知已应用到状态机的日志条目的索引(初始值为0,单调递增)nextIndex []int //对于每一台服务器,发送到该服务器的下一个日志条目的索引(初始值为领导人最后的日志条目的索引+1)matchIndex []int //对于每一台服务器,已知的已经复制到该服务器的最高日志条目的索引(初始值为0,单调递增)
}func Make(peers []*labrpc.ClientEnd, me int,persister *Persister, applyCh chan ApplyMsg) *Raft {rf := &Raft{}rf.peers = peersrf.persister = persisterrf.me = merf.role = Followerrf.electionStart = time.Now()rf.votedFor = -1rf.currentTerm = 0rf.commitIndex = 0rf.lastApplied = 0rf.log = make([]LogEntry, 1)rand.Seed(time.Now().UnixNano())rf.electionTimeout = time.Duration(450+rand.Intn(150)) * time.Millisecondrf.nextIndex = make([]int, len(rf.peers))rf.matchIndex = make([]int, len(rf.peers))// Your initialization code here (3A, 3B, 3C).// initialize from state persisted before a crashrf.readPersist(persister.ReadRaftState())// start ticker goroutine to start electionsgo rf.ticker()return rf
在ticker()中,题目提到"不要使用Go的 time.Timer 或 time.Ticker ,它们很难正确使用",所以用原本代码框架中的time.Sleep()来实现定时操作,sleep的时间也是leader心跳的间隔时间,对于节点选举超时的定时器,用time.Since(rf.electionStart) >= rf.electionTimeout实现。
func (rf *Raft) ticker() {for rf.killed() == false {// Your code here (3A)// Check if a leader election should be started.rf.mu.Lock()// 如果是 Follower 或 Candidate,检查是否超时if rf.role == Follower {if time.Since(rf.electionStart) >= rf.electionTimeout {// 超时,开始选举rf.BecomeCandidate()rf.startElection()}}if rf.role == Candidate {if time.Since(rf.electionStart) >= rf.electionTimeout {rf.electionStart = time.Now()rf.startElection()}}// 如果是 Leader,定期发送心跳if rf.role == Leader {rf.sendHeartbeat()}rf.mu.Unlock()// pause for a random amount of time between 50 and 350// milliseconds.ms := 40 + (rand.Int63() % 100)time.Sleep(time.Duration(ms) * time.Millisecond)}
实现追加条目(AppendEntries)RPC以及请求投票(RequestVote)RPC,参数和返回值的字段以及方法的逻辑在论文中均有记录,这里要注意的是什么时候要进行选举超时定时器的重置rf.electionStart = time.Now(),对每个节点发送心跳和请求投票都需要用groutine,不同发送操作是异步的。若成为Leader,则之前必须是候选人。
type RequestVoteArgs struct {Term int // 候选人的任期号CandidateID int // 请求选票的候选人的 IDLastLogIndex int // 候选人的最后日志条目的索引值LastLogTerm int // 候选人的最后日志条目的任期值
}type RequestVoteReply struct {Term int // 当前任期号,以便于候选人去更新自己的任期号VoteGranted bool // 候选人赢得了此张选票时为真
}func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {// Your code here (3A, 3B).rf.mu.Lock()defer rf.mu.Unlock()if args.Term < rf.currentTerm {reply.Term = rf.currentTermreply.VoteGranted = falsereturn}if args.Term > rf.currentTerm {rf.BecomeFollower(args.Term, -1)}if rf.votedFor == -1 || rf.votedFor == args.CandidateID {DPrintf("candidate %d get vote from %d", args.CandidateID, rf.me)rf.votedFor = args.CandidateIDrf.electionStart = time.Now()reply.VoteGranted = true} else {//处理两个节点同为候选人的情况reply.VoteGranted = false}
}func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {ok := rf.peers[server].Call("Raft.RequestVote", args, reply)return ok
type AppendEntriesArgs struct {Term int // 领导人的任期号LeaderID int // 领导人的 IDPrevLogIndex int // 紧邻新日志条目之前的那个日志条目的索引值PrevLogTerm int // 紧邻新日志条目之前的那个日志条目的任期值Entries []LogEntry // 准备存储的日志条目(表示心跳时为空;一次性发送多个是为了提高效率)LeaderCommit int // 领导人已经提交的日志的索引值
}type AppendEntriesReply struct {Term int // 当前的任期号,用于领导人更新自己Success bool // 如果 Follower 包含了匹配上 `PrevLogIndex` 和 `PrevLogTerm` 的日志条目时为 true
}func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {rf.mu.Lock()defer rf.mu.Unlock()if args.Term < rf.currentTerm {reply.Term = rf.currentTermreply.Success = falsereturn}rf.votedFor = args.LeaderIDrf.electionStart = time.Now()reply.Success = truereturn}func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {return rf.peers[server].Call("Raft.AppendEntries", args, reply)
func (rf *Raft) BecomeFollower(term, votedFor int) {rf.role = Followerrf.currentTerm = termrf.votedFor = votedForrf.electionStart = time.Now()DPrintf("Pod %d become follower by %d when term %d ", rf.me, votedFor, rf.currentTerm)
}func (rf *Raft) BecomeCandidate() {rf.role = Candidaterf.currentTerm++rf.votedFor = rf.merf.electionStart = time.Now()DPrintf("Pod %d become candidate when term %d ", rf.me, rf.currentTerm)
}func (rf *Raft) BecomeLeader() {rf.role = Leaderfor i := range rf.peers {rf.nextIndex[i] = len(rf.log) // 领导者的 nextIndex 为日志最后一条之后rf.matchIndex[i] = 0 // 初始 matchIndex 为 0}DPrintf("Pod %d become leader when term %d ", rf.me, rf.currentTerm)
}func (rf *Raft) sendHeartbeat() {term := rf.currentTermleaderCommit := rf.commitIndex// 遍历所有 Follower,发送 AppendEntries RPCfor i := range rf.peers {if i != rf.me {go func(server int) {prevLogIndex := rf.nextIndex[server] - 1prevLogTerm := 0if prevLogIndex >= 0 {prevLogTerm = rf.log[prevLogIndex].Term}args := &AppendEntriesArgs{Term: term,LeaderID: rf.me,PrevLogIndex: prevLogIndex,PrevLogTerm: prevLogTerm,Entries: nil, // 心跳中无日志条目LeaderCommit: leaderCommit,}reply := &AppendEntriesReply{}if rf.sendAppendEntries(server, args, reply) {if !reply.Success && reply.Term > rf.currentTerm {rf.BecomeFollower(reply.Term, -1)}}}(i)}}rf.electionStart = time.Now()
}func (rf *Raft) startElection() {// 统计自己的票数votes := 1term := rf.currentTerm// 获取当前日志信息lastLogIndex := len(rf.log) - 1lastLogTerm := 0if lastLogIndex >= 0 {lastLogTerm = rf.log[lastLogIndex].Term}// 遍历所有其他节点,发送 RequestVote RPCfor i := range rf.peers {if i != rf.me {go func(server int) {args := &RequestVoteArgs{Term: term,CandidateID: rf.me,LastLogIndex: lastLogIndex,LastLogTerm: lastLogTerm,}reply := &RequestVoteReply{}if rf.sendRequestVote(server, args, reply) {if reply.Term > rf.currentTerm || !reply.VoteGranted {rf.BecomeFollower(reply.Term, -1)return}if reply.VoteGranted {votes++if votes > len(rf.peers)/2 && rf.role == Candidate {rf.BecomeLeader()rf.sendHeartbeat()}}}}(i)}}
Part 3B: log (hard)