Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(2.11) [IMPROVED] Routing: reduce chances of duplicate implicit routes #5746

Merged
merged 2 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 91 additions & 67 deletions server/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,18 @@ type route struct {
// Selected compression mode, which may be different from the
// server configured mode.
compression string
// Transient value used to set the Info.NoGossip when initiating
// Transient value used to set the Info.GossipMode when initiating
// an implicit route and sending to the remote.
noGossip bool
gossipMode byte
}

// Do not change the values/order since they are exchanged between servers.
const (
gossipDefault = byte(iota)
gossipDisabled
gossipOverride
)

type connectInfo struct {
Echo bool `json:"echo"`
Verbose bool `json:"verbose"`
Expand Down Expand Up @@ -818,14 +825,14 @@ func (c *client) processRouteInfo(info *Info) {
accName := string(c.route.accName)

// Capture the noGossip value and reset it here.
noGossip := c.route.noGossip
c.route.noGossip = false
gossipMode := c.route.gossipMode
c.route.gossipMode = 0

// Check to see if we have this remote already registered.
// This can happen when both servers have routes to each other.
c.mu.Unlock()

if added := s.addRoute(c, didSolicit, sendDelayedInfo, noGossip, info, accName); added {
if added := s.addRoute(c, didSolicit, sendDelayedInfo, gossipMode, info, accName); added {
if accName != _EMPTY_ {
c.Debugf("Registering remote route %q for account %q", info.ID, accName)
} else {
Expand Down Expand Up @@ -859,7 +866,7 @@ func (s *Server) negotiateRouteCompression(c *client, didSolicit bool, accName,
if needsCompression(cm) {
// Generate an INFO with the chosen compression mode.
s.mu.Lock()
infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0, false)
infoProto := s.generateRouteInitialInfoJSON(accName, cm, 0, gossipDefault)
s.mu.Unlock()

// If we solicited, then send this INFO protocol BEFORE switching
Expand Down Expand Up @@ -1026,7 +1033,7 @@ func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) {
if info.AuthRequired {
r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password)
}
s.startGoRoutine(func() { s.connectToRoute(r, Implicit, true, info.NoGossip, info.RouteAccount) })
s.startGoRoutine(func() { s.connectToRoute(r, Implicit, true, info.GossipMode, info.RouteAccount) })
// If we are processing an implicit route from a route that does not
// support pooling/pinned-accounts, we won't receive an INFO for each of
// the pinned-accounts that we would normally receive. In that case, just
Expand All @@ -1036,7 +1043,7 @@ func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) {
rURL := r
for _, an := range opts.Cluster.PinnedAccounts {
accName := an
s.startGoRoutine(func() { s.connectToRoute(rURL, Implicit, true, info.NoGossip, accName) })
s.startGoRoutine(func() { s.connectToRoute(rURL, Implicit, true, info.GossipMode, accName) })
}
}
}
Expand Down Expand Up @@ -1075,34 +1082,70 @@ func (s *Server) hasThisRouteConfigured(info *Info) bool {
return false
}

// forwardNewRouteInfoToKnownServers sends the INFO protocol of the new route
// to all routes known by this server. In turn, each server will contact this
// new route.
// forwardNewRouteInfoToKnownServers possibly sends the INFO protocol of the
// new route to all routes known by this server. In turn, each server will
// contact this new route.
// Server lock held on entry.
func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
func (s *Server) forwardNewRouteInfoToKnownServers(info *Info, rtype RouteType, didSolicit bool, localGossipMode byte) {
// Determine if this connection is resulting from a gossip notification.
fromGossip := didSolicit && rtype == Implicit
// If from gossip (but we are not overriding it) or if the remote disabled gossip, bail out.
if (fromGossip && localGossipMode != gossipOverride) || info.GossipMode == gossipDisabled {
return
}

// Note: nonce is not used in routes.
// That being said, the info we get is the initial INFO which
// contains a nonce, but we now forward this to existing routes,
// so clear it now.
info.Nonce = _EMPTY_
b, _ := json.Marshal(info)
infoJSON := []byte(fmt.Sprintf(InfoProto, b))

var (
infoGMDefault []byte
infoGMDisabled []byte
infoGMOverride []byte
)

generateJSON := func(gm byte) []byte {
info.GossipMode = gm
b, _ := json.Marshal(info)
return []byte(fmt.Sprintf(InfoProto, b))
}

getJSON := func(r *client) []byte {
if (!didSolicit && r.route.routeType == Explicit) || (didSolicit && rtype == Explicit) {
if infoGMOverride == nil {
infoGMOverride = generateJSON(gossipOverride)
}
return infoGMOverride
} else if !didSolicit {
if infoGMDisabled == nil {
infoGMDisabled = generateJSON(gossipDisabled)
}
return infoGMDisabled
}
if infoGMDefault == nil {
infoGMDefault = generateJSON(0)
}
return infoGMDefault
}

var accRemotes map[string]*client
pinnedAccount := info.RouteAccount != _EMPTY_
// If this is for a pinned account, we will try to send the gossip
// through our pinned account routes, but fall back to the other
// routes in case we don't have one for a given remote.
accRemotes := map[string]struct{}{}
if info.RouteAccount != _EMPTY_ {
if remotes, ok := s.accRoutes[info.RouteAccount]; ok {
for remoteID, r := range remotes {
if pinnedAccount {
var ok bool
if accRemotes, ok = s.accRoutes[info.RouteAccount]; ok {
for remoteID, r := range accRemotes {
if r == nil {
continue
}
accRemotes[remoteID] = struct{}{}
r.mu.Lock()
// Do not send to a remote that does not support pooling/pinned-accounts.
if remoteID != info.ID && !r.route.noPool {
r.enqueueProto(infoJSON)
r.enqueueProto(getJSON(r))
}
r.mu.Unlock()
}
Expand All @@ -1112,17 +1155,16 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
s.forEachRemote(func(r *client) {
r.mu.Lock()
remoteID := r.route.remoteID
if info.RouteAccount != _EMPTY_ {
if pinnedAccount {
if _, processed := accRemotes[remoteID]; processed {
r.mu.Unlock()
return
}
}
// If this is a new route for a given account, do not send to a server
// that does not support pooling/pinned-accounts.
if remoteID != info.ID &&
(info.RouteAccount == _EMPTY_ || (info.RouteAccount != _EMPTY_ && !r.route.noPool)) {
r.enqueueProto(infoJSON)
if remoteID != info.ID && (!pinnedAccount || !r.route.noPool) {
r.enqueueProto(getJSON(r))
}
r.mu.Unlock()
})
Expand Down Expand Up @@ -1699,12 +1741,12 @@ func (c *client) sendRouteSubOrUnSubProtos(subs []*subscription, isSubProto, tra
c.enqueueProto(buf)
}

func (s *Server) createRoute(conn net.Conn, rURL *url.URL, rtype RouteType, noGossip bool, accName string) *client {
func (s *Server) createRoute(conn net.Conn, rURL *url.URL, rtype RouteType, gossipMode byte, accName string) *client {
// Snapshot server options.
opts := s.getOpts()

didSolicit := rURL != nil
r := &route{routeType: rtype, didSolicit: didSolicit, poolIdx: -1, noGossip: noGossip}
r := &route{routeType: rtype, didSolicit: didSolicit, poolIdx: -1, gossipMode: gossipMode}

c := &client{srv: s, nc: conn, opts: ClientOpts{}, kind: ROUTER, msubs: -1, mpay: -1, route: r, start: time.Now()}

Expand All @@ -1721,7 +1763,7 @@ func (s *Server) createRoute(conn net.Conn, rURL *url.URL, rtype RouteType, noGo
// the incoming INFO from the remote. Also delay if configured for compression.
delayInfo := didSolicit && (compressionConfigured || routeShouldDelayInfo(accName, opts))
if !delayInfo {
infoJSON = s.generateRouteInitialInfoJSON(accName, opts.Cluster.Compression.Mode, 0, noGossip)
infoJSON = s.generateRouteInitialInfoJSON(accName, opts.Cluster.Compression.Mode, 0, gossipMode)
}
authRequired := s.routeInfo.AuthRequired
tlsRequired := s.routeInfo.TLSRequired
Expand Down Expand Up @@ -1854,7 +1896,7 @@ func routeShouldDelayInfo(accName string, opts *Options) bool {
// To be used only when a route is created (to send the initial INFO protocol).
//
// Server lock held on entry.
func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolIdx int, noGossip bool) []byte {
func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolIdx int, gossipMode byte) []byte {
// New proto wants a nonce (although not used in routes, that is, not signed in CONNECT)
var raw [nonceLen]byte
nonce := raw[:]
Expand All @@ -1864,11 +1906,11 @@ func (s *Server) generateRouteInitialInfoJSON(accName, compression string, poolI
if s.getOpts().Cluster.Compression.Mode == CompressionS2Auto {
compression = CompressionS2Auto
}
ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.NoGossip = string(nonce), accName, poolIdx, compression, noGossip
ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.GossipMode = string(nonce), accName, poolIdx, compression, gossipMode
infoJSON := generateInfoJSON(&s.routeInfo)
// Clear now that it has been serialized. Will prevent nonce to be included in async INFO that we may send.
// Same for some other fields.
ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.NoGossip = _EMPTY_, _EMPTY_, 0, _EMPTY_, false
ri.Nonce, ri.RouteAccount, ri.RoutePoolIdx, ri.Compression, ri.GossipMode = _EMPTY_, _EMPTY_, 0, _EMPTY_, 0
return infoJSON
}

Expand All @@ -1877,7 +1919,7 @@ const (
_EMPTY_ = ""
)

func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool, info *Info, accName string) bool {
func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo bool, gossipMode byte, info *Info, accName string) bool {
id := info.ID

var acc *Account
Expand Down Expand Up @@ -1963,19 +2005,17 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool,
c.mu.Lock()
idHash := c.route.idHash
cid := c.cid
rtype := c.route.routeType
if sendDelayedInfo {
cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression)
c.enqueueProto(s.generateRouteInitialInfoJSON(accName, cm, 0, noGossip))
c.enqueueProto(s.generateRouteInitialInfoJSON(accName, cm, 0, gossipMode))
}
if c.last.IsZero() {
c.last = time.Now()
}
if acc != nil {
c.acc = acc
}
// This will be true if this is a route that was initiated from the
// gossip protocol (basically invoked from processImplicitRoute).
fromGossip := didSolicit && c.route.routeType == Implicit
c.mu.Unlock()

// Store this route with key being the route id hash + account name
Expand All @@ -1984,19 +2024,9 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool,
// Now that we have registered the route, we can remove from the temp map.
s.removeFromTempClients(cid)

// We will not gossip if we are an implicit route created due to
// gossip, or if the remote instructed us not to gossip.
if !fromGossip && !info.NoGossip {
if !didSolicit {
// If the connection was accepted, instruct the neighbors to
// set Info.NoGossip to true also when sending their own INFO
// protocol. In normal situations, any implicit route would
// set their Info.NoGossip to true, but we do this to solve
// a very specific situation. For some background, see test
// TestRouteImplicitJoinsSeparateGroups.
info.NoGossip = true
}
s.forwardNewRouteInfoToKnownServers(info)
// We don't need to send if the only route is the one we just accepted.
if len(conns) > 1 {
s.forwardNewRouteInfoToKnownServers(info, rtype, didSolicit, gossipMode)
}

// Send subscription interest
Expand Down Expand Up @@ -2080,7 +2110,7 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool,
url := c.route.url
if sendDelayedInfo {
cm := compressionModeForInfoProtocol(&opts.Cluster.Compression, c.route.compression)
c.enqueueProto(s.generateRouteInitialInfoJSON(_EMPTY_, cm, idx, noGossip))
c.enqueueProto(s.generateRouteInitialInfoJSON(_EMPTY_, cm, idx, gossipMode))
}
if c.last.IsZero() {
c.last = time.Now()
Expand Down Expand Up @@ -2117,15 +2147,9 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool,
s.sendAsyncGatewayInfo()
}

// we don't need to send if the only route is the one we just accepted.
// For other checks, see other call to forwardNewRouteInfoToKnownServers
// in the handling of pinned account above.
fromGossip := didSolicit && rtype == Implicit
if len(s.routes) > 1 && !fromGossip && !info.NoGossip {
if !didSolicit {
info.NoGossip = true
}
s.forwardNewRouteInfoToKnownServers(info)
// We don't need to send if the only route is the one we just accepted.
if len(s.routes) > 1 {
s.forwardNewRouteInfoToKnownServers(info, rtype, didSolicit, gossipMode)
}

// Send info about the known gateways to this route.
Expand Down Expand Up @@ -2160,7 +2184,7 @@ func (s *Server) addRoute(c *client, didSolicit, sendDelayedInfo, noGossip bool,
s.grWG.Done()
return
}
s.connectToRoute(url, rtype, true, noGossip, _EMPTY_)
s.connectToRoute(url, rtype, true, gossipMode, _EMPTY_)
})
}
}
Expand Down Expand Up @@ -2562,7 +2586,7 @@ func (s *Server) startRouteAcceptLoop() {
}

// Start the accept loop in a different go routine.
go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, Implicit, false, _EMPTY_) }, nil)
go s.acceptConnections(l, "Route", func(conn net.Conn) { s.createRoute(conn, nil, Implicit, gossipDefault, _EMPTY_) }, nil)

// Solicit Routes if applicable. This will not block.
s.solicitRoutes(opts.Routes, opts.Cluster.PinnedAccounts)
Expand Down Expand Up @@ -2619,7 +2643,7 @@ func (s *Server) reConnectToRoute(rURL *url.URL, rtype RouteType, accName string
s.grWG.Done()
return
}
s.connectToRoute(rURL, rtype, false, false, accName)
s.connectToRoute(rURL, rtype, false, gossipDefault, accName)
}

// Checks to make sure the route is still valid.
Expand All @@ -2632,7 +2656,7 @@ func (s *Server) routeStillValid(rURL *url.URL) bool {
return false
}

func (s *Server) connectToRoute(rURL *url.URL, rtype RouteType, firstConnect, noGossip bool, accName string) {
func (s *Server) connectToRoute(rURL *url.URL, rtype RouteType, firstConnect bool, gossipMode byte, accName string) {
defer s.grWG.Done()
if rURL == nil {
return
Expand Down Expand Up @@ -2705,7 +2729,7 @@ func (s *Server) connectToRoute(rURL *url.URL, rtype RouteType, firstConnect, no

// We have a route connection here.
// Go ahead and create it and exit this func.
s.createRoute(conn, rURL, rtype, noGossip, accName)
s.createRoute(conn, rURL, rtype, gossipMode, accName)
return
}
}
Expand Down Expand Up @@ -2734,13 +2758,13 @@ func (s *Server) solicitRoutes(routes []*url.URL, accounts []string) {
s.saveRouteTLSName(routes)
for _, r := range routes {
route := r
s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, false, _EMPTY_) })
s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, gossipDefault, _EMPTY_) })
}
// Now go over possible per-account routes and create them.
for _, an := range accounts {
for _, r := range routes {
route, accName := r, an
s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, false, accName) })
s.startGoRoutine(func() { s.connectToRoute(route, Explicit, true, gossipDefault, accName) })
}
}
}
Expand Down Expand Up @@ -2944,12 +2968,12 @@ func (s *Server) removeRoute(c *client) {
// this remote was a "no pool" route, attempt to reconnect.
if noPool {
if s.routesPoolSize > 1 {
s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, false, _EMPTY_) })
s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, gossipDefault, _EMPTY_) })
}
if len(opts.Cluster.PinnedAccounts) > 0 {
for _, an := range opts.Cluster.PinnedAccounts {
accName := an
s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, false, accName) })
s.startGoRoutine(func() { s.connectToRoute(rURL, rtype, true, gossipDefault, accName) })
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ type Info struct {
RoutePoolIdx int `json:"route_pool_idx,omitempty"`
RouteAccount string `json:"route_account,omitempty"`
RouteAccReqID string `json:"route_acc_add_reqid,omitempty"`
NoGossip bool `json:"no_gossip,omitempty"`
GossipMode byte `json:"gossip_mode,omitempty"`

// Gateways Specific
Gateway string `json:"gateway,omitempty"` // Name of the origin Gateway (sent by gateway's INFO)
Expand Down