diff --git a/server/events.go b/server/events.go index ff940571a0..83f9a85031 100644 --- a/server/events.go +++ b/server/events.go @@ -1066,44 +1066,56 @@ func (s *Server) Node() string { // Tradeoff is subscription and interest graph events vs connect and // disconnect events, etc. func (s *Server) initEventTracking() { - if !s.EventsEnabled() { + // Capture sys in case we are shutdown while setting up. + s.mu.RLock() + sys := s.sys + s.mu.RUnlock() + + if sys == nil || sys.client == nil || sys.account == nil { return } // Create a system hash which we use for other servers to target us specifically. - s.sys.shash = getHash(s.info.Name) + sys.shash = getHash(s.info.Name) // This will be for all inbox responses. - subject := fmt.Sprintf(inboxRespSubj, s.sys.shash, "*") + subject := fmt.Sprintf(inboxRespSubj, sys.shash, "*") if _, err := s.sysSubscribe(subject, s.inboxReply); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } - s.sys.inboxPre = subject + sys.inboxPre = subject // This is for remote updates for connection accounting. subject = fmt.Sprintf(accConnsEventSubjOld, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil { s.Errorf("Error setting up internal tracking for %s: %v", subject, err) + return } // This will be for responses for account info that we send out. subject = fmt.Sprintf(connsRespSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteConnsUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for broad requests to respond with number of subscriptions for a given subject. if _, err := s.sysSubscribe(accNumSubsReqSubj, s.noInlineCallback(s.nsubsRequest)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for statsz from others. subject = fmt.Sprintf(serverStatsSubj, "*") if sub, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } else { // Keep track of this one. - s.sys.remoteStatsSub = sub + sys.remoteStatsSub = sub } + // Listen for all server shutdowns. subject = fmt.Sprintf(shutdownEventSubj, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for servers entering lame-duck mode. // NOTE: This currently is handled in the same way as a server shutdown, but has @@ -1111,6 +1123,7 @@ func (s *Server) initEventTracking() { subject = fmt.Sprintf(lameDuckEventSubj, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for account claims updates. subscribeToUpdate := true @@ -1121,6 +1134,7 @@ func (s *Server) initEventTracking() { for _, sub := range []string{accUpdateEventSubjOld, accUpdateEventSubjNew} { if _, err := s.sysSubscribe(fmt.Sprintf(sub, "*"), s.noInlineCallback(s.accountClaimUpdate)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } } } @@ -1128,6 +1142,7 @@ func (s *Server) initEventTracking() { // This subscription is kept for backwards compatibility. Got replaced by ...PING.STATZ from below if _, err := s.sysSubscribe(serverStatsPingReqSubj, s.noInlineCallback(s.statszReq)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } monSrvc := map[string]sysMsgHandler{ "IDZ": s.idzReq, @@ -1181,10 +1196,12 @@ func (s *Server) initEventTracking() { subject = fmt.Sprintf(serverDirectReqSubj, s.info.ID, name) if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } subject = fmt.Sprintf(serverPingReqSubj, name) if _, err := s.sysSubscribe(subject, s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } } extractAccount := func(subject string) (string, error) { @@ -1277,6 +1294,7 @@ func (s *Server) initEventTracking() { for name, req := range monAccSrvc { if _, err := s.sysSubscribe(fmt.Sprintf(accDirectReqSubj, "*", name), s.noInlineCallback(req)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } } @@ -1285,6 +1303,7 @@ func (s *Server) initEventTracking() { // is only one that will answer. This breaks tests since we still forward on remote server connect. if _, err := s.sysSubscribe(fmt.Sprintf(userDirectReqSubj, "*"), s.userInfoReq); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // For now only the STATZ subject has an account specific ping equivalent. @@ -1302,6 +1321,7 @@ func (s *Server) initEventTracking() { }) })); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // Listen for updates when leaf nodes connect for a given account. This will @@ -1309,32 +1329,38 @@ func (s *Server) initEventTracking() { subject = fmt.Sprintf(leafNodeConnectEventSubj, "*") if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.leafNodeConnected)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) + return } // For tracking remote latency measurements. - subject = fmt.Sprintf(remoteLatencyEventSubj, s.sys.shash) + subject = fmt.Sprintf(remoteLatencyEventSubj, sys.shash) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteLatencyUpdate)); err != nil { s.Errorf("Error setting up internal latency tracking: %v", err) + return } // This is for simple debugging of number of subscribers that exist in the system. if _, err := s.sysSubscribeInternal(accSubsSubj, s.noInlineCallback(s.debugSubscribers)); err != nil { s.Errorf("Error setting up internal debug service for subscribers: %v", err) + return } // Listen for requests to reload the server configuration. subject = fmt.Sprintf(serverReloadReqSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.reloadConfig)); err != nil { s.Errorf("Error setting up server reload handler: %v", err) + return } // Client connection kick subject = fmt.Sprintf(clientKickReqSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.kickClient)); err != nil { s.Errorf("Error setting up client kick service: %v", err) + return } // Client connection LDM subject = fmt.Sprintf(clientLDMReqSubj, s.info.ID) if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.ldmClient)); err != nil { s.Errorf("Error setting up client LDM service: %v", err) + return } }