Skip to content

Commit

Permalink
chore: address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Oct 12, 2023
1 parent e9fff20 commit 4890984
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 21 deletions.
8 changes: 2 additions & 6 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package peermanager
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
Expand Down Expand Up @@ -51,8 +50,7 @@ type PeerManager struct {
type PeerSelection int

const (
Unknown PeerSelection = iota
Automatic
Automatic PeerSelection = iota
LowestRTT
)

Expand Down Expand Up @@ -570,7 +568,7 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
rtt: result.RTT,
}
} else {
fmt.Println("Error in Ping", result)
pm.logger.Debug("could not ping", logging.HostID("peer", p), zap.Error(result.Error))
}
}(p)
}
Expand All @@ -583,7 +581,6 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
case <-waitCh:
var min *pingResult
for p := range pingCh {
fmt.Println("ping result", p)
if min == nil {
min = &p
} else {
Expand All @@ -593,7 +590,6 @@ func (pm *PeerManager) SelectPeerWithLowestRTT(criteria PeerSelectionCriteria) (
}
}
if min == nil {
pm.logger.Info("Could not find min")
return "", ErrNoPeersAvailable
}

Expand Down
11 changes: 9 additions & 2 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,15 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot

//TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic
if params.pm != nil {
params.selectedPeer, err = wf.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1, PubsubTopic: pubSubTopic, SpecificPeers: params.preferredPeers, Ctx: ctx})
params.selectedPeer, err = wf.pm.SelectPeer(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1,
PubsubTopic: pubSubTopic,
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
)
}

if params.selectedPeer == "" {
Expand Down
11 changes: 9 additions & 2 deletions waku/v2/protocol/legacy_filter/waku_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,15 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
opt(params)
}
if wf.pm != nil {
params.selectedPeer, _ = wf.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType,
Proto: FilterID_v20beta1, PubsubTopic: filter.Topic, SpecificPeers: params.preferredPeers, Ctx: ctx})
params.selectedPeer, _ = wf.pm.SelectPeer(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterID_v20beta1,
PubsubTopic: filter.Topic,
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
)
}
if params.selectedPeer == "" {
wf.metrics.RecordError(peerNotFoundFailure)
Expand Down
20 changes: 12 additions & 8 deletions waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package peer_exchange

import (
"context"
"errors"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -16,12 +17,13 @@ type PeerExchangeParameters struct {
log *zap.Logger
}

type PeerExchangeOption func(*PeerExchangeParameters)
type PeerExchangeOption func(*PeerExchangeParameters) error

// WithPeer is an option used to specify the peerID to push a waku message to
func WithPeer(p peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) {
return func(params *PeerExchangeParameters) error {
params.selectedPeer = p
return nil
}
}

Expand All @@ -31,17 +33,18 @@ func WithPeer(p peer.ID) PeerExchangeOption {
// from the node peerstore
// Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) {
return func(params *PeerExchangeParameters) error {
if params.pm == nil {
params.log.Info("automatic selection is not avaiable since peerManager is not initialized")
return errors.New("automatic selection is not avaiable since peerManager is not initialized")
} else {
p, err := params.pm.SelectPeer(peermanager.PeerSelectionCriteria{Proto: PeerExchangeID_v20alpha1,
SpecificPeers: fromThesePeers})
if err == nil {
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
return err
}
return nil
}
}
}
Expand All @@ -51,18 +54,19 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
// from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) PeerExchangeOption {
return func(params *PeerExchangeParameters) {
return func(params *PeerExchangeParameters) error {
if params.pm == nil {
params.log.Info("automatic selection is not avaiable since peerManager is not initialized")
return errors.New("automatic selection is not avaiable since peerManager is not initialized")
} else {
p, err := params.pm.SelectPeerWithLowestRTT(
peermanager.PeerSelectionCriteria{Proto: PeerExchangeID_v20alpha1,
SpecificPeers: fromThesePeers, Ctx: ctx})
if err == nil {
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
return err
}
return nil
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions waku/v2/protocol/store/waku_store_protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
ContentTopics: []string{topic1},
}

var hrOptions []HistoryRequestOption
hrOptions = append(hrOptions, WithPeer(host1.ID()))
response, err := s2.Query(ctx, q, hrOptions...)
response, err := s2.Query(ctx, q, WithPeer(host1.ID()))

require.NoError(t, err)
require.Len(t, response.Messages, 1)
Expand Down

0 comments on commit 4890984

Please sign in to comment.