Skip to content

Commit d14e61b

Browse files
authored
Merge pull request #136 from pav-kv/send-append-cleanup
raft: clean-up maybeSendAppend area
2 parents 026484c + c55352f commit d14e61b

File tree

3 files changed

+51
-49
lines changed

3 files changed

+51
-49
lines changed

raft.go

+43-37
Original file line numberDiff line numberDiff line change
@@ -603,65 +603,71 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
603603
return false
604604
}
605605

606-
lastIndex, nextIndex := pr.Next-1, pr.Next
607-
lastTerm, errt := r.raftLog.term(lastIndex)
606+
prevIndex := pr.Next - 1
607+
prevTerm, err := r.raftLog.term(prevIndex)
608+
if err != nil {
609+
// The log probably got truncated at >= pr.Next, so we can't catch up the
610+
// follower log anymore. Send a snapshot instead.
611+
return r.maybeSendSnapshot(to, pr)
612+
}
608613

609614
var ents []pb.Entry
610-
var erre error
611615
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
612616
// Otherwise, if we had a full Inflights and all inflight messages were in
613617
// fact dropped, replication to that follower would stall. Instead, an empty
614618
// MsgApp will eventually reach the follower (heartbeats responses prompt the
615619
// leader to send an append), allowing it to be acked or rejected, both of
616620
// which will clear out Inflights.
617621
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
618-
ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize)
622+
ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize)
619623
}
620-
621624
if len(ents) == 0 && !sendIfEmpty {
622625
return false
623626
}
624-
625-
if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
626-
if !pr.RecentActive {
627-
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
628-
return false
629-
}
630-
631-
snapshot, err := r.raftLog.snapshot()
632-
if err != nil {
633-
if err == ErrSnapshotTemporarilyUnavailable {
634-
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
635-
return false
636-
}
637-
panic(err) // TODO(bdarnell)
638-
}
639-
if IsEmptySnap(snapshot) {
640-
panic("need non-empty snapshot")
641-
}
642-
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
643-
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
644-
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
645-
pr.BecomeSnapshot(sindex)
646-
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
647-
648-
r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
649-
return true
627+
// TODO(pav-kv): move this check up to where err is returned.
628+
if err != nil { // send a snapshot if we failed to get the entries
629+
return r.maybeSendSnapshot(to, pr)
650630
}
651631

652632
// Send the actual MsgApp otherwise, and update the progress accordingly.
653-
if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil {
654-
r.logger.Panicf("%x: %v", r.id, err)
655-
}
656-
// NB: pr has been updated, but we make sure to only use its old values below.
657633
r.send(pb.Message{
658634
To: to,
659635
Type: pb.MsgApp,
660-
Index: lastIndex,
661-
LogTerm: lastTerm,
636+
Index: prevIndex,
637+
LogTerm: prevTerm,
662638
Entries: ents,
663639
Commit: r.raftLog.committed,
664640
})
641+
pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)))
642+
return true
643+
}
644+
645+
// maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given
646+
// node. Returns true iff the snapshot message has been emitted successfully.
647+
func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool {
648+
if !pr.RecentActive {
649+
r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
650+
return false
651+
}
652+
653+
snapshot, err := r.raftLog.snapshot()
654+
if err != nil {
655+
if err == ErrSnapshotTemporarilyUnavailable {
656+
r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
657+
return false
658+
}
659+
panic(err) // TODO(bdarnell)
660+
}
661+
if IsEmptySnap(snapshot) {
662+
panic("need non-empty snapshot")
663+
}
664+
sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
665+
r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
666+
r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
667+
pr.BecomeSnapshot(sindex)
668+
r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
669+
670+
r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot})
665671
return true
666672
}
667673

raft_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -2901,7 +2901,7 @@ func TestRecvMsgUnreachable(t *testing.T) {
29012901
// set node 2 to state replicate
29022902
r.trk.Progress[2].Match = 3
29032903
r.trk.Progress[2].BecomeReplicate()
2904-
r.trk.Progress[2].OptimisticUpdate(5)
2904+
r.trk.Progress[2].Next = 6
29052905

29062906
r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
29072907

tracker/progress.go

+7-11
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,15 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
137137

138138
// UpdateOnEntriesSend updates the progress on the given number of consecutive
139139
// entries being sent in a MsgApp, with the given total bytes size, appended at
140-
// and after the given log index.
141-
func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) error {
140+
// log indices >= pr.Next.
141+
//
142+
// Must be used with StateProbe or StateReplicate.
143+
func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) {
142144
switch pr.State {
143145
case StateReplicate:
144146
if entries > 0 {
145-
last := nextIndex + uint64(entries) - 1
146-
pr.OptimisticUpdate(last)
147-
pr.Inflights.Add(last, bytes)
147+
pr.Next += uint64(entries)
148+
pr.Inflights.Add(pr.Next-1, bytes)
148149
}
149150
// If this message overflows the in-flights tracker, or it was already full,
150151
// consider this message being a probe, so that the flow is paused.
@@ -157,9 +158,8 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, bytes, nextIndex uint64) er
157158
pr.MsgAppFlowPaused = true
158159
}
159160
default:
160-
return fmt.Errorf("sending append in unhandled state %s", pr.State)
161+
panic(fmt.Sprintf("sending append in unhandled state %s", pr.State))
161162
}
162-
return nil
163163
}
164164

165165
// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
@@ -176,10 +176,6 @@ func (pr *Progress) MaybeUpdate(n uint64) bool {
176176
return updated
177177
}
178178

179-
// OptimisticUpdate signals that appends all the way up to and including index n
180-
// are in-flight. As a result, Next is increased to n+1.
181-
func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
182-
183179
// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
184180
// arguments are the index of the append message rejected by the follower, and
185181
// the hint that we want to decrease to.

0 commit comments

Comments
 (0)