Skip to content

Commit

Permalink
Fix to raft-33: commit entries on receiving heartbeat
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-scott committed Aug 8, 2015
1 parent 08a96a8 commit 687a244
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 20 deletions.
26 changes: 10 additions & 16 deletions src/main/scala/pl/project13/scala/akka/raft/Follower.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,26 +69,20 @@ private[raft] trait Follower {
implicit val self = m.clusterSelf // todo this is getting pretty crap, revert to having Cluster awareness a trait IMO

if (leaderIsLagging(msg, m)) {
if (msg.isNotHeartbeat) {
log.info("Rejecting write (Leader is lagging) of: " + msg + "; " + replicatedLog)
leader ! AppendRejected(m.currentTerm)
}
log.info("Rejecting write (Leader is lagging) of: " + msg + "; " + replicatedLog)
leader ! AppendRejected(m.currentTerm)
return stay()
}

senderIsCurrentLeader()
if (msg.isHeartbeat) {
acceptHeartbeat()

} else {
log.debug("Appending: " + msg.entries)
leader ! append(msg.entries, m)
replicatedLog = commitUntilLeadersIndex(m, msg)

val meta = maybeUpdateConfiguration(m, msg.entries.map(_.command))
val metaWithUpdatedTerm = meta.copy(currentTerm = replicatedLog.lastTerm)
acceptHeartbeat() using metaWithUpdatedTerm
}

log.debug("Appending: " + msg.entries)
leader ! append(msg.entries, m)
replicatedLog = commitUntilLeadersIndex(m, msg)

val meta = maybeUpdateConfiguration(m, msg.entries.map(_.command))
val metaWithUpdatedTerm = meta.copy(currentTerm = replicatedLog.lastTerm)
acceptHeartbeat() using metaWithUpdatedTerm
}

def leaderIsLagging(msg: AppendEntries[Command], m: Meta): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ private[protocol] trait RaftProtocol extends Serializable {
entries: immutable.Seq[Entry[T]],
leaderCommitId: Int
) extends RaftMessage {

def isHeartbeat = entries.isEmpty
def isNotHeartbeat = !isHeartbeat

override def toString = s"""AppendEntries(term:$term,prevLog:($prevLogTerm,$prevLogIndex),entries:$entries)"""
}

Expand Down

0 comments on commit 687a244

Please sign in to comment.