Skip to content

Commit

Permalink
Fix consumer starting sequence with start time and multiple filters (#…
Browse files Browse the repository at this point in the history
…6082)

fixes #6076 

When starting by time option was used , a different codepath was taken
when calculating starting sequence for the consumer. That codepath had
off by one bug.

Signed-off-by: Tomasz Pietrek <tomasz@nats.io>
  • Loading branch information
derekcollison authored Nov 7, 2024
2 parents e2ece84 + c984e57 commit dd0bedd
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5350,7 +5350,7 @@ func (o *consumer) selectStartingSeqNo() {
for _, filter := range o.subjf {
// Use first sequence since this is more optimized atm.
ss := o.mset.store.FilteredState(state.FirstSeq, filter.subject)
if ss.First > o.sseq && ss.First < nseq {
if ss.First >= o.sseq && ss.First < nseq {
nseq = ss.First
}
}
Expand Down
50 changes: 50 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2315,6 +2315,56 @@ func TestJetStreamConsumerOverflow(t *testing.T) {
msg, err = maxAckPending50.NextMsg(time.Second)
require_NoError(t, err)
require_NotNil(t, msg)
}

func TestJetStreamConsumerMultipleFitersWithStartDate(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

past := time.Now().Add(-90 * time.Second)

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"events.>"},
})
require_NoError(t, err)

sendStreamMsg(t, nc, "events.foo", "msg-1")
sendStreamMsg(t, nc, "events.bar", "msg-2")
sendStreamMsg(t, nc, "events.baz", "msg-3")
sendStreamMsg(t, nc, "events.biz", "msg-4")
sendStreamMsg(t, nc, "events.faz", "msg-5")
sendStreamMsg(t, nc, "events.foo", "msg-6")
sendStreamMsg(t, nc, "events.biz", "msg-7")

for _, test := range []struct {
name string
filterSubjects []string
startTime time.Time
expectedMessages uint64
expectedStreamSequence uint64
}{
{"Single-Filter-first-sequence", []string{"events.foo"}, past, 2, 0},
{"Multiple-Filter-first-sequence", []string{"events.foo", "events.bar", "events.baz"}, past, 4, 0},
{"Multiple-Filters-second-subject", []string{"events.bar", "events.baz"}, past, 2, 1},
{"Multiple-Filters-first-last-subject", []string{"events.foo", "events.biz"}, past, 4, 0},
{"Multiple-Filters-in-future", []string{"events.foo", "events.biz"}, time.Now().Add(1 * time.Minute), 0, 7},
} {
t.Run(test.name, func(t *testing.T) {
info, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: test.name,
FilterSubjects: test.filterSubjects,
DeliverPolicy: nats.DeliverByStartTimePolicy,
OptStartTime: &test.startTime,
})
require_NoError(t, err)
require_Equal(t, test.expectedStreamSequence, info.Delivered.Stream)
require_Equal(t, test.expectedMessages, info.NumPending)
})
}

}

Expand Down

0 comments on commit dd0bedd

Please sign in to comment.