Skip to content

Commit

Permalink
[FIXED] Cluster: loab balancing of queue groups from attached leaf nodes
Browse files Browse the repository at this point in the history
When receiving a message from a route, if the queue interest is
only routes and leaf connections, the server will always favor
the leaf kind. However, if there were more than one leaf connections
for the same queue group, the server will always pick the same
(the last one). It would change if new leaf were to connect or
disconnect/reconnect, but with a stable set, it would always
be the same.

This PR makes this selection somewhat random.

Resolves #6040

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic authored and neilalexander committed Oct 29, 2024
1 parent a6810ba commit 0a99ac9
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 5 deletions.
16 changes: 11 additions & 5 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4608,12 +4608,18 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
ql := _ql[:0]
for i := 0; i < len(qsubs); i++ {
sub = qsubs[i]
if sub.client.kind == LEAF || sub.client.kind == ROUTER {
// If we have assigned an rsub already, replace if the destination is a LEAF
// since we want to favor that compared to a ROUTER. We could make sure that
// we override only if previous was a ROUTE and not a LEAF, but we don't have to.
if rsub == nil || sub.client.kind == LEAF {
if dst := sub.client.kind; dst == LEAF || dst == ROUTER {
// If we have assigned an ROUTER rsub already, replace if
// the destination is a LEAF since we want to favor that.
if rsub == nil || (rsub.client.kind == ROUTER && dst == LEAF) {
rsub = sub
} else if dst == LEAF {
// We already have a LEAF and this is another one.
// Flip a coin to see if we swap it or not.
// See https://github.com/nats-io/nats-server/issues/6040
if fastrand.Uint32()%2 == 1 {
rsub = sub
}
}
} else {
ql = append(ql, sub)
Expand Down
116 changes: 116 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4117,6 +4117,122 @@ func TestLeafNodeQueueGroupDistribution(t *testing.T) {
sendAndCheck(2)
}

func TestLeafNodeQueueGroupDistributionVariant(t *testing.T) {
hc := createClusterWithName(t, "HUB", 3)
defer hc.shutdown()

// Now have a cluster of leafnodes with LEAF1 and LEAF2 connecting to HUB1.
c1 := `
server_name: LEAF1
listen: 127.0.0.1:-1
cluster { name: ln22, listen: 127.0.0.1:-1 }
leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] }
`
lconf1 := createConfFile(t, []byte(fmt.Sprintf(c1, hc.opts[0].LeafNode.Port)))
ln1, lopts1 := RunServerWithConfig(lconf1)
defer ln1.Shutdown()

c2 := `
server_name: LEAF2
listen: 127.0.0.1:-1
cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] }
leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] }
`
lconf2 := createConfFile(t, []byte(fmt.Sprintf(c2, lopts1.Cluster.Port, hc.opts[0].LeafNode.Port)))
ln2, _ := RunServerWithConfig(lconf2)
defer ln2.Shutdown()

// And LEAF3 to HUB3
c3 := `
server_name: LEAF3
listen: 127.0.0.1:-1
cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] }
leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] }
`
lconf3 := createConfFile(t, []byte(fmt.Sprintf(c3, lopts1.Cluster.Port, hc.opts[2].LeafNode.Port)))
ln3, _ := RunServerWithConfig(lconf3)
defer ln3.Shutdown()

// Check leaf cluster is formed and all connected to the HUB.
lnServers := []*Server{ln1, ln2, ln3}
checkClusterFormed(t, lnServers...)
for _, s := range lnServers {
checkLeafNodeConnected(t, s)
}
// Check that HUB1 has 2 leaf connections, HUB2 has 0 and HUB3 has 1.
checkLeafNodeConnectedCount(t, hc.servers[0], 2)
checkLeafNodeConnectedCount(t, hc.servers[1], 0)
checkLeafNodeConnectedCount(t, hc.servers[2], 1)

// Create a client and qsub on LEAF1 and LEAF2.
nc1 := natsConnect(t, ln1.ClientURL())
defer nc1.Close()
var qsub1Count atomic.Int32
natsQueueSub(t, nc1, "foo", "queue1", func(_ *nats.Msg) {
qsub1Count.Add(1)
})
natsFlush(t, nc1)

nc2 := natsConnect(t, ln2.ClientURL())
defer nc2.Close()
var qsub2Count atomic.Int32
natsQueueSub(t, nc2, "foo", "queue1", func(_ *nats.Msg) {
qsub2Count.Add(1)
})
natsFlush(t, nc2)

// Make sure that the propagation interest is done before sending.
for i, s := range hc.servers {
gacc := s.GlobalAccount()
var ei int
switch i {
case 0:
ei = 2
default:
ei = 1
}
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if n := gacc.Interest("foo"); n != ei {
return fmt.Errorf("Expected interest for %q to be %d, got %v", "foo", ei, n)
}
return nil
})
}

sendAndCheck := func(idx int) {
t.Helper()
nchub := natsConnect(t, hc.servers[idx].ClientURL())
defer nchub.Close()
total := 1000
for i := 0; i < total; i++ {
natsPub(t, nchub, "foo", []byte("from hub"))
}
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if trecv := int(qsub1Count.Load() + qsub2Count.Load()); trecv != total {
return fmt.Errorf("Expected %v messages, got %v", total, trecv)
}
return nil
})
// Now that we have made sure that all messages were received,
// check that qsub1 and qsub2 are getting at least some.
if n := int(qsub1Count.Load()); n <= total/10 {
t.Fatalf("Expected qsub1 to get some messages, but got %v (qsub2=%v)", n, qsub2Count.Load())
}
if n := int(qsub2Count.Load()); n <= total/10 {
t.Fatalf("Expected qsub2 to get some messages, but got %v (qsub1=%v)", n, qsub1Count.Load())
}
// Reset the counters.
qsub1Count.Store(0)
qsub2Count.Store(0)
}
// Send from HUB1
sendAndCheck(0)
// Send from HUB2
sendAndCheck(1)
// Send from HUB3
sendAndCheck(2)
}

func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) {
/*
Expand Down

0 comments on commit 0a99ac9

Please sign in to comment.