Skip to content

Commit

Permalink
Improve error message when attempting to change consumer type (#6408)
Browse files Browse the repository at this point in the history
Before this change, an attempt to change consumer type (by either
setting or removing deliver subject), resulted in `max waiting can not
be updated` error as `MaxWaiting` has a default value for pull
consumers. This ensures that changing consumer type is checked before
max waiting to return a more useful error message.

Signed-off-by: Piotr Piotrowski
[piotr@synadia.com](mailto:piotr@synadia.com)
  • Loading branch information
derekcollison authored Jan 27, 2025
2 parents 654d051 + d408266 commit 52df7e6
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 3 deletions.
7 changes: 4 additions & 3 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2099,9 +2099,6 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
if cfg.FlowControl != ncfg.FlowControl {
return errors.New("flow control can not be updated")
}
if cfg.MaxWaiting != ncfg.MaxWaiting {
return errors.New("max waiting can not be updated")
}

// Deliver Subject is conditional on if its bound.
if cfg.DeliverSubject != ncfg.DeliverSubject {
Expand All @@ -2116,6 +2113,10 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
}
}

if cfg.MaxWaiting != ncfg.MaxWaiting {
return errors.New("max waiting can not be updated")
}

// Check if BackOff is defined, MaxDeliver is within range.
if lbo := len(ncfg.BackOff); lbo > 0 && ncfg.MaxDeliver != -1 && lbo > ncfg.MaxDeliver {
return NewJSConsumerMaxDeliverBackoffError()
Expand Down
41 changes: 41 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9279,6 +9279,47 @@ func TestJetStreamPullConsumerMaxWaiting(t *testing.T) {
}
}

func TestJetStreamChangeConsumerType(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"test.*"}})
require_NoError(t, err)

// create pull consumer
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "pull",
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// cannot update pull -> push
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
Name: "pull",
AckPolicy: nats.AckExplicitPolicy,
DeliverSubject: "foo",
})
require_Contains(t, err.Error(), "can not update pull consumer to push based")

// create push consumer
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Name: "push",
AckPolicy: nats.AckExplicitPolicy,
DeliverSubject: "foo",
})
require_NoError(t, err)

// cannot change push -> pull
_, err = js.UpdateConsumer("TEST", &nats.ConsumerConfig{
Name: "push",
AckPolicy: nats.AckExplicitPolicy,
})
require_Contains(t, err.Error(), "can not update push consumer to pull based")
}

////////////////////////////////////////
// Benchmark placeholders
// TODO(dlc) - move
Expand Down

0 comments on commit 52df7e6

Please sign in to comment.