Skip to content

Commit

Permalink
Fix consumer start sequence when sequence not yet in stream
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Aug 13, 2024
1 parent 1e1169b commit ec54164
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 6 deletions.
16 changes: 10 additions & 6 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4983,12 +4983,16 @@ func (o *consumer) selectStartingSeqNo() {
o.sseq = o.cfg.OptStartSeq
}

if state.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < state.FirstSeq {
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
// Only clip the sseq if the OptStartSeq is not provided, otherwise
// it's possible that the stream just doesn't contain OptStartSeq yet.
if o.cfg.OptStartSeq == 0 {
if state.FirstSeq == 0 {
o.sseq = 1
} else if o.sseq < state.FirstSeq {
o.sseq = state.FirstSeq
} else if o.sseq > state.LastSeq {
o.sseq = state.LastSeq + 1
}
}
}

Expand Down
56 changes: 56 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23869,3 +23869,59 @@ func TestJetStreamConsumerInfoNumPending(t *testing.T) {
require_NoError(t, err)
require_Equal(t, ci.NumPending, 100)
}

func TestJetStreamConsumerStartSequenceNotInStream(t *testing.T) {
// This test is checking that we still correctly set the start
// sequence of a consumer if that start sequence doesn't appear
// in the stream yet. Previously this would have been clipped
// back to between the first and last seq from the stream state.

s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"test"},
})
require_NoError(t, err)

sub, err := js.PullSubscribe("test", "test_consumer", nats.StartSequence(10))
require_NoError(t, err)

stream, err := s.gacc.lookupStream("TEST")
require_NoError(t, err)
consumer := stream.lookupConsumer("test_consumer")

func() {
consumer.mu.RLock()
defer consumer.mu.RUnlock()

require_Equal(t, consumer.dseq, 1)
require_Equal(t, consumer.sseq, 10)
}()

for i := 1; i <= 10; i++ {
_, err = js.Publish("test", []byte{byte(i)})
require_NoError(t, err)
}

msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
require_Equal(t, msgs[0].Data[0], 10)

require_NoError(t, msgs[0].AckSync())

func() {
consumer.mu.RLock()
defer consumer.mu.RUnlock()

require_Equal(t, consumer.dseq, 2)
require_Equal(t, consumer.adflr, 1)
require_Equal(t, consumer.sseq, 11)
require_Equal(t, consumer.asflr, 10)
}()
}

0 comments on commit ec54164

Please sign in to comment.