From 947495588398610eedf557a909f8c75f1d316c11 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Oct 2024 16:51:25 +0100 Subject: [PATCH 1/3] Revert "Fix consumer start sequence when sequence not yet in stream" This reverts commit ec54164df357fb842ae2b636c56ad99f059bf418. See discussion nats-io/nats-server#6005. --- server/consumer.go | 16 +++++------- server/jetstream_test.go | 56 ---------------------------------------- 2 files changed, 6 insertions(+), 66 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index eb9d13ed71..9af1c5b2b3 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5031,16 +5031,12 @@ func (o *consumer) selectStartingSeqNo() { o.sseq = o.cfg.OptStartSeq } - // Only clip the sseq if the OptStartSeq is not provided, otherwise - // it's possible that the stream just doesn't contain OptStartSeq yet. - if o.cfg.OptStartSeq == 0 { - if state.FirstSeq == 0 { - o.sseq = 1 - } else if o.sseq < state.FirstSeq { - o.sseq = state.FirstSeq - } else if o.sseq > state.LastSeq { - o.sseq = state.LastSeq + 1 - } + if state.FirstSeq == 0 { + o.sseq = 1 + } else if o.sseq < state.FirstSeq { + o.sseq = state.FirstSeq + } else if o.sseq > state.LastSeq { + o.sseq = state.LastSeq + 1 } } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 6249a34d68..43fdcfdeb8 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -24000,62 +24000,6 @@ func TestJetStreamConsumerInfoNumPending(t *testing.T) { require_Equal(t, ci.NumPending, 100) } -func TestJetStreamConsumerStartSequenceNotInStream(t *testing.T) { - // This test is checking that we still correctly set the start - // sequence of a consumer if that start sequence doesn't appear - // in the stream yet. Previously this would have been clipped - // back to between the first and last seq from the stream state. - - s := RunBasicJetStreamServer(t) - defer s.Shutdown() - - nc, js := jsClientConnect(t, s) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: "TEST", - Subjects: []string{"test"}, - }) - require_NoError(t, err) - - sub, err := js.PullSubscribe("test", "test_consumer", nats.StartSequence(10)) - require_NoError(t, err) - - stream, err := s.gacc.lookupStream("TEST") - require_NoError(t, err) - consumer := stream.lookupConsumer("test_consumer") - - func() { - consumer.mu.RLock() - defer consumer.mu.RUnlock() - - require_Equal(t, consumer.dseq, 1) - require_Equal(t, consumer.sseq, 10) - }() - - for i := 1; i <= 10; i++ { - _, err = js.Publish("test", []byte{byte(i)}) - require_NoError(t, err) - } - - msgs, err := sub.Fetch(1) - require_NoError(t, err) - require_Len(t, len(msgs), 1) - require_Equal(t, msgs[0].Data[0], 10) - - require_NoError(t, msgs[0].AckSync()) - - func() { - consumer.mu.RLock() - defer consumer.mu.RUnlock() - - require_Equal(t, consumer.dseq, 2) - require_Equal(t, consumer.adflr, 1) - require_Equal(t, consumer.sseq, 11) - require_Equal(t, consumer.asflr, 10) - }() -} - func TestJetStreamInterestStreamWithDuplicateMessages(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown() From 72a15fc82143cbbab719110a9aadcbaf31330081 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Oct 2024 16:52:36 +0100 Subject: [PATCH 2/3] Add unit tests that prove sources and mirrors have their consumer start sequences clipped Signed-off-by: Neil Twigg --- server/jetstream_test.go | 84 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 43fdcfdeb8..9f8cdeb72e 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -24371,3 +24371,87 @@ func TestIsJSONObjectOrArray(t *testing.T) { }) } } + +func TestJetStreamSourcingClipStartSeq(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "ORIGIN", + Subjects: []string{"test"}, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + _, err := js.Publish("test", nil) + require_NoError(t, err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "SOURCING", + Sources: []*nats.StreamSource{ + { + Name: "ORIGIN", + OptStartSeq: 20, + }, + }, + }) + require_NoError(t, err) + + // Wait for sourcing consumer to be created. + time.Sleep(time.Second) + + mset, err := s.GlobalAccount().lookupStream("ORIGIN") + require_NoError(t, err) + require_True(t, mset != nil) + require_Len(t, len(mset.consumers), 1) + for _, o := range mset.consumers { + // Should have been clipped back to below 20 as only + // 10 messages in the origin stream. + require_Equal(t, o.sseq, 11) + } +} + +func TestJetStreamMirroringClipStartSeq(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "ORIGIN", + Subjects: []string{"test"}, + }) + require_NoError(t, err) + + for i := 0; i < 10; i++ { + _, err := js.Publish("test", nil) + require_NoError(t, err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "MIRRORING", + Mirror: &nats.StreamSource{ + Name: "ORIGIN", + OptStartSeq: 20, + }, + }) + require_NoError(t, err) + + // Wait for mirroring consumer to be created. + time.Sleep(time.Second) + + mset, err := s.GlobalAccount().lookupStream("ORIGIN") + require_NoError(t, err) + require_True(t, mset != nil) + require_Len(t, len(mset.consumers), 1) + for _, o := range mset.consumers { + // Should have been clipped back to below 20 as only + // 10 messages in the origin stream. + require_Equal(t, o.sseq, 11) + } +} From 035811710764a04b4d7a4579849baf01f06ec740 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 17 Oct 2024 17:31:31 +0100 Subject: [PATCH 3/3] Fix `TestJetStreamClusterConsumeWithStartSequence` to account for clipping of `OptStartSeq` Signed-off-by: Neil Twigg --- server/jetstream_cluster_4_test.go | 38 +++++++++++++++++------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 32f807dfca..a5a3f8f236 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -3402,6 +3402,8 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { // This is the success condition for all sub-tests below var ExpectedMsgId = "" checkMessage := func(t *testing.T, msg *nats.Msg) { + t.Helper() + msgMeta, err := msg.Metadata() require_NoError(t, err) @@ -3414,6 +3416,8 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { } checkRawMessage := func(t *testing.T, msg *nats.RawStreamMsg) { + t.Helper() + // Check sequence number require_Equal(t, msg.Sequence, ChosenSeq) @@ -3447,7 +3451,23 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { }) require_NoError(t, err) - // Setup: create subscriptions before stream is populated + // Setup: populate stream + buf := make([]byte, 100) + for i := uint64(1); i <= NumMessages; i++ { + msgId := nuid.Next() + pubAck, err := js.Publish(StreamSubjectPrefix+strconv.Itoa(int(i)), buf, nats.MsgId(msgId)) + require_NoError(t, err) + + // Verify assumption made in tests below + require_Equal(t, pubAck.Sequence, i) + + if i == ChosenSeq { + // Save the expected message id for the chosen message + ExpectedMsgId = msgId + } + } + + // Setup: create subscriptions, needs to be after stream creation or OptStartSeq could be clipped var preCreatedSub, preCreatedSubDurable *nats.Subscription { preCreatedSub, err = js.PullSubscribe( @@ -3483,22 +3503,6 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { }() } - // Setup: populate stream - buf := make([]byte, 100) - for i := uint64(1); i <= NumMessages; i++ { - msgId := nuid.Next() - pubAck, err := js.Publish(StreamSubjectPrefix+strconv.Itoa(int(i)), buf, nats.MsgId(msgId)) - require_NoError(t, err) - - // Verify assumption made in tests below - require_Equal(t, pubAck.Sequence, i) - - if i == ChosenSeq { - // Save the expected message id for the chosen message - ExpectedMsgId = msgId - } - } - // Tests various ways to consume the stream starting at the ChosenSeq sequence t.Run(