From ba168ddf9f6fde3614be46ce07abe43e7f2ef42a Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 1 Sep 2021 08:40:24 -0700 Subject: [PATCH] Make a copy and re-assign consumer config if a consumer assignment was passed in. Signed-off-by: Derek Collison --- server/consumer.go | 40 +++++++++++++++++++++++----------------- server/jetstream_api.go | 3 +++ 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index ec772f8604e..ec69f404618 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -271,6 +271,26 @@ const ( JsDefaultMaxAckPending = 20_000 ) +// Helper function to set consumer config defaults from above. +func setConsumerConfigDefaults(config *ConsumerConfig) { + // Set to default if not specified. + if config.DeliverSubject == _EMPTY_ && config.MaxWaiting == 0 { + config.MaxWaiting = JSWaitQueueDefaultMax + } + // Setup proper default for ack wait if we are in explicit ack mode. + if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) { + config.AckWait = JsAckWaitDefault + } + // Setup default of -1, meaning no limit for MaxDeliver. + if config.MaxDeliver == 0 { + config.MaxDeliver = -1 + } + // Set proper default for max ack pending if we are ack explicit and none has been set. + if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 { + config.MaxAckPending = JsDefaultMaxAckPending + } +} + func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) { return mset.addConsumerWithAssignment(config, _EMPTY_, nil) } @@ -291,6 +311,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri return nil, NewJSConsumerConfigRequiredError() } + // Make sure we have sane defaults. + setConsumerConfigDefaults(config) + if len(config.Description) > JSMaxDescriptionLen { return nil, NewJSConsumerDescriptionTooLongError(JSMaxDescriptionLen) } @@ -329,10 +352,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri if config.MaxWaiting < 0 { return nil, NewJSConsumerMaxWaitingNegativeError() } - // Set to default if not specified. - if config.MaxWaiting == 0 { - config.MaxWaiting = JSWaitQueueDefaultMax - } if config.Heartbeat > 0 { return nil, NewJSConsumerHBRequiresPushError() } @@ -354,19 +373,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } } - // Setup proper default for ack wait if we are in explicit ack mode. - if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) { - config.AckWait = JsAckWaitDefault - } - // Setup default of -1, meaning no limit for MaxDeliver. - if config.MaxDeliver == 0 { - config.MaxDeliver = -1 - } - // Set proper default for max ack pending if we are ack explicit and none has been set. - if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 { - config.MaxAckPending = JsDefaultMaxAckPending - } - // As best we can make sure the filtered subject is valid. if config.FilterSubject != _EMPTY_ { subjects, hasExt := mset.allSubjects() diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 6806c65e01b..1a55213d04b 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -3045,6 +3045,9 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj return } + // Make sure we have sane defaults. + setConsumerConfigDefaults(&req.Config) + // Determine if we should proceed here when we are in clustered mode. if s.JetStreamIsClustered() { if req.Config.Direct {