Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Cluster: load balancing of queue groups from attached leaf nodes #6043

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4790,12 +4790,18 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
ql := _ql[:0]
for i := 0; i < len(qsubs); i++ {
sub = qsubs[i]
if sub.client.kind == LEAF || sub.client.kind == ROUTER {
// If we have assigned an rsub already, replace if the destination is a LEAF
// since we want to favor that compared to a ROUTER. We could make sure that
// we override only if previous was a ROUTE and not a LEAF, but we don't have to.
if rsub == nil || sub.client.kind == LEAF {
if dst := sub.client.kind; dst == LEAF || dst == ROUTER {
// If we have assigned an ROUTER rsub already, replace if
// the destination is a LEAF since we want to favor that.
if rsub == nil || (rsub.client.kind == ROUTER && dst == LEAF) {
rsub = sub
} else if dst == LEAF {
// We already have a LEAF and this is another one.
// Flip a coin to see if we swap it or not.
// See https://github.com/nats-io/nats-server/issues/6040
if fastrand.Uint32()%2 == 1 {
rsub = sub
}
}
} else {
ql = append(ql, sub)
Expand Down
116 changes: 116 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4117,6 +4117,122 @@ func TestLeafNodeQueueGroupDistribution(t *testing.T) {
sendAndCheck(2)
}

func TestLeafNodeQueueGroupDistributionVariant(t *testing.T) {
hc := createClusterWithName(t, "HUB", 3)
defer hc.shutdown()

// Now have a cluster of leafnodes with LEAF1 and LEAF2 connecting to HUB1.
c1 := `
server_name: LEAF1
listen: 127.0.0.1:-1
cluster { name: ln22, listen: 127.0.0.1:-1 }
leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] }
`
lconf1 := createConfFile(t, []byte(fmt.Sprintf(c1, hc.opts[0].LeafNode.Port)))
ln1, lopts1 := RunServerWithConfig(lconf1)
defer ln1.Shutdown()

c2 := `
server_name: LEAF2
listen: 127.0.0.1:-1
cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] }
leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] }
`
lconf2 := createConfFile(t, []byte(fmt.Sprintf(c2, lopts1.Cluster.Port, hc.opts[0].LeafNode.Port)))
ln2, _ := RunServerWithConfig(lconf2)
defer ln2.Shutdown()

// And LEAF3 to HUB3
c3 := `
server_name: LEAF3
listen: 127.0.0.1:-1
cluster { name: ln22, listen: 127.0.0.1:-1, routes = [ nats-route://127.0.0.1:%d] }
leafnodes { remotes = [{ url: nats-leaf://127.0.0.1:%d }] }
`
lconf3 := createConfFile(t, []byte(fmt.Sprintf(c3, lopts1.Cluster.Port, hc.opts[2].LeafNode.Port)))
ln3, _ := RunServerWithConfig(lconf3)
defer ln3.Shutdown()

// Check leaf cluster is formed and all connected to the HUB.
lnServers := []*Server{ln1, ln2, ln3}
checkClusterFormed(t, lnServers...)
for _, s := range lnServers {
checkLeafNodeConnected(t, s)
}
// Check that HUB1 has 2 leaf connections, HUB2 has 0 and HUB3 has 1.
checkLeafNodeConnectedCount(t, hc.servers[0], 2)
checkLeafNodeConnectedCount(t, hc.servers[1], 0)
checkLeafNodeConnectedCount(t, hc.servers[2], 1)

// Create a client and qsub on LEAF1 and LEAF2.
nc1 := natsConnect(t, ln1.ClientURL())
defer nc1.Close()
var qsub1Count atomic.Int32
natsQueueSub(t, nc1, "foo", "queue1", func(_ *nats.Msg) {
qsub1Count.Add(1)
})
natsFlush(t, nc1)

nc2 := natsConnect(t, ln2.ClientURL())
defer nc2.Close()
var qsub2Count atomic.Int32
natsQueueSub(t, nc2, "foo", "queue1", func(_ *nats.Msg) {
qsub2Count.Add(1)
})
natsFlush(t, nc2)

// Make sure that the propagation interest is done before sending.
for i, s := range hc.servers {
gacc := s.GlobalAccount()
var ei int
switch i {
case 0:
ei = 2
default:
ei = 1
}
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if n := gacc.Interest("foo"); n != ei {
return fmt.Errorf("Expected interest for %q to be %d, got %v", "foo", ei, n)
}
return nil
})
}

sendAndCheck := func(idx int) {
t.Helper()
nchub := natsConnect(t, hc.servers[idx].ClientURL())
defer nchub.Close()
total := 1000
for i := 0; i < total; i++ {
natsPub(t, nchub, "foo", []byte("from hub"))
}
checkFor(t, time.Second, 15*time.Millisecond, func() error {
if trecv := int(qsub1Count.Load() + qsub2Count.Load()); trecv != total {
return fmt.Errorf("Expected %v messages, got %v", total, trecv)
}
return nil
})
// Now that we have made sure that all messages were received,
// check that qsub1 and qsub2 are getting at least some.
if n := int(qsub1Count.Load()); n <= total/10 {
t.Fatalf("Expected qsub1 to get some messages, but got %v (qsub2=%v)", n, qsub2Count.Load())
}
if n := int(qsub2Count.Load()); n <= total/10 {
t.Fatalf("Expected qsub2 to get some messages, but got %v (qsub1=%v)", n, qsub1Count.Load())
}
// Reset the counters.
qsub1Count.Store(0)
qsub2Count.Store(0)
}
// Send from HUB1
sendAndCheck(0)
// Send from HUB2
sendAndCheck(1)
// Send from HUB3
sendAndCheck(2)
}

func TestLeafNodeQueueGroupWithLateLNJoin(t *testing.T) {
/*

Expand Down