Skip to content

Commit

Permalink
[FIXED] LeafNodes: Queue interest may be lost in super cluster
Browse files Browse the repository at this point in the history
If a cluster has leafnode connections and each have the same queue
group, the loss of a leafnode connection could cause the server
in the "hub" cluster to drop interest across a gateway for this
queue group.

The issue is fixed by properly accounting for all queue sub and
unsub in the server gateway interest map.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Jan 16, 2025
1 parent 3616832 commit af6541d
Show file tree
Hide file tree
Showing 3 changed files with 361 additions and 22 deletions.
20 changes: 9 additions & 11 deletions server/leafnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2525,7 +2525,6 @@ func (c *client) processLeafSub(argo []byte) (err error) {
}
key := bytesToString(sub.sid)
osub := c.subs[key]
updateGWs := false
if osub == nil {
c.subs[key] = sub
// Now place into the account sl.
Expand All @@ -2536,7 +2535,6 @@ func (c *client) processLeafSub(argo []byte) (err error) {
c.sendErr("Invalid Subscription")
return nil
}
updateGWs = srv.gateway.enabled
} else if sub.queue != nil {
// For a queue we need to update the weight.
delta = sub.qw - atomic.LoadInt32(&osub.qw)
Expand All @@ -2559,7 +2557,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
if !spoke {
// If we are routing add to the route map for the associated account.
srv.updateRouteSubscriptionMap(acc, sub, delta)
if updateGWs {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
}
}
Expand Down Expand Up @@ -2601,27 +2599,27 @@ func (c *client) processLeafUnsub(arg []byte) error {
return nil
}

updateGWs := false
spoke := c.isSpokeLeafNode()
// We store local subs by account and subject and optionally queue name.
// LS- will have the arg exactly as the key.
sub, ok := c.subs[string(arg)]
if !ok {
// If not found, don't try to update routes/gws/leaf nodes.
c.mu.Unlock()
return nil
}
delta := int32(1)
if ok && len(sub.queue) > 0 {
if len(sub.queue) > 0 {
delta = sub.qw
}
c.mu.Unlock()

if ok {
c.unsubscribe(acc, sub, true, true)
updateGWs = srv.gateway.enabled
}

c.unsubscribe(acc, sub, true, true)
if !spoke {
// If we are routing subtract from the route map for the associated account.
srv.updateRouteSubscriptionMap(acc, sub, -delta)
// Gateways
if updateGWs {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
}
}
Expand Down
Loading

0 comments on commit af6541d

Please sign in to comment.