Skip to content

Commit

Permalink
Feat: filter select peer for sharding (#783)
Browse files Browse the repository at this point in the history
* update selectPeer to support pubsubTopic based selection
  • Loading branch information
chaitanyaprem authored Sep 29, 2023
1 parent e95418d commit 89809a0
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 27 deletions.
12 changes: 11 additions & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,11 +847,21 @@ func (w *WakuNode) Peers() ([]*Peer, error) {
return peers, nil
}

func (w *WakuNode) PeersByShard(cluster uint16, shard uint16) peer.IDSlice {
// PeersByShard filters peers based on shard information following static sharding
func (w *WakuNode) PeersByStaticShard(cluster uint16, shard uint16) peer.IDSlice {
pTopic := wakuprotocol.NewStaticShardingPubsubTopic(cluster, shard).String()
return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic)
}

// PeersByContentTopics filters peers based on contentTopic
func (w *WakuNode) PeersByContentTopic(contentTopic string) peer.IDSlice {
pTopic, err := wakuprotocol.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return nil
}
return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic)
}

func (w *WakuNode) findRelayNodes(ctx context.Context) {
defer w.wg.Done()

Expand Down
62 changes: 51 additions & 11 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) {
}

// GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction
func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
if len(specificPeers) == 0 {
specificPeers = pm.host.Network().Peers()
}
Expand All @@ -150,9 +150,9 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers p

// getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers.
// If specifiedPeers is empty, it checks within all peers in peerStore.
func (pm *PeerManager) getRelayPeers(specificPeers []peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
//Group peers by their connected direction inbound or outbound.
inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers)
inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers...)
if err != nil {
return
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (pm *PeerManager) connectToRelayPeers() {
//Check for out peer connections and connect to more peers.
pm.ensureMinRelayConnsPerTopic()

inRelayPeers, outRelayPeers := pm.getRelayPeers(nil)
inRelayPeers, outRelayPeers := pm.getRelayPeers()
pm.logger.Info("number of relay peers connected",
zap.Int("in", inRelayPeers.Len()),
zap.Int("out", outRelayPeers.Len()))
Expand Down Expand Up @@ -417,29 +417,69 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
pm.serviceSlots.getPeers(proto).add(peerID)
}

// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) {
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return "", err
}
return pm.SelectPeer(proto, pubsubTopic, specificPeers...)
}

// SelectPeer is used to return a random peer that supports a given protocol.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID) (peer.ID, error) {
// if pubSubTopic is specified, peer is selected from list that support the pubSubTopic
func (pm *PeerManager) SelectPeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as:
// - which topics they track
// - latency?

//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(proto); slot != nil {
if peerID, err := slot.getRandom(); err == nil {
return peerID, nil
}
if peerID := pm.selectServicePeer(proto, pubSubTopic, specificPeers...); peerID != nil {
return *peerID, nil
}

// if not found in serviceSlots or proto == WakuRelayIDv200
filteredPeers, err := utils.FilterPeersByProto(pm.host, specificPeers, proto)
if err != nil {
return "", err
}

if pubSubTopic != "" {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, filteredPeers...)
}
return utils.SelectRandomPeer(filteredPeers, pm.logger)
}

func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peerIDPtr *peer.ID) {
peerIDPtr = nil

//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(proto); slot != nil {
if pubSubTopic == "" {
if peerID, err := slot.getRandom(); err == nil {
peerIDPtr = &peerID
} else {
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))
}
} else { //PubsubTopic based selection
keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m {
keys = append(keys, i)
}
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, keys...)
peerID, err := utils.SelectRandomPeer(selectedPeers, pm.logger)
if err == nil {
peerIDPtr = &peerID
} else {
pm.logger.Debug("could not select random peer", zap.Error(err))
}
}
}
return
}
54 changes: 46 additions & 8 deletions waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
wakuproto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestServiceSlots(t *testing.T) {
///////////////

// select peer from pm, currently only h2 is set in pm
peerID, err := pm.SelectPeer(protocol, nil)
peerID, err := pm.SelectPeer(protocol, "")
require.NoError(t, err)
require.Equal(t, peerID, h2.ID())

Expand All @@ -74,7 +75,7 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)

// check that returned peer is h2 or h3 peer
peerID, err = pm.SelectPeer(protocol, nil)
peerID, err = pm.SelectPeer(protocol, "")
require.NoError(t, err)
if peerID == h2.ID() || peerID == h3.ID() {
//Test success
Expand All @@ -90,18 +91,55 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)
defer h4.Close()

_, err = pm.SelectPeer(protocol1, nil)
_, err = pm.SelectPeer(protocol1, "")
require.Error(t, err, utils.ErrNoPeersAvailable)

// add h4 peer for protocol1
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
require.NoError(t, err)

//Test peer selection for protocol1
peerID, err = pm.SelectPeer(protocol1, nil)
peerID, err = pm.SelectPeer(protocol1, "")
require.NoError(t, err)
require.Equal(t, peerID, h4.ID())

_, err = pm.SelectPeerByContentTopic(protocol1, "")
require.Error(t, wakuproto.ErrInvalidFormat, err)

}

func TestPeerSelection(t *testing.T) {
ctx, pm, deferFn := initTest(t)
defer deferFn()

h2, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h2.Close()

h3, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h3.Close()

protocol := libp2pProtocol.ID("test/protocol")
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/rs/2/1", "/waku/rs/2/2"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)

_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)

_, err = pm.SelectPeer(protocol, "")
require.NoError(t, err)

peerID, err := pm.SelectPeer(protocol, "/waku/rs/2/2")
require.NoError(t, err)
require.Equal(t, h2.ID(), peerID)

_, err = pm.SelectPeer(protocol, "/waku/rs/2/3")
require.Error(t, utils.ErrNoPeersAvailable, err)

_, err = pm.SelectPeer(protocol, "/waku/rs/2/1")
require.NoError(t, err)

}

func TestDefaultProtocol(t *testing.T) {
Expand All @@ -111,7 +149,7 @@ func TestDefaultProtocol(t *testing.T) {
// check peer for default protocol
///////////////
//Test empty peer selection for relay protocol
_, err := pm.SelectPeer(relay.WakuRelayID_v200, nil)
_, err := pm.SelectPeer(relay.WakuRelayID_v200, "")
require.Error(t, err, utils.ErrNoPeersAvailable)

///////////////
Expand All @@ -126,7 +164,7 @@ func TestDefaultProtocol(t *testing.T) {
require.NoError(t, err)

// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, nil)
peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, "")
require.NoError(t, err)
require.Equal(t, peerID, h5.ID())
}
Expand All @@ -146,12 +184,12 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2)
require.NoError(t, err)

peerID, err := pm.SelectPeer(protocol2, nil)
peerID, err := pm.SelectPeer(protocol2, "")
require.NoError(t, err)
require.Equal(t, peerID, h6.ID())

pm.RemovePeer(peerID)
_, err = pm.SelectPeer(protocol2, nil)
_, err = pm.SelectPeer(protocol2, "")
require.Error(t, err, utils.ErrNoPeersAvailable)
}

Expand Down
10 changes: 7 additions & 3 deletions waku/v2/peerstore/waku_peer_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type WakuPeerstore interface {
RemovePubSubTopic(p peer.ID, topic string) error
PubSubTopics(p peer.ID) ([]string, error)
SetPubSubTopics(p peer.ID, topics []string) error
PeersByPubSubTopic(pubSubTopic string) peer.IDSlice
PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice
}

// NewWakuPeerstore creates a new WakuPeerStore object
Expand Down Expand Up @@ -208,9 +208,13 @@ func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) {
}

// PeersByPubSubTopic Returns list of peers by pubSubTopic
func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string) peer.IDSlice {
// If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore
func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice {
if specificPeers == nil {
specificPeers = ps.Peers()
}
var result peer.IDSlice
for _, p := range ps.Peers() {
for _, p := range specificPeers {
topics, err := ps.PubSubTopics(p)
if err == nil {
for _, topic := range topics {
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption
if params.pm == nil {
p, err = utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log)
} else {
p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, fromThesePeers)
p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, "", fromThesePeers...)
}
if err == nil {
params.selectedPeer = p
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/lightpush/waku_lightpush_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option {
if params.pm == nil {
p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log)
} else {
p, err = params.pm.SelectPeer(LightPushID_v20beta1, fromThesePeers)
p, err = params.pm.SelectPeer(LightPushID_v20beta1, "", fromThesePeers...)
}
if err == nil {
params.selectedPeer = p
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
if params.pm == nil {
p, err = utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log)
} else {
p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, fromThesePeers)
p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, "", fromThesePeers...)
}
if err == nil {
params.selectedPeer = p
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/store/waku_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption
if params.s.pm == nil {
p, err = utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log)
} else {
p, err = params.s.pm.SelectPeer(StoreID_v20beta4, fromThesePeers)
p, err = params.s.pm.SelectPeer(StoreID_v20beta4, "", fromThesePeers...)
}
if err == nil {
params.selectedPeer = p
Expand Down

0 comments on commit 89809a0

Please sign in to comment.