From 6977981bb7c836457bbdf1387fc3b8821cd23259 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 19 Dec 2024 16:37:29 +0000 Subject: [PATCH] [IMPROVED] Add extra option to OrderedConsumerConfig (#1737) Signed-off-by: Michael Wain --- jetstream/consumer_config.go | 5 +++++ jetstream/ordered.go | 1 + jetstream/test/ordered_test.go | 4 ++++ 3 files changed, 10 insertions(+) diff --git a/jetstream/consumer_config.go b/jetstream/consumer_config.go index 4e2e3d6e0..65f581d8e 100644 --- a/jetstream/consumer_config.go +++ b/jetstream/consumer_config.go @@ -263,6 +263,11 @@ type ( // Maximum number of attempts for the consumer to be recreated in a // single recreation cycle. Defaults to unlimited. MaxResetAttempts int + + // Metadata is a set of application-defined key-value pairs for + // associating metadata on the consumer. This feature requires + // nats-server v2.10.0 or later. + Metadata map[string]string `json:"metadata,omitempty"` } // DeliverPolicy determines from which point to start delivering messages. diff --git a/jetstream/ordered.go b/jetstream/ordered.go index ed5ef6ac4..055b5ef46 100644 --- a/jetstream/ordered.go +++ b/jetstream/ordered.go @@ -611,6 +611,7 @@ func (c *orderedConsumer) getConsumerConfig() *ConsumerConfig { Replicas: 1, HeadersOnly: c.cfg.HeadersOnly, MemoryStorage: true, + Metadata: c.cfg.Metadata, } if len(c.cfg.FilterSubjects) == 1 { cfg.FilterSubject = c.cfg.FilterSubjects[0] diff --git a/jetstream/test/ordered_test.go b/jetstream/test/ordered_test.go index a40867ac5..97125674f 100644 --- a/jetstream/test/ordered_test.go +++ b/jetstream/test/ordered_test.go @@ -2006,6 +2006,7 @@ func TestOrderedConsumerConfig(t *testing.T) { ReplayPolicy: jetstream.ReplayOriginalPolicy, InactiveThreshold: 10 * time.Second, HeadersOnly: true, + Metadata: map[string]string{"foo": "a"}, }, expected: jetstream.ConsumerConfig{ FilterSubjects: []string{"foo.a", "foo.b"}, @@ -2018,6 +2019,7 @@ func TestOrderedConsumerConfig(t *testing.T) { Replicas: 1, MemoryStorage: true, HeadersOnly: true, + Metadata: map[string]string{"foo": "a"}, }, }, { @@ -2029,6 +2031,7 @@ func TestOrderedConsumerConfig(t *testing.T) { ReplayPolicy: jetstream.ReplayOriginalPolicy, InactiveThreshold: 10 * time.Second, HeadersOnly: true, + Metadata: map[string]string{"foo": "a"}, }, expected: jetstream.ConsumerConfig{ FilterSubjects: []string{"foo.a", "foo.b"}, @@ -2041,6 +2044,7 @@ func TestOrderedConsumerConfig(t *testing.T) { Replicas: 1, MemoryStorage: true, HeadersOnly: true, + Metadata: map[string]string{"foo": "a"}, }, }, {