Skip to content

Commit

Permalink
fix: add timeout param to WaitForTimeout and WaitForMsg
Browse files Browse the repository at this point in the history
- clarify intent for using different shardIDs
- use clusterID other than 0 - to test unsubscribed topic
  • Loading branch information
romanzac committed Apr 22, 2024
1 parent 8d7e31b commit 99e3f33
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 16 deletions.
8 changes: 4 additions & 4 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func GenerateRandomSQLInsert(maxLength int) (string, error) {
return query, nil
}

func WaitForMsg(t *testing.T, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
func WaitForMsg(t *testing.T, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
wg.Add(1)
log := utils.Logger()
go func() {
Expand All @@ -398,21 +398,21 @@ func WaitForMsg(t *testing.T, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
case env := <-ch:
msg := env.Message()
log.Info("Received ", zap.String("msg", msg.String()))
case <-time.After(2 * time.Second):
case <-time.After(timeout):
require.Fail(t, "Message timeout")
}
}()
wg.Wait()
}

func WaitForTimeout(t *testing.T, ctx context.Context, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
func WaitForTimeout(t *testing.T, ctx context.Context, timeout time.Duration, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
wg.Add(1)
go func() {
defer wg.Done()
select {
case _, ok := <-ch:
require.False(t, ok, "should not retrieve message")
case <-time.After(1 * time.Second):
case <-time.After(timeout):
// All good
case <-ctx.Done():
require.Fail(t, "test exceeded allocated time")
Expand Down
19 changes: 11 additions & 8 deletions waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,28 +326,31 @@ func TestStaticShardingMultipleTopics(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

nodeNativeClusterID := uint16(20)
shardingClusterID := uint16(21)

// Node1 with Relay
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
wakuNode1, err := New(
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithClusterID(uint16(20)),
WithClusterID(nodeNativeClusterID),
)
require.NoError(t, err)
err = wakuNode1.Start(ctx)
require.NoError(t, err)
defer wakuNode1.Stop()

pubSubTopic1 := protocol.NewStaticShardingPubsubTopic(uint16(21), uint16(0))
pubSubTopic1 := protocol.NewStaticShardingPubsubTopic(shardingClusterID, uint16(0))
pubSubTopic1Str := pubSubTopic1.String()
contentTopic1 := "/test/2/my-app/sharded"

pubSubTopic2 := protocol.NewStaticShardingPubsubTopic(uint16(21), uint16(10))
pubSubTopic2 := protocol.NewStaticShardingPubsubTopic(shardingClusterID, uint16(10))
pubSubTopic2Str := pubSubTopic2.String()
contentTopic2 := "/test/3/my-app/sharded"

require.Equal(t, uint16(20), wakuNode1.ClusterID())
require.Equal(t, nodeNativeClusterID, wakuNode1.ClusterID())

r := wakuNode1.Relay()

Expand Down Expand Up @@ -395,13 +398,13 @@ func TestStaticShardingMultipleTopics(t *testing.T) {
// Send another message to non-subscribed pubsub topic, but subscribed content topic
msg2 := tests.CreateWakuMessage(contentTopic1, utils.GetUnixEpoch(), "test message 2")

_, err = r.Publish(ctx, msg2, relay.WithPubSubTopic("/waku/2/rs/0/321"))
_, err = r.Publish(ctx, msg2, relay.WithPubSubTopic("/waku/2/rs/100/321"))
require.NoError(t, err)

time.Sleep(100 * time.Millisecond)

// No message could be retrieved
tests.WaitForTimeout(t, ctx, &wg, subs1[0].Ch)
tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch)

// Send another message to subscribed pubsub topic, but not subscribed content topic - mix it up
msg3 := tests.CreateWakuMessage(contentTopic2, utils.GetUnixEpoch(), "test message 3")
Expand All @@ -412,7 +415,7 @@ func TestStaticShardingMultipleTopics(t *testing.T) {
time.Sleep(100 * time.Millisecond)

// No message could be retrieved
tests.WaitForTimeout(t, ctx, &wg, subs1[0].Ch)
tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, subs1[0].Ch)

}

Expand Down Expand Up @@ -537,6 +540,6 @@ func TestStaticShardingLimits(t *testing.T) {
var wg sync.WaitGroup

// Retrieve on node2
tests.WaitForMsg(t, &wg, s2.Ch)
tests.WaitForMsg(t, 2*time.Second, &wg, s2.Ch)

}
6 changes: 3 additions & 3 deletions waku/v2/protocol/lightpush/waku_lightpush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestWakuLightPushCornerCases(t *testing.T) {
require.NoError(t, err)

// Wait for the nominal case message at node1
tests.WaitForMsg(t, &wg, sub1.Ch)
tests.WaitForMsg(t, 2*time.Second, &wg, sub1.Ch)

// Test error case with nil message
_, err = client.Publish(ctx, nil, lpOptions...)
Expand Down Expand Up @@ -367,11 +367,11 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) {
// Check that msg publish has led to message deliver for existing topic
_, err = client.Publish(ctx, msg, WithPubSubTopic(pubSubTopic), WithPeer(host2.ID()))
require.NoError(t, err)
tests.WaitForMsg(t, &wg, sub1.Ch)
tests.WaitForMsg(t, 2*time.Second, &wg, sub1.Ch)

// Check that msg2 publish finished without message delivery for unconfigured topic
_, err = client.Publish(ctx, msg2, WithPubSubTopic("/waku/2/rsv/25/0"), WithPeer(host2.ID()))
require.NoError(t, err)
tests.WaitForTimeout(t, ctx, &wg, sub1.Ch)
tests.WaitForTimeout(t, ctx, 1*time.Second, &wg, sub1.Ch)

}
2 changes: 1 addition & 1 deletion waku/v2/protocol/relay/waku_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,6 @@ func TestWakuRelayStaticSharding(t *testing.T) {
var wg sync.WaitGroup

// Msg should get received on host1
tests.WaitForMsg(t, &wg, subs1[0].Ch)
tests.WaitForMsg(t, 2*time.Second, &wg, subs1[0].Ch)

}

0 comments on commit 99e3f33

Please sign in to comment.