Skip to content

Commit

Permalink
Fix JS API in-flight metric (#6373)
Browse files Browse the repository at this point in the history
After a drain this would have been misreporting, as we did not remove
drained entries from the `apiInflight` count.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison authored Jan 14, 2025
2 parents 587d910 + 296edb7 commit 74f005d
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
7 changes: 5 additions & 2 deletions server/ipqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,14 +246,16 @@ func (q *ipQueue[T]) size() uint64 {
}

// Empty the queue and consumes the notification signal if present.
// Returns the number of items that were drained from the queue.
// Note that this could cause a reader go routine that has been
// notified that there is something in the queue (reading from queue's `ch`)
// may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`.
func (q *ipQueue[T]) drain() {
func (q *ipQueue[T]) drain() int {
if q == nil {
return
return 0
}
q.Lock()
olen := len(q.elts) - q.pos
q.elts, q.pos, q.sz = nil, 0, 0
// Consume the signal if it was present to reduce the chance of a reader
// routine to be think that there is something in the queue...
Expand All @@ -262,6 +264,7 @@ func (q *ipQueue[T]) drain() {
default:
}
q.Unlock()
return olen
}

// Since the length of the queue goes to 0 after a pop(), it is good to
Expand Down
5 changes: 3 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,8 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
limit := atomic.LoadInt64(&js.queueLimit)
if pending >= int(limit) {
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
s.jsAPIRoutedReqs.drain()
drained := int64(s.jsAPIRoutedReqs.drain())
atomic.AddInt64(&js.apiInflight, -drained)

s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
TypedEvent: TypedEvent{
Expand All @@ -904,7 +905,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
},
Server: s.Name(),
Domain: js.config.Domain,
Dropped: int64(pending),
Dropped: drained,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1962,7 +1962,7 @@ runner:
// just will remove them from the central monitoring map
queues := []interface {
unregister()
drain()
drain() int
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.drain()
Expand Down

0 comments on commit 74f005d

Please sign in to comment.