diff --git a/server/events.go b/server/events.go index b21a734ae66..16d4e819227 100644 --- a/server/events.go +++ b/server/events.go @@ -314,6 +314,15 @@ type ClientInfo struct { Nonce string `json:"nonce,omitempty"` } +// forAssignmentSnap returns the minimum amount of ClientInfo we need for assignment snapshots. +func (ci *ClientInfo) forAssignmentSnap() *ClientInfo { + return &ClientInfo{ + Account: ci.Account, + Service: ci.Service, + Cluster: ci.Cluster, + } +} + // ServerStats hold various statistics that we will periodically send out. type ServerStats struct { Start time.Time `json:"start"` diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 680909d5b90..95b42a2ad1b 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1542,7 +1542,7 @@ func (js *jetStream) metaSnapshot() []byte { for _, asa := range cc.streams { for _, sa := range asa { wsa := writeableStreamAssignment{ - Client: sa.Client, + Client: sa.Client.forAssignmentSnap(), Created: sa.Created, Config: sa.Config, Group: sa.Group, @@ -1555,7 +1555,9 @@ func (js *jetStream) metaSnapshot() []byte { if ca.pending { continue } - wsa.Consumers = append(wsa.Consumers, ca) + cca := *ca + cca.Client = cca.Client.forAssignmentSnap() + wsa.Consumers = append(wsa.Consumers, &cca) nca++ } streams = append(streams, wsa) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 54c93755ea9..0e66f3b987c 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4231,3 +4231,164 @@ func TestJetStreamClusterHardKillAfterStreamAdd(t *testing.T) { _, err = js.StreamInfo("TEST") require_NoError(t, err) } + +func TestJetStreamClusterStreamConsumerStateResetAfterRecreate(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + stream := "test:0" + config := &nats.StreamConfig{ + Name: stream, + Subjects: []string{"test.0.*"}, + Replicas: 3, + Retention: nats.WorkQueuePolicy, + MaxMsgs: 100_000, + Discard: nats.DiscardNew, + Duplicates: 5 * time.Second, + Storage: nats.MemoryStorage, + } + consumer := "A:0:0" + subject := "test.0.0" + var ( + duration = 30 * time.Minute + producerMsgs = 200_000 + producerMsgSize = 1024 + payload = []byte(strings.Repeat("A", producerMsgSize)) + wg sync.WaitGroup + n atomic.Uint64 + canPublish atomic.Bool + ) + createStream := func(t *testing.T) { + t.Helper() + _, err := js.AddStream(config) + require_NoError(t, err) + consumer := &nats.ConsumerConfig{ + Durable: consumer, + Replicas: 3, + MaxAckPending: 100_000, + MaxWaiting: 100_000, + FilterSubject: subject, + AckPolicy: nats.AckExplicitPolicy, + } + _, err = js.AddConsumer(stream, consumer) + require_NoError(t, err) + } + deleteStream := func(t *testing.T) { + err := js.DeleteStream(stream) + require_NoError(t, err) + } + stopPublishing := func() { + canPublish.Store(false) + } + resumePublishing := func() { + canPublish.Store(true) + } + // Setup stream + ctx, cancel := context.WithTimeout(context.Background(), duration) + defer cancel() + createStream(t) + // Setup producer + resumePublishing() + wg.Add(1) + go func() { + defer wg.Done() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + for range time.NewTicker(1 * time.Millisecond).C { + select { + case <-ctx.Done(): + return + default: + } + if !canPublish.Load() { + continue + } + _, err := js.Publish("test.0.0", payload, nats.AckWait(200*time.Millisecond)) + if err == nil { + if nn := n.Add(1); int(nn) >= producerMsgs { + return + } + } + } + }() + // Setup consumer + acked := make(chan struct{}, 100) + wg.Add(1) + go func() { + defer wg.Done() + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + Attempts: + for attempts := 0; attempts < 10; attempts++ { + _, err := js.ConsumerInfo(stream, consumer) + if err != nil { + t.Logf("WRN: Failed creating pull subscriber: %v - %v - %v - %v", + subject, stream, consumer, err) + time.Sleep(200 * time.Millisecond) + continue + } + break Attempts + } + sub, err := js.PullSubscribe(subject, "", nats.Bind(stream, consumer)) + if err != nil { + t.Logf("WRN: Failed creating pull subscriber: %v - %v - %v - %v", + subject, stream, consumer, err) + return + } + require_NoError(t, err) + for range time.NewTicker(100 * time.Millisecond).C { + select { + case <-ctx.Done(): + return + default: + } + msgs, err := sub.Fetch(1, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + time.AfterFunc(3*time.Second, func() { + select { + case <-ctx.Done(): + return + default: + } + msg.Ack() + acked <- struct{}{} + }) + } + msgs, err = sub.Fetch(10, nats.MaxWait(200*time.Millisecond)) + if err != nil { + continue + } + for _, msg := range msgs { + msg.Ack() + } + } + }() + // Let publish and consume to happen for a bit. + time.Sleep(2 * time.Second) + // Recreate the stream + deleteStream(t) + stopPublishing() + createStream(t) + for i := 0; i < 3; i++ { + js.Publish("test.0.0", payload, nats.AckWait(200*time.Millisecond)) + } + select { + case <-time.After(5 * time.Second): + t.Fatal("Timed out waiting for ack") + case <-acked: + time.Sleep(2 * time.Second) + } + sinfo, err := js.StreamInfo(stream) + require_NoError(t, err) + cinfo, err := js.ConsumerInfo(stream, consumer) + require_NoError(t, err) + cancel() + if cinfo.Delivered.Stream > sinfo.State.LastSeq { + t.Fatalf("Consumer Stream sequence is ahead of Stream LastSeq: consumer=%d, stream=%d", cinfo.Delivered.Stream, sinfo.State.LastSeq) + } + wg.Wait() +}