diff --git a/server/raft.go b/server/raft.go index e5812982ca4..8c79dac729a 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3347,7 +3347,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { // If this term is greater than ours. if ae.term > n.term { - n.pterm = ae.pterm n.term = ae.term n.vote = noVote if isNew { diff --git a/server/raft_test.go b/server/raft_test.go index 56299ea17df..c810d189a79 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -14,6 +14,7 @@ package server import ( + "bytes" "errors" "fmt" "math" @@ -941,3 +942,68 @@ func TestNRGRemoveLeaderPeerDeadlockBug(t *testing.T) { return errors.New("Leader has not moved") }) } + +func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + var err error + var scratch [1024]byte + + // Simulate leader storing an AppendEntry in WAL but being hard killed before it can propose to its peers. + n := rg.leader().node().(*raft) + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + n.Lock() + ae := n.buildAppendEntry(entries) + ae.buf, err = ae.encode(scratch[:]) + require_NoError(t, err) + err = n.storeToWAL(ae) + n.Unlock() + require_NoError(t, err) + + // Stop the leader so it moves to another one. + n.shutdown(false) + + // Wait for another leader to be picked + rg.waitOnLeader() + + // Restart the previous leader that contains the stored AppendEntry without quorum. + for _, a := range rg { + if a.node().ID() == n.ID() { + sa := a.(*stateAdder) + sa.restart() + break + } + } + + // The previous leader's WAL should truncate to remove the AppendEntry only it has. + // Eventually all WALs for all peers must match. + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + var expected [][]byte + for _, a := range rg { + an := a.node().(*raft) + var state StreamState + an.wal.FastState(&state) + if len(expected) > 0 && int(state.LastSeq-state.FirstSeq+1) != len(expected) { + return fmt.Errorf("WAL is different: too many entries") + } + for index := state.FirstSeq; index <= state.LastSeq; index++ { + ae, err := an.loadEntry(index) + if err != nil { + return err + } + seq := int(index) + if len(expected) < seq { + expected = append(expected, ae.buf) + } else if !bytes.Equal(expected[seq-1], ae.buf) { + return fmt.Errorf("WAL is different: stored bytes differ") + } + } + } + return nil + }) +}