Skip to content

Commit

Permalink
Make a copy and re-assign consumer config if a consumer assignment wa…
Browse files Browse the repository at this point in the history
…s passed in.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Sep 1, 2021
1 parent d809b02 commit ba168dd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
40 changes: 23 additions & 17 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,26 @@ const (
JsDefaultMaxAckPending = 20_000
)

// Helper function to set consumer config defaults from above.
func setConsumerConfigDefaults(config *ConsumerConfig) {
// Set to default if not specified.
if config.DeliverSubject == _EMPTY_ && config.MaxWaiting == 0 {
config.MaxWaiting = JSWaitQueueDefaultMax
}
// Setup proper default for ack wait if we are in explicit ack mode.
if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) {
config.AckWait = JsAckWaitDefault
}
// Setup default of -1, meaning no limit for MaxDeliver.
if config.MaxDeliver == 0 {
config.MaxDeliver = -1
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 {
config.MaxAckPending = JsDefaultMaxAckPending
}
}

func (mset *stream) addConsumer(config *ConsumerConfig) (*consumer, error) {
return mset.addConsumerWithAssignment(config, _EMPTY_, nil)
}
Expand All @@ -291,6 +311,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
return nil, NewJSConsumerConfigRequiredError()
}

// Make sure we have sane defaults.
setConsumerConfigDefaults(config)

if len(config.Description) > JSMaxDescriptionLen {
return nil, NewJSConsumerDescriptionTooLongError(JSMaxDescriptionLen)
}
Expand Down Expand Up @@ -329,10 +352,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
if config.MaxWaiting < 0 {
return nil, NewJSConsumerMaxWaitingNegativeError()
}
// Set to default if not specified.
if config.MaxWaiting == 0 {
config.MaxWaiting = JSWaitQueueDefaultMax
}
if config.Heartbeat > 0 {
return nil, NewJSConsumerHBRequiresPushError()
}
Expand All @@ -354,19 +373,6 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
}
}

// Setup proper default for ack wait if we are in explicit ack mode.
if config.AckWait == 0 && (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) {
config.AckWait = JsAckWaitDefault
}
// Setup default of -1, meaning no limit for MaxDeliver.
if config.MaxDeliver == 0 {
config.MaxDeliver = -1
}
// Set proper default for max ack pending if we are ack explicit and none has been set.
if (config.AckPolicy == AckExplicit || config.AckPolicy == AckAll) && config.MaxAckPending == 0 {
config.MaxAckPending = JsDefaultMaxAckPending
}

// As best we can make sure the filtered subject is valid.
if config.FilterSubject != _EMPTY_ {
subjects, hasExt := mset.allSubjects()
Expand Down
3 changes: 3 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3045,6 +3045,9 @@ func (s *Server) jsConsumerCreate(sub *subscription, c *client, a *Account, subj
return
}

// Make sure we have sane defaults.
setConsumerConfigDefaults(&req.Config)

// Determine if we should proceed here when we are in clustered mode.
if s.JetStreamIsClustered() {
if req.Config.Direct {
Expand Down

0 comments on commit ba168dd

Please sign in to comment.