diff --git a/server/events.go b/server/events.go index 3ab75c56d7..2ebfc5ebac 100644 --- a/server/events.go +++ b/server/events.go @@ -1065,44 +1065,56 @@ func (s *Server) Node() string { // Tradeoff is subscription and interest graph events vs connect and // disconnect events, etc. func (s *Server) initEventTracking() { - if !s.EventsEnabled() { + // Capture sys in case we are shutdown while setting up. + s.mu.RLock() + sys := s.sys + s.mu.RUnlock() + + if sys == nil || sys.client == nil || sys.account == nil { return } // Create a system hash which we use for other servers to target us specifically. - s.sys.shash = getHash(s.info.Name) + sys.shash = getHash(s.info.Name) // This will be for all inbox responses. - subject := fmt.Sprintf(inboxRespSubj, s.sys.shash, "*") + subject := fmt.Sprintf(inboxRespSubj, sys.shash, "*") if _, err := s.sysSubscribe(subject, s.inboxReply); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } - s.sys.inboxPre = subject + sys.inboxPre = subject // This is for remote updates for connection accounting. subject = fmt.Sprintf(accConnsEventSubjOld, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil { s.Errorf("Error setting up internal tracking for %s: %v", subject, err) + return } // This will be for responses for account info that we send out. subject = fmt.Sprintf(connsRespSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for broad requests to respond with number of subscriptions for a given subject. if _, err := s.sysSubscribe(accNumSubsReqSubj, s.noInlineCallback(s.nsubsRequest)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for statsz from others. subject = fmt.Sprintf(serverStatsSubj, "*") if sub, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } else { // Keep track of this one. - s.sys.remoteStatsSub = sub + sys.remoteStatsSub = sub } + // Listen for all server shutdowns. subject = fmt.Sprintf(shutdownEventSubj, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for servers entering lame-duck mode. // NOTE: This currently is handled in the same way as a server shutdown, but has @@ -1110,6 +1122,7 @@ func (s *Server) initEventTracking() { subject = fmt.Sprintf(lameDuckEventSubj, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for account claims updates. subscribeToUpdate := true @@ -1120,6 +1133,7 @@ func (s *Server) initEventTracking() { for _, sub := range []string{accUpdateEventSubjOld, accUpdateEventSubjNew} { if _, err := s.sysSubscribe(fmt.Sprintf(sub, "*"), s.noInlineCallback(s.accountClaimUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } } } @@ -1127,6 +1141,7 @@ func (s *Server) initEventTracking() { // This subscription is kept for backwards compatibility. Got replaced by ...PING.STATZ from below if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } monSrvc := map[string]sysMsgHandler{ "IDZ": s.idzReq, @@ -1180,10 +1195,12 @@ func (s *Server) initEventTracking() { subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name) if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } subject = fmt.Sprintf(serverPingReqSubj, name) if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } } extractAccount := func(subject string) (string, error) { @@ -1276,6 +1293,7 @@ func (s *Server) initEventTracking() { for name, req := range monAccSrvc { if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } } @@ -1284,6 +1302,7 @@ func (s *Server) initEventTracking() { // is only one that will answer. This breaks tests since we still forward on remote server connect. if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // For now only the STATZ subject has an account specific ping equivalent. @@ -1301,6 +1320,7 @@ func (s *Server) initEventTracking() { }) })); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for updates when leaf nodes connect for a given account. This will @@ -1308,32 +1328,38 @@ func (s *Server) initEventTracking() { subject = fmt.Sprintf(leafNodeConnectEventSubj, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.leafNodeConnected)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // For tracking remote latency measurements. - subject = fmt.Sprintf(remoteLatencyEventSubj, s.sys.shash) + subject = fmt.Sprintf(remoteLatencyEventSubj, sys.shash) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteLatencyUpdate)); err != nil { s.Errorf("Error setting up internal latency tracking: %v", err) + return } // This is for simple debugging of number of subscribers that exist in the system. if _, err := s.sysSubscribeInternal(accSubsSubj, s.noInlineCallback(s.debugSubscribers)); err != nil { s.Errorf("Error setting up internal debug service for subscribers: %v", err) + return } // Listen for requests to reload the server configuration. subject = fmt.Sprintf(serverReloadReqSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.reloadConfig)); err != nil { s.Errorf("Error setting up server reload handler: %v", err) + return } // Client connection kick subject = fmt.Sprintf(clientKickReqSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.kickClient)); err != nil { s.Errorf("Error setting up client kick service: %v", err) + return } // Client connection LDM subject = fmt.Sprintf(clientLDMReqSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.ldmClient)); err != nil { s.Errorf("Error setting up client LDM service: %v", err) + return } } diff --git a/server/server.go b/server/server.go index 8a27695898..cc3130ebe5 100644 --- a/server/server.go +++ b/server/server.go @@ -4098,6 +4098,16 @@ func (s *Server) isLameDuckMode() bool { return s.ldm } +// LameDuckShutdown will perform a lame duck shutdown of NATS, whereby +// the client listener is closed, existing client connections are +// kicked, Raft leaderships are transferred, JetStream is shutdown +// and then finally shutdown the the NATS Server itself. +// This function blocks and will not return until the NATS Server +// has completed the entire shutdown operation. +func (s *Server) LameDuckShutdown() { + s.lameDuckMode() +} + // This function will close the client listener then close the clients // at some interval to avoid a reconnect storm. // We will also transfer any raft leaders and shutdown JetStream. diff --git a/server/stree/stree.go b/server/stree/stree.go index fd72b3bac0..d0835bf5d1 100644 --- a/server/stree/stree.go +++ b/server/stree/stree.go @@ -51,6 +51,10 @@ func (t *SubjectTree[T]) Empty() *SubjectTree[T] { // Insert a value into the tree. Will return if the value was updated and if so the old value. func (t *SubjectTree[T]) Insert(subject []byte, value T) (*T, bool) { + if t == nil { + return nil, false + } + old, updated := t.insert(&t.root, subject, value, 0) if !updated { t.size++ @@ -60,6 +64,10 @@ func (t *SubjectTree[T]) Insert(subject []byte, value T) (*T, bool) { // Find will find the value and return it or false if it was not found. func (t *SubjectTree[T]) Find(subject []byte) (*T, bool) { + if t == nil { + return nil, false + } + var si int for n := t.root; n != nil; { if n.isLeaf() { @@ -88,6 +96,10 @@ func (t *SubjectTree[T]) Find(subject []byte) (*T, bool) { // Delete will delete the item and return its value, or not found if it did not exist. func (t *SubjectTree[T]) Delete(subject []byte) (*T, bool) { + if t == nil { + return nil, false + } + val, deleted := t.delete(&t.root, subject, 0) if deleted { t.size-- @@ -97,7 +109,7 @@ func (t *SubjectTree[T]) Delete(subject []byte) (*T, bool) { // Match will match against a subject that can have wildcards and invoke the callback func for each matched value. func (t *SubjectTree[T]) Match(filter []byte, cb func(subject []byte, val *T)) { - if len(filter) == 0 || cb == nil { + if t == nil || t.root == nil || len(filter) == 0 || cb == nil { return } // We need to break this up into chunks based on wildcards, either pwc '*' or fwc '>'. diff --git a/server/stree/stree_test.go b/server/stree/stree_test.go index 7ce59e287e..7177cd6d94 100644 --- a/server/stree/stree_test.go +++ b/server/stree/stree_test.go @@ -764,3 +764,14 @@ func TestSubjectTreeMatchNoCallbackDupe(t *testing.T) { }) } } + +func TestSubjectTreeNilNoPanic(t *testing.T) { + var st *SubjectTree[int] + st.Match([]byte("foo"), func(_ []byte, _ *int) {}) + _, found := st.Find([]byte("foo")) + require_False(t, found) + _, found = st.Delete([]byte("foo")) + require_False(t, found) + _, found = st.Insert([]byte("foo"), 22) + require_False(t, found) +}