From 53d43dee4fe49c65b7fba827ea671ed8de0ecc30 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sun, 7 Jul 2013 21:31:58 -0700 Subject: [PATCH] fix election timeout problem --- server.go | 52 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/server.go b/server.go index 49edd003fe4..31468895b8e 100644 --- a/server.go +++ b/server.go @@ -433,10 +433,13 @@ func (s *Server) sendAsync(value interface{}) *event { // 1.Receiving valid AppendEntries RPC, or // 2.Granting vote to candidate func (s *Server) followerLoop() { + s.setState(Follower) + timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2) for { var err error + var update bool select { case e := <-s.c: if e.target == &stopValue { @@ -444,18 +447,25 @@ func (s *Server) followerLoop() { } else if _, ok := e.target.(Command); ok { err = NotLeaderError } else if req, ok := e.target.(*AppendEntriesRequest); ok { - e.returnValue = s.processAppendEntriesRequest(req) + e.returnValue, update = s.processAppendEntriesRequest(req) } else if req, ok := e.target.(*RequestVoteRequest); ok { - e.returnValue = s.processRequestVoteRequest(req) + e.returnValue, update = s.processRequestVoteRequest(req) } // Callback to event. e.c <- err - case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2): + case <-timeoutChan: s.setState(Candidate) } + // Converts to candidate if election timeout elapses without either: + // 1.Receiving valid AppendEntries RPC, or + // 2.Granting vote to candidate + if update { + timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2) + } + // Exit loop on state change. if s.State() != Follower { break @@ -515,9 +525,9 @@ func (s *Server) candidateLoop() { } else if _, ok := e.target.(Command); ok { err = NotLeaderError } else if req, ok := e.target.(*AppendEntriesRequest); ok { - e.returnValue = s.processAppendEntriesRequest(req) + e.returnValue, _ = s.processAppendEntriesRequest(req) } else if req, ok := e.target.(*RequestVoteRequest); ok { - e.returnValue = s.processRequestVoteRequest(req) + e.returnValue, _ = s.processRequestVoteRequest(req) } // Callback to event. @@ -568,11 +578,11 @@ func (s *Server) leaderLoop() { s.processCommand(command, e) continue } else if req, ok := e.target.(*AppendEntriesRequest); ok { - e.returnValue = s.processAppendEntriesRequest(req) + e.returnValue, _ = s.processAppendEntriesRequest(req) } else if resp, ok := e.target.(*AppendEntriesResponse); ok { s.processAppendEntriesResponse(resp) } else if req, ok := e.target.(*RequestVoteRequest); ok { - e.returnValue = s.processRequestVoteRequest(req) + e.returnValue, _ = s.processRequestVoteRequest(req) } // Callback to event. @@ -645,10 +655,10 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse } // Processes the "append entries" request. -func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) *AppendEntriesResponse { +func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) { if req.Term < s.currentTerm { s.debugln("server.ae.error: stale term") - return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()) + return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), false } // Update term and leader. @@ -657,22 +667,22 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) *AppendE // Reject if log doesn't contain a matching previous entry. if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil { s.debugln("server.ae.truncate.error: ", err) - return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()) + return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true } // Append entries to the log. if err := s.log.appendEntries(req.Entries); err != nil { s.debugln("server.ae.append.error: ", err) - return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()) + return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true } // Commit up to the commit index. if err := s.log.setCommitIndex(req.CommitIndex); err != nil { s.debugln("server.ae.commit.error: ", err) - return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()) + return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true } - return newAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()) + return newAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), true } // Processes the "append entries" response from the peer. This is only @@ -732,33 +742,35 @@ func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse { } // Processes a "request vote" request. -func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) *RequestVoteResponse { +func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) { // If the request is coming from an old term then reject it. if req.Term < s.currentTerm { s.debugln("server.rv.error: stale term") - return newRequestVoteResponse(s.currentTerm, false) + return newRequestVoteResponse(s.currentTerm, false), false } s.setCurrentTerm(req.Term, "", false) // If we've already voted for a different candidate then don't vote for this candidate. if s.votedFor != "" && s.votedFor != req.CandidateName { - s.debugln("server.rv.error: duplicate vote: ", req.CandidateName) - return newRequestVoteResponse(s.currentTerm, false) + s.debugln("server.rv.error: duplicate vote: ", req.CandidateName, + " already vote for ", s.votedFor) + return newRequestVoteResponse(s.currentTerm, false), false } // If the candidate's log is not at least as up-to-date as our last log then don't vote. lastIndex, lastTerm := s.log.lastInfo() if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm { - s.debugln("server.rv.error: out of date log: ", req.CandidateName) - return newRequestVoteResponse(s.currentTerm, false) + s.debugln("server.rv.error: out of date log: ", req.CandidateName, + "[", lastIndex, "]", " [", req.LastLogIndex, "]") + return newRequestVoteResponse(s.currentTerm, false), false } // If we made it this far then cast a vote and reset our election time out. s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term) s.votedFor = req.CandidateName - return newRequestVoteResponse(s.currentTerm, true) + return newRequestVoteResponse(s.currentTerm, true), true } //--------------------------------------