Skip to content

Commit

Permalink
[FIXED] LeafNode: propagation of (no)interest issues
Browse files Browse the repository at this point in the history
There were multiple issues, but basically the fact that we would
not store the routed subscriptions with the origin of the LEAF they
came from made the server unable to differentiate those compared to
"local" routed subscriptions, which in some cases (like a server
restart and the resend of subscriptions) could lead to servers
sending incorrectly subscription interest to leaf connections.

We are now storing the subscriptions with a sub type indicator and
the origin (for leaf subscriptions) as part of the key. This allows
to differentiate "regular" routed subs versus the ones on behalf
of a leafnode.
An INFO boolean is added `LNOCU` to indicate support for origin
in the `LS-` protocol, which is required to properly handle the
removal. Therefore, if a route does not have `LNOCU`, the server
will behave like an old server, and store with the key that does
not contain the origin, so that it can be removed when getting
an LS- without the origin. Note that in the case of a mix of servers
in the same cluster, some of the issues this PR is trying to fix
will be present (since the server will basically behave like a
server without the fix).

Having a different routed subs for leaf connections allow to revisit
the fix #5982 that was done for issue #5972, which was about
a more fair queue distribution to a cluster of leaf connections.
That fix actually introduced a change in that we always wanted to
favor queue subscriptions of the cluster where the message is produced,
which that fix possibly changed. With this current PR, the server
can now know if a remote queue sub is for a "local" queue sub there
or on behalf of a leaf and therefore will not favor that route compared
to a leaf subscription that it may have directly attached.

Resolves #5972
Resolves #6148

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Nov 22, 2024
1 parent f66c73d commit d81310c
Show file tree
Hide file tree
Showing 12 changed files with 951 additions and 133 deletions.
23 changes: 18 additions & 5 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4866,19 +4866,32 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}
// Remember that leaf in case we don't find any other candidate.
// We already start randomly in lqs slice, so we don't need
// to do a random swap if we already have an rsub like we do
// when src == ROUTER above.
if rsub == nil {
rsub = sub
}
continue
} else {
// We would be picking a route, but if we had remembered a "hub" leaf,
// then pick that one instead of the route.
if rsub != nil && rsub.client.kind == LEAF && rsub.client.isHubLeafNode() {
break
// We want to favor qsubs in our own cluster. If the routed
// qsub has an origin, it means that is on behalf of a leaf.
// We need to treat it differently.
if len(sub.origin) > 0 {
// If we already have an rsub, nothing to do. Also, do
// not pick a routed qsub for a LEAF origin cluster
// that is the same than where the message comes from.
if rsub == nil && (leafOrigin == _EMPTY_ || leafOrigin != bytesToString(sub.origin)) {
rsub = sub
}
continue
}
// This is a qsub that is local on the remote server (or
// we are connected to an older server and we don't know).
// Pick this one and be done.
rsub = sub
break
}
break
}

// Assume delivery subject is normal subject to this point.
Expand Down
3 changes: 3 additions & 0 deletions server/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ const (
// MAX_HPUB_ARGS Maximum possible number of arguments from HPUB proto.
MAX_HPUB_ARGS = 4

// MAX_RSUB_ARGS Maximum possible number of arguments from a RS+/LS+ proto.
MAX_RSUB_ARGS = 6

// DEFAULT_MAX_CLOSED_CLIENTS is the maximum number of closed connections we hold onto.
DEFAULT_MAX_CLOSED_CLIENTS = 10000

Expand Down
2 changes: 1 addition & 1 deletion server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -1912,7 +1912,7 @@ func (c *client) processGatewayAccountSub(accName string) error {
// the sublist if present.
// <Invoked from outbound connection's readLoop>
func (c *client) processGatewayRUnsub(arg []byte) error {
accName, subject, queue, err := c.parseUnsubProto(arg)
_, accName, subject, queue, err := c.parseUnsubProto(arg, true, false)
if err != nil {
return fmt.Errorf("processGatewaySubjectUnsub %s", err.Error())
}
Expand Down
56 changes: 52 additions & 4 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2353,6 +2353,42 @@ func keyFromSub(sub *subscription) string {
return sb.String()
}

const (
keyRoutedSub = "R"
keyRoutedSubByte = 'R'
keyRoutedLeafSub = "L"
keyRoutedLeafSubByte = 'L'
)

// Helper function to build the key that prevents collisions between normal
// routed subscriptions and routed subscriptions on behalf of a leafnode.
// Keys will look like this:
// "R foo" -> plain routed sub on "foo"
// "R foo bar" -> queue routed sub on "foo", queue "bar"
// "L foo bar" -> plain routed leaf sub on "foo", leaf "bar"
// "L foo bar baz" -> queue routed sub on "foo", queue "bar", leaf "baz"
func keyFromSubWithOrigin(sub *subscription) string {
var sb strings.Builder
sb.Grow(2 + len(sub.origin) + 1 + len(sub.subject) + 1 + len(sub.queue))
leaf := len(sub.origin) > 0
if leaf {
sb.WriteByte(keyRoutedLeafSubByte)
} else {
sb.WriteByte(keyRoutedSubByte)
}
sb.WriteByte(' ')
sb.Write(sub.subject)
if sub.queue != nil {
sb.WriteByte(' ')
sb.Write(sub.queue)
}
if leaf {
sb.WriteByte(' ')
sb.Write(sub.origin)
}
return sb.String()
}

// Lock should be held.
func (c *client) writeLeafSub(w *bytes.Buffer, key string, n int32) {
if key == _EMPTY_ {
Expand Down Expand Up @@ -2403,12 +2439,21 @@ func (c *client) processLeafSub(argo []byte) (err error) {
args := splitArg(arg)
sub := &subscription{client: c}

delta := int32(1)
switch len(args) {
case 1:
sub.queue = nil
case 3:
sub.queue = args[1]
sub.qw = int32(parseSize(args[2]))
// TODO: (ik) We should have a non empty queue name and a queue
// weight >= 1. For 2.11, we may want to return an error if that
// is not the case, but for now just overwrite `delta` if queue
// weight is greater than 1 (it is possible after a reconnect/
// server restart to receive a queue weight > 1 for a new sub).
if sub.qw > 1 {
delta = sub.qw
}
default:
return fmt.Errorf("processLeafSub Parse Error: '%s'", arg)
}
Expand Down Expand Up @@ -2473,7 +2518,6 @@ func (c *client) processLeafSub(argo []byte) (err error) {
key := bytesToString(sub.sid)
osub := c.subs[key]
updateGWs := false
delta := int32(1)
if osub == nil {
c.subs[key] = sub
// Now place into the account sl.
Expand Down Expand Up @@ -2554,6 +2598,10 @@ func (c *client) processLeafUnsub(arg []byte) error {
// We store local subs by account and subject and optionally queue name.
// LS- will have the arg exactly as the key.
sub, ok := c.subs[string(arg)]
delta := int32(1)
if ok && len(sub.queue) > 0 {
delta = sub.qw
}
c.mu.Unlock()

if ok {
Expand All @@ -2563,14 +2611,14 @@ func (c *client) processLeafUnsub(arg []byte) error {

if !spoke {
// If we are routing subtract from the route map for the associated account.
srv.updateRouteSubscriptionMap(acc, sub, -1)
srv.updateRouteSubscriptionMap(acc, sub, -delta)
// Gateways
if updateGWs {
srv.gatewayUpdateSubInterest(acc.Name, sub, -1)
srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
}
}
// Now check on leafnode updates for other leaf nodes.
acc.updateLeafNodes(sub, -1)
acc.updateLeafNodes(sub, -delta)
return nil
}

Expand Down
Loading

0 comments on commit d81310c

Please sign in to comment.