Skip to content

Commit

Permalink
fix: address few issues with test and not doing selection if preferre…
Browse files Browse the repository at this point in the history
…dPeer is already specified
  • Loading branch information
chaitanyaprem committed Oct 13, 2023
1 parent 268c41e commit 89da415
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 67 deletions.
6 changes: 4 additions & 2 deletions waku/v2/node/wakunode2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/persistence/sqlite"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
Expand Down Expand Up @@ -303,9 +304,10 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
defer wakuNode3.Stop()

err = wakuNode3.DialPeerWithMultiAddress(ctx, wakuNode2.ListenAddresses()[0])
//err = wakuNode3.DialPeerWithMultiAddress(ctx, wakuNode2.ListenAddresses()[0])
_, err = wakuNode3.AddPeer(wakuNode2.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, store.StoreID_v20beta4)
require.NoError(t, err)

time.Sleep(2 * time.Second)
// NODE2 should have returned the message received via filter
result, err := wakuNode3.Store().Query(ctx, store.Query{})
require.NoError(t, err)
Expand Down
12 changes: 7 additions & 5 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
failedContentTopics := []string{}
subscriptions := make([]*subscription.SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap {

var selectedPeer peer.ID
//TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic
if params.pm != nil {
params.selectedPeer, err = wf.pm.SelectPeer(
if params.pm != nil && params.selectedPeer == "" {
selectedPeer, err = wf.pm.SelectPeer(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
Expand All @@ -288,9 +288,11 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
Ctx: ctx,
},
)
} else {
selectedPeer = params.selectedPeer
}

if params.selectedPeer == "" {
if selectedPeer == "" {
wf.metrics.RecordError(peerNotFoundFailure)
wf.log.Error("selecting peer", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
zap.Error(err))
Expand All @@ -309,7 +311,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
failedContentTopics = append(failedContentTopics, cTopics...)
continue
}
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter))
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(selectedPeer, cFilter))
}

if len(failedContentTopics) > 0 {
Expand Down
3 changes: 1 addition & 2 deletions waku/v2/protocol/filter/options.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package filter

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -67,7 +66,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption
// with the lowest ping If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a
// peer from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption {
func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
params.peerSelectionType = peermanager.LowestRTT
return nil
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestFilterOption(t *testing.T) {
options := []FilterSubscribeOption{
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
WithAutomaticPeerSelection(),
WithFastestPeerSelection(context.Background()),
WithFastestPeerSelection(),
}

params := new(FilterSubscribeParameters)
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/legacy_filter/waku_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
for _, opt := range optList {
opt(params)
}
if wf.pm != nil {
if wf.pm != nil && params.selectedPeer == "" {
params.selectedPeer, _ = wf.pm.SelectPeer(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Expand Down
3 changes: 1 addition & 2 deletions waku/v2/protocol/legacy_filter/waku_filter_option.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package legacy_filter

import (
"context"
"time"

"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -61,7 +60,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption
// with the lowest ping If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a
// peer from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption {
func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.peerSelectionType = peermanager.LowestRTT
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/legacy_filter/waku_filter_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestFilterOption(t *testing.T) {
options := []FilterSubscribeOption{
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
WithAutomaticPeerSelection(),
WithFastestPeerSelection(context.Background()),
WithFastestPeerSelection(),
}

params := new(FilterSubscribeParameters)
Expand Down
13 changes: 9 additions & 4 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,16 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
}
}

if params.pm != nil {
if params.pm != nil && params.selectedPeer == "" {
params.selectedPeer, err = wakuLP.pm.SelectPeer(
peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType,
Proto: LightPushID_v20beta1, PubsubTopic: params.pubsubTopic,
SpecificPeers: params.preferredPeers, Ctx: ctx})
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: LightPushID_v20beta1,
PubsubTopic: params.pubsubTopic,
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
)
}
if params.selectedPeer == "" {
if err != nil {
Expand Down
4 changes: 1 addition & 3 deletions waku/v2/protocol/lightpush/waku_lightpush_option.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package lightpush

import (
"context"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/peermanager"
Expand Down Expand Up @@ -52,7 +50,7 @@ func WithPubSubTopic(pubsubTopic string) Option {
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) Option {
func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option {
return func(params *lightPushParameters) {
params.peerSelectionType = peermanager.LowestRTT
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/lightpush/waku_lightpush_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestLightPushOption(t *testing.T) {
options := []Option{
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
WithAutomaticPeerSelection(),
WithFastestPeerSelection(context.Background()),
WithFastestPeerSelection(),
WithRequestID([]byte("requestID")),
WithAutomaticRequestID(),
}
Expand Down
14 changes: 12 additions & 2 deletions waku/v2/protocol/peer_exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,22 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
optList := DefaultOptions(wakuPX.h)
optList = append(optList, opts...)
for _, opt := range optList {
err := opt(params)
opt(params)
}
if params.pm != nil && params.selectedPeer == "" {
var err error
params.selectedPeer, err = wakuPX.pm.SelectPeer(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: PeerExchangeID_v20alpha1,
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
)
if err != nil {
return err
}
}

if params.selectedPeer == "" {
wakuPX.metrics.RecordError(dialFailure)
return ErrNoPeersAvailable
Expand Down
53 changes: 15 additions & 38 deletions waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
package peer_exchange

import (
"context"
"errors"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"go.uber.org/zap"
)

type PeerExchangeParameters struct {
host host.Host
selectedPeer peer.ID
pm *peermanager.PeerManager
log *zap.Logger
host host.Host
selectedPeer peer.ID
peerSelectionType peermanager.PeerSelection
preferredPeers peer.IDSlice
pm *peermanager.PeerManager
log *zap.Logger
}

type PeerExchangeOption func(*PeerExchangeParameters) error
type PeerExchangeOption func(*PeerExchangeParameters)

// WithPeer is an option used to specify the peerID to push a waku message to
func WithPeer(p peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
return func(params *PeerExchangeParameters) {
params.selectedPeer = p
return nil
}
}

Expand All @@ -33,41 +31,20 @@ func WithPeer(p peer.ID) PeerExchangeOption {
// from the node peerstore
// Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
if params.pm == nil {
return errors.New("automatic selection is not avaiable since peerManager is not initialized")
} else {
p, err := params.pm.SelectPeer(peermanager.PeerSelectionCriteria{Proto: PeerExchangeID_v20alpha1,
SpecificPeers: fromThesePeers})
if err == nil {
params.selectedPeer = p
} else {
return err
}
return nil
}
return func(params *PeerExchangeParameters) {
params.peerSelectionType = peermanager.Automatic
params.preferredPeers = fromThesePeers
}
}

// WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
if params.pm == nil {
return errors.New("automatic selection is not avaiable since peerManager is not initialized")
} else {
p, err := params.pm.SelectPeerWithLowestRTT(
peermanager.PeerSelectionCriteria{Proto: PeerExchangeID_v20alpha1,
SpecificPeers: fromThesePeers, Ctx: ctx})
if err == nil {
params.selectedPeer = p
} else {
return err
}
return nil
}
func WithFastestPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) {
params.peerSelectionType = peermanager.LowestRTT
params.preferredPeers = fromThesePeers
}
}

Expand Down
20 changes: 15 additions & 5 deletions waku/v2/protocol/store/waku_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
// Note: This option is avaiable only with peerManager
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) HistoryRequestOption {
func WithFastestPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.peerSelectionType = peermanager.LowestRTT
}
Expand Down Expand Up @@ -264,10 +264,20 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
for _, opt := range optList {
opt(params)
}

if store.pm != nil {
params.selectedPeer, _ = store.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType,
Proto: StoreID_v20beta4, PubsubTopic: query.Topic, SpecificPeers: params.preferredPeers, Ctx: ctx})
if store.pm != nil && params.selectedPeer == "" {
var err error
params.selectedPeer, err = store.pm.SelectPeer(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: StoreID_v20beta4,
PubsubTopic: query.Topic,
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
)
if err != nil {
return nil, err
}
}

if !params.localQuery && params.selectedPeer == "" {
Expand Down

0 comments on commit 89da415

Please sign in to comment.