Skip to content

Commit

Permalink
Merge pull request #59 from coreos/master
Browse files Browse the repository at this point in the history
Fix Election timeout
  • Loading branch information
benbjohnson committed Jul 8, 2013
2 parents d309819 + 53d43de commit c5bebba
Showing 1 changed file with 32 additions and 20 deletions.
52 changes: 32 additions & 20 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,29 +433,39 @@ func (s *Server) sendAsync(value interface{}) *event {
// 1.Receiving valid AppendEntries RPC, or
// 2.Granting vote to candidate
func (s *Server) followerLoop() {

s.setState(Follower)
timeoutChan := afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)

for {
var err error
var update bool
select {
case e := <-s.c:
if e.target == &stopValue {
s.setState(Stopped)
} else if _, ok := e.target.(Command); ok {
err = NotLeaderError
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue = s.processAppendEntriesRequest(req)
e.returnValue, update = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue = s.processRequestVoteRequest(req)
e.returnValue, update = s.processRequestVoteRequest(req)
}

// Callback to event.
e.c <- err

case <-afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2):
case <-timeoutChan:
s.setState(Candidate)
}

// Converts to candidate if election timeout elapses without either:
// 1.Receiving valid AppendEntries RPC, or
// 2.Granting vote to candidate
if update {
timeoutChan = afterBetween(s.ElectionTimeout(), s.ElectionTimeout()*2)
}

// Exit loop on state change.
if s.State() != Follower {
break
Expand Down Expand Up @@ -515,9 +525,9 @@ func (s *Server) candidateLoop() {
} else if _, ok := e.target.(Command); ok {
err = NotLeaderError
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue = s.processAppendEntriesRequest(req)
e.returnValue, _ = s.processAppendEntriesRequest(req)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue = s.processRequestVoteRequest(req)
e.returnValue, _ = s.processRequestVoteRequest(req)
}

// Callback to event.
Expand Down Expand Up @@ -568,11 +578,11 @@ func (s *Server) leaderLoop() {
s.processCommand(command, e)
continue
} else if req, ok := e.target.(*AppendEntriesRequest); ok {
e.returnValue = s.processAppendEntriesRequest(req)
e.returnValue, _ = s.processAppendEntriesRequest(req)
} else if resp, ok := e.target.(*AppendEntriesResponse); ok {
s.processAppendEntriesResponse(resp)
} else if req, ok := e.target.(*RequestVoteRequest); ok {
e.returnValue = s.processRequestVoteRequest(req)
e.returnValue, _ = s.processRequestVoteRequest(req)
}

// Callback to event.
Expand Down Expand Up @@ -645,10 +655,10 @@ func (s *Server) AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
}

// Processes the "append entries" request.
func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) *AppendEntriesResponse {
func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) (*AppendEntriesResponse, bool) {
if req.Term < s.currentTerm {
s.debugln("server.ae.error: stale term")
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex())
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), false
}

// Update term and leader.
Expand All @@ -657,22 +667,22 @@ func (s *Server) processAppendEntriesRequest(req *AppendEntriesRequest) *AppendE
// Reject if log doesn't contain a matching previous entry.
if err := s.log.truncate(req.PrevLogIndex, req.PrevLogTerm); err != nil {
s.debugln("server.ae.truncate.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex())
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true
}

// Append entries to the log.
if err := s.log.appendEntries(req.Entries); err != nil {
s.debugln("server.ae.append.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex())
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true
}

// Commit up to the commit index.
if err := s.log.setCommitIndex(req.CommitIndex); err != nil {
s.debugln("server.ae.commit.error: ", err)
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex())
return newAppendEntriesResponse(s.currentTerm, false, s.log.CommitIndex()), true
}

return newAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex())
return newAppendEntriesResponse(s.currentTerm, true, s.log.CommitIndex()), true
}

// Processes the "append entries" response from the peer. This is only
Expand Down Expand Up @@ -732,33 +742,35 @@ func (s *Server) RequestVote(req *RequestVoteRequest) *RequestVoteResponse {
}

// Processes a "request vote" request.
func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) *RequestVoteResponse {
func (s *Server) processRequestVoteRequest(req *RequestVoteRequest) (*RequestVoteResponse, bool) {
// If the request is coming from an old term then reject it.
if req.Term < s.currentTerm {
s.debugln("server.rv.error: stale term")
return newRequestVoteResponse(s.currentTerm, false)
return newRequestVoteResponse(s.currentTerm, false), false
}

s.setCurrentTerm(req.Term, "", false)

// If we've already voted for a different candidate then don't vote for this candidate.
if s.votedFor != "" && s.votedFor != req.CandidateName {
s.debugln("server.rv.error: duplicate vote: ", req.CandidateName)
return newRequestVoteResponse(s.currentTerm, false)
s.debugln("server.rv.error: duplicate vote: ", req.CandidateName,
" already vote for ", s.votedFor)
return newRequestVoteResponse(s.currentTerm, false), false
}

// If the candidate's log is not at least as up-to-date as our last log then don't vote.
lastIndex, lastTerm := s.log.lastInfo()
if lastIndex > req.LastLogIndex || lastTerm > req.LastLogTerm {
s.debugln("server.rv.error: out of date log: ", req.CandidateName)
return newRequestVoteResponse(s.currentTerm, false)
s.debugln("server.rv.error: out of date log: ", req.CandidateName,
"[", lastIndex, "]", " [", req.LastLogIndex, "]")
return newRequestVoteResponse(s.currentTerm, false), false
}

// If we made it this far then cast a vote and reset our election time out.
s.debugln("server.rv.vote: ", s.name, " votes for", req.CandidateName, "at term", req.Term)
s.votedFor = req.CandidateName

return newRequestVoteResponse(s.currentTerm, true)
return newRequestVoteResponse(s.currentTerm, true), true
}

//--------------------------------------
Expand Down

0 comments on commit c5bebba

Please sign in to comment.