Skip to content

Commit

Permalink
Added in another DQ test across leafnodes.
Browse files Browse the repository at this point in the history
This test has multiple leafnode connections to different accounts and to a shared account to make sure behavior is correct.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison committed Jun 16, 2023
1 parent 4a1b281 commit b3f9132
Showing 1 changed file with 268 additions and 0 deletions.
268 changes: 268 additions & 0 deletions server/leafnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5066,3 +5066,271 @@ func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithSeparateAccounts(t *tes
})
require_True(t, r2.Load() > r1.Load())
}

func TestLeafNodeWithWeightedDQRequestsToSuperClusterWithStreamImportAccounts(t *testing.T) {
var tmpl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
leaf { listen: 127.0.0.1:-1 }
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
EFG {
users = [ { user: "efg", pass: "p" } ]
jetstream: enabled
imports [
{ stream: { account: STL, subject: "REQUEST"} }
{ stream: { account: KSC, subject: "REQUEST"} }
]
}
STL {
users = [ { user: "stl", pass: "p" } ]
exports [ { stream: "REQUEST" } ]
}
KSC {
users = [ { user: "ksc", pass: "p" } ]
exports [ { stream: "REQUEST" } ]
}
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}`

sc := createJetStreamSuperClusterWithTemplate(t, tmpl, 3, 2)
defer sc.shutdown()

// Now create a leafnode cluster that has 2 LNs, one to each cluster but on separate accounts, ONE and TWO.
var lnTmpl = `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
{{leaf}}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts { $SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }}
`

var leafFrag = `
leaf {
listen: 127.0.0.1:-1
remotes [
{ urls: [ %s ] }
{ urls: [ %s ] }
{ urls: [ %s ] ; deny_export: REQUEST}
]
}`

// We want to have two leaf node connections that join to the same local account on the leafnode servers,
// but connect to different accounts in different clusters.
c1 := sc.clusters[0] // Will connect to account KSC
c2 := sc.clusters[1] // Will connect to account STL

genLeafTmpl := func(tmpl string) string {
t.Helper()

var ln1, ln2, ln3 []string
for _, s := range c1.servers {
if s.ClusterName() != c1.name {
continue
}
ln := s.getOpts().LeafNode
ln1 = append(ln1, fmt.Sprintf("nats://ksc:p@%s:%d", ln.Host, ln.Port))
}

for _, s := range c2.servers {
if s.ClusterName() != c2.name {
continue
}
ln := s.getOpts().LeafNode
ln2 = append(ln2, fmt.Sprintf("nats://stl:p@%s:%d", ln.Host, ln.Port))
ln3 = append(ln3, fmt.Sprintf("nats://efg:p@%s:%d", ln.Host, ln.Port))
}
return strings.Replace(tmpl, "{{leaf}}", fmt.Sprintf(leafFrag, strings.Join(ln1, ", "), strings.Join(ln2, ", "), strings.Join(ln3, ", ")), 1)
}

tmpl = strings.Replace(lnTmpl, "store_dir:", fmt.Sprintf(`domain: "%s", store_dir:`, "SA"), 1)
tmpl = genLeafTmpl(tmpl)

ln := createJetStreamCluster(t, tmpl, "SA", "SA-", 3, 22280, false)
ln.waitOnClusterReady()
defer ln.shutdown()

for _, s := range ln.servers {
checkLeafNodeConnectedCount(t, s, 3)
}

// Now connect DQ subscribers to each cluster but to the global account.

// Create 5 clients for each cluster / account
var c1c, c2c []*nats.Conn
for i := 0; i < 5; i++ {
nc1, _ := jsClientConnect(t, c1.randomServer(), nats.UserInfo("efg", "p"))
defer nc1.Close()
c1c = append(c1c, nc1)
nc2, _ := jsClientConnect(t, c2.randomServer(), nats.UserInfo("efg", "p"))
defer nc2.Close()
c2c = append(c2c, nc2)
}

createSubs := func(num int, conns []*nats.Conn) (subs []*nats.Subscription) {
for i := 0; i < num; i++ {
nc := conns[rand.Intn(len(conns))]
sub, err := nc.QueueSubscribeSync("REQUEST", "MC")
require_NoError(t, err)
subs = append(subs, sub)
nc.Flush()
}
// Let subs propagate.
time.Sleep(100 * time.Millisecond)
return subs
}
closeSubs := func(subs []*nats.Subscription) {
for _, sub := range subs {
sub.Unsubscribe()
}
}

// Simple test first.
subs1 := createSubs(1, c1c)
defer closeSubs(subs1)
subs2 := createSubs(1, c2c)
defer closeSubs(subs2)

sendRequests := func(num int) {
// Now connect to the leaf cluster and send some requests.
nc, _ := jsClientConnect(t, ln.randomServer())
defer nc.Close()

for i := 0; i < num; i++ {
require_NoError(t, nc.Publish("REQUEST", []byte("HELP")))
}
nc.Flush()
}

pending := func(subs []*nats.Subscription) (total int) {
for _, sub := range subs {
n, _, err := sub.Pending()
require_NoError(t, err)
total += n
}
return total
}

num := 1000
checkAllReceived := func() error {
total := pending(subs1) + pending(subs2)
if total == num {
return nil
}
return fmt.Errorf("Not all received: %d vs %d", total, num)
}

checkBalanced := func(total, pc1, pc2 int) {
tf := float64(total)
e1 := tf * (float64(pc1) / 100.00)
e2 := tf * (float64(pc2) / 100.00)
delta := tf / 10
p1 := float64(pending(subs1))
if p1 < e1-delta || p1 > e1+delta {
t.Fatalf("Value out of range for subs1, expected %v got %v", e1, p1)
}
p2 := float64(pending(subs2))
if p2 < e2-delta || p2 > e2+delta {
t.Fatalf("Value out of range for subs2, expected %v got %v", e2, p2)
}
}

// Now connect to the leaf cluster and send some requests.

// Simple 50/50
sendRequests(num)
checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived)
checkBalanced(num, 50, 50)

closeSubs(subs1)
closeSubs(subs2)

// Now test unbalanced. 10/90
subs1 = createSubs(1, c1c)
defer closeSubs(subs1)
subs2 = createSubs(9, c2c)
defer closeSubs(subs2)

sendRequests(num)
checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived)
checkBalanced(num, 10, 90)

closeSubs(subs1)
closeSubs(subs2)

// Now test unbalanced. 80/20
subs1 = createSubs(80, c1c)
defer closeSubs(subs1)
subs2 = createSubs(20, c2c)
defer closeSubs(subs2)

sendRequests(num)
checkFor(t, time.Second, 200*time.Millisecond, checkAllReceived)
checkBalanced(num, 80, 20)

// Now test draining the subs as we are sending from an initial balanced situation simulating a draining of a cluster.

closeSubs(subs1)
closeSubs(subs2)
subs1, subs2 = nil, nil

// These subs slightly different.
var r1, r2 atomic.Uint64
for i := 0; i < 20; i++ {
nc := c1c[rand.Intn(len(c1c))]
sub, err := nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r1.Add(1) })
require_NoError(t, err)
subs1 = append(subs1, sub)
nc.Flush()

nc = c2c[rand.Intn(len(c2c))]
sub, err = nc.QueueSubscribe("REQUEST", "MC", func(m *nats.Msg) { r2.Add(1) })
require_NoError(t, err)
subs2 = append(subs2, sub)
nc.Flush()
}
defer closeSubs(subs1)
defer closeSubs(subs2)

nc, _ := jsClientConnect(t, ln.randomServer())
defer nc.Close()

for i, dindex := 0, 1; i < num; i++ {
require_NoError(t, nc.Publish("REQUEST", []byte("HELP")))
// Check if we have more to simulate draining.
// Will drain within first ~100 requests using 20% rand test below.
// Will leave 1 behind.
if dindex < len(subs1)-1 && rand.Intn(6) > 4 {
sub := subs1[dindex]
dindex++
sub.Drain()
}
}
nc.Flush()

checkFor(t, time.Second, 200*time.Millisecond, func() error {
total := int(r1.Load() + r2.Load())
if total == num {
return nil
}
return fmt.Errorf("Not all received: %d vs %d", total, num)
})
require_True(t, r2.Load() > r1.Load())
}

0 comments on commit b3f9132

Please sign in to comment.