diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index ce17ddf51b2..d504b642c7e 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3990,3 +3990,165 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T } } } + +func TestJetStreamClusterDesyncAfterPublishToLeaderWithoutQuorum(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + si, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + streamLeader := si.Cluster.Leader + streamLeaderServer := c.serverByName(streamLeader) + nc.Close() + nc, js = jsClientConnect(t, streamLeaderServer) + defer nc.Close() + + servers := slices.DeleteFunc([]string{"S-1", "S-2", "S-3"}, func(s string) bool { + return s == streamLeader + }) + + // Stop followers so further publishes will not have quorum. + followerName1 := servers[0] + followerName2 := servers[1] + followerServer1 := c.serverByName(followerName1) + followerServer2 := c.serverByName(followerName2) + followerServer1.Shutdown() + followerServer2.Shutdown() + followerServer1.WaitForShutdown() + followerServer2.WaitForShutdown() + + // Although this request will time out, it will be added to the stream leader's WAL. + _, err = js.Publish("foo", []byte("first"), nats.AckWait(time.Second)) + require_NotNil(t, err) + require_Equal(t, err, nats.ErrTimeout) + + // Now shut down the leader as well. + nc.Close() + streamLeaderServer.Shutdown() + streamLeaderServer.WaitForShutdown() + + // Only restart the (previous) followers. + followerServer1 = c.restartServer(followerServer1) + c.restartServer(followerServer2) + c.waitOnStreamLeader(globalAccountName, "TEST") + + nc, js = jsClientConnect(t, followerServer1) + defer nc.Close() + + // Publishing a message will now have quorum. + pubAck, err := js.Publish("foo", []byte("first, this is a retry")) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, 1) + + // Bring up the previous stream leader. + c.restartServer(streamLeaderServer) + c.waitOnAllCurrent() + c.waitOnStreamLeader(globalAccountName, "TEST") + + // Check all servers ended up with the last published message, which had quorum. + for _, s := range c.servers { + c.waitOnStreamCurrent(s, globalAccountName, "TEST") + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + state := mset.state() + require_Equal(t, state.Msgs, 1) + require_Equal(t, state.Bytes, 55) + } +} + +func TestJetStreamClusterPreserveWALDuringCatchupWithMatchingTerm(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo.>"}, + Replicas: 3, + }) + nc.Close() + require_NoError(t, err) + + // Pick one server that will only store a part of the messages in its WAL. + rs := c.randomNonStreamLeader(globalAccountName, "TEST") + ts := time.Now().UnixNano() + + // Manually add 3 append entries to each node's WAL, except for one node who is one behind. + var scratch [1024]byte + for _, s := range c.servers { + for _, n := range s.raftNodes { + rn := n.(*raft) + if rn.accName == globalAccountName { + for i := uint64(0); i < 3; i++ { + // One server will be one behind and need to catchup. + if s.Name() == rs.Name() && i >= 2 { + break + } + + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, i, ts, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + rn.Lock() + ae := rn.buildAppendEntry(entries) + ae.buf, err = ae.encode(scratch[:]) + require_NoError(t, err) + err = rn.storeToWAL(ae) + rn.Unlock() + require_NoError(t, err) + } + } + } + } + + // Restart all. + c.stopAll() + c.restartAll() + c.waitOnAllCurrent() + c.waitOnStreamLeader(globalAccountName, "TEST") + + rs = c.serverByName(rs.Name()) + + // Check all servers ended up with all published messages, which had quorum. + for _, s := range c.servers { + c.waitOnStreamCurrent(s, globalAccountName, "TEST") + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + state := mset.state() + require_Equal(t, state.Msgs, 3) + require_Equal(t, state.Bytes, 99) + } + + // Check that the first two published messages came from our WAL, and + // the last came from a catchup by another leader. + for _, n := range rs.raftNodes { + rn := n.(*raft) + if rn.accName == globalAccountName { + ae, err := rn.loadEntry(2) + require_NoError(t, err) + require_True(t, ae.leader == rn.ID()) + + ae, err = rn.loadEntry(3) + require_NoError(t, err) + require_True(t, ae.leader == rn.ID()) + + ae, err = rn.loadEntry(4) + require_NoError(t, err) + require_True(t, ae.leader != rn.ID()) + } + } +} diff --git a/server/raft.go b/server/raft.go index 09141e152b0..ed72e2a3893 100644 --- a/server/raft.go +++ b/server/raft.go @@ -3049,10 +3049,10 @@ func (n *raft) catchupStalled() bool { if n.catchup == nil { return false } - if n.catchup.pindex == n.pindex { + if n.catchup.pindex == n.commit { return time.Since(n.catchup.active) > 2*time.Second } - n.catchup.pindex = n.pindex + n.catchup.pindex = n.commit n.catchup.active = time.Now() return false } @@ -3071,7 +3071,7 @@ func (n *raft) createCatchup(ae *appendEntry) string { cterm: ae.pterm, cindex: ae.pindex, pterm: n.pterm, - pindex: n.pindex, + pindex: n.commit, active: time.Now(), } inbox := n.newCatchupInbox() @@ -3241,7 +3241,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { if n.catchupStalled() { n.debug("Catchup may be stalled, will request again") inbox = n.createCatchup(ae) - ar = newAppendEntryResponse(n.pterm, n.pindex, n.id, false) + ar = newAppendEntryResponse(n.pterm, n.commit, n.id, false) } n.Unlock() if ar != nil { @@ -3282,13 +3282,15 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } if (isNew && ae.pterm != n.pterm) || ae.pindex != n.pindex { - // Check if this is a lower or equal index than what we were expecting. - if ae.pindex <= n.pindex { + // Check if this is a lower index than what we were expecting. + if ae.pindex < n.pindex { n.debug("AppendEntry detected pindex less than ours: %d:%d vs %d:%d", ae.pterm, ae.pindex, n.pterm, n.pindex) var ar *appendEntryResponse + // An AppendEntry is stored at seq=ae.pindex+1. This can be checked when eae != nil, eae.pindex==ae.pindex. + seq := ae.pindex + 1 var success bool - if eae, _ := n.loadEntry(ae.pindex); eae == nil { + if eae, _ := n.loadEntry(seq); eae == nil { // If terms are equal, and we are not catching up, we have simply already processed this message. // So we will ACK back to the leader. This can happen on server restarts based on timings of snapshots. if ae.pterm == n.pterm && !catchingUp { @@ -3296,14 +3298,18 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { } else { n.resetWAL() } - } else { - // If terms mismatched, or we got an error loading, delete that entry and all others past it. + } else if eae.term != ae.term { + // If terms mismatched, delete that entry and all others past it. // Make sure to cancel any catchups in progress. // Truncate will reset our pterm and pindex. Only do so if we have an entry. n.truncateWAL(eae.pterm, eae.pindex) + } else { + success = true + } + // Cancel regardless if truncated/unsuccessful. + if !success { + n.cancelCatchup() } - // Cancel regardless. - n.cancelCatchup() // Create response. ar = newAppendEntryResponse(ae.pterm, ae.pindex, n.id, success) @@ -3377,11 +3383,11 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) { return } else { - n.debug("AppendEntry did not match %d %d with %d %d", ae.pterm, ae.pindex, n.pterm, n.pindex) - if ae.pindex > n.pindex { + n.debug("AppendEntry did not match %d %d with %d %d (commit %d)", ae.pterm, ae.pindex, n.pterm, n.pindex, n.commit) + if ae.pindex > n.commit { // Setup our state for catching up. inbox := n.createCatchup(ae) - ar := newAppendEntryResponse(n.pterm, n.pindex, n.id, false) + ar := newAppendEntryResponse(n.pterm, n.commit, n.id, false) n.Unlock() n.sendRPC(ae.reply, inbox, ar.encode(arbuf)) arPool.Put(ar) diff --git a/server/raft_test.go b/server/raft_test.go index 15fb321e325..90d0959093e 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -823,8 +823,8 @@ func TestNRGTermDoesntRollBackToPtermOnCatchup(t *testing.T) { require_Equal(t, rn.term, 2) if !rn.Leader() { - rn.truncateWAL(1, 6) // This will overwrite rn.term, so... - rn.term = 2 // ... we'll set it back manually. + rn.truncateWAL(1, 6) + require_Equal(t, rn.term, 2) // rn.term must stay the same require_Equal(t, rn.pterm, 1) require_Equal(t, rn.pindex, 6) } @@ -984,68 +984,94 @@ func TestNRGRemoveLeaderPeerDeadlockBug(t *testing.T) { } func TestNRGWALEntryWithoutQuorumMustTruncate(t *testing.T) { - c := createJetStreamClusterExplicit(t, "R3S", 3) - defer c.shutdown() - - rg := c.createRaftGroup("TEST", 3, newStateAdder) - rg.waitOnLeader() - - var err error - var scratch [1024]byte - - // Simulate leader storing an AppendEntry in WAL but being hard killed before it can propose to its peers. - n := rg.leader().node().(*raft) - esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) - entries := []*Entry{newEntry(EntryNormal, esm)} - n.Lock() - ae := n.buildAppendEntry(entries) - ae.buf, err = ae.encode(scratch[:]) - require_NoError(t, err) - err = n.storeToWAL(ae) - n.Unlock() - require_NoError(t, err) - - // Stop the leader so it moves to another one. - n.shutdown(false) - - // Wait for another leader to be picked - rg.waitOnLeader() - - // Restart the previous leader that contains the stored AppendEntry without quorum. - for _, a := range rg { - if a.node().ID() == n.ID() { - sa := a.(*stateAdder) - sa.restart() - break - } + tests := []struct { + title string + modify func(rg smGroup) + }{ + { + // state equals, only need to remove the entry + title: "equal", + modify: func(rg smGroup) {}, + }, + { + // state diverged, need to replace the entry + title: "diverged", + modify: func(rg smGroup) { + rg.leader().(*stateAdder).proposeDelta(11) + }, + }, } - // The previous leader's WAL should truncate to remove the AppendEntry only it has. - // Eventually all WALs for all peers must match. - checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { - var expected [][]byte - for _, a := range rg { - an := a.node().(*raft) - var state StreamState - an.wal.FastState(&state) - if len(expected) > 0 && int(state.LastSeq-state.FirstSeq+1) != len(expected) { - return fmt.Errorf("WAL is different: too many entries") - } - for index := state.FirstSeq; index <= state.LastSeq; index++ { - ae, err := an.loadEntry(index) - if err != nil { - return err - } - seq := int(index) - if len(expected) < seq { - expected = append(expected, ae.buf) - } else if !bytes.Equal(expected[seq-1], ae.buf) { - return fmt.Errorf("WAL is different: stored bytes differ") + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createRaftGroup("TEST", 3, newStateAdder) + rg.waitOnLeader() + + var err error + var scratch [1024]byte + + // Simulate leader storing an AppendEntry in WAL but being hard killed before it can propose to its peers. + n := rg.leader().node().(*raft) + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + n.Lock() + ae := n.buildAppendEntry(entries) + ae.buf, err = ae.encode(scratch[:]) + require_NoError(t, err) + err = n.storeToWAL(ae) + n.Unlock() + require_NoError(t, err) + + // Stop the leader so it moves to another one. + n.shutdown(false) + + // Wait for another leader to be picked + rg.waitOnLeader() + + // Make a modification, specific to this test. + test.modify(rg) + + // Restart the previous leader that contains the stored AppendEntry without quorum. + for _, a := range rg { + if a.node().ID() == n.ID() { + sa := a.(*stateAdder) + sa.restart() + break } } - } - return nil - }) + + // The previous leader's WAL should truncate to remove the AppendEntry only it has. + // Eventually all WALs for all peers must match. + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + var expected [][]byte + for _, a := range rg { + an := a.node().(*raft) + var state StreamState + an.wal.FastState(&state) + if len(expected) > 0 && int(state.LastSeq-state.FirstSeq+1) != len(expected) { + return fmt.Errorf("WAL is different: too many entries") + } + // Loop over all entries in the WAL, checking if the contents for all RAFT nodes are equal. + for index := state.FirstSeq; index <= state.LastSeq; index++ { + ae, err := an.loadEntry(index) + if err != nil { + return err + } + seq := int(index) + if len(expected) < seq { + expected = append(expected, ae.buf) + } else if !bytes.Equal(expected[seq-1], ae.buf) { + return fmt.Errorf("WAL is different: stored bytes differ") + } + } + } + return nil + }) + }) + } } func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { @@ -1088,3 +1114,132 @@ func TestNRGTermNoDecreaseAfterWALReset(t *testing.T) { } } } + +func TestNRGCatchupDoesNotTruncateUncommittedEntriesWithQuorum(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + s := c.servers[0] // RunBasicJetStreamServer not available + + ms, err := newMemStore(&StreamConfig{Name: "TEST", Storage: MemoryStorage}) + require_NoError(t, err) + cfg := &RaftConfig{Name: "TEST", Store: t.TempDir(), Log: ms} + + err = s.bootstrapRaftNode(cfg, nil, false) + require_NoError(t, err) + n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{}) + require_NoError(t, err) + + // An AppendEntry is encoded into a buffer and that's stored into the WAL. + // This is a helper function to generate that buffer. + encode := func(ae *appendEntry) *appendEntry { + buf, err := ae.encode(nil) + require_NoError(t, err) + ae.buf = buf + return ae + } + + // Create a sample entry, the content doesn't matter, just that it's stored. + esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true) + entries := []*Entry{newEntry(EntryNormal, esm)} + + nats0 := "S1Nunr6R" // "nats-0" + nats1 := "yrzKKRBu" // "nats-1" + + // Timeline, for first leader + aeInitial := encode(&appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries}) + aeHeartbeat := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: nil}) + aeUncommitted := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 1, entries: entries}) + aeNoQuorum := encode(&appendEntry{leader: nats0, term: 1, commit: 1, pterm: 1, pindex: 2, entries: entries}) + + // Timeline, after leader change + aeMissed := encode(&appendEntry{leader: nats1, term: 2, commit: 1, pterm: 1, pindex: 2, entries: entries}) + aeCatchupTrigger := encode(&appendEntry{leader: nats1, term: 2, commit: 1, pterm: 2, pindex: 3, entries: entries}) + aeHeartbeat2 := encode(&appendEntry{leader: nats1, term: 2, commit: 2, pterm: 2, pindex: 4, entries: nil}) + aeHeartbeat3 := encode(&appendEntry{leader: nats1, term: 2, commit: 4, pterm: 2, pindex: 4, entries: nil}) + + // Initial case is simple, just store the entry. + n.processAppendEntry(aeInitial, n.aesub) + require_Equal(t, n.wal.State().Msgs, 1) + entry, err := n.loadEntry(1) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // Heartbeat, makes sure commit moves up. + n.processAppendEntry(aeHeartbeat, n.aesub) + require_Equal(t, n.commit, 1) + + // We get one entry that has quorum (but we don't know that yet), so it stays uncommitted for a bit. + n.processAppendEntry(aeUncommitted, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We get one entry that has NO quorum (but we don't know that yet). + n.processAppendEntry(aeNoQuorum, n.aesub) + require_Equal(t, n.wal.State().Msgs, 3) + entry, err = n.loadEntry(3) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We've just had a leader election, and we missed one message from the previous leader, we should catchup. + n.processAppendEntry(aeCatchupTrigger, n.aesub) + require_Equal(t, n.wal.State().Msgs, 3) + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 1) // n.commit + + // Make sure our WAL was not truncated, and we're still catching up. + aeUncommitted.leader = nats1 + aeUncommitted = encode(aeUncommitted) + n.processAppendEntry(aeUncommitted, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 3) + require_True(t, n.catchup != nil) + // Our entry should not be touched, so the 'leader' should've stayed the same. + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We now notice the leader indicated a different entry at the (no quorum) index, should truncate. + n.processAppendEntry(aeMissed, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 2) + require_True(t, n.catchup == nil) + + // We get a heartbeat that prompts us to catchup again. + n.processAppendEntry(aeHeartbeat2, n.aesub) + require_Equal(t, n.wal.State().Msgs, 2) + require_Equal(t, n.commit, 1) // Commit should not change, as we missed an item. + require_True(t, n.catchup != nil) + require_Equal(t, n.catchup.pterm, 1) // n.pterm + require_Equal(t, n.catchup.pindex, 1) // n.commit + + // We get the uncommitted entry again, it should stay the same. + n.processAppendEntry(aeUncommitted, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 2) + require_True(t, n.catchup != nil) + // Our entry should still stay the same. + entry, err = n.loadEntry(2) + require_NoError(t, err) + require_Equal(t, entry.leader, nats0) + + // We now get the missed append entry, store it. + n.processAppendEntry(aeMissed, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 3) + require_True(t, n.catchup != nil) + entry, err = n.loadEntry(3) + require_NoError(t, err) + require_Equal(t, entry.leader, nats1) + + // We now get the entry that initially triggered us to catchup again, it should be added. + n.processAppendEntry(aeCatchupTrigger, n.catchup.sub) + require_Equal(t, n.wal.State().Msgs, 4) + require_True(t, n.catchup != nil) + entry, err = n.loadEntry(4) + require_NoError(t, err) + require_Equal(t, entry.leader, nats1) + + // Heartbeat, makes sure we commit (and reset catchup, as we're now up-to-date). + n.processAppendEntry(aeHeartbeat3, n.aesub) + require_Equal(t, n.commit, 4) + require_True(t, n.catchup == nil) +}