Skip to content

Commit

Permalink
Fix drift in WAL, truncate AppendEntry without quorum
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Oct 3, 2024
1 parent 426d4b9 commit 2a3232f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
1 change: 0 additions & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3347,7 +3347,6 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {

// If this term is greater than ours.
if ae.term > n.term {
n.pterm = ae.pterm
n.term = ae.term
n.vote = noVote
if isNew {
Expand Down
66 changes: 66 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"bytes"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -941,3 +942,68 @@ func TestNRGRemoveLeaderPeerDeadlockBug(t *testing.T) {
return errors.New("Leader has not moved")
})
}

func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

var err error
var scratch [1024]byte

// Simulate leader storing an AppendEntry in WAL but being hard killed before it can propose to its peers.
n := rg.leader().node().(*raft)
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
entries := []*Entry{newEntry(EntryNormal, esm)}
n.Lock()
ae := n.buildAppendEntry(entries)
ae.buf, err = ae.encode(scratch[:])
require_NoError(t, err)
err = n.storeToWAL(ae)
require_NoError(t, err)
n.Unlock()

// Stop the leader so it moves to another one.
n.shutdown(false)

// Wait for another leader to be picked
rg.waitOnLeader()

// Restart the previous leader that contains the stored AppendEntry without quorum.
for _, a := range rg {
if a.node().ID() == n.ID() {
sa := a.(*stateAdder)
sa.restart()
break
}
}

// The previous leader's WAL should truncate to remove the AppendEntry only it has.
// Eventually all WALs for all peers must match.
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
var expected [][]byte
for _, a := range rg {
an := a.node().(*raft)
var state StreamState
an.wal.FastState(&state)
if len(expected) > 0 && int(state.LastSeq-state.FirstSeq+1) != len(expected) {
return fmt.Errorf("WAL is different: too many entries")
}
for index := state.FirstSeq; index <= state.LastSeq; index++ {
ae, err := an.loadEntry(index)
if err != nil {
return err
}
seq := int(index)
if len(expected) < seq {
expected = append(expected, ae.buf)
} else if !bytes.Equal(expected[seq-1], ae.buf) {
return fmt.Errorf("WAL is different: stored bytes differ")
}
}
}
return nil
})
}

0 comments on commit 2a3232f

Please sign in to comment.