Skip to content

Commit

Permalink
Cherry-picks for 2.10.18-RC.4 (#5664)
Browse files Browse the repository at this point in the history
Includes:

- #5662
- #5663
- #5660
  • Loading branch information
wallyqs authored Jul 16, 2024
2 parents 5405206 + 9a57e69 commit ff36cf0
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 7 deletions.
38 changes: 32 additions & 6 deletions server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,51 +1065,64 @@ func (s *Server) Node() string {
// Tradeoff is subscription and interest graph events vs connect and
// disconnect events, etc.
func (s *Server) initEventTracking() {
if !s.EventsEnabled() {
// Capture sys in case we are shutdown while setting up.
s.mu.RLock()
sys := s.sys
s.mu.RUnlock()

if sys == nil || sys.client == nil || sys.account == nil {
return
}
// Create a system hash which we use for other servers to target us specifically.
s.sys.shash = getHash(s.info.Name)
sys.shash = getHash(s.info.Name)

// This will be for all inbox responses.
subject := fmt.Sprintf(inboxRespSubj, s.sys.shash, "*")
subject := fmt.Sprintf(inboxRespSubj, sys.shash, "*")
if _, err := s.sysSubscribe(subject, s.inboxReply); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
s.sys.inboxPre = subject
sys.inboxPre = subject
// This is for remote updates for connection accounting.
subject = fmt.Sprintf(accConnsEventSubjOld, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil {
s.Errorf("Error setting up internal tracking for %s: %v", subject, err)
return
}
// This will be for responses for account info that we send out.
subject = fmt.Sprintf(connsRespSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for broad requests to respond with number of subscriptions for a given subject.
if _, err := s.sysSubscribe(accNumSubsReqSubj, s.noInlineCallback(s.nsubsRequest)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for statsz from others.
subject = fmt.Sprintf(serverStatsSubj, "*")
if sub, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
} else {
// Keep track of this one.
s.sys.remoteStatsSub = sub
sys.remoteStatsSub = sub
}

// Listen for all server shutdowns.
subject = fmt.Sprintf(shutdownEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for servers entering lame-duck mode.
// NOTE: This currently is handled in the same way as a server shutdown, but has
// a different subject in case we need to handle differently in future.
subject = fmt.Sprintf(lameDuckEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// Listen for account claims updates.
subscribeToUpdate := true
Expand All @@ -1120,13 +1133,15 @@ func (s *Server) initEventTracking() {
for _, sub := range []string{accUpdateEventSubjOld, accUpdateEventSubjNew} {
if _, err := s.sysSubscribe(fmt.Sprintf(sub, "*"), s.noInlineCallback(s.accountClaimUpdate)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
}
}
// Listen for ping messages that will be sent to all servers for statsz.
// This subscription is kept for backwards compatibility. Got replaced by ...PING.STATZ from below
if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
monSrvc := map[string]sysMsgHandler{
"IDZ": s.idzReq,
Expand Down Expand Up @@ -1180,10 +1195,12 @@ func (s *Server) initEventTracking() {
subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
subject = fmt.Sprintf(serverPingReqSubj, name)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
}
extractAccount := func(subject string) (string, error) {
Expand Down Expand Up @@ -1276,6 +1293,7 @@ func (s *Server) initEventTracking() {
for name, req := range monAccSrvc {
if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), s.noInlineCallback(req)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
}

Expand All @@ -1284,6 +1302,7 @@ func (s *Server) initEventTracking() {
// is only one that will answer. This breaks tests since we still forward on remote server connect.
if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}

// For now only the STATZ subject has an account specific ping equivalent.
Expand All @@ -1301,39 +1320,46 @@ func (s *Server) initEventTracking() {
})
})); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}

// Listen for updates when leaf nodes connect for a given account. This will
// force any gateway connections to move to `modeInterestOnly`
subject = fmt.Sprintf(leafNodeConnectEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.leafNodeConnected)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
return
}
// For tracking remote latency measurements.
subject = fmt.Sprintf(remoteLatencyEventSubj, s.sys.shash)
subject = fmt.Sprintf(remoteLatencyEventSubj, sys.shash)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteLatencyUpdate)); err != nil {
s.Errorf("Error setting up internal latency tracking: %v", err)
return
}
// This is for simple debugging of number of subscribers that exist in the system.
if _, err := s.sysSubscribeInternal(accSubsSubj, s.noInlineCallback(s.debugSubscribers)); err != nil {
s.Errorf("Error setting up internal debug service for subscribers: %v", err)
return
}

// Listen for requests to reload the server configuration.
subject = fmt.Sprintf(serverReloadReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.reloadConfig)); err != nil {
s.Errorf("Error setting up server reload handler: %v", err)
return
}

// Client connection kick
subject = fmt.Sprintf(clientKickReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.kickClient)); err != nil {
s.Errorf("Error setting up client kick service: %v", err)
return
}
// Client connection LDM
subject = fmt.Sprintf(clientLDMReqSubj, s.info.ID)
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.ldmClient)); err != nil {
s.Errorf("Error setting up client LDM service: %v", err)
return
}
}

Expand Down
10 changes: 10 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4098,6 +4098,16 @@ func (s *Server) isLameDuckMode() bool {
return s.ldm
}

// LameDuckShutdown will perform a lame duck shutdown of NATS, whereby
// the client listener is closed, existing client connections are
// kicked, Raft leaderships are transferred, JetStream is shutdown
// and then finally shutdown the the NATS Server itself.
// This function blocks and will not return until the NATS Server
// has completed the entire shutdown operation.
func (s *Server) LameDuckShutdown() {
s.lameDuckMode()
}

// This function will close the client listener then close the clients
// at some interval to avoid a reconnect storm.
// We will also transfer any raft leaders and shutdown JetStream.
Expand Down
14 changes: 13 additions & 1 deletion server/stree/stree.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (t *SubjectTree[T]) Empty() *SubjectTree[T] {

// Insert a value into the tree. Will return if the value was updated and if so the old value.
func (t *SubjectTree[T]) Insert(subject []byte, value T) (*T, bool) {
if t == nil {
return nil, false
}

old, updated := t.insert(&t.root, subject, value, 0)
if !updated {
t.size++
Expand All @@ -60,6 +64,10 @@ func (t *SubjectTree[T]) Insert(subject []byte, value T) (*T, bool) {

// Find will find the value and return it or false if it was not found.
func (t *SubjectTree[T]) Find(subject []byte) (*T, bool) {
if t == nil {
return nil, false
}

var si int
for n := t.root; n != nil; {
if n.isLeaf() {
Expand Down Expand Up @@ -88,6 +96,10 @@ func (t *SubjectTree[T]) Find(subject []byte) (*T, bool) {

// Delete will delete the item and return its value, or not found if it did not exist.
func (t *SubjectTree[T]) Delete(subject []byte) (*T, bool) {
if t == nil {
return nil, false
}

val, deleted := t.delete(&t.root, subject, 0)
if deleted {
t.size--
Expand All @@ -97,7 +109,7 @@ func (t *SubjectTree[T]) Delete(subject []byte) (*T, bool) {

// Match will match against a subject that can have wildcards and invoke the callback func for each matched value.
func (t *SubjectTree[T]) Match(filter []byte, cb func(subject []byte, val *T)) {
if len(filter) == 0 || cb == nil {
if t == nil || t.root == nil || len(filter) == 0 || cb == nil {
return
}
// We need to break this up into chunks based on wildcards, either pwc '*' or fwc '>'.
Expand Down
11 changes: 11 additions & 0 deletions server/stree/stree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,3 +764,14 @@ func TestSubjectTreeMatchNoCallbackDupe(t *testing.T) {
})
}
}

func TestSubjectTreeNilNoPanic(t *testing.T) {
var st *SubjectTree[int]
st.Match([]byte("foo"), func(_ []byte, _ *int) {})
_, found := st.Find([]byte("foo"))
require_False(t, found)
_, found = st.Delete([]byte("foo"))
require_False(t, found)
_, found = st.Insert([]byte("foo"), 22)
require_False(t, found)
}

0 comments on commit ff36cf0

Please sign in to comment.