Skip to content

Commit

Permalink
Use popOne in JS API routed request workers (#6355)
Browse files Browse the repository at this point in the history
This updates the JS API workers to use `popOne`, so that if the queue
stacks up at all, the recovery is fairer across workers, rather than
single workers stealing the entire queue.

This was extracted from #6342.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison authored Jan 9, 2025
2 parents 733315a + f1e52bd commit 1dcd873
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
7 changes: 4 additions & 3 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,8 +922,10 @@ func (s *Server) processJSAPIRoutedRequests() {
for {
select {
case <-queue.ch:
reqs := queue.pop()
for _, r := range reqs {
// Only pop one item at a time here, otherwise if the system is recovering
// from queue buildup, then one worker will pull off all the tasks and the
// others will be starved of work.
for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
client.pa = r.pa
start := time.Now()
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
Expand All @@ -932,7 +934,6 @@ func (s *Server) processJSAPIRoutedRequests() {
}
atomic.AddInt64(&js.apiInflight, -1)
}
queue.recycle(&reqs)
case <-s.quitCh:
return
}
Expand Down
66 changes: 66 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5265,3 +5265,69 @@ func TestJetStreamClusterOnlyPublishAdvisoriesWhenInterest(t *testing.T) {
// it should succeed.
require_True(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test"))
}

func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, _ := jsClientConnect(t, c.randomNonLeader())
defer nc.Close()

// We only run 16 JetStream API workers.
mp := runtime.GOMAXPROCS(0)
if mp > 16 {
mp = 16
}

leader := c.leader()
ljs := leader.js.Load()

// Take the JS lock, which allows the JS API queue to build up.
ljs.mu.Lock()
defer ljs.mu.Unlock()

count := JSDefaultRequestQueueLimit - 1
ch := make(chan *nats.Msg, count)

inbox := nc.NewRespInbox()
_, err := nc.ChanSubscribe(inbox, ch)
require_NoError(t, err)

// To ensure a fair starting line, we need to submit as many tasks as
// there are JS workers whilst holding the JS lock. This will ensure that
// each JS API worker is properly wedged.
msg := &nats.Msg{
Subject: fmt.Sprintf(JSApiConsumerInfoT, "Doesnt", "Exist"),
Reply: "no_one_here",
}
for i := 0; i < mp; i++ {
require_NoError(t, nc.PublishMsg(msg))
}

// Then we want to submit a fixed number of tasks, big enough to fill
// the queue, so that we can measure them.
msg = &nats.Msg{
Subject: fmt.Sprintf(JSApiConsumerInfoT, "Doesnt", "Exist"),
Reply: inbox,
}
for i := 0; i < count; i++ {
require_NoError(t, nc.PublishMsg(msg))
}
checkFor(t, 5*time.Second, 25*time.Millisecond, func() error {
if queued := leader.jsAPIRoutedReqs.len(); queued != count {
return fmt.Errorf("expected %d queued requests, got %d", count, queued)
}
return nil
})

// Now we're going to release the lock and start timing. The workers
// will now race to clear the queues and we'll wait to see how long
// it takes for them all to respond.
start := time.Now()
ljs.mu.Unlock()
for i := 0; i < count; i++ {
<-ch
}
ljs.mu.Lock()
t.Logf("Took %s to clear %d items", time.Since(start), count)
}

0 comments on commit 1dcd873

Please sign in to comment.