From 4e32600e2041a6a4e755b48bc4b06c4199de21db Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Fri, 25 Oct 2024 12:20:36 -0600 Subject: [PATCH] [FIXED] Cluster: loab balancing of queue groups from attached leaf nodes When receiving a message from a route, if the queue interest is only routes and leaf connections, the server will always favor the leaf kind. However, if there were more than one leaf connections for the same queue group, the server will always pick the same (the last one). It would change if new leaf were to connect or disconnect/reconnect, but with a stable set, it would always be the same. This PR makes this selection somewhat random. Resolves #6040 Signed-off-by: Ivan Kozlovic --- server/client.go | 16 ++++-- server/leafnode_test.go | 116 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 127 insertions(+), 5 deletions(-) diff --git a/server/client.go b/server/client.go index f9ddbebc2d..584c90aedc 100644 --- a/server/client.go +++ b/server/client.go @@ -4608,12 +4608,18 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, ql := _ql[:0] for i := 0; i < len(qsubs); i++ { sub = qsubs[i] - if sub.client.kind == LEAF || sub.client.kind == ROUTER { - // If we have assigned an rsub already, replace if the destination is a LEAF - // since we want to favor that compared to a ROUTER. We could make sure that - // we override only if previous was a ROUTE and not a LEAF, but we don't have to. - if rsub == nil || sub.client.kind == LEAF { + if dst := sub.client.kind; dst == LEAF || dst == ROUTER { + // If we have assigned an ROUTER rsub already, replace if + // the destination is a LEAF since we want to favor that. + if rsub == nil || (rsub.client.kind == ROUTER && dst == LEAF) { rsub = sub + } else if dst == LEAF { + // We already have a LEAF and this is another one. + // Flip a coin to see if we swap it or not. + // See https://github.com/nats-io/nats-server/issues/6040 + if fastrand.Uint32()%2 == 1 { + rsub = sub + } } } else { ql = append(ql, sub) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 0757748785..3ca25b484d 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -4117,6 +4117,122 @@ func TestLeafNodeQueueGroupDistribution(t *testing.T) { sendAndCheck(2) } +func TestLeafNodeQueueGroupDistributionVariant(t *testing.T) { + hc := createClusterWithName(t, "HUB", 3) + defer hc.shutdown() + + // Now have a cluster of leafnodes with LEAF1 and LEAF2 connecting to HUB1. + c1 := ` + server_name: LEAF1 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1 } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf1 := createConfFile(t, []byte(fmt.Sprintf(c1, hc.opts[0].LeafNode.Port))) + ln1, lopts1 := RunServerWithConfig(lconf1) + defer ln1.Shutdown() + + c2 := ` + server_name: LEAF2 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf2 := createConfFile(t, []byte(fmt.Sprintf(c2, lopts1.Cluster.Port, hc.opts[0].LeafNode.Port))) + ln2, _ := RunServerWithConfig(lconf2) + defer ln2.Shutdown() + + // And LEAF3 to HUB3 + c3 := ` + server_name: LEAF3 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf3 := createConfFile(t, []byte(fmt.Sprintf(c3, lopts1.Cluster.Port, hc.opts[2].LeafNode.Port))) + ln3, _ := RunServerWithConfig(lconf3) + defer ln3.Shutdown() + + // Check leaf cluster is formed and all connected to the HUB. + lnServers := []*Server{ln1, ln2, ln3} + checkClusterFormed(t, lnServers...) + for _, s := range lnServers { + checkLeafNodeConnected(t, s) + } + // Check that HUB1 has 2 leaf connections, HUB2 has 0 and HUB3 has 1. + checkLeafNodeConnectedCount(t, hc.servers[0], 2) + checkLeafNodeConnectedCount(t, hc.servers[1], 0) + checkLeafNodeConnectedCount(t, hc.servers[2], 1) + + // Create a client and qsub on LEAF1 and LEAF2. + nc1 := natsConnect(t, ln1.ClientURL()) + defer nc1.Close() + var qsub1Count atomic.Int32 + natsQueueSub(t, nc1, "foo", "queue1", func(_ *nats.Msg) { + qsub1Count.Add(1) + }) + natsFlush(t, nc1) + + nc2 := natsConnect(t, ln2.ClientURL()) + defer nc2.Close() + var qsub2Count atomic.Int32 + natsQueueSub(t, nc2, "foo", "queue1", func(_ *nats.Msg) { + qsub2Count.Add(1) + }) + natsFlush(t, nc2) + + // Make sure that the propagation interest is done before sending. + for i, s := range hc.servers { + gacc := s.GlobalAccount() + var ei int + switch i { + case 0: + ei = 2 + default: + ei = 1 + } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if n := gacc.Interest("foo"); n != ei { + return fmt.Errorf("Expected interest for %q to be %d, got %v", "foo", ei, n) + } + return nil + }) + } + + sendAndCheck := func(idx int) { + t.Helper() + nchub := natsConnect(t, hc.servers[idx].ClientURL()) + defer nchub.Close() + total := 1000 + for i := 0; i < total; i++ { + natsPub(t, nchub, "foo", []byte("from hub")) + } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if trecv := int(qsub1Count.Load() + qsub2Count.Load()); trecv != total { + return fmt.Errorf("Expected %v messages, got %v", total, trecv) + } + return nil + }) + // Now that we have made sure that all messages were received, + // check that qsub1 and qsub2 are getting at least some. + if n := int(qsub1Count.Load()); n <= total/10 { + t.Fatalf("Expected qsub1 to get some messages, but got %v (qsub2=%v)", n, qsub2Count.Load()) + } + if n := int(qsub2Count.Load()); n <= total/10 { + t.Fatalf("Expected qsub2 to get some messages, but got %v (qsub1=%v)", n, qsub1Count.Load()) + } + // Reset the counters. + qsub1Count.Store(0) + qsub2Count.Store(0) + } + // Send from HUB1 + sendAndCheck(0) + // Send from HUB2 + sendAndCheck(1) + // Send from HUB3 + sendAndCheck(2) +} + func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) { /*