Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: clean-up maybeSendAppend area #136

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 43 additions & 37 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,65 +603,71 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
return false
}

lastIndex, nextIndex := pr.Next-1, pr.Next
lastTerm, errt := r.raftLog.term(lastIndex)
prevIndex := pr.Next - 1
prevTerm, err := r.raftLog.term(prevIndex)
if err != nil {
// The log probably got truncated at >= pr.Next, so we can't catch up the
// follower log anymore. Send a snapshot instead.
return r.maybeSendSnapshot(to, pr)
}

var ents []pb.Entry
var erre error
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
// Otherwise, if we had a full Inflights and all inflight messages were in
// fact dropped, replication to that follower would stall. Instead, an empty
// MsgApp will eventually reach the follower (heartbeats responses prompt the
// leader to send an append), allowing it to be acked or rejected, both of
// which will clear out Inflights.
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize)
ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
}

if len(ents) == 0 && !sendIfEmpty {
return false
}

if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}

snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
return true
// TODO(pav-kv): move this check up to where err is returned.
if err != nil { // send a snapshot if we failed to get the entries
return r.maybeSendSnapshot(to, pr)
}

// Send the actual MsgApp otherwise, and update the progress accordingly.
if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil {
r.logger.Panicf("%x: %v", r.id, err)
}
// NB: pr has been updated, but we make sure to only use its old values below.
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Index: lastIndex,
LogTerm: lastTerm,
Index: prevIndex,
LogTerm: prevTerm,
Entries: ents,
Commit: r.raftLog.committed,
})
pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)))
return true
}

// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given
// node. Returns true iff the snapshot message has been emitted successfully.
func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
if !pr.RecentActive {
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
return false
}

snapshot, err := r.raftLog.snapshot()
if err != nil {
if err == ErrSnapshotTemporarilyUnavailable {
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
return false
}
panic(err) // TODO(bdarnell)
}
if IsEmptySnap(snapshot) {
panic("need non-empty snapshot")
}
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
pr.BecomeSnapshot(sindex)
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)

r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
return true
}

Expand Down
2 changes: 1 addition & 1 deletion raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2901,7 +2901,7 @@ func TestRecvMsgUnreachable(t *testing.T) {
// set node 2 to state replicate
r.trk.Progress[2].Match = 3
r.trk.Progress[2].BecomeReplicate()
r.trk.Progress[2].OptimisticUpdate(5)
r.trk.Progress[2].Next = 6

r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})

Expand Down
18 changes: 7 additions & 11 deletions tracker/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,15 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) {

// UpdateOnEntriesSend updates the progress on the given number of consecutive
// entries being sent in a MsgApp, with the given total bytes size, appended at
// and after the given log index.
func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error {
// log indices >= pr.Next.
//
// Must be used with StateProbe or StateReplicate.
func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) {
switch pr.State {
case StateReplicate:
if entries > 0 {
last := nextIndex + uint64(entries) - 1
pr.OptimisticUpdate(last)
pr.Inflights.Add(last, bytes)
pr.Next += uint64(entries)
pr.Inflights.Add(pr.Next-1, bytes)
}
// If this message overflows the in-flights tracker, or it was already full,
// consider this message being a probe, so that the flow is paused.
Expand All @@ -171,9 +172,8 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) er
pr.MsgAppFlowPaused = true
}
default:
return fmt.Errorf("sending append in unhandled state %s", pr.State)
panic(fmt.Sprintf("sending append in unhandled state %s", pr.State))
}
return nil
}

// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
Expand All @@ -190,10 +190,6 @@ func (pr *Progress) MaybeUpdate(n uint64) bool {
return updated
}

// OptimisticUpdate signals that appends all the way up to and including index n
// are in-flight. As a result, Next is increased to n+1.
func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }

// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
// arguments are the index of the append message rejected by the follower, and
// the hint that we want to decrease to.
Expand Down
Loading