Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update lightpush API for autosharding #774

Merged
merged 9 commits into from
Sep 29, 2023
8 changes: 4 additions & 4 deletions examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
}

if options.Filter.Enable {
cf := filter.ContentFilter{
cf := protocol.ContentFilter{
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: filter.NewContentTopicSet(options.ContentTopic),
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
}
var filterOpt filter.FilterSubscribeOption
peerID, err := options.Filter.NodePeerID()
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *Chat) SendMessage(line string) {
err := c.publish(tCtx, line)
if err != nil {
if err.Error() == "validation failed" {
err = errors.New("message rate violation!")
err = errors.New("message rate violation")
}
c.ui.ErrorMessage(err)
}
Expand Down Expand Up @@ -524,7 +524,7 @@ func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {

ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second)
defer cancel()
err = c.node.DialPeerWithInfo(ctx, n)
err = c.node.DialPeerWithInfo(ctx, info)
if err != nil {

c.ui.ErrorMessage(fmt.Errorf("co!!uld not connect to %s: %w", info.ID.Pretty(), err))
Expand Down
4 changes: 2 additions & 2 deletions examples/filter2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func main() {
}

// Send FilterRequest from light node to full node
cf := filter.ContentFilter{
ContentTopics: filter.NewContentTopicSet(contentTopic),
cf := protocol.ContentFilter{
ContentTopics: protocol.NewContentTopicSet(contentTopic),
}

theFilter, err := lightNode.FilterLightnode().Subscribe(ctx, cf)
Expand Down
2 changes: 1 addition & 1 deletion library/c/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ Publish a message using Waku Lightpush.

1. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type).
2. `char* pubsubTopic`: pubsub topic on which to publish the message.
If `NULL`, it uses the default pubsub topic.
If `NULL`, it derives the pubsub topic from content-topic based on autosharding.
3. `char* peerID`: Peer ID supporting the lightpush protocol.
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
Expand Down
2 changes: 1 addition & 1 deletion library/c/api_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package main
import "C"
import "github.com/waku-org/go-waku/library"

// Publish a message using waku lightpush. Use NULL for topic to use the default pubsub topic..
// Publish a message using waku lightpush. Use NULL for topic to derive the pubsub topic from the contentTopic.
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
// peerID should contain the ID of a peer supporting the lightpush protocol. Use NULL to automatically select a node
// If ms is greater than 0, the broadcast of the message must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
Expand Down
16 changes: 9 additions & 7 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,32 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
)

type filterArgument struct {
PubsubTopic string `json:"pubsubTopic,omitempty"`
ContentTopics []string `json:"contentTopics,omitempty"`
}

func toContentFilter(filterJSON string) (filter.ContentFilter, error) {
func toContentFilter(filterJSON string) (protocol.ContentFilter, error) {
var f filterArgument
err := json.Unmarshal([]byte(filterJSON), &f)
if err != nil {
return filter.ContentFilter{}, err
return protocol.ContentFilter{}, err
}

return filter.ContentFilter{
return protocol.ContentFilter{
PubsubTopic: f.PubsubTopic,
ContentTopics: filter.NewContentTopicSet(f.ContentTopics...),
ContentTopics: protocol.NewContentTopicSet(f.ContentTopics...),
}, nil
}

type subscribeResult struct {
Subscriptions []*filter.SubscriptionDetails `json:"subscriptions"`
Error string `json:"error,omitempty"`
Subscriptions []*subscription.SubscriptionDetails `json:"subscriptions"`
Error string `json:"error,omitempty"`
}

// FilterSubscribe is used to create a subscription to a filter node to receive messages
Expand Down Expand Up @@ -71,7 +73,7 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
}

for _, subscriptionDetails := range subscriptions {
go func(subscriptionDetails *filter.SubscriptionDetails) {
go func(subscriptionDetails *subscription.SubscriptionDetails) {
for envelope := range subscriptionDetails.C {
send("message", toSubscriptionMessage(envelope))
}
Expand Down
10 changes: 7 additions & 3 deletions library/lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,20 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms
lpOptions = append(lpOptions, lightpush.WithAutomaticPeerSelection())
}

hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, pubsubTopic, lpOptions...)
if pubsubTopic != "" {
lpOptions = append(lpOptions, lightpush.WithPubSubTopic(pubsubTopic))
}

hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, lpOptions...)
return hexutil.Encode(hash), err
}

// LightpushPublish is used to publish a WakuMessage in a pubsub topic using Lightpush protocol
func LightpushPublish(messageJSON string, topic string, peerID string, ms int) (string, error) {
func LightpushPublish(messageJSON string, pubsubTopic string, peerID string, ms int) (string, error) {
msg, err := wakuMessage(messageJSON)
if err != nil {
return "", err
}

return lightpushPublish(msg, getTopic(topic), peerID, ms)
return lightpushPublish(msg, getTopic(pubsubTopic), peerID, ms)
}
12 changes: 11 additions & 1 deletion waku/v2/node/wakunode2.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,11 +847,21 @@ func (w *WakuNode) Peers() ([]*Peer, error) {
return peers, nil
}

func (w *WakuNode) PeersByShard(cluster uint16, shard uint16) peer.IDSlice {
// PeersByShard filters peers based on shard information following static sharding
func (w *WakuNode) PeersByStaticShard(cluster uint16, shard uint16) peer.IDSlice {
pTopic := wakuprotocol.NewStaticShardingPubsubTopic(cluster, shard).String()
return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic)
}

// PeersByContentTopics filters peers based on contentTopic
func (w *WakuNode) PeersByContentTopic(contentTopic string) peer.IDSlice {
pTopic, err := wakuprotocol.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return nil
}
return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic)
}

func (w *WakuNode) findRelayNodes(ctx context.Context) {
defer w.wg.Done()

Expand Down
62 changes: 51 additions & 11 deletions waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) {
}

// GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction
func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
if len(specificPeers) == 0 {
specificPeers = pm.host.Network().Peers()
}
Expand All @@ -150,9 +150,9 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers p

// getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers.
// If specifiedPeers is empty, it checks within all peers in peerStore.
func (pm *PeerManager) getRelayPeers(specificPeers []peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
//Group peers by their connected direction inbound or outbound.
inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers)
inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers...)
if err != nil {
return
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (pm *PeerManager) connectToRelayPeers() {
//Check for out peer connections and connect to more peers.
pm.ensureMinRelayConnsPerTopic()

inRelayPeers, outRelayPeers := pm.getRelayPeers(nil)
inRelayPeers, outRelayPeers := pm.getRelayPeers()
pm.logger.Info("number of relay peers connected",
zap.Int("in", inRelayPeers.Len()),
zap.Int("out", outRelayPeers.Len()))
Expand Down Expand Up @@ -417,29 +417,69 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
pm.serviceSlots.getPeers(proto).add(peerID)
}

// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) {
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return "", err
}
return pm.SelectPeer(proto, pubsubTopic, specificPeers...)
}

// SelectPeer is used to return a random peer that supports a given protocol.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID) (peer.ID, error) {
// if pubSubTopic is specified, peer is selected from list that support the pubSubTopic
func (pm *PeerManager) SelectPeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as:
// - which topics they track
// - latency?

//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(proto); slot != nil {
if peerID, err := slot.getRandom(); err == nil {
return peerID, nil
}
if peerID := pm.selectServicePeer(proto, pubSubTopic, specificPeers...); peerID != nil {
return *peerID, nil
}

// if not found in serviceSlots or proto == WakuRelayIDv200
filteredPeers, err := utils.FilterPeersByProto(pm.host, specificPeers, proto)
if err != nil {
return "", err
}

if pubSubTopic != "" {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, filteredPeers...)
}
return utils.SelectRandomPeer(filteredPeers, pm.logger)
}

func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peerIDPtr *peer.ID) {
peerIDPtr = nil

//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(proto); slot != nil {
if pubSubTopic == "" {
if peerID, err := slot.getRandom(); err == nil {
peerIDPtr = &peerID
} else {
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))
}
} else { //PubsubTopic based selection
keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m {
keys = append(keys, i)
}
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, keys...)
peerID, err := utils.SelectRandomPeer(selectedPeers, pm.logger)
if err == nil {
peerIDPtr = &peerID
} else {
pm.logger.Debug("could not select random peer", zap.Error(err))
}
}
}
return
}
54 changes: 46 additions & 8 deletions waku/v2/peermanager/peer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
wakuproto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestServiceSlots(t *testing.T) {
///////////////

// select peer from pm, currently only h2 is set in pm
peerID, err := pm.SelectPeer(protocol, nil)
peerID, err := pm.SelectPeer(protocol, "")
require.NoError(t, err)
require.Equal(t, peerID, h2.ID())

Expand All @@ -74,7 +75,7 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)

// check that returned peer is h2 or h3 peer
peerID, err = pm.SelectPeer(protocol, nil)
peerID, err = pm.SelectPeer(protocol, "")
require.NoError(t, err)
if peerID == h2.ID() || peerID == h3.ID() {
//Test success
Expand All @@ -90,18 +91,55 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err)
defer h4.Close()

_, err = pm.SelectPeer(protocol1, nil)
_, err = pm.SelectPeer(protocol1, "")
require.Error(t, err, utils.ErrNoPeersAvailable)

// add h4 peer for protocol1
_, err = pm.AddPeer(getAddr(h4), wps.Static, []string{""}, libp2pProtocol.ID(protocol1))
require.NoError(t, err)

//Test peer selection for protocol1
peerID, err = pm.SelectPeer(protocol1, nil)
peerID, err = pm.SelectPeer(protocol1, "")
require.NoError(t, err)
require.Equal(t, peerID, h4.ID())

_, err = pm.SelectPeerByContentTopic(protocol1, "")
require.Error(t, wakuproto.ErrInvalidFormat, err)

}

func TestPeerSelection(t *testing.T) {
ctx, pm, deferFn := initTest(t)
defer deferFn()

h2, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h2.Close()

h3, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h3.Close()

protocol := libp2pProtocol.ID("test/protocol")
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/rs/2/1", "/waku/rs/2/2"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)

_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)

_, err = pm.SelectPeer(protocol, "")
require.NoError(t, err)

peerID, err := pm.SelectPeer(protocol, "/waku/rs/2/2")
require.NoError(t, err)
require.Equal(t, h2.ID(), peerID)

_, err = pm.SelectPeer(protocol, "/waku/rs/2/3")
require.Error(t, utils.ErrNoPeersAvailable, err)

_, err = pm.SelectPeer(protocol, "/waku/rs/2/1")
require.NoError(t, err)

}

func TestDefaultProtocol(t *testing.T) {
Expand All @@ -111,7 +149,7 @@ func TestDefaultProtocol(t *testing.T) {
// check peer for default protocol
///////////////
//Test empty peer selection for relay protocol
_, err := pm.SelectPeer(relay.WakuRelayID_v200, nil)
_, err := pm.SelectPeer(relay.WakuRelayID_v200, "")
require.Error(t, err, utils.ErrNoPeersAvailable)

///////////////
Expand All @@ -126,7 +164,7 @@ func TestDefaultProtocol(t *testing.T) {
require.NoError(t, err)

// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, nil)
peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, "")
require.NoError(t, err)
require.Equal(t, peerID, h5.ID())
}
Expand All @@ -146,12 +184,12 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2)
require.NoError(t, err)

peerID, err := pm.SelectPeer(protocol2, nil)
peerID, err := pm.SelectPeer(protocol2, "")
require.NoError(t, err)
require.Equal(t, peerID, h6.ID())

pm.RemovePeer(peerID)
_, err = pm.SelectPeer(protocol2, nil)
_, err = pm.SelectPeer(protocol2, "")
require.Error(t, err, utils.ErrNoPeersAvailable)
}

Expand Down
Loading