diff --git a/server/consumer.go b/server/consumer.go index bcba89d956..5990585174 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -4983,12 +4983,16 @@ func (o *consumer) selectStartingSeqNo() { o.sseq = o.cfg.OptStartSeq } - if state.FirstSeq == 0 { - o.sseq = 1 - } else if o.sseq < state.FirstSeq { - o.sseq = state.FirstSeq - } else if o.sseq > state.LastSeq { - o.sseq = state.LastSeq + 1 + // Only clip the sseq if the OptStartSeq is not provided, otherwise + // it's possible that the stream just doesn't contain OptStartSeq yet. + if o.cfg.OptStartSeq == 0 { + if state.FirstSeq == 0 { + o.sseq = 1 + } else if o.sseq < state.FirstSeq { + o.sseq = state.FirstSeq + } else if o.sseq > state.LastSeq { + o.sseq = state.LastSeq + 1 + } } } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 8146182df6..8d2f8bf873 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -23869,3 +23869,59 @@ func TestJetStreamConsumerInfoNumPending(t *testing.T) { require_NoError(t, err) require_Equal(t, ci.NumPending, 100) } + +func TestJetStreamConsumerStartSequenceNotInStream(t *testing.T) { + // This test is checking that we still correctly set the start + // sequence of a consumer if that start sequence doesn't appear + // in the stream yet. Previously this would have been clipped + // back to between the first and last seq from the stream state. + + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"test"}, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("test", "test_consumer", nats.StartSequence(10)) + require_NoError(t, err) + + stream, err := s.gacc.lookupStream("TEST") + require_NoError(t, err) + consumer := stream.lookupConsumer("test_consumer") + + func() { + consumer.mu.RLock() + defer consumer.mu.RUnlock() + + require_Equal(t, consumer.dseq, 1) + require_Equal(t, consumer.sseq, 10) + }() + + for i := 1; i <= 10; i++ { + _, err = js.Publish("test", []byte{byte(i)}) + require_NoError(t, err) + } + + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + require_Equal(t, msgs[0].Data[0], 10) + + require_NoError(t, msgs[0].AckSync()) + + func() { + consumer.mu.RLock() + defer consumer.mu.RUnlock() + + require_Equal(t, consumer.dseq, 2) + require_Equal(t, consumer.adflr, 1) + require_Equal(t, consumer.sseq, 11) + require_Equal(t, consumer.asflr, 10) + }() +}