Skip to content

Commit

Permalink
feat: update lightpush API for autosharding (#774)
Browse files Browse the repository at this point in the history
* feat: update lightpush API to make pubSubTopic optional as per autosharding

* Extract contentFilter and subscriptions out of filter to reuse in relay (#779)

* chore: extract contentFilter outside filter package

* chore: move subscription outside of filter so that it can be modified and reused for relay

* Feat: filter select peer for sharding (#783)

* update selectPeer to support pubsubTopic based selection
  • Loading branch information
chaitanyaprem authored Sep 29, 2023
1 parent dfd104d commit 47c961d
Show file tree
Hide file tree
Showing 23 changed files with 417 additions and 205 deletions.
8 changes: 4 additions & 4 deletions examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
}

if options.Filter.Enable {
cf := filter.ContentFilter{
cf := protocol.ContentFilter{
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: filter.NewContentTopicSet(options.ContentTopic),
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
}
var filterOpt filter.FilterSubscribeOption
peerID, err := options.Filter.NodePeerID()
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *Chat) SendMessage(line string) {
err := c.publish(tCtx, line)
if err != nil {
if err.Error() == "validation failed" {
err = errors.New("message rate violation!")
err = errors.New("message rate violation")
}
c.ui.ErrorMessage(err)
}
Expand Down Expand Up @@ -524,7 +524,7 @@ func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {

ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second)
defer cancel()
err = c.node.DialPeerWithInfo(ctx, n)
err = c.node.DialPeerWithInfo(ctx, info)
if err != nil {

c.ui.ErrorMessage(fmt.Errorf("co!!uld not connect to %s: %w", info.ID.Pretty(), err))
Expand Down
4 changes: 2 additions & 2 deletions examples/filter2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func main() {
}

// Send FilterRequest from light node to full node
cf := filter.ContentFilter{
ContentTopics: filter.NewContentTopicSet(contentTopic),
cf := protocol.ContentFilter{
ContentTopics: protocol.NewContentTopicSet(contentTopic),
}

theFilter, err := lightNode.FilterLightnode().Subscribe(ctx, cf)
Expand Down
2 changes: 1 addition & 1 deletion library/c/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ Publish a message using Waku Lightpush.

1. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type).
2. `char* pubsubTopic`: pubsub topic on which to publish the message.
If `NULL`, it uses the default pubsub topic.
If `NULL`, it derives the pubsub topic from content-topic based on autosharding.
3. `char* peerID`: Peer ID supporting the lightpush protocol.
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
Expand Down
2 changes: 1 addition & 1 deletion library/c/api_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package main
import "C"
import "github.com/waku-org/go-waku/library"

// Publish a message using waku lightpush. Use NULL for topic to use the default pubsub topic..
// Publish a message using waku lightpush. Use NULL for topic to derive the pubsub topic from the contentTopic.
// peerID should contain the ID of a peer supporting the lightpush protocol. Use NULL to automatically select a node
// If ms is greater than 0, the broadcast of the message must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
Expand Down
16 changes: 9 additions & 7 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,32 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
)

type filterArgument struct {
PubsubTopic string `json:"pubsubTopic,omitempty"`
ContentTopics []string `json:"contentTopics,omitempty"`
}

func toContentFilter(filterJSON string) (filter.ContentFilter, error) {
func toContentFilter(filterJSON string) (protocol.ContentFilter, error) {
var f filterArgument
err := json.Unmarshal([]byte(filterJSON), &f)
if err != nil {
return filter.ContentFilter{}, err
return protocol.ContentFilter{}, err
}

return filter.ContentFilter{
return protocol.ContentFilter{
PubsubTopic: f.PubsubTopic,
ContentTopics: filter.NewContentTopicSet(f.ContentTopics...),
ContentTopics: protocol.NewContentTopicSet(f.ContentTopics...),
}, nil
}

type subscribeResult struct {
Subscriptions []*filter.SubscriptionDetails `json:"subscriptions"`
Error string `json:"error,omitempty"`
Subscriptions []*subscription.SubscriptionDetails `json:"subscriptions"`
Error string `json:"error,omitempty"`
}

// FilterSubscribe is used to create a subscription to a filter node to receive messages
Expand Down Expand Up @@ -71,7 +73,7 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
}

for _, subscriptionDetails := range subscriptions {
go func(subscriptionDetails *filter.SubscriptionDetails) {
go func(subscriptionDetails *subscription.SubscriptionDetails) {
for envelope := range subscriptionDetails.C {
send("message", toSubscriptionMessage(envelope))
}
Expand Down
10 changes: 7 additions & 3 deletions library/lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms
lpOptions = append(lpOptions, lightpush.WithAutomaticPeerSelection())
}

hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, pubsubTopic, lpOptions...)
if pubsubTopic != "" {
lpOptions = append(lpOptions, lightpush.WithPubSubTopic(pubsubTopic))
}

hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, lpOptions...)
return hexutil.Encode(hash), err
}

// LightpushPublish is used to publish a WakuMessage in a pubsub topic using Lightpush protocol
func LightpushPublish(messageJSON string, topic string, peerID string, ms int) (string, error) {
func LightpushPublish(messageJSON string, pubsubTopic string, peerID string, ms int) (string, error) {
msg, err := wakuMessage(messageJSON)
if err != nil {
return "", err
}

return lightpushPublish(msg, getTopic(topic), peerID, ms)
return lightpushPublish(msg, getTopic(pubsubTopic), peerID, ms)
}
12 changes: 11 additions & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,11 +847,21 @@ func (w *WakuNode) Peers() ([]*Peer, error) {
return peers, nil
}

func (w *WakuNode) PeersByShard(cluster uint16, shard uint16) peer.IDSlice {
// PeersByShard filters peers based on shard information following static sharding
func (w *WakuNode) PeersByStaticShard(cluster uint16, shard uint16) peer.IDSlice {
pTopic := wakuprotocol.NewStaticShardingPubsubTopic(cluster, shard).String()
return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic)
}

// PeersByContentTopics filters peers based on contentTopic
func (w *WakuNode) PeersByContentTopic(contentTopic string) peer.IDSlice {
pTopic, err := wakuprotocol.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return nil
}
return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic)
}

func (w *WakuNode) findRelayNodes(ctx context.Context) {
defer w.wg.Done()

Expand Down
62 changes: 51 additions & 11 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) {
}

// GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction
func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
if len(specificPeers) == 0 {
specificPeers = pm.host.Network().Peers()
}
Expand All @@ -150,9 +150,9 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers p

// getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers.
// If specifiedPeers is empty, it checks within all peers in peerStore.
func (pm *PeerManager) getRelayPeers(specificPeers []peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
//Group peers by their connected direction inbound or outbound.
inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers)
inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers...)
if err != nil {
return
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (pm *PeerManager) connectToRelayPeers() {
//Check for out peer connections and connect to more peers.
pm.ensureMinRelayConnsPerTopic()

inRelayPeers, outRelayPeers := pm.getRelayPeers(nil)
inRelayPeers, outRelayPeers := pm.getRelayPeers()
pm.logger.Info("number of relay peers connected",
zap.Int("in", inRelayPeers.Len()),
zap.Int("out", outRelayPeers.Len()))
Expand Down Expand Up @@ -417,29 +417,69 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
pm.serviceSlots.getPeers(proto).add(peerID)
}

// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) {
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return "", err
}
return pm.SelectPeer(proto, pubsubTopic, specificPeers...)
}

// SelectPeer is used to return a random peer that supports a given protocol.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID) (peer.ID, error) {
// if pubSubTopic is specified, peer is selected from list that support the pubSubTopic
func (pm *PeerManager) SelectPeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as:
// - which topics they track
// - latency?

//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(proto); slot != nil {
if peerID, err := slot.getRandom(); err == nil {
return peerID, nil
}
if peerID := pm.selectServicePeer(proto, pubSubTopic, specificPeers...); peerID != nil {
return *peerID, nil
}

// if not found in serviceSlots or proto == WakuRelayIDv200
filteredPeers, err := utils.FilterPeersByProto(pm.host, specificPeers, proto)
if err != nil {
return "", err
}

if pubSubTopic != "" {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, filteredPeers...)
}
return utils.SelectRandomPeer(filteredPeers, pm.logger)
}

func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peerIDPtr *peer.ID) {
peerIDPtr = nil

//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(proto); slot != nil {
if pubSubTopic == "" {
if peerID, err := slot.getRandom(); err == nil {
peerIDPtr = &peerID
} else {
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))
}
} else { //PubsubTopic based selection
keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m {
keys = append(keys, i)
}
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, keys...)
peerID, err := utils.SelectRandomPeer(selectedPeers, pm.logger)
if err == nil {
peerIDPtr = &peerID
} else {
pm.logger.Debug("could not select random peer", zap.Error(err))
}
}
}
return
}
54 changes: 46 additions & 8 deletions waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
wakuproto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestServiceSlots(t *testing.T) {
///////////////

// select peer from pm, currently only h2 is set in pm
peerID, err := pm.SelectPeer(protocol, nil)
peerID, err := pm.SelectPeer(protocol, "")
require.NoError(t, err)
require.Equal(t, peerID, h2.ID())

Expand All @@ -74,7 +75,7 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)

// check that returned peer is h2 or h3 peer
peerID, err = pm.SelectPeer(protocol, nil)
peerID, err = pm.SelectPeer(protocol, "")
require.NoError(t, err)
if peerID == h2.ID() || peerID == h3.ID() {
//Test success
Expand All @@ -90,18 +91,55 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)
defer h4.Close()

_, err = pm.SelectPeer(protocol1, nil)
_, err = pm.SelectPeer(protocol1, "")
require.Error(t, err, utils.ErrNoPeersAvailable)

// add h4 peer for protocol1
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
require.NoError(t, err)

//Test peer selection for protocol1
peerID, err = pm.SelectPeer(protocol1, nil)
peerID, err = pm.SelectPeer(protocol1, "")
require.NoError(t, err)
require.Equal(t, peerID, h4.ID())

_, err = pm.SelectPeerByContentTopic(protocol1, "")
require.Error(t, wakuproto.ErrInvalidFormat, err)

}

func TestPeerSelection(t *testing.T) {
ctx, pm, deferFn := initTest(t)
defer deferFn()

h2, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h2.Close()

h3, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h3.Close()

protocol := libp2pProtocol.ID("test/protocol")
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/rs/2/1", "/waku/rs/2/2"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)

_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)

_, err = pm.SelectPeer(protocol, "")
require.NoError(t, err)

peerID, err := pm.SelectPeer(protocol, "/waku/rs/2/2")
require.NoError(t, err)
require.Equal(t, h2.ID(), peerID)

_, err = pm.SelectPeer(protocol, "/waku/rs/2/3")
require.Error(t, utils.ErrNoPeersAvailable, err)

_, err = pm.SelectPeer(protocol, "/waku/rs/2/1")
require.NoError(t, err)

}

func TestDefaultProtocol(t *testing.T) {
Expand All @@ -111,7 +149,7 @@ func TestDefaultProtocol(t *testing.T) {
// check peer for default protocol
///////////////
//Test empty peer selection for relay protocol
_, err := pm.SelectPeer(relay.WakuRelayID_v200, nil)
_, err := pm.SelectPeer(relay.WakuRelayID_v200, "")
require.Error(t, err, utils.ErrNoPeersAvailable)

///////////////
Expand All @@ -126,7 +164,7 @@ func TestDefaultProtocol(t *testing.T) {
require.NoError(t, err)

// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, nil)
peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, "")
require.NoError(t, err)
require.Equal(t, peerID, h5.ID())
}
Expand All @@ -146,12 +184,12 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2)
require.NoError(t, err)

peerID, err := pm.SelectPeer(protocol2, nil)
peerID, err := pm.SelectPeer(protocol2, "")
require.NoError(t, err)
require.Equal(t, peerID, h6.ID())

pm.RemovePeer(peerID)
_, err = pm.SelectPeer(protocol2, nil)
_, err = pm.SelectPeer(protocol2, "")
require.Error(t, err, utils.ErrNoPeersAvailable)
}

Expand Down
Loading

0 comments on commit 47c961d

Please sign in to comment.