Skip to content

Commit

Permalink
Only encode & send advisories when there is interest (#6341)
Browse files Browse the repository at this point in the history
This stops us from spending time & CPU cycles encoding advisory JSONs
when there is no one listening for them.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison authored Jan 8, 2025
2 parents 2daf627 + 48c3c47 commit baf0641
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 60 deletions.
84 changes: 26 additions & 58 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1588,8 +1588,23 @@ func (o *consumer) unsubscribe(sub *subscription) {

// We need to make sure we protect access to the outq.
// Do all advisory sends here.
func (o *consumer) sendAdvisory(subj string, msg []byte) {
o.outq.sendMsg(subj, msg)
func (o *consumer) sendAdvisory(subject string, e any) {
if o.acc == nil {
return
}

// If there is no one listening for this advisory then save ourselves the effort
// and don't bother encoding the JSON or sending it.
if sl := o.acc.sl; (sl != nil && !sl.HasInterest(subject)) && !o.srv.hasGatewayInterest(o.acc.Name, subject) {
return
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.outq.sendMsg(subject, j)
}

func (o *consumer) sendDeleteAdvisoryLocked() {
Expand All @@ -1605,13 +1620,8 @@ func (o *consumer) sendDeleteAdvisoryLocked() {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
}

func (o *consumer) sendPinnedAdvisoryLocked(group string) {
Expand All @@ -1629,13 +1639,8 @@ func (o *consumer) sendPinnedAdvisoryLocked(group string) {
Group: group,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerPinnedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)

}
func (o *consumer) sendUnpinnedAdvisoryLocked(group string, reason string) {
Expand All @@ -1653,13 +1658,8 @@ func (o *consumer) sendUnpinnedAdvisoryLocked(group string, reason string) {
Reason: reason,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerUnpinnedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)

}

Expand All @@ -1679,13 +1679,8 @@ func (o *consumer) sendCreateAdvisory() {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
}

func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) {
Expand All @@ -1705,13 +1700,8 @@ func (o *consumer) sendPauseAdvisoryLocked(cfg *ConsumerConfig) {
e.Paused = time.Now().Before(e.PauseUntil)
}

j, err := json.Marshal(e)
if err != nil {
return
}

subj := JSAdvisoryConsumerPausePre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
}

// Created returns created time.
Expand Down Expand Up @@ -2652,12 +2642,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.sendAdvisory(o.nakEventT, j)
o.sendAdvisory(o.nakEventT, e)

// Check to see if we have delays attached.
if len(nak) > len(AckNak) {
Expand Down Expand Up @@ -2732,15 +2717,8 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
// We had an error during the marshal, so we can't send the advisory,
// but we still need to tell the caller that the ack was processed.
return ackedInPlace
}

subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
return ackedInPlace
}

Expand Down Expand Up @@ -3052,12 +3030,7 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.sendAdvisory(o.ackEventT, j)
o.sendAdvisory(o.ackEventT, e)
}

// Process an ACK.
Expand Down Expand Up @@ -3946,12 +3919,7 @@ func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) {
Domain: o.srv.getOpts().JetStreamDomain,
}

j, err := json.Marshal(e)
if err != nil {
return
}

o.sendAdvisory(o.deliveryExcEventT, j)
o.sendAdvisory(o.deliveryExcEventT, e)
}

// Check if the candidate subject matches a filter if its present.
Expand Down
31 changes: 31 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5229,3 +5229,34 @@ func TestJetStreamClusterMetaStepdownPreferred(t *testing.T) {
require_Equal(t, ErrorIdentifier(apiresp.Error.ErrCode), JSClusterNoPeersErrF)
})
}

func TestJetStreamClusterOnlyPublishAdvisoriesWhenInterest(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

subj := "$JS.ADVISORY.TEST"
s1 := c.servers[0]
s2 := c.servers[1]

// On the first server, see if we think the advisory will be published.
require_False(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test"))

// On the second server, subscribe to the advisory subject.
nc, _ := jsClientConnect(t, s2)
defer nc.Close()

_, err := nc.Subscribe(subj, func(_ *nats.Msg) {})
require_NoError(t, err)

// Wait for the interest to propagate to the first server.
checkFor(t, time.Second, 25*time.Millisecond, func() error {
if !s1.GlobalAccount().sl.HasInterest(subj) {
return fmt.Errorf("expected interest in %q, not yet found", subj)
}
return nil
})

// On the first server, try and publish the advisory again. THis time
// it should succeed.
require_True(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test"))
}
14 changes: 12 additions & 2 deletions server/jetstream_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@ import (
"time"
)

func (s *Server) publishAdvisory(acc *Account, subject string, adv any) {
// publishAdvisory sends the given advisory into the account. Returns true if
// it was sent, false if not (i.e. due to lack of interest or a marshal error).
func (s *Server) publishAdvisory(acc *Account, subject string, adv any) bool {
if acc == nil {
acc = s.SystemAccount()
if acc == nil {
return
return false
}
}

// If there is no one listening for this advisory then save ourselves the effort
// and don't bother encoding the JSON or sending it.
if sl := acc.sl; (sl != nil && !sl.HasInterest(subject)) && !s.hasGatewayInterest(acc.Name, subject) {
return false
}

ej, err := json.Marshal(adv)
if err == nil {
err = s.sendInternalAccountMsg(acc, subject, ej)
Expand All @@ -34,6 +43,7 @@ func (s *Server) publishAdvisory(acc *Account, subject string, adv any) {
} else {
s.Warnf("Advisory could not be serialized for account %q: %v", acc.Name, err)
}
return err == nil
}

// JSAPIAudit is an advisory about administrative actions taken on JetStream
Expand Down

0 comments on commit baf0641

Please sign in to comment.