From 9484363fe4860c81116b8ad1edd58822aacb2267 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 27 Jun 2024 11:12:50 -0600 Subject: [PATCH 01/16] [IMPROVED] Routing: reduce chances of duplicate implicit routes This is an alternate approach to the PR #5484 from @wjordan. Using the code in that PR with the test added in this PR, I could still see duplicate routes (up to 125 in one of the matrix), and still had a data race (that could have easily be fixed). The main issue is that the increment happens in connectToRoute, which is running from a go routine, so there were still chances for duplicates. Instead, I took the approach that those duplicates were the result of way too many gossip protocols. Suppose that you have servers A and B already connected. C connects to A. A gossips to B that it should connect to C. When that happened, B would gossip to A the server C and C would gossip to A the server B, which all that was unnecessary. It would grow quite fast with the size of the cluster (that is, several thousands for a cluster size of 15 or so). Resolves #5483 Signed-off-by: Ivan Kozlovic --- server/route.go | 198 +++++++++++++++++++++++++----------------- server/routes_test.go | 186 +++++++++++++++++++++++++++++++++++++-- server/server.go | 1 + 3 files changed, 300 insertions(+), 85 deletions(-) diff --git a/server/route.go b/server/route.go index 0341f79868..d200d26264 100644 --- a/server/route.go +++ b/server/route.go @@ -98,6 +98,9 @@ type route struct { // Selected compression mode, which may be different from the // server configured mode. compression string + // Transient value used to set the Info.NoGossip when initiating + // an implicit route and sending to the remote. + noGossip bool } type connectInfo struct { @@ -700,12 +703,15 @@ func (c *client) processRouteInfo(info *Info) { return } + var sendDelayedInfo bool + // First INFO, check if this server is configured for compression because // if that is the case, we need to negotiate it with the remote server. if needsCompression(opts.Cluster.Compression.Mode) { accName := bytesToString(c.route.accName) // If we did not yet negotiate... - if !c.flags.isSet(compressionNegotiated) { + compNeg := c.flags.isSet(compressionNegotiated) + if !compNeg { // Prevent from getting back here. c.flags.set(compressionNegotiated) // Release client lock since following function will need server lock. @@ -722,24 +728,21 @@ func (c *client) processRouteInfo(info *Info) { } // No compression because one side does not want/can't, so proceed. c.mu.Lock() - } else if didSolicit { - // The other side has switched to compression, so we can now set - // the first ping timer and send the delayed INFO for situations - // where it was not already sent. - c.setFirstPingTimer() - if !routeShouldDelayInfo(accName, opts) { - cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression) - // Need to release and then reacquire... + // Check that the connection did not close if the lock was released. + if c.isClosed() { c.mu.Unlock() - s.sendDelayedRouteInfo(c, accName, cm) - c.mu.Lock() + return } } - // Check that the connection did not close if the lock was released. - if c.isClosed() { - c.mu.Unlock() - return + // We can set the ping timer after we just negotiated compression above, + // or for solicited routes if we already negotiated. + if !compNeg || didSolicit { + c.setFirstPingTimer() } + // When compression is configured, we delay the initial INFO for any + // solicited route. So we need to send the delayed INFO simply based + // on the didSolicit boolean. + sendDelayedInfo = didSolicit } else { // Coming from an old server, the Compression field would be the empty // string. For servers that are configured with CompressionNotSupported, @@ -749,6 +752,10 @@ func (c *client) processRouteInfo(info *Info) { } else { c.route.compression = CompressionOff } + // When compression is not configured, we delay the initial INFO only + // for solicited pooled routes, so use the same check that we did when + // we decided to delay in createRoute(). + sendDelayedInfo = didSolicit && routeShouldDelayInfo(bytesToString(c.route.accName), opts) } // Mark that the INFO protocol has been received, so we can detect updates. @@ -825,11 +832,15 @@ func (c *client) processRouteInfo(info *Info) { } accName := string(c.route.accName) + // Capture the noGossip value and reset it here. + noGossip := c.route.noGossip + c.route.noGossip = false + // Check to see if we have this remote already registered. // This can happen when both servers have routes to each other. c.mu.Unlock() - if added := s.addRoute(c, didSolicit, info, accName); added { + if added := s.addRoute(c, didSolicit, sendDelayedInfo, noGossip, info, accName); added { if accName != _EMPTY_ { c.Debugf("Registering remote route %q for account %q", info.ID, accName) } else { @@ -863,7 +874,7 @@ func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName, if needsCompression(cm) { // Generate an INFO with the chosen compression mode. s.mu.Lock() - infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0) + infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0, false) s.mu.Unlock() // If we solicited, then send this INFO protocol BEFORE switching @@ -892,29 +903,9 @@ func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName, c.mu.Unlock() return true, nil } - // We are not using compression, set the ping timer. - c.mu.Lock() - c.setFirstPingTimer() - c.mu.Unlock() - // If this is a solicited route, we need to send the INFO if it was not - // done during createRoute() and will not be done in addRoute(). - if didSolicit && !routeShouldDelayInfo(accName, opts) { - cm = compressionModeForInfoProtocol(&opts.Cluster.Compression, cm) - s.sendDelayedRouteInfo(c, accName, cm) - } return false, nil } -func (s *Server) sendDelayedRouteInfo(c *client, accName, cm string) { - s.mu.Lock() - infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0) - s.mu.Unlock() - - c.mu.Lock() - c.enqueueProto(infoProto) - c.mu.Unlock() -} - // Possibly sends local subscriptions interest to this route // based on changes in the remote's Export permissions. func (s *Server) updateRemoteRoutePerms(c *client, info *Info) { @@ -1050,7 +1041,7 @@ func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) { if info.AuthRequired { r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password) } - s.startGoRoutine(func() { s.connectToRoute(r, false, true, info.RouteAccount) }) + s.startGoRoutine(func() { s.connectToRoute(r, Implicit, true, info.NoGossip, info.RouteAccount) }) // If we are processing an implicit route from a route that does not // support pooling/pinned-accounts, we won't receive an INFO for each of // the pinned-accounts that we would normally receive. In that case, just @@ -1060,7 +1051,7 @@ func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) { rURL := r for _, an := range opts.Cluster.PinnedAccounts { accName := an - s.startGoRoutine(func() { s.connectToRoute(rURL, false, true, accName) }) + s.startGoRoutine(func() { s.connectToRoute(rURL, Implicit, true, info.NoGossip, accName) }) } } } @@ -1112,11 +1103,39 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { b, _ := json.Marshal(info) infoJSON := []byte(fmt.Sprintf(InfoProto, b)) + // If this is for a pinned account, we will try to send the gossip + // through our pinned account routes, but fall back to the other + // routes in case we don't have one for a given remote. + accRemotes := map[string]struct{}{} + if info.RouteAccount != _EMPTY_ { + if remotes, ok := s.accRoutes[info.RouteAccount]; ok { + for remoteID, r := range remotes { + if r == nil { + continue + } + accRemotes[remoteID] = struct{}{} + r.mu.Lock() + // Do not send to a remote that does not support pooling/pinned-accounts. + if remoteID != info.ID && !r.route.noPool { + r.enqueueProto(infoJSON) + } + r.mu.Unlock() + } + } + } + s.forEachRemote(func(r *client) { r.mu.Lock() + remoteID := r.route.remoteID + if info.RouteAccount != _EMPTY_ { + if _, processed := accRemotes[remoteID]; processed { + r.mu.Unlock() + return + } + } // If this is a new route for a given account, do not send to a server // that does not support pooling/pinned-accounts. - if r.route.remoteID != info.ID && + if remoteID != info.ID && (info.RouteAccount == _EMPTY_ || (info.RouteAccount != _EMPTY_ && !r.route.noPool)) { r.enqueueProto(infoJSON) } @@ -1695,17 +1714,12 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra c.enqueueProto(buf) } -func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *client { +func (s *Server) createRoute(conn net.Conn, rURL *url.URL, rtype RouteType, noGossip bool, accName string) *client { // Snapshot server options. opts := s.getOpts() didSolicit := rURL != nil - r := &route{didSolicit: didSolicit, poolIdx: -1} - for _, route := range opts.Routes { - if rURL != nil && (strings.EqualFold(rURL.Host, route.Host)) { - r.routeType = Explicit - } - } + r := &route{routeType: rtype, didSolicit: didSolicit, poolIdx: -1, noGossip: noGossip} c := &client{srv: s, nc: conn, opts: ClientOpts{}, kind: ROUTER, msubs: -1, mpay: -1, route: r, start: time.Now()} @@ -1722,7 +1736,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, accName string) *clie // the incoming INFO from the remote. Also delay if configured for compression. delayInfo := didSolicit && (compressionConfigured || routeShouldDelayInfo(accName, opts)) if !delayInfo { - infoJSON = s.generateRouteInitialInfoJSON(accName, opts.Cluster.Compression.Mode, 0) + infoJSON = s.generateRouteInitialInfoJSON(accName, opts.Cluster.Compression.Mode, 0, noGossip) } authRequired := s.routeInfo.AuthRequired tlsRequired := s.routeInfo.TLSRequired @@ -1855,7 +1869,7 @@ func routeShouldDelayInfo(accName string, opts *Options) bool { // To be used only when a route is created (to send the initial INFO protocol). // // Server lock held on entry. -func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolIdx int) []byte { +func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolIdx int, noGossip bool) []byte { // New proto wants a nonce (although not used in routes, that is, not signed in CONNECT) var raw [nonceLen]byte nonce := raw[:] @@ -1865,11 +1879,11 @@ func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolI if s.getOpts().Cluster.Compression.Mode == CompressionS2Auto { compression = CompressionS2Auto } - ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression = string(nonce), accName, poolIdx, compression + ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.NoGossip = string(nonce), accName, poolIdx, compression, noGossip infoJSON := generateInfoJSON(&s.routeInfo) // Clear now that it has been serialized. Will prevent nonce to be included in async INFO that we may send. // Same for some other fields. - ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression = _EMPTY_, _EMPTY_, 0, _EMPTY_ + ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.NoGossip = _EMPTY_, _EMPTY_, 0, _EMPTY_, false return infoJSON } @@ -1878,7 +1892,7 @@ const ( _EMPTY_ = "" ) -func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string) bool { +func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool, info *Info, accName string) bool { id := info.ID var acc *Account @@ -1964,12 +1978,19 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string c.mu.Lock() idHash := c.route.idHash cid := c.cid + if sendDelayedInfo { + cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression) + c.enqueueProto(s.generateRouteInitialInfoJSON(accName, cm, 0, noGossip)) + } if c.last.IsZero() { c.last = time.Now() } if acc != nil { c.acc = acc } + // This will be true if this is a route that was initiated from the + // gossip protocol (basically invoked from processImplicitRoute). + fromGossip := didSolicit && c.route.routeType == Implicit c.mu.Unlock() // Store this route with key being the route id hash + account name @@ -1978,8 +1999,20 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string // Now that we have registered the route, we can remove from the temp map. s.removeFromTempClients(cid) - // Notify other routes about this new route - s.forwardNewRouteInfoToKnownServers(info) + // We will not gossip if we are an implicit route created due to + // gossip, or if the remote instructed us not to gossip. + if !fromGossip && !info.NoGossip { + if !didSolicit { + // If the connection was accepted, instruct the neighbors to + // set Info.NoGossip to true also when sending their own INFO + // protocol. In normal situations, any implicit route would + // set their Info.NoGossip to true, but we do this to solve + // a very specific situation. For some background, see test + // TestRouteImplicitJoinsSeparateGroups. + info.NoGossip = true + } + s.forwardNewRouteInfoToKnownServers(info) + } // Send subscription interest s.sendSubsToRoute(c, -1, accName) @@ -2060,9 +2093,9 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string rHash := c.route.hash rn := c.route.remoteName url := c.route.url - // For solicited routes, we need now to send the INFO protocol. - if didSolicit { - c.enqueueProto(s.generateRouteInitialInfoJSON(_EMPTY_, c.route.compression, idx)) + if sendDelayedInfo { + cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression) + c.enqueueProto(s.generateRouteInitialInfoJSON(_EMPTY_, cm, idx, noGossip)) } if c.last.IsZero() { c.last = time.Now() @@ -2100,8 +2133,13 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string } // we don't need to send if the only route is the one we just accepted. - if len(s.routes) > 1 { - // Now let the known servers know about this new route + // For other checks, see other call to forwardNewRouteInfoToKnownServers + // in the handling of pinned account above. + fromGossip := didSolicit && rtype == Implicit + if len(s.routes) > 1 && !fromGossip && !info.NoGossip { + if !didSolicit { + info.NoGossip = true + } s.forwardNewRouteInfoToKnownServers(info) } @@ -2137,7 +2175,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string s.grWG.Done() return } - s.connectToRoute(url, rtype == Explicit, true, _EMPTY_) + s.connectToRoute(url, rtype, true, noGossip, _EMPTY_) }) } } @@ -2550,7 +2588,7 @@ func (s *Server) startRouteAcceptLoop() { } // Start the accept loop in a different go routine. - go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, _EMPTY_) }, nil) + go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, Implicit, false, _EMPTY_) }, nil) // Solicit Routes if applicable. This will not block. s.solicitRoutes(opts.Routes, opts.Cluster.PinnedAccounts) @@ -2592,14 +2630,13 @@ func (s *Server) StartRouting(clientListenReady chan struct{}) { } func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType, accName string) { - tryForEver := rtype == Explicit // If A connects to B, and B to A (regardless if explicit or // implicit - due to auto-discovery), and if each server first // registers the route on the opposite TCP connection, the // two connections will end-up being closed. // Add some random delay to reduce risk of repeated failures. delay := time.Duration(rand.Intn(100)) * time.Millisecond - if tryForEver { + if rtype == Explicit { delay += DEFAULT_ROUTE_RECONNECT } select { @@ -2608,7 +2645,7 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType, accName string s.grWG.Done() return } - s.connectToRoute(rURL, tryForEver, false, accName) + s.connectToRoute(rURL, rtype, false, false, accName) } // Checks to make sure the route is still valid. @@ -2621,21 +2658,26 @@ func (s *Server) routeStillValid(rURL *url.URL) bool { return false } -func (s *Server) connectToRoute(rURL *url.URL, tryForEver, firstConnect bool, accName string) { +func (s *Server) connectToRoute(rURL *url.URL, rtype RouteType, firstConnect, noGossip bool, accName string) { + defer s.grWG.Done() + if rURL == nil { + return + } + // For explicit routes, we will try to connect until we succeed. For implicit + // we will try only based on the number of ConnectRetries optin. + tryForEver := rtype == Explicit + // Snapshot server options. opts := s.getOpts() - defer s.grWG.Done() - const connErrFmt = "Error trying to connect to route (attempt %v): %v" - s.mu.Lock() + s.mu.RLock() resolver := s.routeResolver excludedAddresses := s.routesToSelf - s.mu.Unlock() + s.mu.RUnlock() - attempts := 0 - for s.isRunning() && rURL != nil { + for attempts := 0; s.isRunning(); { if tryForEver { if !s.routeStillValid(rURL) { return @@ -2689,7 +2731,7 @@ func (s *Server) connectToRoute(rURL *url.URL, tryForEver, firstConnect bool, ac // We have a route connection here. // Go ahead and create it and exit this func. - s.createRoute(conn, rURL, accName) + s.createRoute(conn, rURL, rtype, noGossip, accName) return } } @@ -2718,13 +2760,13 @@ func (s *Server) solicitRoutes(routes []*url.URL, accounts []string) { s.saveRouteTLSName(routes) for _, r := range routes { route := r - s.startGoRoutine(func() { s.connectToRoute(route, true, true, _EMPTY_) }) + s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, false, _EMPTY_) }) } // Now go over possible per-account routes and create them. for _, an := range accounts { for _, r := range routes { route, accName := r, an - s.startGoRoutine(func() { s.connectToRoute(route, true, true, accName) }) + s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, false, accName) }) } } } @@ -2846,7 +2888,7 @@ func (s *Server) removeRoute(c *client) { opts = s.getOpts() rURL *url.URL noPool bool - didSolicit bool + rtype RouteType ) c.mu.Lock() cid := c.cid @@ -2865,7 +2907,7 @@ func (s *Server) removeRoute(c *client) { connectURLs = r.connectURLs wsConnectURLs = r.wsConnURLs rURL = r.url - didSolicit = r.didSolicit + rtype = r.routeType } c.mu.Unlock() if accName != _EMPTY_ { @@ -2928,12 +2970,12 @@ func (s *Server) removeRoute(c *client) { // this remote was a "no pool" route, attempt to reconnect. if noPool { if s.routesPoolSize > 1 { - s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, _EMPTY_) }) + s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, false, _EMPTY_) }) } if len(opts.Cluster.PinnedAccounts) > 0 { for _, an := range opts.Cluster.PinnedAccounts { accName := an - s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, accName) }) + s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, false, accName) }) } } } diff --git a/server/routes_test.go b/server/routes_test.go index acb626cd46..97c790d9d8 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -738,15 +738,10 @@ func TestClientConnectToRoutePort(t *testing.T) { } type checkDuplicateRouteLogger struct { - sync.Mutex + DummyLogger gotDuplicate bool } -func (l *checkDuplicateRouteLogger) Noticef(format string, v ...any) {} -func (l *checkDuplicateRouteLogger) Errorf(format string, v ...any) {} -func (l *checkDuplicateRouteLogger) Warnf(format string, v ...any) {} -func (l *checkDuplicateRouteLogger) Fatalf(format string, v ...any) {} -func (l *checkDuplicateRouteLogger) Tracef(format string, v ...any) {} func (l *checkDuplicateRouteLogger) Debugf(format string, v ...any) { l.Lock() defer l.Unlock() @@ -3357,7 +3352,8 @@ func TestRoutePoolAndPerAccountWithOlderServer(t *testing.T) { type testDuplicateRouteLogger struct { DummyLogger - ch chan struct{} + ch chan struct{} + count int } func (l *testDuplicateRouteLogger) Noticef(format string, args ...any) { @@ -3369,6 +3365,9 @@ func (l *testDuplicateRouteLogger) Noticef(format string, args ...any) { case l.ch <- struct{}{}: default: } + l.Mutex.Lock() + l.count++ + l.Mutex.Unlock() } // This test will make sure that a server with pooling does not @@ -4228,3 +4227,176 @@ func TestRouteNoRaceOnClusterNameNegotiation(t *testing.T) { s1.Shutdown() } } + +func TestRouteImplicitNotTooManyDuplicates(t *testing.T) { + for _, test := range []struct { + name string + pooling bool + compression bool + }{ + {"no pooling-no compression", false, false}, + {"no pooling-compression", false, true}, + {"pooling-no compression", true, false}, + {"pooling-compression", true, true}, + } { + t.Run(test.name, func(t *testing.T) { + o := DefaultOptions() + o.ServerName = "SEED" + if !test.pooling { + o.Cluster.PoolSize = -1 + } + if !test.compression { + o.Cluster.Compression.Mode = CompressionOff + } + seed := RunServer(o) + defer seed.Shutdown() + + dl := &testDuplicateRouteLogger{} + + servers := make([]*Server, 0, 10) + for i := 0; i < cap(servers); i++ { + io := DefaultOptions() + io.ServerName = fmt.Sprintf("IMPLICIT_%d", i+1) + io.NoLog = false + io.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o.Cluster.Port)) + if !test.pooling { + io.Cluster.PoolSize = -1 + } + if !test.compression { + io.Cluster.Compression.Mode = CompressionOff + } + is, err := NewServer(io) + require_NoError(t, err) + // Will do defer of shutdown later. + is.SetLogger(dl, true, false) + is.Start() + servers = append(servers, is) + } + + allServers := make([]*Server, 0, len(servers)+1) + allServers = append(allServers, seed) + allServers = append(allServers, servers...) + + // Let's make sure that we wait for each server to be ready. + for _, s := range allServers { + if !s.ReadyForConnections(2 * time.Second) { + t.Fatalf("Server %q is not ready for connections", s) + } + } + + // Do the defer of shutdown of all servers this way instead of individual + // defers when starting them. It takes less time for the servers to shutdown + // this way. + defer func() { + for _, s := range allServers { + s.Shutdown() + } + }() + checkClusterFormed(t, allServers...) + + dl.Mutex.Lock() + count := dl.count + dl.Mutex.Unlock() + // Getting duplicates should not be considered fatal, it is an optimization + // to reduce the occurrences of those. But to make sure we don't have a + // regression, we will fail the test if we get say more than 20 or so ( + // without the code change, we would get more than 500 of duplicates). + if count > 20 { + t.Fatalf("Got more duplicates than anticipated: %v", count) + } + }) + } +} + +func TestRouteImplicitJoinsSeparateGroups(t *testing.T) { + // The test TestRouteImplicitNotTooManyDuplicates makes sure that we do + // not have too many duplicate routes cases when processing implicit routes. + // This test is to ensure that the code changes to reduce the number + // of duplicate routes does not prevent the formation of the cluster + // with the given setup (which is admittedly not good since a disconnect + // between some routes would not result in a reconnect leading to a full mesh). + // Still, original code was able to create the original full mesh, so we want + // to make sure that this is still possible. + for _, test := range []struct { + name string + pooling bool + compression bool + }{ + {"no pooling-no compression", false, false}, + {"no pooling-compression", false, true}, + {"pooling-no compression", true, false}, + {"pooling-compression", true, true}, + } { + t.Run(test.name, func(t *testing.T) { + setOpts := func(o *Options) { + if !test.pooling { + o.Cluster.PoolSize = -1 + } + if !test.compression { + o.Cluster.Compression.Mode = CompressionOff + } + } + + // Create a cluster s1/s2/s3 + o1 := DefaultOptions() + o1.ServerName = "S1" + setOpts(o1) + s1 := RunServer(o1) + defer s1.Shutdown() + + o2 := DefaultOptions() + o2.ServerName = "S2" + setOpts(o2) + o2.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port)) + s2 := RunServer(o2) + defer s2.Shutdown() + + tmpl := ` + server_name: "S3" + listen: "127.0.0.1:-1" + cluster { + name: "abc" + listen: "127.0.0.1:-1" + %s + %s + routes: ["nats://127.0.0.1:%d"%s] + } + ` + var poolCfg string + var compressionCfg string + if !test.pooling { + poolCfg = "pool_size: -1" + } + if !test.compression { + compressionCfg = "compression: off" + } + conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, poolCfg, compressionCfg, o1.Cluster.Port, _EMPTY_))) + s3, _ := RunServerWithConfig(conf) + defer s3.Shutdown() + + checkClusterFormed(t, s1, s2, s3) + + // Now s4 and s5 connected to each other, but not linked to s1/s2/s3 + o4 := DefaultOptions() + o4.ServerName = "S4" + setOpts(o4) + s4 := RunServer(o4) + defer s4.Shutdown() + + o5 := DefaultOptions() + o5.ServerName = "S5" + setOpts(o5) + o5.Routes = RoutesFromStr(fmt.Sprintf("nats://127.0.0.1:%d", o4.Cluster.Port)) + s5 := RunServer(o5) + defer s5.Shutdown() + + checkClusterFormed(t, s4, s5) + + // Now add a route from s3 to s4 and make sure that we have a full mesh. + routeToS4 := fmt.Sprintf(`, "nats://127.0.0.1:%d"`, o4.Cluster.Port) + reloadUpdateConfig(t, s3, conf, fmt.Sprintf(tmpl, poolCfg, compressionCfg, o1.Cluster.Port, routeToS4)) + + checkClusterFormed(t, s1, s2, s3, s4, s5) + }) + } +} diff --git a/server/server.go b/server/server.go index cc3130ebe5..dd9a12f23e 100644 --- a/server/server.go +++ b/server/server.go @@ -98,6 +98,7 @@ type Info struct { RoutePoolIdx int `json:"route_pool_idx,omitempty"` RouteAccount string `json:"route_account,omitempty"` RouteAccReqID string `json:"route_acc_add_reqid,omitempty"` + NoGossip bool `json:"no_gossip,omitempty"` // Gateways Specific Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO) From 2df3224cddd6b685c44e675d40ebc8958a3039a9 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 18 Jul 2024 14:19:38 +0100 Subject: [PATCH 02/16] Fix panic on `/raftz` when shutting down Signed-off-by: Neil Twigg --- server/monitor.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index b72ee09d57..54a9ed838f 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -3787,8 +3787,14 @@ func (s *Server) HandleRaftz(w http.ResponseWriter, r *http.Request) { gfilter := r.URL.Query().Get("group") afilter := r.URL.Query().Get("acc") - if afilter == "" { - afilter = s.SystemAccount().Name + if afilter == _EMPTY_ { + if sys := s.SystemAccount(); sys != nil { + afilter = sys.Name + } else { + w.WriteHeader(404) + w.Write([]byte("System account not found, the server may be shutting down")) + return + } } groups := map[string]RaftNode{} From d08d1f0f29ea08f957d8604e24be4dd55f9c3496 Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Thu, 18 Jul 2024 01:24:47 +0900 Subject: [PATCH 03/16] Add ldflag to srv_pkg_non_js_tests tests Signed-off-by: Waldemar Quevedo --- scripts/runTestsOnTravis.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/runTestsOnTravis.sh b/scripts/runTestsOnTravis.sh index 7561bd72dd..f60a952831 100755 --- a/scripts/runTestsOnTravis.sh +++ b/scripts/runTestsOnTravis.sh @@ -107,7 +107,8 @@ elif [ "$1" = "srv_pkg_non_js_tests" ]; then # by using `skip_js_tests`, MQTT tests by using `skip_mqtt_tests` and # message tracing tests by using `skip_msgtrace_tests`. - go test -race -v -p=1 ./server/... -tags=skip_store_tests,skip_js_tests,skip_mqtt_tests,skip_msgtrace_tests -count=1 -vet=off -timeout=30m -failfast + # Also including the ldflag with the version since this includes the `TestVersionMatchesTag`. + go test -race -v -p=1 ./server/... -ldflags="-X=github.com/nats-io/nats-server/v2/server.serverVersion=$TRAVIS_TAG" -tags=skip_store_tests,skip_js_tests,skip_mqtt_tests,skip_msgtrace_tests -count=1 -vet=off -timeout=30m -failfast elif [ "$1" = "non_srv_pkg_tests" ]; then From d8c2b388e33066499e5a063297f1636714a098c5 Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Tue, 25 Jun 2024 08:40:46 -0700 Subject: [PATCH 04/16] Leak using just JS --- server/mqtt_ex_leak_investigate_test.go | 198 ++++++++++++++++++++++++ server/mqtt_ex_test_test.go | 15 ++ server/mqtt_test.go | 51 +++--- 3 files changed, 245 insertions(+), 19 deletions(-) create mode 100644 server/mqtt_ex_leak_investigate_test.go diff --git a/server/mqtt_ex_leak_investigate_test.go b/server/mqtt_ex_leak_investigate_test.go new file mode 100644 index 0000000000..9749694ff2 --- /dev/null +++ b/server/mqtt_ex_leak_investigate_test.go @@ -0,0 +1,198 @@ +// Copyright 2024 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !skip_mqtt_tests +// +build !skip_mqtt_tests + +package server + +import ( + "runtime" + "sync" + "testing" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nuid" +) + +func TestJetstreamConsumerLeak(t *testing.T) { + + QOS := byte(2) + NSubscribers := 1000 + NConcurrentSubscribers := 100 + + clusterConf := ` + listen: 127.0.0.1:-1 + + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + leafnodes { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + cl := createJetStreamClusterWithTemplate(t, clusterConf, "Leak-test", 3) + cl.waitOnLeader() + + s := cl.randomNonLeader() + testMQTTInitializeStreams(t, s) + + // Write the memory profile before starting the test + // w, _ := os.Create("before.pprof") + // pprof.WriteHeapProfile(w) + + before := &runtime.MemStats{} + runtime.GC() + runtime.ReadMemStats(before) + + testMQTTConnSubReceiveDiscConcurrent(t, s, QOS, NSubscribers, NConcurrentSubscribers, testMQTTConnSubDiscJS) + + // Sleep for a few seconds to see if some timers kick in and help cleanup? + time.Sleep(10 * time.Second) + + after := &runtime.MemStats{} + runtime.GC() + runtime.ReadMemStats(after) + + limit := before.HeapInuse + 100*1024*1024 // 100MB + if after.HeapInuse > limit { + t.Fatalf("Memory usage too high: %v", after.HeapInuse) + } + + // runtime.GC() + // w, _ = os.Create("after.pprof") + // pprof.WriteHeapProfile(w) + +} + +func testMQTTInitializeStreams(t *testing.T, server *Server) { + nc, js := jsClientConnect(t, server, nats.UserInfo("one", "p")) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: mqttStreamName, + Subjects: []string{mqttStreamSubjectPrefix + ">"}, + Storage: nats.FileStorage, + Retention: nats.InterestPolicy, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + _, err = js.AddStream(&nats.StreamConfig{ + Name: mqttOutStreamName, + Subjects: []string{mqttOutSubjectPrefix + ">"}, + Storage: nats.FileStorage, + Retention: nats.InterestPolicy, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } +} + +func testMQTTConnSubDiscJS(t *testing.T, server *Server, QOS byte, iSub int) { + nc, js := jsClientConnect(t, server, nats.UserInfo("one", "p")) + defer nc.Close() + + // make sure the MQTT streams are accessible to us + _, err := js.StreamInfo(mqttStreamName) + if err != nil { + t.Fatalf("Error on JetStream stream info: %v", err) + } + _, err = js.StreamInfo(mqttOutStreamName) + if err != nil { + t.Fatalf("Error on JetStream stream info: %v", err) + } + + start := time.Now() + pubrelConsumerName := mqttPubRelConsumerDurablePrefix + nuid.Next() + _, err = js.AddConsumer(mqttOutStreamName, &nats.ConsumerConfig{ + DeliverSubject: "pubrel-delivery_" + nuid.Next(), + Durable: pubrelConsumerName, + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverNewPolicy, + FilterSubject: mqttPubRelSubjectPrefix + nuid.Next(), + AckWait: mqttDefaultAckWait, + MaxAckPending: mqttDefaultMaxAckPending, + MemoryStorage: false, + }) + if err != nil { + t.Fatalf("Error on JetStream consumer creation: %v", err) + } + + subConsumerName := "sessid_" + nuid.Next() + _, err = js.AddConsumer(mqttStreamName, &nats.ConsumerConfig{ + DeliverSubject: "inbox", + Durable: subConsumerName, + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverNewPolicy, + FilterSubject: mqttStreamSubjectPrefix + "subject", + AckWait: mqttDefaultAckWait, + MaxAckPending: mqttDefaultMaxAckPending, + MemoryStorage: false, + }) + if err != nil { + t.Fatalf("Error on JetStream consumer creation: %v", err) + } + t.Logf("<>/<> SUB %v: Now %v, created 2 consumers", iSub, time.Since(start)) + + err = js.DeleteConsumer(mqttOutStreamName, pubrelConsumerName) + if err != nil { + t.Fatalf("Error on JetStream consumer deletion: %v", err) + } + err = js.DeleteConsumer(mqttStreamName, subConsumerName) + if err != nil { + t.Fatalf("Error on JetStream consumer deletion: %v", err) + } + t.Logf("SUB %v: Now %v, deleted 2 consumers", iSub, time.Since(start)) +} + +func testMQTTConnSubReceiveDiscConcurrent( + t *testing.T, server *Server, QOS byte, NSubscribers int, NConcurrentSubscribers int, + subf func(t *testing.T, server *Server, QOS byte, n int), +) { + ConcurrentSubscribers := make(chan struct{}, NConcurrentSubscribers) + for i := 0; i < NConcurrentSubscribers; i++ { + ConcurrentSubscribers <- struct{}{} + } + + wg := sync.WaitGroup{} + wg.Add(NSubscribers) + // Start concurrent subscribers. Each will receive 1 to 3 messages, then quit. + go func() { + for iSub := 0; iSub < NSubscribers; { + // wait for a slot to open up + <-ConcurrentSubscribers + iSub++ + go func(c int) { + subf(t, server, QOS, c) + wg.Done() + ConcurrentSubscribers <- struct{}{} + }(iSub) + } + }() + wg.Wait() +} diff --git a/server/mqtt_ex_test_test.go b/server/mqtt_ex_test_test.go index 9acad55877..44a4135581 100644 --- a/server/mqtt_ex_test_test.go +++ b/server/mqtt_ex_test_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "io" + "net" "os" "os/exec" "strconv" @@ -37,6 +38,7 @@ type mqttTarget struct { clusters []*cluster configs []mqttTestConfig all []mqttDial + allNATS []string } type mqttTestConfig struct { @@ -194,6 +196,16 @@ func (d mqttDial) Get() (u, p, s, c string) { return u, p, s, c } +func (d mqttDial) GetHostPort() (host string, port int) { + _, _, s, _ := d.Get() + host, portS, err := net.SplitHostPort(s) + if err != nil { + return s, 0 + } + port, _ = strconv.Atoi(portS) + return host, port +} + func (d mqttDial) Name() string { _, _, _, c := d.Get() return c @@ -285,13 +297,16 @@ func mqttMakeTestCluster(size int, domain string) func(tb testing.TB) *mqttTarge cl.waitOnLeader() all := []mqttDial{} + allNATS := []string{} for _, s := range cl.servers { all = append(all, mqttNewDialForServer(s, "one", "p")) + allNATS = append(allNATS, string(mqttNewDial("one", "p", s.getOpts().Host, s.getOpts().Port, ""))) } return &mqttTarget{ clusters: []*cluster{cl}, all: all, + allNATS: allNATS, configs: []mqttTestConfig{ { name: "publish to one", diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 09b372b910..8508379f11 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -329,8 +329,11 @@ func testMQTTRunServer(t testing.TB, o *Options) *Server { if err != nil { t.Fatalf("Error creating server: %v", err) } - l := &DummyLogger{} - s.SetLogger(l, true, true) + // l := &DummyLogger{} + // s.SetLogger(l, true, true) + o.Debug = false + o.Trace = false + s.ConfigureLogger() s.Start() if err := s.readyForConnections(3 * time.Second); err != nil { testMQTTShutdownServer(s) @@ -1907,23 +1910,7 @@ func TestMQTTParseSub(t *testing.T) { func testMQTTSub(t testing.TB, pi uint16, c net.Conn, r *mqttReader, filters []*mqttFilter, expected []byte) { t.Helper() - w := newMQTTWriter(0) - pkLen := 2 // for pi - for i := 0; i < len(filters); i++ { - f := filters[i] - pkLen += 2 + len(f.filter) + 1 - } - w.WriteByte(mqttPacketSub | mqttSubscribeFlags) - w.WriteVarInt(pkLen) - w.WriteUint16(pi) - for i := 0; i < len(filters); i++ { - f := filters[i] - w.WriteBytes([]byte(f.filter)) - w.WriteByte(f.qos) - } - if _, err := testMQTTWrite(c, w.Bytes()); err != nil { - t.Fatalf("Error writing SUBSCRIBE protocol: %v", err) - } + testMQTTSubNoAck(t, pi, c, r, filters, expected) b, pl := testMQTTReadPacket(t, r) if pt := b & mqttPacketMask; pt != mqttPacketSubAck { t.Fatalf("Expected SUBACK packet %x, got %x", mqttPacketSubAck, pt) @@ -1945,6 +1932,27 @@ func testMQTTSub(t testing.TB, pi uint16, c net.Conn, r *mqttReader, filters []* } } +func testMQTTSubNoAck(t testing.TB, pi uint16, c net.Conn, r *mqttReader, filters []*mqttFilter, expected []byte) { + t.Helper() + w := newMQTTWriter(0) + pkLen := 2 // for pi + for i := 0; i < len(filters); i++ { + f := filters[i] + pkLen += 2 + len(f.filter) + 1 + } + w.WriteByte(mqttPacketSub | mqttSubscribeFlags) + w.WriteVarInt(pkLen) + w.WriteUint16(pi) + for i := 0; i < len(filters); i++ { + f := filters[i] + w.WriteBytes([]byte(f.filter)) + w.WriteByte(f.qos) + } + if _, err := testMQTTWrite(c, w.Bytes()); err != nil { + t.Fatalf("Error writing SUBSCRIBE protocol: %v", err) + } +} + func TestMQTTSubAck(t *testing.T) { o := testMQTTDefaultOptions() s := testMQTTRunServer(t, o) @@ -2098,6 +2106,11 @@ func testMQTTReadPubPacket(t testing.TB, r *mqttReader) (flags byte, pi uint16, if pt := b & mqttPacketMask; pt != mqttPacketPub { t.Fatalf("Expected PUBLISH packet %x, got %x", mqttPacketPub, pt) } + return testMQTTReadPubPacketEx(t, r, b, pl) +} + +func testMQTTReadPubPacketEx(t testing.TB, r *mqttReader, b byte, pl int) (flags byte, pi uint16, topic string, payload []byte) { + t.Helper() flags = b & mqttPacketFlagMask start := r.pos topic, err := r.readString("topic name") From 3f8ca985ddffc2d2205bea9dd7cd74467aa2f45e Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Wed, 26 Jun 2024 12:13:54 -0700 Subject: [PATCH 05/16] [ci skip] Small test updates Signed-off-by: Derek Collison --- server/mqtt_ex_leak_investigate_test.go | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/server/mqtt_ex_leak_investigate_test.go b/server/mqtt_ex_leak_investigate_test.go index 9749694ff2..c05091f538 100644 --- a/server/mqtt_ex_leak_investigate_test.go +++ b/server/mqtt_ex_leak_investigate_test.go @@ -17,7 +17,9 @@ package server import ( + "os" "runtime" + "runtime/pprof" "sync" "testing" "time" @@ -54,14 +56,17 @@ func TestJetstreamConsumerLeak(t *testing.T) { } ` cl := createJetStreamClusterWithTemplate(t, clusterConf, "Leak-test", 3) + defer cl.shutdown() + cl.waitOnLeader() s := cl.randomNonLeader() testMQTTInitializeStreams(t, s) // Write the memory profile before starting the test - // w, _ := os.Create("before.pprof") - // pprof.WriteHeapProfile(w) + w, _ := os.Create("before.pprof") + pprof.WriteHeapProfile(w) + w.Close() before := &runtime.MemStats{} runtime.GC() @@ -72,6 +77,11 @@ func TestJetstreamConsumerLeak(t *testing.T) { // Sleep for a few seconds to see if some timers kick in and help cleanup? time.Sleep(10 * time.Second) + runtime.GC() + w, _ = os.Create("after.pprof") + pprof.WriteHeapProfile(w) + w.Close() + after := &runtime.MemStats{} runtime.GC() runtime.ReadMemStats(after) @@ -80,11 +90,6 @@ func TestJetstreamConsumerLeak(t *testing.T) { if after.HeapInuse > limit { t.Fatalf("Memory usage too high: %v", after.HeapInuse) } - - // runtime.GC() - // w, _ = os.Create("after.pprof") - // pprof.WriteHeapProfile(w) - } func testMQTTInitializeStreams(t *testing.T, server *Server) { @@ -188,9 +193,11 @@ func testMQTTConnSubReceiveDiscConcurrent( <-ConcurrentSubscribers iSub++ go func(c int) { + defer func() { + ConcurrentSubscribers <- struct{}{} + wg.Done() + }() subf(t, server, QOS, c) - wg.Done() - ConcurrentSubscribers <- struct{}{} }(iSub) } }() From a51e54b263d189be89e220f6cdfd14b03b11ff8e Mon Sep 17 00:00:00 2001 From: Lev Brouk Date: Thu, 27 Jun 2024 10:04:24 -0700 Subject: [PATCH 06/16] Moved to jetstream_cluster_4_test.go and simplified --- server/jetstream_cluster_4_test.go | 117 ++++++++++++++ server/mqtt_ex_leak_investigate_test.go | 205 ------------------------ server/mqtt_ex_test_test.go | 15 -- server/mqtt_test.go | 51 +++--- 4 files changed, 136 insertions(+), 252 deletions(-) delete mode 100644 server/mqtt_ex_leak_investigate_test.go diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 8b4abd03e8..9642aeb8d1 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -23,6 +23,7 @@ import ( "math/rand" "os" "path/filepath" + "runtime" "strconv" "strings" "sync" @@ -1839,3 +1840,119 @@ func TestJetStreamClusterAckFloorBetweenLeaderAndFollowers(t *testing.T) { } } } + +// https://github.com/nats-io/nats-server/pull/5600 +func TestJetStreamClusterConsumerLeak(t *testing.T) { + N := 2000 // runs in under 10s, but significant enough to see the difference. + NConcurrent := 100 + + clusterConf := ` + listen: 127.0.0.1:-1 + + server_name: %s + jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} + + leafnodes { + listen: 127.0.0.1:-1 + } + + cluster { + name: %s + listen: 127.0.0.1:%d + routes = [%s] + } + + accounts { + ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } + $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } + } +` + + cl := createJetStreamClusterWithTemplate(t, clusterConf, "Leak-test", 3) + defer cl.shutdown() + cl.waitOnLeader() + + s := cl.randomNonLeader() + + // Create the test stream. + streamName := "LEAK_TEST_STREAM" + nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p")) + defer nc.Close() + _, err := js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: []string{"$SOMETHING.>"}, + Storage: nats.FileStorage, + Retention: nats.InterestPolicy, + Replicas: 3, + }) + if err != nil { + t.Fatalf("Error creating stream: %v", err) + } + + concurrent := make(chan struct{}, NConcurrent) + for i := 0; i < NConcurrent; i++ { + concurrent <- struct{}{} + } + errors := make(chan error, N) + + wg := sync.WaitGroup{} + wg.Add(N) + + // Gather the stats for comparison. + before := &runtime.MemStats{} + runtime.GC() + runtime.ReadMemStats(before) + + for i := 0; i < N; { + // wait for a slot to open up + <-concurrent + i++ + go func() { + defer func() { + concurrent <- struct{}{} + wg.Done() + }() + + nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p")) + defer nc.Close() + + consumerName := "sessid_" + nuid.Next() + _, err := js.AddConsumer(streamName, &nats.ConsumerConfig{ + DeliverSubject: "inbox", + Durable: consumerName, + AckPolicy: nats.AckExplicitPolicy, + DeliverPolicy: nats.DeliverNewPolicy, + FilterSubject: "$SOMETHING.ELSE.subject", + AckWait: 30 * time.Second, + MaxAckPending: 1024, + }) + if err != nil { + errors <- fmt.Errorf("Error on JetStream consumer creation: %v", err) + return + } + + err = js.DeleteConsumer(streamName, consumerName) + if err != nil { + errors <- fmt.Errorf("Error on JetStream consumer deletion: %v", err) + } + }() + } + + wg.Wait() + if len(errors) > 0 { + for err := range errors { + t.Fatalf("%v", err) + } + } + + after := &runtime.MemStats{} + runtime.GC() + runtime.ReadMemStats(after) + + // Before https://github.com/nats-io/nats-server/pull/5600 this test was + // adding 180Mb+ to HeapInuse. Now it's under 40Mb (ran locally on a Mac) + limit := before.HeapInuse + 100*1024*1024 // 100MB + if after.HeapInuse > before.HeapInuse+limit { + t.Fatalf("Extra memory usage too high: %v", after.HeapInuse-before.HeapInuse) + } +} diff --git a/server/mqtt_ex_leak_investigate_test.go b/server/mqtt_ex_leak_investigate_test.go deleted file mode 100644 index c05091f538..0000000000 --- a/server/mqtt_ex_leak_investigate_test.go +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright 2024 The NATS Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !skip_mqtt_tests -// +build !skip_mqtt_tests - -package server - -import ( - "os" - "runtime" - "runtime/pprof" - "sync" - "testing" - "time" - - "github.com/nats-io/nats.go" - "github.com/nats-io/nuid" -) - -func TestJetstreamConsumerLeak(t *testing.T) { - - QOS := byte(2) - NSubscribers := 1000 - NConcurrentSubscribers := 100 - - clusterConf := ` - listen: 127.0.0.1:-1 - - server_name: %s - jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'} - - leafnodes { - listen: 127.0.0.1:-1 - } - - cluster { - name: %s - listen: 127.0.0.1:%d - routes = [%s] - } - - accounts { - ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled } - $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] } - } -` - cl := createJetStreamClusterWithTemplate(t, clusterConf, "Leak-test", 3) - defer cl.shutdown() - - cl.waitOnLeader() - - s := cl.randomNonLeader() - testMQTTInitializeStreams(t, s) - - // Write the memory profile before starting the test - w, _ := os.Create("before.pprof") - pprof.WriteHeapProfile(w) - w.Close() - - before := &runtime.MemStats{} - runtime.GC() - runtime.ReadMemStats(before) - - testMQTTConnSubReceiveDiscConcurrent(t, s, QOS, NSubscribers, NConcurrentSubscribers, testMQTTConnSubDiscJS) - - // Sleep for a few seconds to see if some timers kick in and help cleanup? - time.Sleep(10 * time.Second) - - runtime.GC() - w, _ = os.Create("after.pprof") - pprof.WriteHeapProfile(w) - w.Close() - - after := &runtime.MemStats{} - runtime.GC() - runtime.ReadMemStats(after) - - limit := before.HeapInuse + 100*1024*1024 // 100MB - if after.HeapInuse > limit { - t.Fatalf("Memory usage too high: %v", after.HeapInuse) - } -} - -func testMQTTInitializeStreams(t *testing.T, server *Server) { - nc, js := jsClientConnect(t, server, nats.UserInfo("one", "p")) - defer nc.Close() - - _, err := js.AddStream(&nats.StreamConfig{ - Name: mqttStreamName, - Subjects: []string{mqttStreamSubjectPrefix + ">"}, - Storage: nats.FileStorage, - Retention: nats.InterestPolicy, - Replicas: 3, - }) - if err != nil { - t.Fatalf("Error creating stream: %v", err) - } - _, err = js.AddStream(&nats.StreamConfig{ - Name: mqttOutStreamName, - Subjects: []string{mqttOutSubjectPrefix + ">"}, - Storage: nats.FileStorage, - Retention: nats.InterestPolicy, - Replicas: 3, - }) - if err != nil { - t.Fatalf("Error creating stream: %v", err) - } -} - -func testMQTTConnSubDiscJS(t *testing.T, server *Server, QOS byte, iSub int) { - nc, js := jsClientConnect(t, server, nats.UserInfo("one", "p")) - defer nc.Close() - - // make sure the MQTT streams are accessible to us - _, err := js.StreamInfo(mqttStreamName) - if err != nil { - t.Fatalf("Error on JetStream stream info: %v", err) - } - _, err = js.StreamInfo(mqttOutStreamName) - if err != nil { - t.Fatalf("Error on JetStream stream info: %v", err) - } - - start := time.Now() - pubrelConsumerName := mqttPubRelConsumerDurablePrefix + nuid.Next() - _, err = js.AddConsumer(mqttOutStreamName, &nats.ConsumerConfig{ - DeliverSubject: "pubrel-delivery_" + nuid.Next(), - Durable: pubrelConsumerName, - AckPolicy: nats.AckExplicitPolicy, - DeliverPolicy: nats.DeliverNewPolicy, - FilterSubject: mqttPubRelSubjectPrefix + nuid.Next(), - AckWait: mqttDefaultAckWait, - MaxAckPending: mqttDefaultMaxAckPending, - MemoryStorage: false, - }) - if err != nil { - t.Fatalf("Error on JetStream consumer creation: %v", err) - } - - subConsumerName := "sessid_" + nuid.Next() - _, err = js.AddConsumer(mqttStreamName, &nats.ConsumerConfig{ - DeliverSubject: "inbox", - Durable: subConsumerName, - AckPolicy: nats.AckExplicitPolicy, - DeliverPolicy: nats.DeliverNewPolicy, - FilterSubject: mqttStreamSubjectPrefix + "subject", - AckWait: mqttDefaultAckWait, - MaxAckPending: mqttDefaultMaxAckPending, - MemoryStorage: false, - }) - if err != nil { - t.Fatalf("Error on JetStream consumer creation: %v", err) - } - t.Logf("<>/<> SUB %v: Now %v, created 2 consumers", iSub, time.Since(start)) - - err = js.DeleteConsumer(mqttOutStreamName, pubrelConsumerName) - if err != nil { - t.Fatalf("Error on JetStream consumer deletion: %v", err) - } - err = js.DeleteConsumer(mqttStreamName, subConsumerName) - if err != nil { - t.Fatalf("Error on JetStream consumer deletion: %v", err) - } - t.Logf("SUB %v: Now %v, deleted 2 consumers", iSub, time.Since(start)) -} - -func testMQTTConnSubReceiveDiscConcurrent( - t *testing.T, server *Server, QOS byte, NSubscribers int, NConcurrentSubscribers int, - subf func(t *testing.T, server *Server, QOS byte, n int), -) { - ConcurrentSubscribers := make(chan struct{}, NConcurrentSubscribers) - for i := 0; i < NConcurrentSubscribers; i++ { - ConcurrentSubscribers <- struct{}{} - } - - wg := sync.WaitGroup{} - wg.Add(NSubscribers) - // Start concurrent subscribers. Each will receive 1 to 3 messages, then quit. - go func() { - for iSub := 0; iSub < NSubscribers; { - // wait for a slot to open up - <-ConcurrentSubscribers - iSub++ - go func(c int) { - defer func() { - ConcurrentSubscribers <- struct{}{} - wg.Done() - }() - subf(t, server, QOS, c) - }(iSub) - } - }() - wg.Wait() -} diff --git a/server/mqtt_ex_test_test.go b/server/mqtt_ex_test_test.go index 44a4135581..9acad55877 100644 --- a/server/mqtt_ex_test_test.go +++ b/server/mqtt_ex_test_test.go @@ -21,7 +21,6 @@ import ( "encoding/json" "fmt" "io" - "net" "os" "os/exec" "strconv" @@ -38,7 +37,6 @@ type mqttTarget struct { clusters []*cluster configs []mqttTestConfig all []mqttDial - allNATS []string } type mqttTestConfig struct { @@ -196,16 +194,6 @@ func (d mqttDial) Get() (u, p, s, c string) { return u, p, s, c } -func (d mqttDial) GetHostPort() (host string, port int) { - _, _, s, _ := d.Get() - host, portS, err := net.SplitHostPort(s) - if err != nil { - return s, 0 - } - port, _ = strconv.Atoi(portS) - return host, port -} - func (d mqttDial) Name() string { _, _, _, c := d.Get() return c @@ -297,16 +285,13 @@ func mqttMakeTestCluster(size int, domain string) func(tb testing.TB) *mqttTarge cl.waitOnLeader() all := []mqttDial{} - allNATS := []string{} for _, s := range cl.servers { all = append(all, mqttNewDialForServer(s, "one", "p")) - allNATS = append(allNATS, string(mqttNewDial("one", "p", s.getOpts().Host, s.getOpts().Port, ""))) } return &mqttTarget{ clusters: []*cluster{cl}, all: all, - allNATS: allNATS, configs: []mqttTestConfig{ { name: "publish to one", diff --git a/server/mqtt_test.go b/server/mqtt_test.go index 8508379f11..09b372b910 100644 --- a/server/mqtt_test.go +++ b/server/mqtt_test.go @@ -329,11 +329,8 @@ func testMQTTRunServer(t testing.TB, o *Options) *Server { if err != nil { t.Fatalf("Error creating server: %v", err) } - // l := &DummyLogger{} - // s.SetLogger(l, true, true) - o.Debug = false - o.Trace = false - s.ConfigureLogger() + l := &DummyLogger{} + s.SetLogger(l, true, true) s.Start() if err := s.readyForConnections(3 * time.Second); err != nil { testMQTTShutdownServer(s) @@ -1910,7 +1907,23 @@ func TestMQTTParseSub(t *testing.T) { func testMQTTSub(t testing.TB, pi uint16, c net.Conn, r *mqttReader, filters []*mqttFilter, expected []byte) { t.Helper() - testMQTTSubNoAck(t, pi, c, r, filters, expected) + w := newMQTTWriter(0) + pkLen := 2 // for pi + for i := 0; i < len(filters); i++ { + f := filters[i] + pkLen += 2 + len(f.filter) + 1 + } + w.WriteByte(mqttPacketSub | mqttSubscribeFlags) + w.WriteVarInt(pkLen) + w.WriteUint16(pi) + for i := 0; i < len(filters); i++ { + f := filters[i] + w.WriteBytes([]byte(f.filter)) + w.WriteByte(f.qos) + } + if _, err := testMQTTWrite(c, w.Bytes()); err != nil { + t.Fatalf("Error writing SUBSCRIBE protocol: %v", err) + } b, pl := testMQTTReadPacket(t, r) if pt := b & mqttPacketMask; pt != mqttPacketSubAck { t.Fatalf("Expected SUBACK packet %x, got %x", mqttPacketSubAck, pt) @@ -1932,27 +1945,6 @@ func testMQTTSub(t testing.TB, pi uint16, c net.Conn, r *mqttReader, filters []* } } -func testMQTTSubNoAck(t testing.TB, pi uint16, c net.Conn, r *mqttReader, filters []*mqttFilter, expected []byte) { - t.Helper() - w := newMQTTWriter(0) - pkLen := 2 // for pi - for i := 0; i < len(filters); i++ { - f := filters[i] - pkLen += 2 + len(f.filter) + 1 - } - w.WriteByte(mqttPacketSub | mqttSubscribeFlags) - w.WriteVarInt(pkLen) - w.WriteUint16(pi) - for i := 0; i < len(filters); i++ { - f := filters[i] - w.WriteBytes([]byte(f.filter)) - w.WriteByte(f.qos) - } - if _, err := testMQTTWrite(c, w.Bytes()); err != nil { - t.Fatalf("Error writing SUBSCRIBE protocol: %v", err) - } -} - func TestMQTTSubAck(t *testing.T) { o := testMQTTDefaultOptions() s := testMQTTRunServer(t, o) @@ -2106,11 +2098,6 @@ func testMQTTReadPubPacket(t testing.TB, r *mqttReader) (flags byte, pi uint16, if pt := b & mqttPacketMask; pt != mqttPacketPub { t.Fatalf("Expected PUBLISH packet %x, got %x", mqttPacketPub, pt) } - return testMQTTReadPubPacketEx(t, r, b, pl) -} - -func testMQTTReadPubPacketEx(t testing.TB, r *mqttReader, b byte, pl int) (flags byte, pi uint16, topic string, payload []byte) { - t.Helper() flags = b & mqttPacketFlagMask start := r.pos topic, err := r.readString("topic name") From e8b0ee7af8042ab1266129fb10aca81ed3190495 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 23 Jul 2024 14:41:25 +0100 Subject: [PATCH 07/16] NRG: Don't delete on-disk state if failing to create internal subs If we failed to start the Raft group subscriptions then we were calling `shutdown` with the `shouldDelete` flag set, which would nuke the state on disk, blowing away the WAL, the term and vote etc. However, this could happen if a Raft group tried to be started while the server was shutting down. When this happened, we would see a log entry saying `Error creating raft group: system account not setup` and then the Raft state would get deleted, so after a restart, all state was lost. This PR changes `shouldDelete` to false so that we preserve the state on disk for the next startup. Signed-off-by: Neil Twigg --- server/raft.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/raft.go b/server/raft.go index 347d788eb3..165cbbe91f 100644 --- a/server/raft.go +++ b/server/raft.go @@ -499,7 +499,7 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe // If we fail to do this for some reason then this is fatal — we cannot // continue setting up or the Raft node may be partially/totally isolated. if err := n.createInternalSubs(); err != nil { - n.shutdown(true) + n.shutdown(false) return nil, err } From 3b7bf7c952b2a364faeb3ef678a22fa393fd51d6 Mon Sep 17 00:00:00 2001 From: "R.I.Pienaar" Date: Wed, 24 Jul 2024 15:23:17 +0300 Subject: [PATCH 08/16] Send the correct struct to the restore API While validating the ideas in ADR-44 the proposed improvements caught the fact that a snapshot request was being sent to a restore API call. Tests passed because there was enough overlap in the structs but strictly should have been a failure due to the invalid request Signed-off-by: R.I.Pienaar --- server/jetstream_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index e8ef300f8e..6ff7ca0923 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -4258,6 +4258,7 @@ func TestJetStreamSnapshotsAPI(t *testing.T) { state = mset.state() mset.delete() + req, _ = json.Marshal(rreq) rmsg, err = nc2.Request(strings.ReplaceAll(JSApiStreamRestoreT, JSApiPrefix, "$JS.domain.API"), req, time.Second) if err != nil { t.Fatalf("Unexpected error on snapshot request: %v", err) From 138c8e5889df80f3752165c69075213bc52d4ab8 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Wed, 24 Jul 2024 15:23:54 +0100 Subject: [PATCH 09/16] Fix WQ retention issue due to `checkInterestState` / `isInterestRetention` Signed-off-by: Neil Twigg --- server/jetstream_cluster_4_test.go | 47 ++++++++++++++++++++++++++++++ server/stream.go | 2 +- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 9642aeb8d1..92c88bf12c 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -1956,3 +1956,50 @@ func TestJetStreamClusterConsumerLeak(t *testing.T) { t.Fatalf("Extra memory usage too high: %v", after.HeapInuse-before.HeapInuse) } } + +func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "wq_stream", + Subjects: []string{"something.>"}, + Storage: nats.FileStorage, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + for i := 0; i < 100; i++ { + n := (i % 5) + 1 + _, err := js.Publish(fmt.Sprintf("something.%d", n), nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe( + "something.5", + "wq_consumer_5", + nats.BindStream("wq_stream"), + nats.ConsumerReplicas(3), + ) + require_NoError(t, err) + + for { + msgs, _ := sub.Fetch(5) + if len(msgs) == 0 { + break + } + for _, msg := range msgs { + require_NoError(t, msg.AckSync()) + } + } + + si, err := js.StreamInfo("wq_stream") + require_NoError(t, err) + require_Equal(t, si.State.Msgs, 80) + require_Equal(t, si.State.NumDeleted, 20) + require_Equal(t, si.State.NumSubjects, 4) +} diff --git a/server/stream.go b/server/stream.go index a09afdbf32..5f4df467fa 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5363,7 +5363,7 @@ func (mset *stream) checkInterestState() { func (mset *stream) isInterestRetention() bool { mset.mu.RLock() defer mset.mu.RUnlock() - return mset.cfg.Retention != LimitsPolicy + return mset.cfg.Retention == InterestPolicy } // NumConsumers reports on number of active consumers for this stream. From f52627921da85501a250d35eb004537d6ede3d7c Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Thu, 25 Jul 2024 01:33:26 -0700 Subject: [PATCH 10/16] When checking consumer state for interest policy streams, make sure match filters (#5699) Signed-off-by: Derek Collison Signed-off-by: Derek Collison --- server/consumer.go | 26 +++++++++++++- server/stream.go | 85 +--------------------------------------------- 2 files changed, 26 insertions(+), 85 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index f797073275..6eb0b4745b 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -2898,6 +2898,28 @@ func (o *consumer) isFiltered() bool { return false } +// Check if we would have matched and needed an ack for this store seq. +// This is called for interest based retention streams to remove messages. +func (o *consumer) matchAck(sseq uint64) bool { + o.mu.RLock() + defer o.mu.RUnlock() + + // Check if we are filtered, and if so check if this is even applicable to us. + if o.isFiltered() { + if o.mset == nil { + return false + } + var svp StoreMsg + if _, err := o.mset.store.LoadMsg(sseq, &svp); err != nil { + return false + } + if !o.isFilteredMatch(svp.subj) { + return false + } + } + return true +} + // Check if we need an ack for this store seq. // This is called for interest based retention streams to remove messages. func (o *consumer) needAck(sseq uint64, subj string) bool { @@ -5499,7 +5521,9 @@ func (o *consumer) checkStateForInterestStream() error { } for seq := ss.FirstSeq; asflr > 0 && seq <= asflr; seq++ { - mset.ackMsg(o, seq) + if o.matchAck(seq) { + mset.ackMsg(o, seq) + } } o.mu.RLock() diff --git a/server/stream.go b/server/stream.go index 5f4df467fa..9137254607 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5272,98 +5272,15 @@ func (mset *stream) checkInterestState() { return } - var zeroAcks []*consumer - var lowAckFloor uint64 = math.MaxUint64 - for _, o := range mset.getConsumers() { o.checkStateForInterestStream() - - o.mu.Lock() - if o.isLeader() { - // We need to account for consumers with ack floor of zero. - // We will collect them and see if we need to check pending below. - if o.asflr == 0 { - zeroAcks = append(zeroAcks, o) - } else if o.asflr < lowAckFloor { - lowAckFloor = o.asflr - } - } else { - // We are a follower so only have the store state, so read that in. - state, err := o.store.State() - if err != nil { - // On error we will not have enough information to process correctly so bail. - o.mu.Unlock() - return - } - // We need to account for consumers with ack floor of zero. - if state.AckFloor.Stream == 0 { - zeroAcks = append(zeroAcks, o) - } else if state.AckFloor.Stream < lowAckFloor { - lowAckFloor = state.AckFloor.Stream - } - // We are a follower here but if we detect a drift from when we were previous leader correct here. - if o.asflr > state.AckFloor.Stream || o.sseq > state.Delivered.Stream+1 { - o.applyState(state) - } - } - o.mu.Unlock() } - - // If nothing was set we can bail. - if lowAckFloor == math.MaxUint64 { - return - } - - // Capture our current state. - // ok to do so without lock. - var state StreamState - mset.store.FastState(&state) - - if lowAckFloor <= state.FirstSeq { - return - } - - // Do not want to hold stream lock if calculating numPending. - // Check if we had any zeroAcks, we will need to check them. - for _, o := range zeroAcks { - var np uint64 - o.mu.RLock() - if o.isLeader() { - np = uint64(o.numPending()) - } else { - np, _ = o.calculateNumPending() - } - o.mu.RUnlock() - // This means we have pending and can not remove anything at this time. - if np > 0 { - return - } - } - - mset.mu.Lock() - defer mset.mu.Unlock() - - // Check which purge we need to perform. - if lowAckFloor <= state.LastSeq || state.Msgs == 0 { - // Purge the stream to lowest ack floor + 1 - mset.store.PurgeEx(_EMPTY_, lowAckFloor+1, 0) - } else { - // Here we have a low ack floor higher then our last seq. - // So we will just do normal purge. - mset.store.Purge() - } - - // Make sure to reset our local lseq. - mset.store.FastState(&state) - mset.lseq = state.LastSeq - // Also make sure we clear any pending acks. - mset.clearAllPreAcksBelowFloor(state.FirstSeq) } func (mset *stream) isInterestRetention() bool { mset.mu.RLock() defer mset.mu.RUnlock() - return mset.cfg.Retention == InterestPolicy + return mset.cfg.Retention != LimitsPolicy } // NumConsumers reports on number of active consumers for this stream. From 34c98b68c343b4b0cfed60c9386f02f6051e1e77 Mon Sep 17 00:00:00 2001 From: Byron Ruth Date: Fri, 26 Jul 2024 07:50:21 -0400 Subject: [PATCH 11/16] [jsz] Add StreamLeaderOnly filter option The stream state and replica info is only guaranteed to be accurate when returned by the leader of a given stream. This new option returns stream details only for the stream in which the server is the leader for that stream. For systems with many streams this can significantly reduce the amount of data returned when scraping across all servers since non-leader details will likely be ignored. Fix #5698 Signed-off-by: Byron Ruth --- server/monitor.go | 53 +++++++++++++++++++++++++----------------- server/monitor_test.go | 19 +++++++++++++++ 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/server/monitor.go b/server/monitor.go index 54a9ed838f..e3d020e0f4 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -2724,15 +2724,16 @@ func (s *Server) accountInfo(accName string) (*AccountInfo, error) { // JSzOptions are options passed to Jsz type JSzOptions struct { - Account string `json:"account,omitempty"` - Accounts bool `json:"accounts,omitempty"` - Streams bool `json:"streams,omitempty"` - Consumer bool `json:"consumer,omitempty"` - Config bool `json:"config,omitempty"` - LeaderOnly bool `json:"leader_only,omitempty"` - Offset int `json:"offset,omitempty"` - Limit int `json:"limit,omitempty"` - RaftGroups bool `json:"raft,omitempty"` + Account string `json:"account,omitempty"` + Accounts bool `json:"accounts,omitempty"` + Streams bool `json:"streams,omitempty"` + Consumer bool `json:"consumer,omitempty"` + Config bool `json:"config,omitempty"` + LeaderOnly bool `json:"leader_only,omitempty"` + Offset int `json:"offset,omitempty"` + Limit int `json:"limit,omitempty"` + RaftGroups bool `json:"raft,omitempty"` + StreamLeaderOnly bool `json:"stream_leader_only,omitempty"` } // HealthzOptions are options passed to Healthz @@ -2806,7 +2807,7 @@ type JSInfo struct { AccountDetails []*AccountDetail `json:"account_details,omitempty"` } -func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft bool) *AccountDetail { +func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, optRaft, optStreamLeader bool) *AccountDetail { jsa.mu.RLock() acc := jsa.account name := acc.GetName() @@ -2852,6 +2853,10 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg, c := stream.config() cfg = &c } + // Skip if we are only looking for stream leaders. + if optStreamLeader && ci != nil && ci.Leader != s.Name() { + continue + } sdet := StreamDetail{ Name: stream.name(), Created: stream.createdTime(), @@ -2907,7 +2912,7 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) { if !ok { return nil, fmt.Errorf("account %q not jetstream enabled", acc) } - return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups), nil + return s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly), nil } // helper to get cluster info from node via dummy group @@ -3034,7 +3039,7 @@ func (s *Server) Jsz(opts *JSzOptions) (*JSInfo, error) { } // if wanted, obtain accounts/streams/consumer for _, jsa := range accounts { - detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups) + detail := s.accountDetail(jsa, opts.Streams, opts.Consumer, opts.Config, opts.RaftGroups, opts.StreamLeaderOnly) jsi.AccountDetails = append(jsi.AccountDetails, detail) } return jsi, nil @@ -3078,16 +3083,22 @@ func (s *Server) HandleJsz(w http.ResponseWriter, r *http.Request) { return } + sleader, err := decodeBool(w, r, "stream-leader-only") + if err != nil { + return + } + l, err := s.Jsz(&JSzOptions{ - r.URL.Query().Get("acc"), - accounts, - streams, - consumers, - config, - leader, - offset, - limit, - rgroups, + Account: r.URL.Query().Get("acc"), + Accounts: accounts, + Streams: streams, + Consumer: consumers, + Config: config, + LeaderOnly: leader, + Offset: offset, + Limit: limit, + RaftGroups: rgroups, + StreamLeaderOnly: sleader, }) if err != nil { w.WriteHeader(http.StatusBadRequest) diff --git a/server/monitor_test.go b/server/monitor_test.go index 79c82adacb..eec4d3229c 100644 --- a/server/monitor_test.go +++ b/server/monitor_test.go @@ -4414,6 +4414,25 @@ func TestMonitorJsz(t *testing.T) { } } }) + t.Run("stream-leader-only", func(t *testing.T) { + // First server + info := readJsInfo(monUrl1 + "?streams=true&stream-leader-only=1") + for _, a := range info.AccountDetails { + for _, s := range a.Streams { + if s.Cluster.Leader != srvs[0].serverName() { + t.Fatalf("expected stream leader to be %s but got %s", srvs[0].serverName(), s.Cluster.Leader) + } + } + } + info = readJsInfo(monUrl2 + "?streams=true&stream-leader-only=1") + for _, a := range info.AccountDetails { + for _, s := range a.Streams { + if s.Cluster.Leader != srvs[1].serverName() { + t.Fatalf("expected stream leader to be %s but got %s", srvs[0].serverName(), s.Cluster.Leader) + } + } + } + }) t.Run("consumers", func(t *testing.T) { for _, url := range []string{monUrl1, monUrl2} { info := readJsInfo(url + "?acc=ACC&consumers=true") From 60f5131be431b3f7ec8e9e9199149ab3bb02126f Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Fri, 26 Jul 2024 17:15:10 +0100 Subject: [PATCH 12/16] De-flake `TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer` Signed-off-by: Neil Twigg --- server/raft_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/raft_test.go b/server/raft_test.go index beb15d6348..02ba3bb84c 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -382,7 +382,7 @@ func TestNRGUnsuccessfulVoteRequestDoesntResetElectionTimer(t *testing.T) { leaderOriginal := leader.etlr followerOriginal := follower.etlr vr := &voteRequest{ - term: follower.term, + term: follower.term - 1, lastTerm: follower.term - 1, lastIndex: 0, candidate: follower.id, From 7e986c0bd6c1da95a05fbe11f75272d829b8c463 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Sun, 28 Jul 2024 17:41:53 +0200 Subject: [PATCH 13/16] Fix 'checkSkipFirstBlock' with '_EMPTY_' filter Signed-off-by: Maurice van Veen --- server/filestore.go | 5 +++++ server/filestore_test.go | 22 ++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/server/filestore.go b/server/filestore.go index e059d28bec..a6b97b6f62 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -2610,6 +2610,11 @@ func (fs *fileStore) FilteredState(sseq uint64, subj string) SimpleState { // This is used to see if we can selectively jump start blocks based on filter subject and a floor block index. // Will return -1 if no matches at all. func (fs *fileStore) checkSkipFirstBlock(filter string, wc bool) (int, int) { + if filter == _EMPTY_ { + filter = fwcs + wc = true + } + start, stop := uint32(math.MaxUint32), uint32(0) if wc { fs.psim.Match(stringToBytes(filter), func(_ []byte, psi *psi) { diff --git a/server/filestore_test.go b/server/filestore_test.go index e3f40a89c9..b7adb8d77d 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -7248,6 +7248,28 @@ func TestFileStoreCheckSkipFirstBlockBug(t *testing.T) { require_NoError(t, err) } +// https://github.com/nats-io/nats-server/issues/5705 +func TestFileStoreCheckSkipFirstBlockEmptyFilter(t *testing.T) { + sd := t.TempDir() + fs, err := newFileStore( + FileStoreConfig{StoreDir: sd, BlockSize: 128}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + msg := []byte("hello") + // Create 4 blocks, each block holds 2 msgs + for i := 0; i < 4; i++ { + fs.StoreMsg("foo.22.bar", nil, msg) + fs.StoreMsg("foo.22.baz", nil, msg) + } + require_Equal(t, fs.numMsgBlocks(), 4) + + nbi, lbi := fs.checkSkipFirstBlock(_EMPTY_, false) + require_Equal(t, nbi, 0) + require_Equal(t, lbi, 3) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks /////////////////////////////////////////////////////////////////////////// From ef6815cc38a0d942c66eb7c1925d7ce79810f334 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 28 Jul 2024 12:43:30 -0700 Subject: [PATCH 14/16] When LoadNextMsg misses, make sure to consult psim but conservatively. We had a use case with millions of subjects and the last sequence checked being in the next to the last block. The consumer had a wildcard that matched lots of entries that were behind where we were. This would burn alot of cpu and when a stream had lots of consumers and they shift leadership this would introduce some instability due to all the cpu cycles. Signed-off-by: Derek Collison --- server/filestore.go | 5 ++++- server/filestore_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/server/filestore.go b/server/filestore.go index a6b97b6f62..f2e8242aad 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -6429,7 +6429,10 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store // Nothing found in this block. We missed, if first block (bi) check psim. // Similar to above if start <= first seq. // TODO(dlc) - For v2 track these by filter subject since they will represent filtered consumers. - if i == bi { + // We should not do this at all if we are already on the last block. + // Also if we are a wildcard do not check if large subject space. + const wcMaxSizeToCheck = 64 * 1024 + if i == bi && i < len(fs.blks)-1 && (!wc || fs.psim.Size() < wcMaxSizeToCheck) { nbi, lbi := fs.checkSkipFirstBlock(filter, wc) // Nothing available. if nbi < 0 || lbi <= bi { diff --git a/server/filestore_test.go b/server/filestore_test.go index b7adb8d77d..fa3b92df91 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -7528,6 +7528,37 @@ func Benchmark_FileStoreLoadNextMsgVerySparseMsgsInBetweenWithWildcard(b *testin } } +func Benchmark_FileStoreLoadNextManySubjectsWithWildcardNearLastBlock(b *testing.B) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: b.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*.*"}, Storage: FileStorage}) + require_NoError(b, err) + defer fs.Stop() + + // Small om purpose. + msg := []byte("ok") + + // Make first msg one that would match as well. + fs.StoreMsg("foo.1.baz", nil, msg) + // Add in a bunch of msgs. + // We need to make sure we have a range of subjects that could kick in a linear scan. + for i := 0; i < 1_000_000; i++ { + subj := fmt.Sprintf("foo.%d.bar", rand.Intn(100_000)+2) + fs.StoreMsg(subj, nil, msg) + } + // Make last msg one that would match as well. + fs.StoreMsg("foo.1.baz", nil, msg) + + b.ResetTimer() + + var smv StoreMsg + for i := 0; i < b.N; i++ { + // Make sure not first seq. + _, _, err := fs.LoadNextMsg("foo.*.baz", true, 999_990, &smv) + require_NoError(b, err) + } +} + func Benchmark_FileStoreLoadNextMsgVerySparseMsgsLargeTail(b *testing.B) { fs, err := newFileStore( FileStoreConfig{StoreDir: b.TempDir()}, From 61a37f0fc03748d83ab707556b5fd2ac25ee4d64 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Mon, 29 Jul 2024 11:19:18 +0100 Subject: [PATCH 15/16] Fix panic in `getAccAndResultFromCache` Signed-off-by: Neil Twigg --- server/client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/client.go b/server/client.go index 99134bd0c5..e68d70fa94 100644 --- a/server/client.go +++ b/server/client.go @@ -5436,7 +5436,9 @@ func (c *client) getAccAndResultFromCache() (*Account, *SublistResult) { if !ok { if c.kind == ROUTER && len(c.route.accName) > 0 { - acc = c.acc + if acc = c.acc; acc == nil { + return nil, nil + } } else { // Match correct account and sublist. if acc, _ = c.srv.LookupAccount(string(c.pa.account)); acc == nil { From bf174d06357446b0c6e316f9073b814ef52d9d20 Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Mon, 29 Jul 2024 14:33:21 -0700 Subject: [PATCH 16/16] Make sure to account for tombstone in rbytes to avoid potential excessive compact attempts. Signed-off-by: Derek Collison --- server/filestore.go | 22 ++++++++++++-- server/filestore_test.go | 66 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 2 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index f2e8242aad..975e312ed8 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4034,7 +4034,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( // All other more thorough cleanup will happen in syncBlocks logic. // Note that we do not have to store empty records for the deleted, so don't use to calculate. // TODO(dlc) - This should not be inline, should kick the sync routine. - if mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes && !isLastBlock { + if !isLastBlock && mb.shouldCompactInline() { mb.compact() } } @@ -4096,6 +4096,21 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) ( return true, nil } +// Tests whether we should try to compact this block while inline removing msgs. +// We will want rbytes to be over the minimum and have a 2x potential savings. +// Lock should be held. +func (mb *msgBlock) shouldCompactInline() bool { + return mb.rbytes > compactMinimum && mb.bytes*2 < mb.rbytes +} + +// Tests whether we should try to compact this block while running periodic sync. +// We will want rbytes to be over the minimum and have a 2x potential savings. +// Ignores 2MB minimum. +// Lock should be held. +func (mb *msgBlock) shouldCompactSync() bool { + return mb.bytes*2 < mb.rbytes +} + // This will compact and rewrite this block. This should only be called when we know we want to rewrite this block. // This should not be called on the lmb since we will prune tail deleted messages which could cause issues with // writing new messages. We will silently bail on any issues with the underlying block and let someone else detect. @@ -4989,6 +5004,9 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte } // Write index mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit) + } else { + // Make sure to account for tombstones in rbytes. + mb.rbytes += rl } fch, werr := mb.fch, mb.werr @@ -5332,7 +5350,7 @@ func (fs *fileStore) syncBlocks() { // Check if we should compact here as well. // Do not compact last mb. var needsCompact bool - if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.rbytes > mb.bytes { + if mb != lmb && mb.ensureRawBytesLoaded() == nil && mb.shouldCompactSync() { needsCompact = true markDirty = true } diff --git a/server/filestore_test.go b/server/filestore_test.go index fa3b92df91..a03d63e00a 100644 --- a/server/filestore_test.go +++ b/server/filestore_test.go @@ -7270,6 +7270,72 @@ func TestFileStoreCheckSkipFirstBlockEmptyFilter(t *testing.T) { require_Equal(t, lbi, 3) } +// https://github.com/nats-io/nats-server/issues/5702 +func TestFileStoreTombstoneRbytes(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir(), BlockSize: 1024}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + // Block can hold 24 msgs. + // So will fill one block and half of the other + msg := []byte("hello") + for i := 0; i < 34; i++ { + fs.StoreMsg("foo.22", nil, msg) + } + require_True(t, fs.numMsgBlocks() > 1) + // Now delete second half of first block which will place tombstones in second blk. + for seq := 11; seq <= 24; seq++ { + fs.RemoveMsg(uint64(seq)) + } + // Now check that rbytes has been properly accounted for in second block. + fs.mu.RLock() + blk := fs.blks[1] + fs.mu.RUnlock() + + blk.mu.RLock() + bytes, rbytes := blk.bytes, blk.rbytes + blk.mu.RUnlock() + require_True(t, rbytes > bytes) +} + +func TestFileStoreMsgBlockShouldCompact(t *testing.T) { + fs, err := newFileStore( + FileStoreConfig{StoreDir: t.TempDir()}, + StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}) + require_NoError(t, err) + defer fs.Stop() + + // 127 fit into a block. + msg := bytes.Repeat([]byte("Z"), 64*1024) + for i := 0; i < 190; i++ { + fs.StoreMsg("foo.22", nil, msg) + } + require_True(t, fs.numMsgBlocks() > 1) + // Now delete second half of first block which will place tombstones in second blk. + for seq := 64; seq <= 127; seq++ { + fs.RemoveMsg(uint64(seq)) + } + fs.mu.RLock() + fblk := fs.blks[0] + sblk := fs.blks[1] + fs.mu.RUnlock() + + fblk.mu.RLock() + bytes, rbytes := fblk.bytes, fblk.rbytes + shouldCompact := fblk.shouldCompactInline() + fblk.mu.RUnlock() + // Should have tripped compaction already. + require_Equal(t, bytes, rbytes) + require_False(t, shouldCompact) + + sblk.mu.RLock() + shouldCompact = sblk.shouldCompactInline() + sblk.mu.RUnlock() + require_False(t, shouldCompact) +} + /////////////////////////////////////////////////////////////////////////// // Benchmarks ///////////////////////////////////////////////////////////////////////////