diff --git a/server/client.go b/server/client.go index 87fc410a32e..b308389f7b6 100644 --- a/server/client.go +++ b/server/client.go @@ -4790,12 +4790,18 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver, ql := _ql[:0] for i := 0; i < len(qsubs); i++ { sub = qsubs[i] - if sub.client.kind == LEAF || sub.client.kind == ROUTER { - // If we have assigned an rsub already, replace if the destination is a LEAF - // since we want to favor that compared to a ROUTER. We could make sure that - // we override only if previous was a ROUTE and not a LEAF, but we don't have to. - if rsub == nil || sub.client.kind == LEAF { + if dst := sub.client.kind; dst == LEAF || dst == ROUTER { + // If we have assigned an ROUTER rsub already, replace if + // the destination is a LEAF since we want to favor that. + if rsub == nil || (rsub.client.kind == ROUTER && dst == LEAF) { rsub = sub + } else if dst == LEAF { + // We already have a LEAF and this is another one. + // Flip a coin to see if we swap it or not. + // See https://github.com/nats-io/nats-server/issues/6040 + if fastrand.Uint32()%2 == 1 { + rsub = sub + } } } else { ql = append(ql, sub) diff --git a/server/leafnode_test.go b/server/leafnode_test.go index 698066c03db..0a068ccf665 100644 --- a/server/leafnode_test.go +++ b/server/leafnode_test.go @@ -4117,6 +4117,122 @@ func TestLeafNodeQueueGroupDistribution(t *testing.T) { sendAndCheck(2) } +func TestLeafNodeQueueGroupDistributionVariant(t *testing.T) { + hc := createClusterWithName(t, "HUB", 3) + defer hc.shutdown() + + // Now have a cluster of leafnodes with LEAF1 and LEAF2 connecting to HUB1. + c1 := ` + server_name: LEAF1 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1 } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf1 := createConfFile(t, []byte(fmt.Sprintf(c1, hc.opts[0].LeafNode.Port))) + ln1, lopts1 := RunServerWithConfig(lconf1) + defer ln1.Shutdown() + + c2 := ` + server_name: LEAF2 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf2 := createConfFile(t, []byte(fmt.Sprintf(c2, lopts1.Cluster.Port, hc.opts[0].LeafNode.Port))) + ln2, _ := RunServerWithConfig(lconf2) + defer ln2.Shutdown() + + // And LEAF3 to HUB3 + c3 := ` + server_name: LEAF3 + listen: 127.0.0.1:-1 + cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] } + leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] } + ` + lconf3 := createConfFile(t, []byte(fmt.Sprintf(c3, lopts1.Cluster.Port, hc.opts[2].LeafNode.Port))) + ln3, _ := RunServerWithConfig(lconf3) + defer ln3.Shutdown() + + // Check leaf cluster is formed and all connected to the HUB. + lnServers := []*Server{ln1, ln2, ln3} + checkClusterFormed(t, lnServers...) + for _, s := range lnServers { + checkLeafNodeConnected(t, s) + } + // Check that HUB1 has 2 leaf connections, HUB2 has 0 and HUB3 has 1. + checkLeafNodeConnectedCount(t, hc.servers[0], 2) + checkLeafNodeConnectedCount(t, hc.servers[1], 0) + checkLeafNodeConnectedCount(t, hc.servers[2], 1) + + // Create a client and qsub on LEAF1 and LEAF2. + nc1 := natsConnect(t, ln1.ClientURL()) + defer nc1.Close() + var qsub1Count atomic.Int32 + natsQueueSub(t, nc1, "foo", "queue1", func(_ *nats.Msg) { + qsub1Count.Add(1) + }) + natsFlush(t, nc1) + + nc2 := natsConnect(t, ln2.ClientURL()) + defer nc2.Close() + var qsub2Count atomic.Int32 + natsQueueSub(t, nc2, "foo", "queue1", func(_ *nats.Msg) { + qsub2Count.Add(1) + }) + natsFlush(t, nc2) + + // Make sure that the propagation interest is done before sending. + for i, s := range hc.servers { + gacc := s.GlobalAccount() + var ei int + switch i { + case 0: + ei = 2 + default: + ei = 1 + } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if n := gacc.Interest("foo"); n != ei { + return fmt.Errorf("Expected interest for %q to be %d, got %v", "foo", ei, n) + } + return nil + }) + } + + sendAndCheck := func(idx int) { + t.Helper() + nchub := natsConnect(t, hc.servers[idx].ClientURL()) + defer nchub.Close() + total := 1000 + for i := 0; i < total; i++ { + natsPub(t, nchub, "foo", []byte("from hub")) + } + checkFor(t, time.Second, 15*time.Millisecond, func() error { + if trecv := int(qsub1Count.Load() + qsub2Count.Load()); trecv != total { + return fmt.Errorf("Expected %v messages, got %v", total, trecv) + } + return nil + }) + // Now that we have made sure that all messages were received, + // check that qsub1 and qsub2 are getting at least some. + if n := int(qsub1Count.Load()); n <= total/10 { + t.Fatalf("Expected qsub1 to get some messages, but got %v (qsub2=%v)", n, qsub2Count.Load()) + } + if n := int(qsub2Count.Load()); n <= total/10 { + t.Fatalf("Expected qsub2 to get some messages, but got %v (qsub1=%v)", n, qsub1Count.Load()) + } + // Reset the counters. + qsub1Count.Store(0) + qsub2Count.Store(0) + } + // Send from HUB1 + sendAndCheck(0) + // Send from HUB2 + sendAndCheck(1) + // Send from HUB3 + sendAndCheck(2) +} + func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) { /*