Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(filter v2): test updates #811

Merged
merged 83 commits into from
Oct 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
83 commits
Select commit Hold shift + click to select a range
d900a6c
test: Test incorrect protocol identifiers
romanzac Oct 5, 2023
3e84253
fix: return errors in FilterSubscribeOption
richard-ramos Oct 6, 2023
de0428c
Merge branch 'fix/filter-subscribe-options' into chore(filterV2)-test…
romanzac Oct 9, 2023
7f528e7
test: Test incorrect push identifier added
romanzac Oct 9, 2023
6614928
test: Test Ping failure after unsubscription
romanzac Oct 10, 2023
5f691a7
test: Test PubSub with single content topic
romanzac Oct 11, 2023
d828b32
test: Simplify test PubSub with single content topic
romanzac Oct 12, 2023
91382a9
test: Test with single pubsub and multiple content topics
romanzac Oct 12, 2023
c729846
test: Test with multiple PubSub and multiple contentTopic
romanzac Oct 12, 2023
6d9de05
test: Test with multiple overlaping contentTopics
romanzac Oct 16, 2023
a55fe3d
test: refactor tests to fix concurrent run errors
romanzac Oct 17, 2023
5d83602
test: Test subscription refresh
romanzac Oct 17, 2023
7094e47
test: Test error handling for subscribe
romanzac Oct 17, 2023
ec2a50c
test: Test subscription to multiple full nodes
romanzac Oct 17, 2023
cdf3571
update test to fix #804
chaitanyaprem Oct 17, 2023
404d690
Merge remote-tracking branch 'origin/master' into chore(filterV2)-tes…
romanzac Oct 18, 2023
5e25d4d
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
be68915
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
e6fd86d
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
9f1ad7f
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
8a0d0c1
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
5cfb269
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
2fae323
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
5a7edfb
Merge remote-tracking branch 'origin/chore(filterV2)-test-updates' in…
romanzac Oct 20, 2023
b5715d6
test: refactor tests with prepareData()
romanzac Oct 20, 2023
8fca6a6
Merge branch 'master' into chore(filterV2)-test-updates
romanzac Oct 20, 2023
3909a88
test: Test incorrect protocol identifiers
romanzac Oct 5, 2023
f789bc0
chore: rebase onto latest master
richard-ramos Oct 6, 2023
9d40a74
test: Test incorrect push identifier added
romanzac Oct 9, 2023
dab139d
test: Test Ping failure after unsubscription
romanzac Oct 10, 2023
55e9e88
test: Test PubSub with single content topic
romanzac Oct 11, 2023
65755c7
test: Simplify test PubSub with single content topic
romanzac Oct 12, 2023
38769e0
test: Test with single pubsub and multiple content topics
romanzac Oct 12, 2023
1768918
test: Test with multiple PubSub and multiple contentTopic
romanzac Oct 12, 2023
9c152d9
test: Test with multiple overlaping contentTopics
romanzac Oct 16, 2023
f1d795b
test: refactor tests to fix concurrent run errors
romanzac Oct 17, 2023
214e94e
test: Test subscription refresh
romanzac Oct 17, 2023
2572a02
test: Test error handling for subscribe
romanzac Oct 17, 2023
b8eab75
test: Test subscription to multiple full nodes
romanzac Oct 17, 2023
cbff2ec
update test to fix #804
chaitanyaprem Oct 17, 2023
9fd0282
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
93fc0c0
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
8d6ed25
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
ccb22c0
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
c7c1ebd
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
f67d6ee
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
d7dcc01
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
69fb9cf
test: refactor tests with prepareData()
romanzac Oct 20, 2023
f8d5ab0
test: Test incorrect protocol identifiers
romanzac Oct 5, 2023
fdb8f0f
fix: return errors in FilterSubscribeOption
richard-ramos Oct 6, 2023
c59142b
test: Test incorrect push identifier added
romanzac Oct 9, 2023
75cf35b
test: Test Ping failure after unsubscription
romanzac Oct 10, 2023
e3880fe
test: Test PubSub with single content topic
romanzac Oct 11, 2023
7f08359
test: Simplify test PubSub with single content topic
romanzac Oct 12, 2023
c113d3b
test: Test with single pubsub and multiple content topics
romanzac Oct 12, 2023
a8fa145
test: Test with multiple PubSub and multiple contentTopic
romanzac Oct 12, 2023
5d4780b
test: Test with multiple overlaping contentTopics
romanzac Oct 16, 2023
7333366
test: refactor tests to fix concurrent run errors
romanzac Oct 17, 2023
a1310cd
test: Test subscription refresh
romanzac Oct 17, 2023
acbdf93
test: Test error handling for subscribe
romanzac Oct 17, 2023
c6b65bf
test: Test subscription to multiple full nodes
romanzac Oct 17, 2023
5ccad70
update test to fix #804
chaitanyaprem Oct 17, 2023
ff00b57
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
5493076
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
948772c
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
74d3e96
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
f71f77b
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
73b9dff
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
a4053ed
Update waku/v2/protocol/filter/filter_test.go
romanzac Oct 20, 2023
85185c5
test: refactor tests with prepareData()
romanzac Oct 20, 2023
33f88d4
Fix error during rebase
romanzac Oct 23, 2023
5e89620
Merge remote-tracking branch 'origin/chore(filterV2)-test-updates' in…
romanzac Oct 23, 2023
077c534
Sync filter tests with latest master
romanzac Oct 23, 2023
dca86dc
Refactor context initialization for test
romanzac Oct 23, 2023
884f068
test: Incorrect Subscribe Identifier refactored with custom subscribe
romanzac Oct 24, 2023
acedc00
test: refactor into multiple files
romanzac Oct 24, 2023
155130c
Merge branch 'master' into chore(filterV2)-test-updates
romanzac Oct 24, 2023
d5c5bd2
Merge branch 'master' into chore(filterV2)-test-updates
romanzac Oct 25, 2023
0baef5b
test: Subscribe with multiple light nodes to one full node
romanzac Oct 25, 2023
e287973
Merge branch 'master' into chore(filterV2)-test-updates
romanzac Oct 26, 2023
7d34737
test: shared mode for full node creation
romanzac Oct 26, 2023
62098a0
test: test Subscribe fullNode to fullNode
romanzac Oct 26, 2023
198925e
Merge branch 'master' into chore(filterV2)-test-updates
romanzac Oct 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions waku/v2/protocol/filter/filter_ping_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package filter

import (
"context"
"net/http"
)

func (s *FilterTestSuite) TestSubscriptionPing() {
err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().Error(err)
filterErr, ok := err.(*FilterError)
s.Require().True(ok)
s.Require().Equal(filterErr.Code, http.StatusNotFound)

contentTopic := "abc"
s.subDetails = s.subscribe(s.testTopic, contentTopic, s.fullNodeHost.ID())

err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().NoError(err)
}

func (s *FilterTestSuite) TestUnSubscriptionPing() {

s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().NoError(err)

_, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)

err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().Error(err)
}
303 changes: 303 additions & 0 deletions waku/v2/protocol/filter/filter_proto_ident_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
package filter

import (
"context"
"encoding/hex"
"errors"
"fmt"
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
"golang.org/x/exp/slices"
"math"
"net/http"
"strings"
"sync"
"time"

libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)

func (s *FilterTestSuite) TestCreateSubscription() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)

}, s.subDetails[0].C)
}

func (s *FilterTestSuite) TestModifySubscription() {

// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)

}, s.subDetails[0].C)

// Subscribe to another content_topic
newContentTopic := "Topic_modified"
s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID())

s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)

}, s.subDetails[0].C)
}

func (s *FilterTestSuite) TestMultipleMessages() {

// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())

s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic)
s.Require().NoError(err)

}, s.subDetails[0].C)

s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic)
s.Require().NoError(err)

}, s.subDetails[0].C)
}

func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *FilterSubscribeParameters,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {

const FilterSubscribeID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/abcd")

conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_Incorrect1)
if err != nil {
wf.metrics.RecordError(dialFailure)
return err
}
defer conn.Close()

writer := pbio.NewDelimitedWriter(conn)
reader := pbio.NewDelimitedReader(conn, math.MaxInt32)

request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(params.requestID),
FilterSubscribeType: reqType,
PubsubTopic: &contentFilter.PubsubTopic,
ContentTopics: contentFilter.ContentTopicsList(),
}

wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request))
err = writer.WriteMsg(request)
if err != nil {
wf.metrics.RecordError(writeRequestFailure)
wf.log.Error("sending FilterSubscribeRequest", zap.Error(err))
return err
}

filterSubscribeResponse := &pb.FilterSubscribeResponse{}
err = reader.ReadMsg(filterSubscribeResponse)
if err != nil {
wf.log.Error("receiving FilterSubscribeResponse", zap.Error(err))
wf.metrics.RecordError(decodeRPCFailure)
return err
}
if filterSubscribeResponse.RequestId != request.RequestId {
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
wf.metrics.RecordError(requestIDMismatch)
err := NewFilterError(300, "request_id_mismatch")
return &err
}

if filterSubscribeResponse.StatusCode != http.StatusOK {
wf.metrics.RecordError(errorResponse)
err := NewFilterError(int(filterSubscribeResponse.StatusCode), filterSubscribeResponse.StatusDesc)
return &err
}

return nil
}

func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
return nil, err
}

if len(contentFilter.ContentTopics) == 0 {
return nil, errors.New("at least one content topic is required")
}
if slices.Contains[string](contentFilter.ContentTopicsList(), "") {
return nil, errors.New("one or more content topics specified is empty")
}

if len(contentFilter.ContentTopics) > MaxContentTopicsPerRequest {
return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)
}

params := new(FilterSubscribeParameters)
params.log = wf.log
params.host = wf.h
params.pm = wf.pm

optList := DefaultSubscriptionOptions()
optList = append(optList, opts...)
for _, opt := range optList {
err := opt(params)
if err != nil {
return nil, err
}
}

pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)

if err != nil {
return nil, err
}
failedContentTopics := []string{}
subscriptions := make([]*subscription.SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap {
var selectedPeer peer.ID
//TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic
if params.pm != nil && params.selectedPeer == "" {
selectedPeer, err = wf.pm.SelectPeer(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
PubsubTopic: pubSubTopic,
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
)
} else {
selectedPeer = params.selectedPeer
}

if selectedPeer == "" {
wf.metrics.RecordError(peerNotFoundFailure)
wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
continue
}

var cFilter protocol.ContentFilter
cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...)

err := wf.incorrectSubscribeRequest(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
failedContentTopics = append(failedContentTopics, cTopics...)
continue
}
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(selectedPeer, cFilter))
}

if len(failedContentTopics) > 0 {
return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ","))
} else {
return subscriptions, nil
}
}

func (s *FilterTestSuite) TestIncorrectSubscribeIdentifier() {
log := utils.Logger()
s.log = log
s.wg = &sync.WaitGroup{}

// Create test context
s.ctx, s.ctxCancel = context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds

s.testTopic = "/waku/2/go/filter/test"
s.testContentTopic = "TopicA"

s.lightNode = s.makeWakuFilterLightNode(true, true)

s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)

//Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)

// Subscribe with incorrect SubscribeID
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)}
_, err := s.lightNode.IncorrectSubscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().Error(err)

_, err = s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}

func (wf *WakuFilterLightNode) startWithIncorrectPushProto() error {
const FilterPushID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-push/abcd")

wf.subscriptions = subscription.NewSubscriptionMap(wf.log)
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_Incorrect1)), wf.onRequest(wf.Context()))

wf.log.Info("filter-push incorrect protocol started")
return nil
}

func (s *FilterTestSuite) TestIncorrectPushIdentifier() {
log := utils.Logger()
s.log = log
s.wg = &sync.WaitGroup{}

// Create test context
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
s.ctx = ctx
s.ctxCancel = cancel

s.testTopic = "/waku/2/go/filter/test"
s.testContentTopic = "TopicA"

s.lightNode = s.makeWakuFilterLightNode(false, true)

s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic, false, true)

// Re-start light node with unsupported prefix for match func
s.lightNode.Stop()
err := s.lightNode.CommonService.Start(s.ctx, s.lightNode.startWithIncorrectPushProto)
s.Require().NoError(err)

// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)

// Subscribe
s.contentFilter = protocol.ContentFilter{PubsubTopic: s.testTopic, ContentTopics: protocol.NewContentTopicSet(s.testContentTopic)}
s.subDetails, err = s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)

time.Sleep(1 * time.Second)

// Send message
_, err = s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic)
s.Require().NoError(err)

// Message should never arrive -> exit after timeout
select {
case msg := <-s.subDetails[0].C:
s.log.Info("Light node received a msg")
s.Require().Nil(msg)
case <-time.After(1 * time.Second):
s.Require().True(true)
}

_, err = s.lightNode.UnsubscribeAll(s.ctx)
s.Require().NoError(err)
}
Loading