Skip to content

Commit

Permalink
rename peer manager funcs
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Oct 3, 2023
1 parent 330cf92 commit edf5a85
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 23 deletions.
10 changes: 5 additions & 5 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,12 @@ func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic
return pm.SelectPeer(PeerSelectionCriteria{PubsubTopic: pubsubTopic, Proto: proto, SpecificPeers: specificPeers})
}

// SelectPeer is used to return a random peer that supports a given protocol.
// SelectRandomPeer is used to return a random peer that supports a given protocol.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
// if pubSubTopic is specified, peer is selected from list that support the pubSubTopic
func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) {
func (pm *PeerManager) SelectRandomPeer(criteria PeerSelectionCriteria) (peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as:
Expand Down Expand Up @@ -508,13 +508,13 @@ type PeerSelectionCriteria struct {
Ctx context.Context
}

// HandlePeerSelection selects a peer based on selectionType specified.
// SelectPeer selects a peer based on selectionType specified.
// Context is required only in case of selectionType set to LowestRTT
func (pm *PeerManager) HandlePeerSelection(criteria PeerSelectionCriteria) (peer.ID, error) {
func (pm *PeerManager) SelectPeer(criteria PeerSelectionCriteria) (peer.ID, error) {

switch criteria.SelectionType {
case Automatic:
return pm.SelectPeer(criteria)
return pm.SelectRandomPeer(criteria)
case LowestRTT:
if criteria.Ctx == nil {
criteria.Ctx = context.Background()
Expand Down
28 changes: 14 additions & 14 deletions waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func getAddr(h host.Host) multiaddr.Multiaddr {
}

func initTest(t *testing.T) (context.Context, *PeerManager, func()) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
// hosts
h1, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
Expand Down Expand Up @@ -73,7 +73,7 @@ func TestServiceSlots(t *testing.T) {
///////////////

// select peer from pm, currently only h2 is set in pm
peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: protocol})
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)
require.Equal(t, peerID, h2.ID())

Expand All @@ -82,7 +82,7 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)

// check that returned peer is h2 or h3 peer
peerID, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol})
peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)
if peerID == h2.ID() || peerID == h3.ID() {
//Test success
Expand All @@ -98,15 +98,15 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)
defer h4.Close()

_, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol1})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1})
require.Error(t, err, ErrNoPeersAvailable)

// add h4 peer for protocol1
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
require.NoError(t, err)

//Test peer selection for protocol1
peerID, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol1})
peerID, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol1})
require.NoError(t, err)
require.Equal(t, peerID, h4.ID())

Expand Down Expand Up @@ -134,21 +134,21 @@ func TestPeerSelection(t *testing.T) {
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)

_, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol})
require.NoError(t, err)

peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/2"})
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/2"})
require.NoError(t, err)
require.Equal(t, h2.ID(), peerID)

_, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/3"})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/3"})
require.Error(t, ErrNoPeersAvailable, err)

_, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/1"})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol, PubsubTopic: "/waku/rs/2/1"})
require.NoError(t, err)

//Test for selectWithLowestRTT
_, err = pm.SelectPeerWithLowestRTT(PeerSelectionCriteria{Proto: protocol, PubsubTopic: "/waku/rs/2/1"})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: LowestRTT, Proto: protocol, PubsubTopic: "/waku/rs/2/1"})
require.NoError(t, err)
}

Expand All @@ -159,7 +159,7 @@ func TestDefaultProtocol(t *testing.T) {
// check peer for default protocol
///////////////
//Test empty peer selection for relay protocol
_, err := pm.SelectPeer(PeerSelectionCriteria{Proto: relay.WakuRelayID_v200})
_, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200})
require.Error(t, err, ErrNoPeersAvailable)

///////////////
Expand All @@ -174,7 +174,7 @@ func TestDefaultProtocol(t *testing.T) {
require.NoError(t, err)

// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: relay.WakuRelayID_v200})
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: relay.WakuRelayID_v200})
require.NoError(t, err)
require.Equal(t, peerID, h5.ID())
}
Expand All @@ -194,12 +194,12 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2)
require.NoError(t, err)

peerID, err := pm.SelectPeer(PeerSelectionCriteria{Proto: protocol2})
peerID, err := pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
require.NoError(t, err)
require.Equal(t, peerID, h6.ID())

pm.RemovePeer(peerID)
_, err = pm.SelectPeer(PeerSelectionCriteria{Proto: protocol2})
_, err = pm.SelectPeer(PeerSelectionCriteria{SelectionType: Automatic, Proto: protocol2})
require.Error(t, err, ErrNoPeersAvailable)
}

Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot

//TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic
if params.pm != nil {
params.selectedPeer, err = wf.pm.HandlePeerSelection(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType,
params.selectedPeer, err = wf.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType,
Proto: FilterSubscribeID_v20beta1, PubsubTopic: pubSubTopic, SpecificPeers: params.preferredPeers, Ctx: ctx})
}

Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/legacy_filter/waku_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
opt(params)
}
if wf.pm != nil {
params.selectedPeer, _ = wf.pm.HandlePeerSelection(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType,
params.selectedPeer, _ = wf.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType,
Proto: FilterID_v20beta1, PubsubTopic: filter.Topic, SpecificPeers: params.preferredPeers, Ctx: ctx})
}
if params.selectedPeer == "" {
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
}

if params.pm != nil {
params.selectedPeer, err = wakuLP.pm.HandlePeerSelection(
params.selectedPeer, err = wakuLP.pm.SelectPeer(
peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType,
Proto: LightPushID_v20beta1, PubsubTopic: params.pubsubTopic,
SpecificPeers: params.preferredPeers, Ctx: ctx})
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/store/waku_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
}

if store.pm != nil {
params.selectedPeer, _ = store.pm.HandlePeerSelection(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType,
params.selectedPeer, _ = store.pm.SelectPeer(peermanager.PeerSelectionCriteria{SelectionType: params.peerSelectionType,
Proto: StoreID_v20beta4, PubsubTopic: query.Topic, SpecificPeers: params.preferredPeers, Ctx: ctx})
}

Expand Down

0 comments on commit edf5a85

Please sign in to comment.