Skip to content

Commit

Permalink
test: static sharding limits
Browse files Browse the repository at this point in the history
  • Loading branch information
romanzac committed Apr 3, 2024
1 parent c80456d commit 873caa8
Showing 1 changed file with 73 additions and 0 deletions.
73 changes: 73 additions & 0 deletions waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package node
import (
"bytes"
"context"
"fmt"
"math/big"
"net"
"sync"
Expand Down Expand Up @@ -98,6 +99,7 @@ func TestUpAndDown(t *testing.T) {
WithWakuRelay(),
WithDiscoveryV5(0, bootnodes, true),
)

require.NoError(t, err)

for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -412,3 +414,74 @@ func TestStaticShardingMultipleTopics(t *testing.T) {
tests.WaitForTimeout(t, ctx, &wg, subs1[0].Ch)

}

func TestStaticShardingLimits(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 140*time.Second)
defer cancel()

// Node1 with Relay
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
discv5UDPPort1, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3)
require.NoError(t, err)
wakuNode1, err := New(
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithClusterID(uint16(21)),
WithDiscoveryV5(uint(discv5UDPPort1), nil, true),
)
require.NoError(t, err)
err = wakuNode1.Start(ctx)
require.NoError(t, err)
defer wakuNode1.Stop()

// Node2 with Relay
hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
discv5UDPPort2, err := tests.FindFreeUDPPort(t, "0.0.0.0", 3)
require.NoError(t, err)
wakuNode2, err := New(
WithHostAddress(hostAddr2),
WithWakuRelay(),
WithClusterID(uint16(21)),
WithDiscoveryV5(uint(discv5UDPPort2), []*enode.Node{wakuNode1.localNode.Node()}, true),
)
require.NoError(t, err)
err = wakuNode2.Start(ctx)
require.NoError(t, err)
defer wakuNode2.Stop()

err = wakuNode1.DiscV5().Start(ctx)
require.NoError(t, err)
err = wakuNode2.DiscV5().Start(ctx)
require.NoError(t, err)

// Wait for discovery
time.Sleep(3 * time.Second)

contentTopic1 := "/test/2/my-app/sharded"

r1 := wakuNode1.Relay()

var shardedPubSubTopics, nonShardedPubSubTopics []string
for i := 0; i < 1024; i++ {
nonShardedPubSubTopics = append(nonShardedPubSubTopics, fmt.Sprintf("/waku/2/my-app/21/%d", i))
shardedPubSubTopics = append(shardedPubSubTopics, fmt.Sprintf("/waku/2/rs/21/%d", i))
}

// Subscribe topics not related to static sharding
for i := 0; i < 1024; i++ {
_, err = r1.Subscribe(ctx, protocol.NewContentFilter(nonShardedPubSubTopics[i], contentTopic1))
require.NoError(t, err)
time.Sleep(10 * time.Millisecond)
}

// Subscribe topics related to static sharding
for i := 0; i < 1024; i++ {
_, err = r1.Subscribe(ctx, protocol.NewContentFilter(shardedPubSubTopics[i], contentTopic1))
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
}

time.Sleep(1 * time.Second)
}

0 comments on commit 873caa8

Please sign in to comment.