From b29198ea441b030aec53ad7e90257f299fc0998a Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 26 Nov 2024 17:18:47 +0100 Subject: [PATCH] [FIXED] Do not overwrite ordered consumer deliver policy if start time is set Signed-off-by: Piotr Piotrowski --- jetstream/ordered.go | 8 ++---- jetstream/test/ordered_test.go | 50 ++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/jetstream/ordered.go b/jetstream/ordered.go index af598f2d5..5fe656e9b 100644 --- a/jetstream/ordered.go +++ b/jetstream/ordered.go @@ -631,6 +631,9 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig { c.cfg.DeliverPolicy == DeliverAllPolicy { cfg.OptStartSeq = 0 + } else if c.cfg.DeliverPolicy == DeliverByStartTimePolicy { + cfg.OptStartSeq = 0 + cfg.OptStartTime = c.cfg.OptStartTime } else { cfg.OptStartSeq = c.cfg.OptStartSeq } @@ -638,11 +641,6 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig { if cfg.DeliverPolicy == DeliverLastPerSubjectPolicy && len(c.cfg.FilterSubjects) == 0 { cfg.FilterSubjects = []string{">"} } - if c.cfg.OptStartTime != nil { - cfg.OptStartSeq = 0 - cfg.DeliverPolicy = DeliverByStartTimePolicy - cfg.OptStartTime = c.cfg.OptStartTime - } return cfg } diff --git a/jetstream/test/ordered_test.go b/jetstream/test/ordered_test.go index a6e524b68..5a6231b2d 100644 --- a/jetstream/test/ordered_test.go +++ b/jetstream/test/ordered_test.go @@ -2051,6 +2051,56 @@ func TestOrderedConsumerConfig(t *testing.T) { HeadersOnly: true, }, }, + { + name: "both start seq and time set, deliver policy start seq", + config: jetstream.OrderedConsumerConfig{ + FilterSubjects: []string{"foo.a", "foo.b"}, + DeliverPolicy: jetstream.DeliverByStartSequencePolicy, + OptStartSeq: 10, + OptStartTime: &time.Time{}, + ReplayPolicy: jetstream.ReplayOriginalPolicy, + InactiveThreshold: 10 * time.Second, + HeadersOnly: true, + }, + expected: jetstream.ConsumerConfig{ + FilterSubjects: []string{"foo.a", "foo.b"}, + OptStartSeq: 10, + OptStartTime: nil, + DeliverPolicy: jetstream.DeliverByStartSequencePolicy, + AckPolicy: jetstream.AckNonePolicy, + MaxDeliver: -1, + MaxWaiting: 512, + InactiveThreshold: 10 * time.Second, + Replicas: 1, + MemoryStorage: true, + HeadersOnly: true, + }, + }, + { + name: "both start seq and time set, deliver policy start time", + config: jetstream.OrderedConsumerConfig{ + FilterSubjects: []string{"foo.a", "foo.b"}, + DeliverPolicy: jetstream.DeliverByStartTimePolicy, + OptStartSeq: 10, + OptStartTime: &time.Time{}, + ReplayPolicy: jetstream.ReplayOriginalPolicy, + InactiveThreshold: 10 * time.Second, + HeadersOnly: true, + }, + expected: jetstream.ConsumerConfig{ + FilterSubjects: []string{"foo.a", "foo.b"}, + OptStartSeq: 0, + OptStartTime: &time.Time{}, + DeliverPolicy: jetstream.DeliverByStartTimePolicy, + AckPolicy: jetstream.AckNonePolicy, + MaxDeliver: -1, + MaxWaiting: 512, + InactiveThreshold: 10 * time.Second, + Replicas: 1, + MemoryStorage: true, + HeadersOnly: true, + }, + }, } for _, test := range tests {