From ce8e271b888e2214277992ebcbddb900f33e3453 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 2 Aug 2024 22:03:38 -0600 Subject: [PATCH 1/2] (2.11) [IMPROVED] Routing: reduce chances of duplicate implicit routes The PR#5602 solved the issue for typical seed setups, but it was found that the test `TestStressChainedSolicitWorks` would sometimes fail. This is a situation where we have S4->S3->S2->S1 and all servers start at the same type. This is not a typical setup, but still the regular gossip would allow the creation of the full mesh, so this PR helps in those situations. Related to #5602 Signed-off-by: Ivan Kozlovic --- server/route.go | 158 +++++++++++++++++++++++++++-------------------- server/server.go | 2 +- 2 files changed, 92 insertions(+), 68 deletions(-) diff --git a/server/route.go b/server/route.go index ca4d2bbb0ae..3570d727904 100644 --- a/server/route.go +++ b/server/route.go @@ -82,11 +82,18 @@ type route struct { // Selected compression mode, which may be different from the // server configured mode. compression string - // Transient value used to set the Info.NoGossip when initiating + // Transient value used to set the Info.GossipMode when initiating // an implicit route and sending to the remote. - noGossip bool + gossipMode byte } +// Do not change the values/order since they are exchanged between servers. +const ( + gossipDefault = byte(iota) + gossipDisabled + gossipOverride +) + type connectInfo struct { Echo bool `json:"echo"` Verbose bool `json:"verbose"` @@ -818,14 +825,14 @@ func (c *client) processRouteInfo(info *Info) { accName := string(c.route.accName) // Capture the noGossip value and reset it here. - noGossip := c.route.noGossip - c.route.noGossip = false + gossipMode := c.route.gossipMode + c.route.gossipMode = 0 // Check to see if we have this remote already registered. // This can happen when both servers have routes to each other. c.mu.Unlock() - if added := s.addRoute(c, didSolicit, sendDelayedInfo, noGossip, info, accName); added { + if added := s.addRoute(c, didSolicit, sendDelayedInfo, gossipMode, info, accName); added { if accName != _EMPTY_ { c.Debugf("Registering remote route %q for account %q", info.ID, accName) } else { @@ -859,7 +866,7 @@ func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName, if needsCompression(cm) { // Generate an INFO with the chosen compression mode. s.mu.Lock() - infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0, false) + infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0, 0) s.mu.Unlock() // If we solicited, then send this INFO protocol BEFORE switching @@ -1026,7 +1033,7 @@ func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) { if info.AuthRequired { r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password) } - s.startGoRoutine(func() { s.connectToRoute(r, Implicit, true, info.NoGossip, info.RouteAccount) }) + s.startGoRoutine(func() { s.connectToRoute(r, Implicit, true, info.GossipMode, info.RouteAccount) }) // If we are processing an implicit route from a route that does not // support pooling/pinned-accounts, we won't receive an INFO for each of // the pinned-accounts that we would normally receive. In that case, just @@ -1036,7 +1043,7 @@ func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) { rURL := r for _, an := range opts.Cluster.PinnedAccounts { accName := an - s.startGoRoutine(func() { s.connectToRoute(rURL, Implicit, true, info.NoGossip, accName) }) + s.startGoRoutine(func() { s.connectToRoute(rURL, Implicit, true, info.GossipMode, accName) }) } } } @@ -1075,34 +1082,70 @@ func (s *Server) hasThisRouteConfigured(info *Info) bool { return false } -// forwardNewRouteInfoToKnownServers sends the INFO protocol of the new route -// to all routes known by this server. In turn, each server will contact this -// new route. +// forwardNewRouteInfoToKnownServers possibly sends the INFO protocol of the +// new route to all routes known by this server. In turn, each server will +// contact this new route. // Server lock held on entry. -func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { +func (s *Server) forwardNewRouteInfoToKnownServers(info *Info, rtype RouteType, didSolicit bool, localGossipMode byte) { + // Determine if this connection is resulting from a gossip notification. + fromGossip := didSolicit && rtype == Implicit + // If from gossip (but we are not overriding it) or if the remote disabled gossip, bail out. + if (fromGossip && localGossipMode != gossipOverride) || info.GossipMode == gossipDisabled { + return + } + // Note: nonce is not used in routes. // That being said, the info we get is the initial INFO which // contains a nonce, but we now forward this to existing routes, // so clear it now. info.Nonce = _EMPTY_ - b, _ := json.Marshal(info) - infoJSON := []byte(fmt.Sprintf(InfoProto, b)) + var ( + infoGMDefault []byte + infoGMDisabled []byte + infoGMOverride []byte + ) + + generateJSON := func(gm byte) []byte { + info.GossipMode = gm + b, _ := json.Marshal(info) + return []byte(fmt.Sprintf(InfoProto, b)) + } + + getJSON := func(r *client) []byte { + if (!didSolicit && r.route.routeType == Explicit) || (didSolicit && rtype == Explicit) { + if infoGMOverride == nil { + infoGMOverride = generateJSON(gossipOverride) + } + return infoGMOverride + } else if !didSolicit { + if infoGMDisabled == nil { + infoGMDisabled = generateJSON(gossipDisabled) + } + return infoGMDisabled + } + if infoGMDefault == nil { + infoGMDefault = generateJSON(0) + } + return infoGMDefault + } + + var accRemotes map[string]*client + pinnedAccount := info.RouteAccount != _EMPTY_ // If this is for a pinned account, we will try to send the gossip // through our pinned account routes, but fall back to the other // routes in case we don't have one for a given remote. - accRemotes := map[string]struct{}{} - if info.RouteAccount != _EMPTY_ { - if remotes, ok := s.accRoutes[info.RouteAccount]; ok { - for remoteID, r := range remotes { + if pinnedAccount { + var ok bool + if accRemotes, ok = s.accRoutes[info.RouteAccount]; ok { + for remoteID, r := range accRemotes { if r == nil { continue } - accRemotes[remoteID] = struct{}{} r.mu.Lock() // Do not send to a remote that does not support pooling/pinned-accounts. if remoteID != info.ID && !r.route.noPool { - r.enqueueProto(infoJSON) + r.enqueueProto(getJSON(r)) } r.mu.Unlock() } @@ -1112,7 +1155,7 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { s.forEachRemote(func(r *client) { r.mu.Lock() remoteID := r.route.remoteID - if info.RouteAccount != _EMPTY_ { + if pinnedAccount { if _, processed := accRemotes[remoteID]; processed { r.mu.Unlock() return @@ -1120,9 +1163,8 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) { } // If this is a new route for a given account, do not send to a server // that does not support pooling/pinned-accounts. - if remoteID != info.ID && - (info.RouteAccount == _EMPTY_ || (info.RouteAccount != _EMPTY_ && !r.route.noPool)) { - r.enqueueProto(infoJSON) + if remoteID != info.ID && (!pinnedAccount || !r.route.noPool) { + r.enqueueProto(getJSON(r)) } r.mu.Unlock() }) @@ -1699,12 +1741,12 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra c.enqueueProto(buf) } -func (s *Server) createRoute(conn net.Conn, rURL *url.URL, rtype RouteType, noGossip bool, accName string) *client { +func (s *Server) createRoute(conn net.Conn, rURL *url.URL, rtype RouteType, gossipMode byte, accName string) *client { // Snapshot server options. opts := s.getOpts() didSolicit := rURL != nil - r := &route{routeType: rtype, didSolicit: didSolicit, poolIdx: -1, noGossip: noGossip} + r := &route{routeType: rtype, didSolicit: didSolicit, poolIdx: -1, gossipMode: gossipMode} c := &client{srv: s, nc: conn, opts: ClientOpts{}, kind: ROUTER, msubs: -1, mpay: -1, route: r, start: time.Now()} @@ -1721,7 +1763,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, rtype RouteType, noGo // the incoming INFO from the remote. Also delay if configured for compression. delayInfo := didSolicit && (compressionConfigured || routeShouldDelayInfo(accName, opts)) if !delayInfo { - infoJSON = s.generateRouteInitialInfoJSON(accName, opts.Cluster.Compression.Mode, 0, noGossip) + infoJSON = s.generateRouteInitialInfoJSON(accName, opts.Cluster.Compression.Mode, 0, gossipMode) } authRequired := s.routeInfo.AuthRequired tlsRequired := s.routeInfo.TLSRequired @@ -1854,7 +1896,7 @@ func routeShouldDelayInfo(accName string, opts *Options) bool { // To be used only when a route is created (to send the initial INFO protocol). // // Server lock held on entry. -func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolIdx int, noGossip bool) []byte { +func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolIdx int, gossipMode byte) []byte { // New proto wants a nonce (although not used in routes, that is, not signed in CONNECT) var raw [nonceLen]byte nonce := raw[:] @@ -1864,11 +1906,11 @@ func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolI if s.getOpts().Cluster.Compression.Mode == CompressionS2Auto { compression = CompressionS2Auto } - ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.NoGossip = string(nonce), accName, poolIdx, compression, noGossip + ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.GossipMode = string(nonce), accName, poolIdx, compression, gossipMode infoJSON := generateInfoJSON(&s.routeInfo) // Clear now that it has been serialized. Will prevent nonce to be included in async INFO that we may send. // Same for some other fields. - ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.NoGossip = _EMPTY_, _EMPTY_, 0, _EMPTY_, false + ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.GossipMode = _EMPTY_, _EMPTY_, 0, _EMPTY_, 0 return infoJSON } @@ -1877,7 +1919,7 @@ const ( _EMPTY_ = "" ) -func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool, info *Info, accName string) bool { +func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMode byte, info *Info, accName string) bool { id := info.ID var acc *Account @@ -1963,9 +2005,10 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool, c.mu.Lock() idHash := c.route.idHash cid := c.cid + rtype := c.route.routeType if sendDelayedInfo { cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression) - c.enqueueProto(s.generateRouteInitialInfoJSON(accName, cm, 0, noGossip)) + c.enqueueProto(s.generateRouteInitialInfoJSON(accName, cm, 0, gossipMode)) } if c.last.IsZero() { c.last = time.Now() @@ -1973,9 +2016,6 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool, if acc != nil { c.acc = acc } - // This will be true if this is a route that was initiated from the - // gossip protocol (basically invoked from processImplicitRoute). - fromGossip := didSolicit && c.route.routeType == Implicit c.mu.Unlock() // Store this route with key being the route id hash + account name @@ -1984,19 +2024,9 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool, // Now that we have registered the route, we can remove from the temp map. s.removeFromTempClients(cid) - // We will not gossip if we are an implicit route created due to - // gossip, or if the remote instructed us not to gossip. - if !fromGossip && !info.NoGossip { - if !didSolicit { - // If the connection was accepted, instruct the neighbors to - // set Info.NoGossip to true also when sending their own INFO - // protocol. In normal situations, any implicit route would - // set their Info.NoGossip to true, but we do this to solve - // a very specific situation. For some background, see test - // TestRouteImplicitJoinsSeparateGroups. - info.NoGossip = true - } - s.forwardNewRouteInfoToKnownServers(info) + // We don't need to send if the only route is the one we just accepted. + if len(conns) > 1 { + s.forwardNewRouteInfoToKnownServers(info, rtype, didSolicit, gossipMode) } // Send subscription interest @@ -2080,7 +2110,7 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool, url := c.route.url if sendDelayedInfo { cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression) - c.enqueueProto(s.generateRouteInitialInfoJSON(_EMPTY_, cm, idx, noGossip)) + c.enqueueProto(s.generateRouteInitialInfoJSON(_EMPTY_, cm, idx, gossipMode)) } if c.last.IsZero() { c.last = time.Now() @@ -2117,15 +2147,9 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool, s.sendAsyncGatewayInfo() } - // we don't need to send if the only route is the one we just accepted. - // For other checks, see other call to forwardNewRouteInfoToKnownServers - // in the handling of pinned account above. - fromGossip := didSolicit && rtype == Implicit - if len(s.routes) > 1 && !fromGossip && !info.NoGossip { - if !didSolicit { - info.NoGossip = true - } - s.forwardNewRouteInfoToKnownServers(info) + // We don't need to send if the only route is the one we just accepted. + if len(s.routes) > 1 { + s.forwardNewRouteInfoToKnownServers(info, rtype, didSolicit, gossipMode) } // Send info about the known gateways to this route. @@ -2160,7 +2184,7 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool, s.grWG.Done() return } - s.connectToRoute(url, rtype, true, noGossip, _EMPTY_) + s.connectToRoute(url, rtype, true, gossipMode, _EMPTY_) }) } } @@ -2562,7 +2586,7 @@ func (s *Server) startRouteAcceptLoop() { } // Start the accept loop in a different go routine. - go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, Implicit, false, _EMPTY_) }, nil) + go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, Implicit, 0, _EMPTY_) }, nil) // Solicit Routes if applicable. This will not block. s.solicitRoutes(opts.Routes, opts.Cluster.PinnedAccounts) @@ -2619,7 +2643,7 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType, accName string s.grWG.Done() return } - s.connectToRoute(rURL, rtype, false, false, accName) + s.connectToRoute(rURL, rtype, false, 0, accName) } // Checks to make sure the route is still valid. @@ -2632,7 +2656,7 @@ func (s *Server) routeStillValid(rURL *url.URL) bool { return false } -func (s *Server) connectToRoute(rURL *url.URL, rtype RouteType, firstConnect, noGossip bool, accName string) { +func (s *Server) connectToRoute(rURL *url.URL, rtype RouteType, firstConnect bool, gossipMode byte, accName string) { defer s.grWG.Done() if rURL == nil { return @@ -2705,7 +2729,7 @@ func (s *Server) connectToRoute(rURL *url.URL, rtype RouteType, firstConnect, no // We have a route connection here. // Go ahead and create it and exit this func. - s.createRoute(conn, rURL, rtype, noGossip, accName) + s.createRoute(conn, rURL, rtype, gossipMode, accName) return } } @@ -2734,13 +2758,13 @@ func (s *Server) solicitRoutes(routes []*url.URL, accounts []string) { s.saveRouteTLSName(routes) for _, r := range routes { route := r - s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, false, _EMPTY_) }) + s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, 0, _EMPTY_) }) } // Now go over possible per-account routes and create them. for _, an := range accounts { for _, r := range routes { route, accName := r, an - s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, false, accName) }) + s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, 0, accName) }) } } } @@ -2944,12 +2968,12 @@ func (s *Server) removeRoute(c *client) { // this remote was a "no pool" route, attempt to reconnect. if noPool { if s.routesPoolSize > 1 { - s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, false, _EMPTY_) }) + s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, 0, _EMPTY_) }) } if len(opts.Cluster.PinnedAccounts) > 0 { for _, an := range opts.Cluster.PinnedAccounts { accName := an - s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, false, accName) }) + s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, 0, accName) }) } } } diff --git a/server/server.go b/server/server.go index 7c37d38b757..fb40f43f15e 100644 --- a/server/server.go +++ b/server/server.go @@ -142,7 +142,7 @@ type Info struct { RoutePoolIdx int `json:"route_pool_idx,omitempty"` RouteAccount string `json:"route_account,omitempty"` RouteAccReqID string `json:"route_acc_add_reqid,omitempty"` - NoGossip bool `json:"no_gossip,omitempty"` + GossipMode byte `json:"gossip_mode,omitempty"` // Gateways Specific Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO) From 3ef8d3041e1ff0df31273b4181b7bac630b3934f Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Sat, 3 Aug 2024 09:57:29 -0600 Subject: [PATCH 2/2] Use gossipDefault instead of 0 Signed-off-by: Ivan Kozlovic --- server/route.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/server/route.go b/server/route.go index 3570d727904..61f3c6dbb49 100644 --- a/server/route.go +++ b/server/route.go @@ -866,7 +866,7 @@ func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName, if needsCompression(cm) { // Generate an INFO with the chosen compression mode. s.mu.Lock() - infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0, 0) + infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0, gossipDefault) s.mu.Unlock() // If we solicited, then send this INFO protocol BEFORE switching @@ -2586,7 +2586,7 @@ func (s *Server) startRouteAcceptLoop() { } // Start the accept loop in a different go routine. - go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, Implicit, 0, _EMPTY_) }, nil) + go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, Implicit, gossipDefault, _EMPTY_) }, nil) // Solicit Routes if applicable. This will not block. s.solicitRoutes(opts.Routes, opts.Cluster.PinnedAccounts) @@ -2643,7 +2643,7 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType, accName string s.grWG.Done() return } - s.connectToRoute(rURL, rtype, false, 0, accName) + s.connectToRoute(rURL, rtype, false, gossipDefault, accName) } // Checks to make sure the route is still valid. @@ -2758,13 +2758,13 @@ func (s *Server) solicitRoutes(routes []*url.URL, accounts []string) { s.saveRouteTLSName(routes) for _, r := range routes { route := r - s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, 0, _EMPTY_) }) + s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, gossipDefault, _EMPTY_) }) } // Now go over possible per-account routes and create them. for _, an := range accounts { for _, r := range routes { route, accName := r, an - s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, 0, accName) }) + s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, gossipDefault, accName) }) } } } @@ -2968,12 +2968,12 @@ func (s *Server) removeRoute(c *client) { // this remote was a "no pool" route, attempt to reconnect. if noPool { if s.routesPoolSize > 1 { - s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, 0, _EMPTY_) }) + s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, gossipDefault, _EMPTY_) }) } if len(opts.Cluster.PinnedAccounts) > 0 { for _, an := range opts.Cluster.PinnedAccounts { accName := an - s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, 0, accName) }) + s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, gossipDefault, accName) }) } } }