Skip to content

Commit

Permalink
Do not start additional goroutines from consumer deleteNotActive (#…
Browse files Browse the repository at this point in the history
…6344)

Since `deleteNotActive` already runs in its own goroutine via
`time.AfterFunc`, we just create more work for the scheduler by having
extra goroutines running on top of that.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison authored Jan 8, 2025
2 parents baf0641 + b37c63e commit edf1b3e
Showing 1 changed file with 34 additions and 27 deletions.
61 changes: 34 additions & 27 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,8 @@ var (
consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval
)

// deleteNotActive must only be called from time.AfterFunc or in its own
// goroutine, as it can block on clean-up.
func (o *consumer) deleteNotActive() {
o.mu.Lock()
if o.mset == nil {
Expand Down Expand Up @@ -1853,6 +1855,16 @@ func (o *consumer) deleteNotActive() {
acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct
o.mu.Unlock()

// Useful for pprof.
setGoRoutineLabels(pprofLabels{
"account": acc,
"stream": stream,
"consumer": name,
})

// We will delete locally regardless.
defer o.delete()

// If we are clustered, check if we still have this consumer assigned.
// If we do forward a proposal to delete ourselves to the metacontroller leader.
if !isDirect && s.JetStreamIsClustered() {
Expand All @@ -1875,38 +1887,33 @@ func (o *consumer) deleteNotActive() {
if ca != nil && cc != nil {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
if js.shuttingDown {
js.mu.RUnlock()
return
}
nca := js.consumerAssignment(acc, stream, name)
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
if js.shuttingDown {
js.mu.RUnlock()
// Make sure this is not a new consumer with the same name.
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
continue
}
// We saw that consumer has been removed, all done.
return
}
}()
nca := js.consumerAssignment(acc, stream, name)
js.mu.RUnlock()
// Make sure this is not a new consumer with the same name.
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
continue
}
// We saw that consumer has been removed, all done.
return
}
}
}

// We will delete here regardless.
o.delete()
}

func (o *consumer) watchGWinterest() {
Expand Down

0 comments on commit edf1b3e

Please sign in to comment.