diff --git a/server/client.go b/server/client.go index 94fec71db7..fa2a939533 100644 --- a/server/client.go +++ b/server/client.go @@ -4866,19 +4866,32 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, continue } // Remember that leaf in case we don't find any other candidate. + // We already start randomly in lqs slice, so we don't need + // to do a random swap if we already have an rsub like we do + // when src == ROUTER above. if rsub == nil { rsub = sub } continue } else { - // We would be picking a route, but if we had remembered a "hub" leaf, - // then pick that one instead of the route. - if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() { - break + // We want to favor qsubs in our own cluster. If the routed + // qsub has an origin, it means that is on behalf of a leaf. + // We need to treat it differently. + if len(sub.origin) > 0 { + // If we already have an rsub, nothing to do. Also, do + // not pick a routed qsub for a LEAF origin cluster + // that is the same than where the message comes from. + if rsub == nil && (leafOrigin == _EMPTY_ || leafOrigin != bytesToString(sub.origin)) { + rsub = sub + } + continue } + // This is a qsub that is local on the remote server (or + // we are connected to an older server and we don't know). + // Pick this one and be done. rsub = sub + break } - break } // Assume delivery subject is normal subject to this point. diff --git a/server/const.go b/server/const.go index 8b88341f42..b383d25f5e 100644 --- a/server/const.go +++ b/server/const.go @@ -174,6 +174,9 @@ const ( // MAX_HPUB_ARGS Maximum possible number of arguments from HPUB proto. MAX_HPUB_ARGS = 4 + // MAX_RSUB_ARGS Maximum possible number of arguments from a RS+/LS+ proto. + MAX_RSUB_ARGS = 6 + // DEFAULT_MAX_CLOSED_CLIENTS is the maximum number of closed connections we hold onto. DEFAULT_MAX_CLOSED_CLIENTS = 10000 diff --git a/server/gateway.go b/server/gateway.go index 92b7ea5463..1fa89c71d4 100644 --- a/server/gateway.go +++ b/server/gateway.go @@ -1912,7 +1912,7 @@ func (c *client) processGatewayAccountSub(accName string) error { // the sublist if present. // func (c *client) processGatewayRUnsub(arg []byte) error { - accName, subject, queue, err := c.parseUnsubProto(arg) + _, accName, subject, queue, err := c.parseUnsubProto(arg, true, false) if err != nil { return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error()) } diff --git a/server/leafnode.go b/server/leafnode.go index 791ed472fb..a1568288cf 100644 --- a/server/leafnode.go +++ b/server/leafnode.go @@ -2353,6 +2353,42 @@ func keyFromSub(sub *subscription) string { return sb.String() } +const ( + keyRoutedSub = "R" + keyRoutedSubByte = 'R' + keyRoutedLeafSub = "L" + keyRoutedLeafSubByte = 'L' +) + +// Helper function to build the key that prevents collisions between normal +// routed subscriptions and routed subscriptions on behalf of a leafnode. +// Keys will look like this: +// "R foo" -> plain routed sub on "foo" +// "R foo bar" -> queue routed sub on "foo", queue "bar" +// "L foo bar" -> plain routed leaf sub on "foo", leaf "bar" +// "L foo bar baz" -> queue routed sub on "foo", queue "bar", leaf "baz" +func keyFromSubWithOrigin(sub *subscription) string { + var sb strings.Builder + sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue)) + leaf := len(sub.origin) > 0 + if leaf { + sb.WriteByte(keyRoutedLeafSubByte) + } else { + sb.WriteByte(keyRoutedSubByte) + } + sb.WriteByte(' ') + sb.Write(sub.subject) + if sub.queue != nil { + sb.WriteByte(' ') + sb.Write(sub.queue) + } + if leaf { + sb.WriteByte(' ') + sb.Write(sub.origin) + } + return sb.String() +} + // Lock should be held. func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) { if key == _EMPTY_ { @@ -2403,12 +2439,21 @@ func (c *client) processLeafSub(argo []byte) (err error) { args := splitArg(arg) sub := &subscription{client: c} + delta := int32(1) switch len(args) { case 1: sub.queue = nil case 3: sub.queue = args[1] sub.qw = int32(parseSize(args[2])) + // TODO: (ik) We should have a non empty queue name and a queue + // weight >= 1. For 2.11, we may want to return an error if that + // is not the case, but for now just overwrite `delta` if queue + // weight is greater than 1 (it is possible after a reconnect/ + // server restart to receive a queue weight > 1 for a new sub). + if sub.qw > 1 { + delta = sub.qw + } default: return fmt.Errorf("processLeafSub Parse Error: '%s'", arg) } @@ -2473,7 +2518,6 @@ func (c *client) processLeafSub(argo []byte) (err error) { key := bytesToString(sub.sid) osub := c.subs[key] updateGWs := false - delta := int32(1) if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -2554,6 +2598,10 @@ func (c *client) processLeafUnsub(arg []byte) error { // We store local subs by account and subject and optionally queue name. // LS- will have the arg exactly as the key. sub, ok := c.subs[string(arg)] + delta := int32(1) + if ok && len(sub.queue) > 0 { + delta = sub.qw + } c.mu.Unlock() if ok { @@ -2563,14 +2611,14 @@ func (c *client) processLeafUnsub(arg []byte) error { if !spoke { // If we are routing subtract from the route map for the associated account. - srv.updateRouteSubscriptionMap(acc, sub, -1) + srv.updateRouteSubscriptionMap(acc, sub, -delta) // Gateways if updateGWs { - srv.gatewayUpdateSubInterest(acc.Name, sub, -1) + srv.gatewayUpdateSubInterest(acc.Name, sub, -delta) } } // Now check on leafnode updates for other leaf nodes. - acc.updateLeafNodes(sub, -1) + acc.updateLeafNodes(sub, -delta) return nil } diff --git a/server/leafnode_test.go b/server/leafnode_test.go index c5a131d277..bb87ebe21e 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -2470,7 +2470,7 @@ func (l *parseRouteLSUnsubLogger) Errorf(format string, v ...any) { func (l *parseRouteLSUnsubLogger) Tracef(format string, v ...any) { trace := fmt.Sprintf(format, v...) - if strings.Contains(trace, "LS- $G foo bar") { + if strings.Contains(trace, "LS- xyz $G foo bar") { l.gotTrace <- struct{}{} } } @@ -4380,7 +4380,7 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { checkLeafNodeConnected(t, a2) checkClusterFormed(t, a1, a2) - // Create out client connections to all servers where we may need to have + // Create our client connections to all servers where we may need to have // queue subscriptions. ncD1 := natsConnect(t, d1.ClientURL(), nats.UserInfo("user", "pwd")) defer ncD1.Close() @@ -4421,7 +4421,7 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { } else { nc = ncB2 } - natsPub(t, nc, subj, []byte("hello")) + natsPub(t, nc, subj, []byte(fmt.Sprintf("msg_%d", i+1))) } } @@ -4532,23 +4532,493 @@ func TestLeafNodeQueueGroupDistributionWithDaisyChainAndGateway(t *testing.T) { // Check that appropriate queue subs receive all messages. checkFor(t, 2*time.Second, 10*time.Millisecond, func() error { - // When there is (are) qsub(s) on b, then only A and B should - // get the messages. Otherwise, it should be between A and D - n := aCount.Load() - if test.b1 || test.b2 { - n += bCount.Load() - } else { - n += dCount.Load() - } + n := aCount.Load() + bCount.Load() + dCount.Load() if n == int32(total) { return nil } return fmt.Errorf("Got only %v/%v messages (a=%v b=%v d=%v)", n, total, aCount.Load(), bCount.Load(), dCount.Load()) }) - // For this specific case, make sure that D did not receive any. + // When there is (are) qsub(s) on b, then only B should + // get the messages. Otherwise, it should be between A and D if test.b1 || test.b2 { - require_LessThan(t, dCount.Load(), 1) + require_Equal(t, aCount.Load(), 0) + require_Equal(t, dCount.Load(), 0) + } else { + require_Equal(t, bCount.Load(), 0) + // We should have receive some on A and D + require_True(t, aCount.Load() > 0) + require_True(t, dCount.Load() > 0) + } + }) + } +} + +func TestLeafNodeQueueInterestAndWeightCorrectAfterServerRestartOrConnectionClose(t *testing.T) { + + // Note that this is not what a normal configuration should be. Users should + // configure each leafnode to have the URLs of both B1 and B2 so that when + // a server fails, the leaf can reconnect to the other running server. But + // we force it to be this way to demonstrate what the issue was and see that + // it is now fixed. + // + // B1 <--- route ---> B2 + // | | + // Leaf Leaf + // | | + // A1 <--- route ---> A2 + // + + for _, test := range []struct { + name string + pinnedAccount string + }{ + {"without pinned account", _EMPTY_}, + {"with pinned account", "accounts: [\"A\"]"}, + } { + t.Run(test.name, func(t *testing.T) { + leafBConf := ` + accounts { A { users: [{user:a, password: pwd}] } } + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: HUB + listen: "127.0.0.1:-1" + %s + %s + } + leafnodes { + listen: "127.0.0.1:-1" + no_advertise: true + } + ` + b1Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B1", _EMPTY_, test.pinnedAccount))) + b1, b1Opts := RunServerWithConfig(b1Conf) + defer b1.Shutdown() + + b2Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", b1Opts.Cluster.Port), test.pinnedAccount))) + b2, b2Opts := RunServerWithConfig(b2Conf) + defer b2.Shutdown() + + checkClusterFormed(t, b1, b2) + + leafAConf := ` + accounts { A { users: [{user:a, password: pwd}] } } + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: LEAF + listen: "127.0.0.1:-1" + %s + %s + } + leafnodes { + listen: "127.0.0.1:-1" + remotes: [ + { + url: "nats://a:pwd@127.0.0.1:%d" + account: A + } + ] + no_advertise: true + } + ` + a1Conf := createConfFile(t, []byte(fmt.Sprintf(leafAConf, "A1", _EMPTY_, test.pinnedAccount, b1Opts.LeafNode.Port))) + a1, a1Opts := RunServerWithConfig(a1Conf) + defer a1.Shutdown() + + checkLeafNodeConnected(t, b1) + checkLeafNodeConnected(t, a1) + + a2Conf := createConfFile(t, []byte(fmt.Sprintf(leafAConf, "A2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", a1Opts.Cluster.Port), test.pinnedAccount, b2Opts.LeafNode.Port))) + a2, _ := RunServerWithConfig(a2Conf) + defer a2.Shutdown() + + checkLeafNodeConnected(t, b2) + checkLeafNodeConnected(t, a2) + checkClusterFormed(t, a1, a2) + + // Create a client on A2 and 3 queue subs. + ncA2 := natsConnect(t, a2.ClientURL(), nats.UserInfo("a", "pwd")) + defer ncA2.Close() + + var qsubs []*nats.Subscription + for i := 0; i < 3; i++ { + qsubs = append(qsubs, natsQueueSub(t, ncA2, "foo", "queue", func(_ *nats.Msg) {})) } + natsFlush(t, ncA2) + + subj := "foo" + checkInterest := func(expected bool) { + t.Helper() + for _, s := range []*Server{a1, a2, b1, b2} { + acc, err := s.LookupAccount("A") + require_NoError(t, err) + checkFor(t, time.Second, 100*time.Millisecond, func() error { + i := acc.Interest(subj) + if expected && i == 0 { + return fmt.Errorf("Still no interest on %q in server %q", subj, s) + } else if !expected && i > 0 { + return fmt.Errorf("Still interest on %q in server %q", subj, s) + } + return nil + }) + } + } + checkInterest(true) + + // Check that Leafz from A1 (which connects to B1) has the expected sub + // interest on "foo". + checkLeafA1 := func(expected bool) { + t.Helper() + // We will wait a bit before checking Leafz since with the bug, it would + // take a bit of time after the action to reproduce the issue for the + // LS+ to be sent to the wrong cluster, or the interest to not be removed. + time.Sleep(100 * time.Millisecond) + // Now check Leafz + leafsz, err := a1.Leafz(&LeafzOptions{Subscriptions: true}) + require_NoError(t, err) + require_Equal(t, leafsz.NumLeafs, 1) + require_True(t, leafsz.Leafs[0] != nil) + lz := leafsz.Leafs[0] + require_Equal(t, lz.Name, "B1") + require_Equal(t, lz.NumSubs, uint32(len(lz.Subs))) + var ok bool + for _, sub := range lz.Subs { + if sub == "foo" { + if expected { + ok = true + break + } + t.Fatalf("Did not expect to have the %q subscription", sub) + } + } + if expected && !ok { + t.Fatalf("Expected to have the %q subscription", "foo") + } + } + checkLeafA1(false) + + // Now restart server "B1". We need to create a conf file with the ports + // that it used. + restartBConf := createConfFile(t, []byte(fmt.Sprintf(` + accounts { A { users: [{user:a, password: pwd}] } } + server_name: B1 + listen: "127.0.0.1:%d" + cluster { + name: HUB + listen: "127.0.0.1:%d" + %s + } + leafnodes { + listen: "127.0.0.1:%d" + no_advertise: true + } + `, b1Opts.Port, b1Opts.Cluster.Port, test.pinnedAccount, b1Opts.LeafNode.Port))) + b1.Shutdown() + b1, _ = RunServerWithConfig(restartBConf) + defer b1.Shutdown() + + checkLeafNodeConnected(t, b1) + checkLeafNodeConnected(t, a1) + + // Stop one of the queue sub. + qsubs[0].Unsubscribe() + natsFlush(t, ncA2) + + // Check that "foo" does not show up in the subscription list + // for the leaf from A1 to B1. + checkLeafA1(false) + + // Now stop the other 2 and check again. + qsubs[1].Unsubscribe() + qsubs[2].Unsubscribe() + natsFlush(t, ncA2) + checkInterest(false) + + checkLeafA1(false) + + // Now recreate 3 queue subs. + for i := 0; i < 3; i++ { + natsQueueSub(t, ncA2, "foo", "queue", func(_ *nats.Msg) {}) + } + // Check interest is present in all servers + checkInterest(true) + // But A1's leaf to B1 should still not have a sub interest for "foo". + checkLeafA1(false) + + // Now stop the client connection instead of removing queue sub + // one at a time. This will ensure that we properly handle an LS- + // on B2 with an interest with a queue weight more than 1 still + // present at the time of processing. + ncA2.Close() + checkInterest(false) + + checkLeafA1(false) + + // We will now test that if the queue subs are created on B2, + // we have proper interest on A1, but when we close the connection, + // the interest disappears. + ncB2 := natsConnect(t, b2.ClientURL(), nats.UserInfo("a", "pwd")) + defer ncB2.Close() + + for i := 0; i < 3; i++ { + natsQueueSub(t, ncB2, "foo", "queue", func(_ *nats.Msg) {}) + } + checkInterest(true) + checkLeafA1(true) + // Close the connection, so all queue subs should be removed at once. + ncB2.Close() + checkInterest(false) + checkLeafA1(false) + }) + } +} + +func TestLeafNodeQueueWeightCorrectOnRestart(t *testing.T) { + leafBConf := ` + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: HUB + listen: "127.0.0.1:-1" + %s + } + leafnodes { + listen: "127.0.0.1:-1" + no_advertise: true + } + ` + b1Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B1", _EMPTY_))) + b1, b1Opts := RunServerWithConfig(b1Conf) + defer b1.Shutdown() + + b2Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", b1Opts.Cluster.Port)))) + b2, b2Opts := RunServerWithConfig(b2Conf) + defer b2.Shutdown() + + checkClusterFormed(t, b1, b2) + + leafAConf := ` + server_name: LEAF + listen: "127.0.0.1:-1" + leafnodes { + remotes: [{url: "nats://127.0.0.1:%d"}] + reconnect: "50ms" + } + ` + aConf := createConfFile(t, []byte(fmt.Sprintf(leafAConf, b2Opts.LeafNode.Port))) + a, _ := RunServerWithConfig(aConf) + defer a.Shutdown() + + checkLeafNodeConnected(t, b2) + checkLeafNodeConnected(t, a) + + nc := natsConnect(t, a.ClientURL()) + defer nc.Close() + + for i := 0; i < 2; i++ { + natsQueueSubSync(t, nc, "foo", "queue") + } + natsFlush(t, nc) + + checkQueueWeight := func() { + for _, s := range []*Server{b1, b2} { + gacc := s.GlobalAccount() + gacc.mu.RLock() + sl := gacc.sl + gacc.mu.RUnlock() + checkFor(t, time.Second, 10*time.Millisecond, func() error { + // For remote queue interest, Match() will expand to queue weight. + // So we should have 1 group and 2 queue subs present. + res := sl.Match("foo") + for _, qsubs := range res.qsubs { + for _, sub := range qsubs { + if string(sub.subject) == "foo" && string(sub.queue) == "queue" && atomic.LoadInt32(&sub.qw) == 2 { + return nil + } + } + } + return fmt.Errorf("Server %q does not have expected queue interest with expected weight", s) + }) + } + } + checkQueueWeight() + + // Now restart server "B2". We need to create a conf file with the ports + // that it used. + restartBConf := createConfFile(t, []byte(fmt.Sprintf(` + server_name: B2 + listen: "127.0.0.1:%d" + cluster { + name: HUB + listen: "127.0.0.1:%d" + %s + } + leafnodes { + listen: "127.0.0.1:%d" + no_advertise: true + } + `, b2Opts.Port, b2Opts.Cluster.Port, fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", b1Opts.Cluster.Port), b2Opts.LeafNode.Port))) + b2.Shutdown() + b2, _ = RunServerWithConfig(restartBConf) + defer b2.Shutdown() + + checkLeafNodeConnected(t, b2) + checkLeafNodeConnected(t, a) + checkQueueWeight() +} + +func TestLeafNodeRoutedSubKeyDifferentBetweenLeafSubAndRoutedSub(t *testing.T) { + for _, test := range []struct { + name string + pinnedAccount string + }{ + {"without pinned account", _EMPTY_}, + {"with pinned account", "accounts: [\"XYZ\"]"}, + } { + t.Run(test.name, func(t *testing.T) { + leafBConf := ` + accounts: {XYZ {users:[{user:a, password:pwd}]}} + server_name: %s + listen: "127.0.0.1:-1" + cluster { + name: HUB + listen: "127.0.0.1:-1" + %s + %s + } + leafnodes { + listen: "127.0.0.1:-1" + no_advertise: true + } + ` + b1Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B1", _EMPTY_, test.pinnedAccount))) + b1, b1Opts := RunServerWithConfig(b1Conf) + defer b1.Shutdown() + + b2Conf := createConfFile(t, []byte(fmt.Sprintf(leafBConf, "B2", + fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", b1Opts.Cluster.Port), test.pinnedAccount))) + b2, b2Opts := RunServerWithConfig(b2Conf) + defer b2.Shutdown() + + checkClusterFormed(t, b1, b2) + + // This leaf will have a cluster name that matches an account name. + // The idea is to make sure that hub servers are not using incorrect + // keys to differentiate a routed queue interest on subject "A" with + // queue name "foo" for account "A" in their cluster: "RS+ A A foo" + // with a leafnode plain subscription, which since there is an origin + // would be: "LS+ A A foo", that is, origin is "A", account is "A" + // and subject is "foo". + leafAConf := ` + accounts: {XYZ {users:[{user:a, password:pwd}]}} + server_name: LEAF + listen: "127.0.0.1:-1" + cluster { + name: XYZ + listen: "127.0.0.1:-1" + } + leafnodes { + remotes: [ + { + url: "nats://a:pwd@127.0.0.1:%d" + account: "XYZ" + } + ] + } + ` + aConf := createConfFile(t, []byte(fmt.Sprintf(leafAConf, b2Opts.LeafNode.Port))) + a, _ := RunServerWithConfig(aConf) + defer a.Shutdown() + + checkLeafNodeConnected(t, b2) + checkLeafNodeConnected(t, a) + + ncB2 := natsConnect(t, b2.ClientURL(), nats.UserInfo("a", "pwd")) + defer ncB2.Close() + // Create a plain sub on "foo" + natsSubSync(t, ncB2, "foo") + // And a queue sub on "XYZ" with queue name "foo" + natsQueueSubSync(t, ncB2, "XYZ", "foo") + natsFlush(t, ncB2) + + ncA := natsConnect(t, a.ClientURL(), nats.UserInfo("a", "pwd")) + defer ncA.Close() + // From the leafnode, create a plain sub on "foo" + natsSubSync(t, ncA, "foo") + // And a queue sub on "XYZ" with queue name "foo" + natsQueueSubSync(t, ncA, "XYZ", "foo") + natsFlush(t, ncA) + + // Check the acc.rm on B2 + acc, err := b2.LookupAccount("XYZ") + require_NoError(t, err) + + rsubKey := keyFromSubWithOrigin(&subscription{subject: []byte("foo")}) + rqsubKey := keyFromSubWithOrigin(&subscription{subject: []byte("XYZ"), queue: []byte("foo")}) + rlsubKey := keyFromSubWithOrigin(&subscription{origin: []byte("XYZ"), subject: []byte("foo")}) + rlqsubKey := keyFromSubWithOrigin(&subscription{origin: []byte("XYZ"), subject: []byte("XYZ"), queue: []byte("foo")}) + + checkFor(t, time.Second, 10*time.Millisecond, func() error { + acc.mu.RLock() + defer acc.mu.RUnlock() + for _, key := range []string{rsubKey, rqsubKey, rlsubKey, rlqsubKey} { + v, ok := acc.rm[key] + if !ok { + return fmt.Errorf("Did not find key %q for sub: %+v", key, sub) + } + if v != 1 { + return fmt.Errorf("Key %q v=%v for sub: %+v", key, v, sub) + } + } + return nil + }) + + // Now check that on B1, we have 2 distinct subs for the route. + acc, err = b1.LookupAccount("XYZ") + require_NoError(t, err) + + var route *client + + if test.pinnedAccount == _EMPTY_ { + acc.mu.RLock() + rIdx := acc.routePoolIdx + acc.mu.RUnlock() + b1.mu.RLock() + b1.forEachRouteIdx(rIdx, func(r *client) bool { + route = r + return false + }) + b1.mu.RUnlock() + } else { + b1.mu.RLock() + remotes := b1.accRoutes["XYZ"] + for _, r := range remotes { + route = r + break + } + b1.mu.RUnlock() + } + + checkFor(t, time.Second, 10*time.Millisecond, func() error { + // Check that route.subs has 4 entries for the subs we + // created in this test. + var entries []string + route.mu.Lock() + for key := range route.subs { + if strings.Contains(key, "foo") { + entries = append(entries, key) + } + } + route.mu.Unlock() + if len(entries) != 4 { + return fmt.Errorf("Expected 4 entries with %q, got this: %q", "foo", entries) + } + return nil + }) }) } } diff --git a/server/parser.go b/server/parser.go index dabbe25730..c29030eff1 100644 --- a/server/parser.go +++ b/server/parser.go @@ -804,7 +804,8 @@ func (c *client) parse(buf []byte) error { c.traceInOp("LS-", arg) } } - err = c.processRemoteUnsub(arg) + leafUnsub := c.op == 'L' || c.op == 'l' + err = c.processRemoteUnsub(arg, leafUnsub) case GATEWAY: if trace { c.traceInOp("RS-", arg) diff --git a/server/reload.go b/server/reload.go index 6d9af46278..03fb06c911 100644 --- a/server/reload.go +++ b/server/reload.go @@ -2176,15 +2176,22 @@ func (s *Server) reloadClusterPermissions(oldPerms *RoutePermissions) { } deleteRoutedSubs = deleteRoutedSubs[:0] route.mu.Lock() + pa, _, hasSubType := route.getRoutedSubKeyInfo() for key, sub := range route.subs { - if an := strings.Fields(key)[0]; an != accName { - continue + // If this is not a pinned-account route, we need to get the + // account name from the key to see if we collect this sub. + if !pa { + if an := getAccNameFromRoutedSubKey(sub, key, hasSubType); an != accName { + continue + } } // If we can't export, we need to drop the subscriptions that // we have on behalf of this route. + // Need to make a string cast here since canExport call sl.Match() subj := string(sub.subject) if !route.canExport(subj) { - delete(route.subs, string(sub.sid)) + // We can use bytesToString() here. + delete(route.subs, bytesToString(sub.sid)) deleteRoutedSubs = append(deleteRoutedSubs, sub) } } diff --git a/server/route.go b/server/route.go index d46b76b626..7e7c4a82dd 100644 --- a/server/route.go +++ b/server/route.go @@ -58,6 +58,7 @@ type route struct { didSolicit bool retry bool lnoc bool + lnocu bool routeType RouteType url *url.URL authRequired bool @@ -106,6 +107,7 @@ type connectInfo struct { Cluster string `json:"cluster"` Dynamic bool `json:"cluster_dynamic,omitempty"` LNOC bool `json:"lnoc,omitempty"` + LNOCU bool `json:"lnocu,omitempty"` // Support for LS- with origin cluster name Gateway string `json:"gateway,omitempty"` } @@ -766,6 +768,7 @@ func (c *client) processRouteInfo(info *Info) { c.route.gatewayURL = info.GatewayURL c.route.remoteName = info.Name c.route.lnoc = info.LNOC + c.route.lnocu = info.LNOCU c.route.jetstream = info.JetStream // When sent through route INFO, if the field is set, it should be of size 1. @@ -1215,6 +1218,36 @@ type asubs struct { subs []*subscription } +// Returns the account name from the subscription's key. +// This is invoked knowing that the key contains an account name, so for a sub +// that is not from a pinned-account route. +// The `keyHasSubType` boolean indicates that the key starts with the indicator +// for leaf or regular routed subscriptions. +func getAccNameFromRoutedSubKey(sub *subscription, key string, keyHasSubType bool) string { + var accIdx int + if keyHasSubType { + // Start after the sub type indicator. + accIdx = 1 + // But if there is an origin, bump its index. + if len(sub.origin) > 0 { + accIdx = 2 + } + } + return strings.Fields(key)[accIdx] +} + +// Returns if the route is dedicated to an account, its name, and a boolean +// that indicates if this route uses the routed subscription indicator at +// the beginning of the subscription key. +// Lock held on entry. +func (c *client) getRoutedSubKeyInfo() (bool, string, bool) { + var accName string + if an := c.route.accName; len(an) > 0 { + accName = string(an) + } + return accName != _EMPTY_, accName, c.route.lnocu +} + // removeRemoteSubs will walk the subs and remove them from the appropriate account. func (c *client) removeRemoteSubs() { // We need to gather these on a per account basis. @@ -1224,14 +1257,18 @@ func (c *client) removeRemoteSubs() { srv := c.srv subs := c.subs c.subs = nil + pa, accountName, hasSubType := c.getRoutedSubKeyInfo() c.mu.Unlock() for key, sub := range subs { c.mu.Lock() sub.max = 0 c.mu.Unlock() - // Grab the account - accountName := strings.Fields(key)[0] + // If not a pinned-account route, we need to find the account + // name from the sub's key. + if !pa { + accountName = getAccNameFromRoutedSubKey(sub, key, hasSubType) + } ase := as[accountName] if ase == nil { if v, ok := srv.accounts.Load(accountName); ok { @@ -1243,10 +1280,14 @@ func (c *client) removeRemoteSubs() { } else { ase.subs = append(ase.subs, sub) } + delta := int32(1) + if len(sub.queue) > 0 { + delta = sub.qw + } if srv.gateway.enabled { - srv.gatewayUpdateSubInterest(accountName, sub, -1) + srv.gatewayUpdateSubInterest(accountName, sub, -delta) } - ase.acc.updateLeafNodes(sub, -1) + ase.acc.updateLeafNodes(sub, -delta) } // Now remove the subs by batch for each account sublist. @@ -1263,8 +1304,9 @@ func (c *client) removeRemoteSubs() { // Lock is held on entry func (c *client) removeRemoteSubsForAcc(name string) []*subscription { var subs []*subscription + _, _, hasSubType := c.getRoutedSubKeyInfo() for key, sub := range c.subs { - an := strings.Fields(key)[0] + an := getAccNameFromRoutedSubKey(sub, key, hasSubType) if an == name { sub.max = 0 subs = append(subs, sub) @@ -1274,46 +1316,69 @@ func (c *client) removeRemoteSubsForAcc(name string) []*subscription { return subs } -func (c *client) parseUnsubProto(arg []byte) (string, []byte, []byte, error) { +func (c *client) parseUnsubProto(arg []byte, accInProto, hasOrigin bool) ([]byte, string, []byte, []byte, error) { // Indicate any activity, so pub and sub or unsubs. c.in.subs++ args := splitArg(arg) - var queue []byte - var accountName string - subjIdx := 1 - c.mu.Lock() - if c.kind == ROUTER && c.route != nil { - if accountName = string(c.route.accName); accountName != _EMPTY_ { - subjIdx = 0 - } + var ( + origin []byte + accountName string + queue []byte + subjIdx int + ) + // If `hasOrigin` is true, then it means this is a LS- with origin in proto. + if hasOrigin { + // We would not be here if there was not at least 1 field. + origin = args[0] + subjIdx = 1 + } + // If there is an account in the protocol, bump the subject index. + if accInProto { + subjIdx++ } - c.mu.Unlock() switch len(args) { case subjIdx + 1: case subjIdx + 2: queue = args[subjIdx+1] default: - return _EMPTY_, nil, nil, fmt.Errorf("parse error: '%s'", arg) + return nil, _EMPTY_, nil, nil, fmt.Errorf("parse error: '%s'", arg) } - if accountName == _EMPTY_ { - accountName = string(args[0]) + if accInProto { + // If there is an account in the protocol, it is before the subject. + accountName = string(args[subjIdx-1]) } - return accountName, args[subjIdx], queue, nil + return origin, accountName, args[subjIdx], queue, nil } // Indicates no more interest in the given account/subject for the remote side. -func (c *client) processRemoteUnsub(arg []byte) (err error) { +func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) { srv := c.srv if srv == nil { return nil } - accountName, subject, _, err := c.parseUnsubProto(arg) + + var accountName string + // Assume the account will be in the protocol. + accInProto := true + + c.mu.Lock() + originSupport := c.route.lnocu + if c.route != nil && len(c.route.accName) > 0 { + accountName, accInProto = string(c.route.accName), false + } + c.mu.Unlock() + + hasOrigin := leafUnsub && originSupport + _, accNameFromProto, subject, _, err := c.parseUnsubProto(arg, accInProto, hasOrigin) if err != nil { return fmt.Errorf("processRemoteUnsub %s", err.Error()) } + if accInProto { + accountName = accNameFromProto + } // Lookup the account var acc *Account if v, ok := srv.accounts.Load(accountName); ok { @@ -1330,28 +1395,43 @@ func (c *client) processRemoteUnsub(arg []byte) (err error) { } updateGWs := false - // We store local subs by account and subject and optionally queue name. - // RS- will have the arg exactly as the key. + + _keya := [128]byte{} + _key := _keya[:0] + var key string - if c.kind == ROUTER && c.route != nil && len(c.route.accName) > 0 { - key = accountName + " " + bytesToString(arg) - } else { + if !originSupport { + // If it is an LS- or RS-, we use the protocol as-is as the key. key = bytesToString(arg) + } else { + // We need to prefix with the sub type. + if leafUnsub { + _key = append(_key, keyRoutedLeafSubByte) + } else { + _key = append(_key, keyRoutedSubByte) + } + _key = append(_key, ' ') + _key = append(_key, arg...) + key = bytesToString(_key) } + delta := int32(1) sub, ok := c.subs[key] if ok { delete(c.subs, key) acc.sl.Remove(sub) updateGWs = srv.gateway.enabled + if len(sub.queue) > 0 { + delta = sub.qw + } } c.mu.Unlock() if updateGWs { - srv.gatewayUpdateSubInterest(accountName, sub, -1) + srv.gatewayUpdateSubInterest(accountName, sub, -delta) } // Now check on leafnode updates. - acc.updateLeafNodes(sub, -1) + acc.updateLeafNodes(sub, -delta) if c.opts.Verbose { c.sendOK() @@ -1368,35 +1448,78 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { return nil } - // Copy so we do not reference a potentially large buffer - arg := make([]byte, len(argo)) - copy(arg, argo) + // We copy `argo` to not reference the read buffer. However, we will + // prefix with a code that says if the remote sub is for a leaf + // (hasOrigin == true) or not to prevent key collisions. Imagine: + // "RS+ foo bar baz 1\r\n" => "foo bar baz" (a routed queue sub) + // "LS+ foo bar baz\r\n" => "foo bar baz" (a route leaf sub on "baz", + // for account "bar" with origin "foo"). + // + // The sub.sid/key will be set respectively to "R foo bar baz" and + // "L foo bar baz". + // + // We also no longer add the account if it was not present (due to + // pinned-account route) since there is no need really. + // + // For routes to older server, we will still create the "arg" with + // the above layout, but we will create the sub.sid/key as before, + // that is, not including the origin for LS+ because older server + // only send LS- without origin, so we would not be able to find + // the sub in the map. + c.mu.Lock() + accountName := string(c.route.accName) + oldStyle := !c.route.lnocu + c.mu.Unlock() - args := splitArg(arg) + // Indicate if the account name should be in the protocol. It would be the + // case if accountName is empty. + accInProto := accountName == _EMPTY_ + + // Copy so we do not reference a potentially large buffer. + // Add 2 more bytes for the routed sub type. + arg := make([]byte, 0, 2+len(argo)) + if hasOrigin { + arg = append(arg, keyRoutedLeafSubByte) + } else { + arg = append(arg, keyRoutedSubByte) + } + arg = append(arg, ' ') + arg = append(arg, argo...) + + // Now split to get all fields. Unroll splitArgs to avoid runtime/heap issues. + a := [MAX_RSUB_ARGS][]byte{} + args := a[:0] + start := -1 + for i, b := range arg { + switch b { + case ' ', '\t', '\r', '\n': + if start >= 0 { + args = append(args, arg[start:i]) + start = -1 + } + default: + if start < 0 { + start = i + } + } + } + if start >= 0 { + args = append(args, arg[start:]) + } + + delta := int32(1) sub := &subscription{client: c} - // This value indicate what is the mandatory subject offset in the args - // slice. It varies based on the optional presence of origin or account name - // fields (tha latter would not be present for "per-account" routes). - var subjIdx int - // If account is present, this is its "char" position in arg slice. - var accPos int + // There will always be at least a subject, but its location will depend + // on if there is an origin, an account name, etc.. Since we know that + // we have added the sub type indicator as the first field, the subject + // position will be at minimum at index 1. + subjIdx := 1 if hasOrigin { - // Set to 1, will be adjusted if the account is also expected. - subjIdx = 1 - sub.origin = args[0] - // The account would start after the origin and trailing space. - accPos = len(sub.origin) + 1 + subjIdx++ } - c.mu.Lock() - accountName := string(c.route.accName) - c.mu.Unlock() - // If the route is dedicated to an account, accountName will not - // be empty. If it is, then the account must be in the protocol. - var accInProto bool - if accountName == _EMPTY_ { + if accInProto { subjIdx++ - accInProto = true } switch len(args) { case subjIdx + 1: @@ -1404,15 +1527,50 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { case subjIdx + 3: sub.queue = args[subjIdx+1] sub.qw = int32(parseSize(args[subjIdx+2])) + // TODO: (ik) We should have a non empty queue name and a queue + // weight >= 1. For 2.11, we may want to return an error if that + // is not the case, but for now just overwrite `delta` if queue + // weight is greater than 1 (it is possible after a reconnect/ + // server restart to receive a queue weight > 1 for a new sub). + if sub.qw > 1 { + delta = sub.qw + } default: return fmt.Errorf("processRemoteSub Parse Error: '%s'", arg) } + // We know that the number of fields is correct. So we can access args[] based + // on where we expect the fields to be. + + // If there is an origin, it will be at index 1. + if hasOrigin { + sub.origin = args[1] + } + // For subject, use subjIdx. sub.subject = args[subjIdx] - // If the account name is empty (not a "per-account" route), the account - // is at the index prior to the subject. - if accountName == _EMPTY_ { + // If the account name is in the protocol, it will be before the subject. + if accInProto { accountName = bytesToString(args[subjIdx-1]) } + // Now set the sub.sid from the arg slice. However, we will have a different + // one if we use the origin or not. + start = 0 + end := len(arg) + if sub.queue != nil { + // Remove the ' ' from the arg length. + end -= 1 + len(args[subjIdx+2]) + } + if oldStyle { + // We will start at the account (if present) or at the subject. + // We first skip the "R " or "L " + start = 2 + // And if there is an origin skip that. + if hasOrigin { + start += len(sub.origin) + 1 + } + // Here we are pointing at the account (if present), or at the subject. + } + sub.sid = arg[start:end] + // Lookup account while avoiding fetch. // A slow fetch delays subsequent remote messages. It also avoids the expired check (see below). // With all but memory resolver lookup can be delayed or fail. @@ -1470,33 +1628,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { return nil } - // We store local subs by account and subject and optionally queue name. - // If we have a queue it will have a trailing weight which we do not want. - if sub.queue != nil { - // if the account is in the protocol, we can reference directly "arg", - // otherwise, we need to allocate/construct the sid. - if accInProto { - sub.sid = arg[accPos : accPos+len(accountName)+1+len(sub.subject)+1+len(sub.queue)] - } else { - // It is unfortunate that we have to do this, but the gain of not - // having the account name in message protocols outweight the - // penalty of having to do this here for the processing of a - // subscription. - sub.sid = append(sub.sid, accountName...) - sub.sid = append(sub.sid, ' ') - sub.sid = append(sub.sid, sub.subject...) - sub.sid = append(sub.sid, ' ') - sub.sid = append(sub.sid, sub.queue...) - } - } else if accInProto { - sub.sid = arg[accPos:] - } else { - sub.sid = append(sub.sid, accountName...) - sub.sid = append(sub.sid, ' ') - sub.sid = append(sub.sid, sub.subject...) - } - key := bytesToString(sub.sid) - acc.mu.RLock() // For routes (this can be called by leafnodes), check if the account is // transitioning (from pool to dedicated route) and this route is not a @@ -1511,9 +1642,11 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) { } sl := acc.sl acc.mu.RUnlock() + + // We use the sub.sid for the key of the c.subs map. + key := bytesToString(sub.sid) osub := c.subs[key] updateGWs := false - delta := int32(1) if osub == nil { c.subs[key] = sub // Now place into the account sl. @@ -1555,10 +1688,14 @@ func (c *client) addRouteSubOrUnsubProtoToBuf(buf []byte, accName string, sub *s if isSubProto { buf = append(buf, lSubBytes...) buf = append(buf, sub.origin...) + buf = append(buf, ' ') } else { buf = append(buf, lUnsubBytes...) + if c.route.lnocu { + buf = append(buf, sub.origin...) + buf = append(buf, ' ') + } } - buf = append(buf, ' ') } else { if isSubProto { buf = append(buf, rSubBytes...) @@ -1659,18 +1796,27 @@ func (s *Server) sendSubsToRoute(route *client, idx int, account string) { for _, a := range accs { a.mu.RLock() for key, n := range a.rm { - var subj, qn []byte - s := strings.Split(key, " ") - subj = []byte(s[0]) - if len(s) > 1 { - qn = []byte(s[1]) + var origin, qn []byte + s := strings.Fields(key) + // Subject will always be the second field (index 1). + subj := stringToBytes(s[1]) + // Check if the key is for a leaf (will be field 0). + forLeaf := s[0] == keyRoutedLeafSub + // For queue, if not for a leaf, we need 3 fields "R foo bar", + // but if for a leaf, we need 4 fields "L foo bar leaf_origin". + if l := len(s); (!forLeaf && l == 3) || (forLeaf && l == 4) { + qn = stringToBytes(s[2]) + } + if forLeaf { + // The leaf origin will be the last field. + origin = stringToBytes(s[len(s)-1]) } - // s[0] is the subject and already as a string, so use that + // s[1] is the subject and already as a string, so use that // instead of converting back `subj` to a string. - if !route.canImport(s[0]) { + if !route.canImport(s[1]) { continue } - sub := subscription{subject: subj, queue: qn, qw: n} + sub := subscription{origin: origin, subject: subj, queue: qn, qw: n} buf = route.addRouteSubOrUnsubProtoToBuf(buf, a.Name, &sub, true) } a.mu.RUnlock() @@ -2333,8 +2479,10 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del return } - // Create the fast key which will use the subject or 'subjectqueue' for queue subscribers. - key := keyFromSub(sub) + // Create the fast key which will use the subject or '[origin]subjectqueue' + // for queue subscribers, where "origin" will be non-empty if it is a sub + // from a leafnode which has a cluster name provided. + key := keyFromSubWithOrigin(sub) // Decide whether we need to send an update out to all the routes. update := isq @@ -2517,6 +2665,7 @@ func (s *Server) startRouteAcceptLoop() { Domain: s.info.Domain, Dynamic: s.isClusterNameDynamic(), LNOC: true, + LNOCU: true, } // For tests that want to simulate old servers, do not set the compression // on the INFO protocol if configured with CompressionNotSupported. @@ -2835,6 +2984,7 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error c.mu.Lock() c.route.remoteID = c.opts.Name c.route.lnoc = proto.LNOC + c.route.lnocu = proto.LNOCU c.setRoutePermissions(perms) c.headers = supportsHeaders && proto.Headers c.mu.Unlock() diff --git a/server/routes_test.go b/server/routes_test.go index fca2d96e8c..6ff1cdb83b 100644 --- a/server/routes_test.go +++ b/server/routes_test.go @@ -1629,9 +1629,11 @@ func TestClusterQueueGroupWeightTrackingLeak(t *testing.T) { check := func(present bool, expected int32) { t.Helper() + sub := subscription{subject: []byte("foo"), queue: []byte("bar")} + key := keyFromSubWithOrigin(&sub) checkFor(t, time.Second, 15*time.Millisecond, func() error { acc.mu.RLock() - v, ok := acc.lqws["foo bar"] + v, ok := acc.lqws[key] acc.mu.RUnlock() if present { if !ok { diff --git a/server/server.go b/server/server.go index 7fa37ce201..242724db16 100644 --- a/server/server.go +++ b/server/server.go @@ -136,6 +136,7 @@ type Info struct { Import *SubjectPermission `json:"import,omitempty"` Export *SubjectPermission `json:"export,omitempty"` LNOC bool `json:"lnoc,omitempty"` + LNOCU bool `json:"lnocu,omitempty"` InfoOnConnect bool `json:"info_on_connect,omitempty"` // When true the server will respond to CONNECT with an INFO ConnectInfo bool `json:"connect_info,omitempty"` // When true this is the server INFO response to CONNECT RoutePoolSize int `json:"route_pool_size,omitempty"` diff --git a/test/new_routes_test.go b/test/new_routes_test.go index 2635f7132a..c4e5b23d1e 100644 --- a/test/new_routes_test.go +++ b/test/new_routes_test.go @@ -58,6 +58,9 @@ func TestNewRouteInfoOnConnect(t *testing.T) { if !info.LNOC { t.Fatalf("Expected to have leafnode origin cluster support") } + if !info.LNOCU { + t.Fatalf("Expected to have leafnode origin cluster in unsub protocol support") + } } func TestNewRouteHeaderSupport(t *testing.T) { @@ -1713,29 +1716,93 @@ func TestNewRouteLeafNodeOriginSupport(t *testing.T) { info.ID = routeID info.Name = "" info.LNOC = true + // Overwrite to false to check that we are getting LS- without origin + // if we are an old server. + info.LNOCU = false b, err := json.Marshal(info) if err != nil { t.Fatalf("Could not marshal test route info: %v", err) } routeSend(fmt.Sprintf("INFO %s\r\n", b)) - routeExpect(rsubRe) + routeExpect(rlsubRe) pingPong() - // Make sure it can process and LS+ - routeSend("LS+ ln1 $G foo\r\n") - pingPong() + sendLSProtosFromRoute := func(lnocu bool) { + t.Helper() - if !gacc.SubscriptionInterest("foo") { - t.Fatalf("Expected interest on \"foo\"") - } + // Make sure it can process and LS+ + routeSend("LS+ ln1 $G foo\r\n") + pingPong() - // This should not have been sent to the leafnode since same origin cluster. - time.Sleep(10 * time.Millisecond) - if lgacc.SubscriptionInterest("foo") { - t.Fatalf("Did not expect interest on \"foo\"") + // Check interest is registered on remote server. + if !gacc.SubscriptionInterest("foo") { + t.Fatalf("Expected interest on \"foo\"") + } + + // This should not have been sent to the leafnode since same origin cluster. + time.Sleep(10 * time.Millisecond) + if lgacc.SubscriptionInterest("foo") { + t.Fatalf("Did not expect interest on \"foo\"") + } + + // Now unsub. Either act as an old server that does not support origin + // in the LS- or as a new server. + if lnocu { + routeSend("LS- ln1 $G foo\r\n") + } else { + routeSend("LS- $G foo\r\n") + } + pingPong() + + // Interest should be gone. + if gacc.SubscriptionInterest("foo") { + t.Fatalf("Expected no interest on \"foo\"") + } + + // Make sure we did not incorrectly send an interest to the leaf. + time.Sleep(10 * time.Millisecond) + if lgacc.SubscriptionInterest("foo") { + t.Fatalf("Did not expect interest on \"foo\"") + } + + // Repeat with a queue. + routeSend("LS+ ln1 $G foo bar 1\r\n") + pingPong() + + if !gacc.SubscriptionInterest("foo") { + t.Fatalf("Expected interest on \"foo\"") + } + + // This should not have been sent to the leafnode since same origin cluster. + time.Sleep(10 * time.Millisecond) + if lgacc.SubscriptionInterest("foo") { + t.Fatalf("Did not expect interest on \"foo\"") + } + + // Now unsub. + if lnocu { + routeSend("LS- ln1 $G foo bar\r\n") + } else { + routeSend("LS- $G foo bar\r\n") + } + pingPong() + + // Subscription should be gone. + if gacc.SubscriptionInterest("foo") { + t.Fatalf("Expected no interest on \"foo\"") + } + + // Make sure we did not incorrectly send an interest to the leaf. + time.Sleep(10 * time.Millisecond) + if lgacc.SubscriptionInterest("foo") { + t.Fatalf("Did not expect interest on \"foo\"") + } } + // Check the LS+/- when not supporting origin in LS- + sendLSProtosFromRoute(false) + // Create a connection on the leafnode server. nc, err := nats.Connect(ln.ClientURL()) if err != nil { @@ -1778,6 +1845,61 @@ func TestNewRouteLeafNodeOriginSupport(t *testing.T) { if n, _, _ := sub.Pending(); n != 0 { t.Fatalf("Should not have received the message on bar") } + + // Now unsubscribe, we should receive an LS- without origin. + sub.Unsubscribe() + routeExpect(lunsubRe) + + // Quick check for queues + sub, _ = nc.QueueSubscribeSync("baz", "bat") + // Let it propagate to the main server + checkFor(t, time.Second, 10*time.Millisecond, func() error { + if !gacc.SubscriptionInterest("baz") { + return fmt.Errorf("No interest") + } + return nil + }) + // For "baz" + routeExpect(rlsubRe) + sub.Unsubscribe() + routeExpect(lunsubRe) + + // Restart our routed server, but this time indicate support + // for LS- with origin cluster. + rc.Close() + rc = createRouteConn(t, opts.Cluster.Host, opts.Cluster.Port) + defer rc.Close() + + routeSend, routeExpect = setupRouteEx(t, rc, opts, routeID) + + info = checkInfoMsg(t, rc) + info.ID = routeID + info.Name = "" + // These should be already set to true since the server that sends the + // INFO has them enabled, but just be explicit. + info.LNOC = true + info.LNOCU = true + b, err = json.Marshal(info) + if err != nil { + t.Fatalf("Could not marshal test route info: %v", err) + } + + routeSend(fmt.Sprintf("INFO %s\r\n", b)) + routeExpect(rlsubRe) + pingPong() + + // Check the LS+/LS- + sendLSProtosFromRoute(true) + + sub, _ = nc.SubscribeSync("bar") + routeExpect(rlsubRe) + sub.Unsubscribe() + routeExpect(rlunsubRe) + + sub, _ = nc.QueueSubscribeSync("baz", "bat") + routeExpect(rlsubRe) + sub.Unsubscribe() + routeExpect(rlunsubRe) } // Check that real duplicate subscription (that is, sent by client with same sid) diff --git a/test/test.go b/test/test.go index 3b49100b4a..52cb873ca1 100644 --- a/test/test.go +++ b/test/test.go @@ -372,9 +372,10 @@ var ( asubRe = regexp.MustCompile(`A\+\s+([^\r\n]+)\r\n`) aunsubRe = regexp.MustCompile(`A\-\s+([^\r\n]+)\r\n`) lsubRe = regexp.MustCompile(`LS\+\s+([^\s]+)\s*([^\s]+)?\s*(\d+)?\r\n`) - lunsubRe = regexp.MustCompile(`LS\-\s+([^\s]+)\s*([^\s]+)?\r\n`) + lunsubRe = regexp.MustCompile(`LS\-\s+([^\s]+)\s*([^\s]+)\s*([^\s]+)?\r\n`) lmsgRe = regexp.MustCompile(`(?:(?:LMSG\s+([^\s]+)\s+(?:([|+]\s+([\w\s]+)|[^\s]+)[^\S\r\n]+)?(\d+)\s*\r\n([^\\r\\n]*?)\r\n)+?)`) rlsubRe = regexp.MustCompile(`LS\+\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s*([^\s]+)?\s*(\d+)?\r\n`) + rlunsubRe = regexp.MustCompile(`LS\-\s+([^\s]+)\s+([^\s]+)\s+([^\s]+)\s*([^\s]+)?\r\n`) ) const (