diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 85ac8e4c75..2d90693ec0 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -4970,23 +4970,13 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { } // Process the change. - if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader { + if err := js.processConsumerLeaderChange(o, isLeader); err == nil { // Check our state if we are under an interest based stream. if mset := o.getStream(); mset != nil { var ss StreamState mset.store.FastState(&ss) o.checkStateForInterestStream(&ss) } - // Do a snapshot. - doSnapshot(true) - // Synchronize followers to our state. Only send out if we have state and nothing pending. - if n != nil { - if _, _, applied := n.Progress(); applied > 0 && aq.len() == 0 { - if snap, err := o.store.EncodedState(); err == nil { - n.SendSnapshot(snap) - } - } - } } // We may receive a leader change after the consumer assignment which would cancel us diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index b3ef31b1b6..a33c03a9b8 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -18,6 +18,7 @@ package server import ( "context" + "encoding/binary" "encoding/json" "errors" "fmt" @@ -4457,3 +4458,130 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T } } } + +func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 3, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // Add a message and let the consumer ack it, this moves the consumer's RAFT applied up to 1. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + sub, err := js.PullSubscribe("foo", "CONSUMER") + require_NoError(t, err) + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + err = msgs[0].AckSync() + require_NoError(t, err) + + // We don't need the client anymore. + nc.Close() + + lookupConsumer := func(s *Server) *consumer { + t.Helper() + mset, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + acc, err := mset.lookupStream("TEST") + require_NoError(t, err) + o := acc.lookupConsumer("CONSUMER") + require_NotNil(t, o) + return o + } + + // Grab current consumer leader before moving all into observer mode. + cl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER") + for _, s := range c.servers { + // Put all consumer's RAFT into observer mode, this will prevent all servers from trying to become leader. + o := lookupConsumer(s) + o.node.SetObserver(true) + if s != cl { + // For all followers, pause apply so they only store messages in WAL but not apply and possibly snapshot. + err = o.node.PauseApply() + require_NoError(t, err) + } + } + + updateDeliveredBuffer := func() []byte { + var b [4*binary.MaxVarintLen64 + 1]byte + b[0] = byte(updateDeliveredOp) + n := 1 + n += binary.PutUvarint(b[n:], 100) + n += binary.PutUvarint(b[n:], 100) + n += binary.PutUvarint(b[n:], 1) + n += binary.PutVarint(b[n:], time.Now().UnixNano()) + return b[:n] + } + + updateAcksBuffer := func() []byte { + var b [2*binary.MaxVarintLen64 + 1]byte + b[0] = byte(updateAcksOp) + n := 1 + n += binary.PutUvarint(b[n:], 100) + n += binary.PutUvarint(b[n:], 100) + return b[:n] + } + + // Store an uncommitted entry into our WAL, which will be committed and applied later. + co := lookupConsumer(cl) + rn := co.node.(*raft) + rn.Lock() + entries := []*Entry{{EntryNormal, updateDeliveredBuffer()}, {EntryNormal, updateAcksBuffer()}} + ae := encode(t, rn.buildAppendEntry(entries)) + err = rn.storeToWAL(ae) + minPindex := rn.pindex + rn.Unlock() + require_NoError(t, err) + + // Simulate leader change, we do this so we can check what happens in the upper layer logic. + rn.leadc <- true + rn.SetObserver(false) + + // Since upper layer is async, we don't know whether it will or will not act on the leader change. + // Wait for some time to check if it does. + time.Sleep(2 * time.Second) + rn.RLock() + maxPindex := rn.pindex + rn.RUnlock() + + r := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER") + ro := lookupConsumer(r) + rn = ro.node.(*raft) + + checkFor(t, 5*time.Second, time.Second, func() error { + rn.RLock() + defer rn.RUnlock() + if rn.pindex < maxPindex { + return fmt.Errorf("rn.pindex too low, expected %d, got %d", maxPindex, rn.pindex) + } + return nil + }) + + // We should only have 'Normal' entries. + // If we'd get a 'Snapshot' entry, that would mean it had incomplete state and would be reverting committed state. + var state StreamState + rn.wal.FastState(&state) + for seq := minPindex; seq <= maxPindex; seq++ { + ae, err = rn.loadEntry(seq) + require_NoError(t, err) + for _, entry := range ae.entries { + require_Equal(t, entry.Type, EntryNormal) + } + } +}