Skip to content

Commit

Permalink
Merge pull request #10920 from tbg/rawnode-ready
Browse files Browse the repository at this point in the history
raft: require app to consume result from Ready()
  • Loading branch information
tbg authored Jul 24, 2019
2 parents 8b752ef + 721127d commit d137fa9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 25 deletions.
4 changes: 2 additions & 2 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (n *node) run(rn *RawNode) {
// handled first, but it's generally good to emit larger Readys plus
// it simplifies testing (by emitting less frequently and more
// predictably).
rd = rn.Ready()
rd = rn.readyWithoutAccept()
readyc = n.readyc
}

Expand Down Expand Up @@ -387,7 +387,7 @@ func (n *node) run(rn *RawNode) {
rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
rn.commitReady(rd)
rn.Advance(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
Expand Down
34 changes: 11 additions & 23 deletions raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,17 @@ func (rn *RawNode) Step(m pb.Message) error {

// Ready returns the outstanding work that the application needs to handle. This
// includes appending and applying entries or a snapshot, updating the HardState,
// and sending messages. Ready() is a read-only operation, that is, it does not
// require the caller to actually handle the result. Typically, a caller will
// want to handle the Ready and must pass the Ready to Advance *after* having
// done so. While a Ready is being handled, the RawNode must not be used for
// operations that may alter its state. For example, it is illegal to call
// Ready, followed by Step, followed by Advance.
// and sending messages. The returned Ready() *must* be handled and subsequently
// passed back via Advance().
func (rn *RawNode) Ready() Ready {
rd := rn.newReady()
rd := rn.readyWithoutAccept()
rn.acceptReady(rd)
return rd
}

func (rn *RawNode) newReady() Ready {
// readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there
// is no obligation that the Ready must be handled.
func (rn *RawNode) readyWithoutAccept() Ready {
return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
}

Expand All @@ -149,15 +148,6 @@ func (rn *RawNode) acceptReady(rd Ready) {
rn.raft.msgs = nil
}

// commitReady is called when the consumer of the RawNode has successfully
// handled a Ready (having previously called acceptReady).
func (rn *RawNode) commitReady(rd Ready) {
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
rn.raft.advance(rd)
}

// HasReady called when RawNode user need to check if any Ready pending.
// Checking logic in this method should be consistent with Ready.containsUpdates().
func (rn *RawNode) HasReady() bool {
Expand All @@ -183,12 +173,10 @@ func (rn *RawNode) HasReady() bool {
// Advance notifies the RawNode that the application has applied and saved progress in the
// last Ready results.
func (rn *RawNode) Advance(rd Ready) {
// Advance combines accept and commit. Callers can't mutate the RawNode
// between the call to Ready and the matching call to Advance, or the work
// done in acceptReady will clobber potentially newer data that has not been
// emitted in a Ready yet.
rn.acceptReady(rd)
rn.commitReady(rd)
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
rn.raft.advance(rd)
}

// Status returns the current status of the given group. This allocates, see
Expand Down
34 changes: 34 additions & 0 deletions raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,37 @@ func BenchmarkStatus(b *testing.B) {
})
}
}

func TestRawNodeConsumeReady(t *testing.T) {
// Check that readyWithoutAccept() does not call acceptReady (which resets
// the messages) but Ready() does.
s := NewMemoryStorage()
rn := newTestRawNode(1, []uint64{1}, 3, 1, s)
m1 := pb.Message{Context: []byte("foo")}
m2 := pb.Message{Context: []byte("bar")}

// Inject first message, make sure it's visible via readyWithoutAccept.
rn.raft.msgs = append(rn.raft.msgs, m1)
rd := rn.readyWithoutAccept()
if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
}
if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m1) {
t.Fatalf("expected only m1 in raft.msgs, got %+v", rn.raft.msgs)
}
// Now call Ready() which should move the message into the Ready (as opposed
// to leaving it in both places).
rd = rn.Ready()
if len(rn.raft.msgs) > 0 {
t.Fatalf("messages not reset: %+v", rn.raft.msgs)
}
if len(rd.Messages) != 1 || !reflect.DeepEqual(rd.Messages[0], m1) {
t.Fatalf("expected only m1 sent, got %+v", rd.Messages)
}
// Add a message to raft to make sure that Advance() doesn't drop it.
rn.raft.msgs = append(rn.raft.msgs, m2)
rn.Advance(rd)
if len(rn.raft.msgs) != 1 || !reflect.DeepEqual(rn.raft.msgs[0], m2) {
t.Fatalf("expected only m2 in raft.msgs, got %+v", rn.raft.msgs)
}
}

0 comments on commit d137fa9

Please sign in to comment.