diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index c1e4271bc..b8f4e5f6b 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -63,9 +63,9 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. } if options.Filter.Enable { - cf := filter.ContentFilter{ + cf := protocol.ContentFilter{ PubsubTopic: relay.DefaultWakuTopic, - ContentTopics: filter.NewContentTopicSet(options.ContentTopic), + ContentTopics: protocol.NewContentTopicSet(options.ContentTopic), } var filterOpt filter.FilterSubscribeOption peerID, err := options.Filter.NodePeerID() @@ -269,7 +269,7 @@ func (c *Chat) SendMessage(line string) { err := c.publish(tCtx, line) if err != nil { if err.Error() == "validation failed" { - err = errors.New("message rate violation!") + err = errors.New("message rate violation") } c.ui.ErrorMessage(err) } @@ -524,7 +524,7 @@ func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) { ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second) defer cancel() - err = c.node.DialPeerWithInfo(ctx, n) + err = c.node.DialPeerWithInfo(ctx, info) if err != nil { c.ui.ErrorMessage(fmt.Errorf("co!!uld not connect to %s: %w", info.ID.Pretty(), err)) diff --git a/examples/filter2/main.go b/examples/filter2/main.go index c35584ea9..ec00f95ba 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -97,8 +97,8 @@ func main() { } // Send FilterRequest from light node to full node - cf := filter.ContentFilter{ - ContentTopics: filter.NewContentTopicSet(contentTopic), + cf := protocol.ContentFilter{ + ContentTopics: protocol.NewContentTopicSet(contentTopic), } theFilter, err := lightNode.FilterLightnode().Subscribe(ctx, cf) diff --git a/library/c/README.md b/library/c/README.md index 227bb54e3..4d23aa684 100644 --- a/library/c/README.md +++ b/library/c/README.md @@ -1083,7 +1083,7 @@ Publish a message using Waku Lightpush. 1. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type). 2. `char* pubsubTopic`: pubsub topic on which to publish the message. - If `NULL`, it uses the default pubsub topic. + If `NULL`, it derives the pubsub topic from content-topic based on autosharding. 3. `char* peerID`: Peer ID supporting the lightpush protocol. The peer must be already known. It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) diff --git a/library/c/api_lightpush.go b/library/c/api_lightpush.go index 2243f4f1e..83ca57603 100644 --- a/library/c/api_lightpush.go +++ b/library/c/api_lightpush.go @@ -6,7 +6,7 @@ package main import "C" import "github.com/waku-org/go-waku/library" -// Publish a message using waku lightpush. Use NULL for topic to use the default pubsub topic.. +// Publish a message using waku lightpush. Use NULL for topic to derive the pubsub topic from the contentTopic. // peerID should contain the ID of a peer supporting the lightpush protocol. Use NULL to automatically select a node // If ms is greater than 0, the broadcast of the message must happen before the timeout // (in milliseconds) is reached, or an error will be returned diff --git a/library/filter.go b/library/filter.go index 009167653..687eb6054 100644 --- a/library/filter.go +++ b/library/filter.go @@ -7,7 +7,9 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" + "github.com/waku-org/go-waku/waku/v2/protocol/subscription" ) type filterArgument struct { @@ -15,22 +17,22 @@ type filterArgument struct { ContentTopics []string `json:"contentTopics,omitempty"` } -func toContentFilter(filterJSON string) (filter.ContentFilter, error) { +func toContentFilter(filterJSON string) (protocol.ContentFilter, error) { var f filterArgument err := json.Unmarshal([]byte(filterJSON), &f) if err != nil { - return filter.ContentFilter{}, err + return protocol.ContentFilter{}, err } - return filter.ContentFilter{ + return protocol.ContentFilter{ PubsubTopic: f.PubsubTopic, - ContentTopics: filter.NewContentTopicSet(f.ContentTopics...), + ContentTopics: protocol.NewContentTopicSet(f.ContentTopics...), }, nil } type subscribeResult struct { - Subscriptions []*filter.SubscriptionDetails `json:"subscriptions"` - Error string `json:"error,omitempty"` + Subscriptions []*subscription.SubscriptionDetails `json:"subscriptions"` + Error string `json:"error,omitempty"` } // FilterSubscribe is used to create a subscription to a filter node to receive messages @@ -71,7 +73,7 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) { } for _, subscriptionDetails := range subscriptions { - go func(subscriptionDetails *filter.SubscriptionDetails) { + go func(subscriptionDetails *subscription.SubscriptionDetails) { for envelope := range subscriptionDetails.C { send("message", toSubscriptionMessage(envelope)) } diff --git a/library/lightpush.go b/library/lightpush.go index 633e6dc9c..def0d47cd 100644 --- a/library/lightpush.go +++ b/library/lightpush.go @@ -37,16 +37,20 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms lpOptions = append(lpOptions, lightpush.WithAutomaticPeerSelection()) } - hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, pubsubTopic, lpOptions...) + if pubsubTopic != "" { + lpOptions = append(lpOptions, lightpush.WithPubSubTopic(pubsubTopic)) + } + + hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, lpOptions...) return hexutil.Encode(hash), err } // LightpushPublish is used to publish a WakuMessage in a pubsub topic using Lightpush protocol -func LightpushPublish(messageJSON string, topic string, peerID string, ms int) (string, error) { +func LightpushPublish(messageJSON string, pubsubTopic string, peerID string, ms int) (string, error) { msg, err := wakuMessage(messageJSON) if err != nil { return "", err } - return lightpushPublish(msg, getTopic(topic), peerID, ms) + return lightpushPublish(msg, getTopic(pubsubTopic), peerID, ms) } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 1a3b18c21..1035ed9fc 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -847,11 +847,21 @@ func (w *WakuNode) Peers() ([]*Peer, error) { return peers, nil } -func (w *WakuNode) PeersByShard(cluster uint16, shard uint16) peer.IDSlice { +// PeersByShard filters peers based on shard information following static sharding +func (w *WakuNode) PeersByStaticShard(cluster uint16, shard uint16) peer.IDSlice { pTopic := wakuprotocol.NewStaticShardingPubsubTopic(cluster, shard).String() return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic) } +// PeersByContentTopics filters peers based on contentTopic +func (w *WakuNode) PeersByContentTopic(contentTopic string) peer.IDSlice { + pTopic, err := wakuprotocol.GetPubSubTopicFromContentTopic(contentTopic) + if err != nil { + return nil + } + return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic) +} + func (w *WakuNode) findRelayNodes(ctx context.Context) { defer w.wg.Done() diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 042d3de99..6ded3fa27 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -127,7 +127,7 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) { } // GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction -func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) { +func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) { if len(specificPeers) == 0 { specificPeers = pm.host.Network().Peers() } @@ -150,9 +150,9 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers p // getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers. // If specifiedPeers is empty, it checks within all peers in peerStore. -func (pm *PeerManager) getRelayPeers(specificPeers []peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) { +func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) { //Group peers by their connected direction inbound or outbound. - inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers) + inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers...) if err != nil { return } @@ -206,7 +206,7 @@ func (pm *PeerManager) connectToRelayPeers() { //Check for out peer connections and connect to more peers. pm.ensureMinRelayConnsPerTopic() - inRelayPeers, outRelayPeers := pm.getRelayPeers(nil) + inRelayPeers, outRelayPeers := pm.getRelayPeers() pm.logger.Info("number of relay peers connected", zap.Int("in", inRelayPeers.Len()), zap.Int("out", outRelayPeers.Len())) @@ -417,22 +417,32 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) { pm.serviceSlots.getPeers(proto).add(peerID) } +// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic. +// If a list of specific peers is passed, the peer will be chosen from that list assuming +// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot. +// If a peer cannot be found in the service slot, a peer will be selected from node peerstore +func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) { + pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) + if err != nil { + return "", err + } + return pm.SelectPeer(proto, pubsubTopic, specificPeers...) +} + // SelectPeer is used to return a random peer that supports a given protocol. // If a list of specific peers is passed, the peer will be chosen from that list assuming // it supports the chosen protocol, otherwise it will chose a peer from the service slot. // If a peer cannot be found in the service slot, a peer will be selected from node peerstore -func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID) (peer.ID, error) { +// if pubSubTopic is specified, peer is selected from list that support the pubSubTopic +func (pm *PeerManager) SelectPeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) { // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // Ideally depending on the query and our set of peers we take a subset of ideal peers. // This will require us to check for various factors such as: // - which topics they track // - latency? - //Try to fetch from serviceSlot - if slot := pm.serviceSlots.getPeers(proto); slot != nil { - if peerID, err := slot.getRandom(); err == nil { - return peerID, nil - } + if peerID := pm.selectServicePeer(proto, pubSubTopic, specificPeers...); peerID != nil { + return *peerID, nil } // if not found in serviceSlots or proto == WakuRelayIDv200 @@ -440,6 +450,36 @@ func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID) (p if err != nil { return "", err } - + if pubSubTopic != "" { + filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, filteredPeers...) + } return utils.SelectRandomPeer(filteredPeers, pm.logger) } + +func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peerIDPtr *peer.ID) { + peerIDPtr = nil + + //Try to fetch from serviceSlot + if slot := pm.serviceSlots.getPeers(proto); slot != nil { + if pubSubTopic == "" { + if peerID, err := slot.getRandom(); err == nil { + peerIDPtr = &peerID + } else { + pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err)) + } + } else { //PubsubTopic based selection + keys := make([]peer.ID, 0, len(slot.m)) + for i := range slot.m { + keys = append(keys, i) + } + selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, keys...) + peerID, err := utils.SelectRandomPeer(selectedPeers, pm.logger) + if err == nil { + peerIDPtr = &peerID + } else { + pm.logger.Debug("could not select random peer", zap.Error(err)) + } + } + } + return +} diff --git a/waku/v2/peermanager/peer_manager_test.go b/waku/v2/peermanager/peer_manager_test.go index 259b92f46..ce42ab432 100644 --- a/waku/v2/peermanager/peer_manager_test.go +++ b/waku/v2/peermanager/peer_manager_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" wps "github.com/waku-org/go-waku/waku/v2/peerstore" + wakuproto "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -65,7 +66,7 @@ func TestServiceSlots(t *testing.T) { /////////////// // select peer from pm, currently only h2 is set in pm - peerID, err := pm.SelectPeer(protocol, nil) + peerID, err := pm.SelectPeer(protocol, "") require.NoError(t, err) require.Equal(t, peerID, h2.ID()) @@ -74,7 +75,7 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) // check that returned peer is h2 or h3 peer - peerID, err = pm.SelectPeer(protocol, nil) + peerID, err = pm.SelectPeer(protocol, "") require.NoError(t, err) if peerID == h2.ID() || peerID == h3.ID() { //Test success @@ -90,7 +91,7 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) defer h4.Close() - _, err = pm.SelectPeer(protocol1, nil) + _, err = pm.SelectPeer(protocol1, "") require.Error(t, err, utils.ErrNoPeersAvailable) // add h4 peer for protocol1 @@ -98,10 +99,47 @@ func TestServiceSlots(t *testing.T) { require.NoError(t, err) //Test peer selection for protocol1 - peerID, err = pm.SelectPeer(protocol1, nil) + peerID, err = pm.SelectPeer(protocol1, "") require.NoError(t, err) require.Equal(t, peerID, h4.ID()) + _, err = pm.SelectPeerByContentTopic(protocol1, "") + require.Error(t, wakuproto.ErrInvalidFormat, err) + +} + +func TestPeerSelection(t *testing.T) { + ctx, pm, deferFn := initTest(t) + defer deferFn() + + h2, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h2.Close() + + h3, err := tests.MakeHost(ctx, 0, rand.Reader) + require.NoError(t, err) + defer h3.Close() + + protocol := libp2pProtocol.ID("test/protocol") + _, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/rs/2/1", "/waku/rs/2/2"}, libp2pProtocol.ID(protocol)) + require.NoError(t, err) + + _, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol)) + require.NoError(t, err) + + _, err = pm.SelectPeer(protocol, "") + require.NoError(t, err) + + peerID, err := pm.SelectPeer(protocol, "/waku/rs/2/2") + require.NoError(t, err) + require.Equal(t, h2.ID(), peerID) + + _, err = pm.SelectPeer(protocol, "/waku/rs/2/3") + require.Error(t, utils.ErrNoPeersAvailable, err) + + _, err = pm.SelectPeer(protocol, "/waku/rs/2/1") + require.NoError(t, err) + } func TestDefaultProtocol(t *testing.T) { @@ -111,7 +149,7 @@ func TestDefaultProtocol(t *testing.T) { // check peer for default protocol /////////////// //Test empty peer selection for relay protocol - _, err := pm.SelectPeer(relay.WakuRelayID_v200, nil) + _, err := pm.SelectPeer(relay.WakuRelayID_v200, "") require.Error(t, err, utils.ErrNoPeersAvailable) /////////////// @@ -126,7 +164,7 @@ func TestDefaultProtocol(t *testing.T) { require.NoError(t, err) // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. - peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, nil) + peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, "") require.NoError(t, err) require.Equal(t, peerID, h5.ID()) } @@ -146,12 +184,12 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) { _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2) require.NoError(t, err) - peerID, err := pm.SelectPeer(protocol2, nil) + peerID, err := pm.SelectPeer(protocol2, "") require.NoError(t, err) require.Equal(t, peerID, h6.ID()) pm.RemovePeer(peerID) - _, err = pm.SelectPeer(protocol2, nil) + _, err = pm.SelectPeer(protocol2, "") require.Error(t, err, utils.ErrNoPeersAvailable) } diff --git a/waku/v2/peerstore/waku_peer_store.go b/waku/v2/peerstore/waku_peer_store.go index 5d2f28e06..b1b1ac4f1 100644 --- a/waku/v2/peerstore/waku_peer_store.go +++ b/waku/v2/peerstore/waku_peer_store.go @@ -59,7 +59,7 @@ type WakuPeerstore interface { RemovePubSubTopic(p peer.ID, topic string) error PubSubTopics(p peer.ID) ([]string, error) SetPubSubTopics(p peer.ID, topics []string) error - PeersByPubSubTopic(pubSubTopic string) peer.IDSlice + PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice } // NewWakuPeerstore creates a new WakuPeerStore object @@ -208,9 +208,13 @@ func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) { } // PeersByPubSubTopic Returns list of peers by pubSubTopic -func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string) peer.IDSlice { +// If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore +func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice { + if specificPeers == nil { + specificPeers = ps.Peers() + } var result peer.IDSlice - for _, p := range ps.Peers() { + for _, p := range specificPeers { topics, err := ps.PubSubTopics(p) if err == nil { for _, topic := range topics { diff --git a/waku/v2/protocol/content_filter.go b/waku/v2/protocol/content_filter.go new file mode 100644 index 000000000..c230f6f32 --- /dev/null +++ b/waku/v2/protocol/content_filter.go @@ -0,0 +1,30 @@ +package protocol + +import "golang.org/x/exp/maps" + +type ContentTopicSet map[string]struct{} + +func NewContentTopicSet(contentTopics ...string) ContentTopicSet { + s := make(ContentTopicSet, len(contentTopics)) + for _, ct := range contentTopics { + s[ct] = struct{}{} + } + return s +} + +// ContentFilter is used to specify the filter to be applied for a FilterNode. +// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding. +// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding) +// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm +type ContentFilter struct { + PubsubTopic string + ContentTopics ContentTopicSet +} + +func (cf ContentFilter) ContentTopicsList() []string { + return maps.Keys(cf.ContentTopics) +} + +func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter { + return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)} +} diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 916dea5a2..b36f05671 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -21,9 +21,9 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/protocol/subscription" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" - "golang.org/x/exp/maps" ) // FilterPushID_v20beta1 is the current Waku Filter protocol identifier used to allow @@ -41,27 +41,10 @@ type WakuFilterLightNode struct { timesource timesource.Timesource metrics Metrics log *zap.Logger - subscriptions *SubscriptionsMap + subscriptions *subscription.SubscriptionsMap pm *peermanager.PeerManager } -// ContentFilter is used to specify the filter to be applied for a FilterNode. -// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding. -// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding) -// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm -type ContentFilter struct { - PubsubTopic string - ContentTopics ContentTopicSet -} - -func (cf ContentFilter) ContentTopicsList() []string { - return maps.Keys(cf.ContentTopics) -} - -func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter { - return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)} -} - type WakuFilterPushResult struct { Err error PeerID peer.ID @@ -95,7 +78,7 @@ func (wf *WakuFilterLightNode) Start(ctx context.Context) error { } func (wf *WakuFilterLightNode) start() error { - wf.subscriptions = NewSubscriptionMap(wf.log) + wf.subscriptions = subscription.NewSubscriptionMap(wf.log) wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(wf.Context())) wf.log.Info("filter-push protocol started") @@ -144,7 +127,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str pubSubTopic := "" //For now returning failure, this will get addressed with autosharding changes for filter. if messagePush.PubsubTopic == nil { - pubSubTopic, err = getPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic) + pubSubTopic, err = protocol.GetPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic) if err != nil { logger.Error("could not derive pubSubTopic from contentTopic", zap.Error(err)) wf.metrics.RecordError(decodeRPCFailure) @@ -153,7 +136,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str } else { pubSubTopic = *messagePush.PubsubTopic } - if !wf.subscriptions.Has(s.Conn().RemotePeer(), NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) { + if !wf.subscriptions.Has(s.Conn().RemotePeer(), protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) { logger.Warn("received messagepush with invalid subscription parameters", logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic), zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) @@ -181,7 +164,7 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, } func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters, - reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { + reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error { conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) if err != nil { wf.metrics.RecordError(dialFailure) @@ -230,18 +213,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr return nil } -func getPubSubTopicFromContentTopic(cTopicString string) (string, error) { - cTopic, err := protocol.StringToContentTopic(cTopicString) - if err != nil { - return "", fmt.Errorf("%s : %s", err.Error(), cTopicString) - } - pTopic := protocol.GetShardFromContentTopic(cTopic, protocol.GenerationZeroShardsCount) - - return pTopic.String(), nil -} - // This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics -func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) { +func contentFilterToPubSubTopicMap(contentFilter protocol.ContentFilter) (map[string][]string, error) { pubSubTopicMap := make(map[string][]string) if contentFilter.PubsubTopic != "" { @@ -249,7 +222,7 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st } else { //Parse the content-Topics to figure out shards. for _, cTopicString := range contentFilter.ContentTopicsList() { - pTopicStr, err := getPubSubTopicFromContentTopic(cTopicString) + pTopicStr, err := protocol.GetPubSubTopicFromContentTopic(cTopicString) if err != nil { return nil, err } @@ -267,7 +240,7 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st // If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer. // This may change if Filterv2 protocol is updated to handle such a scenario in a single request. // Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics. -func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) ([]*SubscriptionDetails, error) { +func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -303,11 +276,11 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont return nil, err } failedContentTopics := []string{} - subscriptions := make([]*SubscriptionDetails, 0) + subscriptions := make([]*subscription.SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { - var cFilter ContentFilter + var cFilter protocol.ContentFilter cFilter.PubsubTopic = pubSubTopic - cFilter.ContentTopics = NewContentTopicSet(cTopics...) + cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...) err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) if err != nil { wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), @@ -325,7 +298,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont } // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol -func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) { +func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) (*subscription.SubscriptionDetails, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -361,10 +334,10 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error { ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: protocol.GenerateRequestID()}, pb.FilterSubscribeRequest_SUBSCRIBER_PING, - ContentFilter{}) + protocol.ContentFilter{}) } -func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error { +func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *subscription.SubscriptionDetails) error { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -374,7 +347,7 @@ func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscrip return wf.Ping(ctx, subscription.PeerID) } -func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails { +func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -384,10 +357,10 @@ func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails { wf.subscriptions.RLock() defer wf.subscriptions.RUnlock() - var output []*SubscriptionDetails + var output []*subscription.SubscriptionDetails - for _, peerSubscription := range wf.subscriptions.items { - for _, subscriptions := range peerSubscription.subsPerPubsubTopic { + for _, peerSubscription := range wf.subscriptions.Items { + for _, subscriptions := range peerSubscription.SubsPerPubsubTopic { for _, subscriptionDetail := range subscriptions { output = append(output, subscriptionDetail) } @@ -397,16 +370,16 @@ func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails { return output } -func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) { +func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter protocol.ContentFilter) { wf.subscriptions.Lock() defer wf.subscriptions.Unlock() - peerSubscription, ok := wf.subscriptions.items[peerID] + peerSubscription, ok := wf.subscriptions.Items[peerID] if !ok { return } - subscriptionDetailList, ok := peerSubscription.subsPerPubsubTopic[contentFilter.PubsubTopic] + subscriptionDetailList, ok := peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic] if !ok { return } @@ -415,18 +388,18 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte subscriptionDetail.Remove(contentFilter.ContentTopicsList()...) if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 { delete(subscriptionDetailList, subscriptionDetailID) - subscriptionDetail.closeC() + subscriptionDetail.CloseC() } } if len(subscriptionDetailList) == 0 { - delete(wf.subscriptions.items[peerID].subsPerPubsubTopic, contentFilter.PubsubTopic) + delete(wf.subscriptions.Items[peerID].SubsPerPubsubTopic, contentFilter.PubsubTopic) } } // Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -450,22 +423,22 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co if err != nil { return nil, err } - resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) + resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items)) for pTopic, cTopics := range pubSubTopicMap { - cFilter := NewContentFilter(pTopic, cTopics...) - for peerID := range wf.subscriptions.items { + cFilter := protocol.NewContentFilter(pTopic, cTopics...) + for peerID := range wf.subscriptions.Items { if params.selectedPeer != "" && peerID != params.selectedPeer { continue } - subscriptions, ok := wf.subscriptions.items[peerID] + subscriptions, ok := wf.subscriptions.Items[peerID] if !ok || subscriptions == nil { continue } wf.cleanupSubscriptions(peerID, cFilter) - if len(subscriptions.subsPerPubsubTopic) == 0 { - delete(wf.subscriptions.items, peerID) + if len(subscriptions.SubsPerPubsubTopic) == 0 { + delete(wf.subscriptions.Items, peerID) } if params.wg != nil { @@ -501,7 +474,8 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co // UnsubscribeWithSubscription is used to close a particular subscription // If there are no more subscriptions matching the passed [peer, contentFilter] pair, // server unsubscribe is also performed -func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails, + opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -531,7 +505,7 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, } -func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter ContentFilter) error { +func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter protocol.ContentFilter) error { err := wf.request(ctx, params, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter) if err != nil { ferr, ok := err.(*FilterError) @@ -554,14 +528,14 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte wf.subscriptions.Lock() defer wf.subscriptions.Unlock() - resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) + resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items)) - for peerID := range wf.subscriptions.items { + for peerID := range wf.subscriptions.Items { if params.selectedPeer != "" && peerID != params.selectedPeer { continue } - delete(wf.subscriptions.items, peerID) + delete(wf.subscriptions.Items, peerID) if params.wg != nil { params.wg.Add(1) @@ -578,7 +552,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL, - ContentFilter{}) + protocol.ContentFilter{}) if err != nil { wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) } diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index eff63b836..685a15ea4 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -17,6 +17,7 @@ import ( "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/protocol/subscription" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -40,8 +41,8 @@ type FilterTestSuite struct { fullNode *WakuFilterFullNode fullNodeHost host.Host wg *sync.WaitGroup - contentFilter ContentFilter - subDetails []*SubscriptionDetails + contentFilter protocol.ContentFilter + subDetails []*subscription.SubscriptionDetails log *zap.Logger } @@ -145,8 +146,8 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) s.wg.Wait() } -func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*SubscriptionDetails { - s.contentFilter = ContentFilter{pubsubTopic, NewContentTopicSet(contentTopic)} +func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails { + s.contentFilter = protocol.ContentFilter{PubsubTopic: pubsubTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic)} subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) s.Require().NoError(err) @@ -378,7 +379,7 @@ func (s *FilterTestSuite) TestRunningGuard() { s.lightNode.Stop() - contentFilter := ContentFilter{"test", NewContentTopicSet("test")} + contentFilter := protocol.ContentFilter{PubsubTopic: "test", ContentTopics: protocol.NewContentTopicSet("test")} _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) @@ -394,7 +395,7 @@ func (s *FilterTestSuite) TestRunningGuard() { func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { - contentFilter := ContentFilter{"test", NewContentTopicSet("test")} + contentFilter := protocol.ContentFilter{PubsubTopic: "test", ContentTopics: protocol.NewContentTopicSet("test")} _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) s.Require().NoError(err) @@ -508,9 +509,9 @@ func (s *FilterTestSuite) TestAutoShard() { s.Require().NoError(err) }, s.subDetails[0].C) - _, err = s.lightNode.Unsubscribe(s.ctx, ContentFilter{ + _, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{ PubsubTopic: s.testTopic, - ContentTopics: NewContentTopicSet(newContentTopic), + ContentTopics: protocol.NewContentTopicSet(newContentTopic), }) s.Require().NoError(err) diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 9b23db1d7..036165fbd 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -60,7 +60,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption if params.pm == nil { p, err = utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log) } else { - p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, fromThesePeers) + p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, "", fromThesePeers...) } if err == nil { params.selectedPeer = p diff --git a/waku/v2/protocol/filter/subscribers_map.go b/waku/v2/protocol/filter/subscribers_map.go index b77d1332d..80a364e11 100644 --- a/waku/v2/protocol/filter/subscribers_map.go +++ b/waku/v2/protocol/filter/subscribers_map.go @@ -8,23 +8,14 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" ) -var ErrNotFound = errors.New("not found") - -type ContentTopicSet map[string]struct{} - -func NewContentTopicSet(contentTopics ...string) ContentTopicSet { - s := make(ContentTopicSet, len(contentTopics)) - for _, ct := range contentTopics { - s[ct] = struct{}{} - } - return s -} - type PeerSet map[peer.ID]struct{} -type PubsubTopics map[string]ContentTopicSet // pubsubTopic => contentTopics +type PubsubTopics map[string]protocol.ContentTopicSet // pubsubTopic => contentTopics + +var errNotFound = errors.New("not found") type SubscribersMap struct { sync.RWMutex @@ -65,7 +56,7 @@ func (sub *SubscribersMap) Set(peerID peer.ID, pubsubTopic string, contentTopics contentTopicsMap, ok := pubsubTopicMap[pubsubTopic] if !ok { - contentTopicsMap = make(ContentTopicSet) + contentTopicsMap = make(protocol.ContentTopicSet) } for _, c := range contentTopics { @@ -106,12 +97,12 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop pubsubTopicMap, ok := sub.items[peerID] if !ok { - return ErrNotFound + return errNotFound } contentTopicsMap, ok := pubsubTopicMap[pubsubTopic] if !ok { - return ErrNotFound + return errNotFound } // Removing content topics individually @@ -140,7 +131,7 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop func (sub *SubscribersMap) deleteAll(peerID peer.ID) error { pubsubTopicMap, ok := sub.items[peerID] if !ok { - return ErrNotFound + return errNotFound } for pubsubTopic, contentTopicsMap := range pubsubTopicMap { diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 3c5e7a275..8570ce818 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -144,15 +144,10 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea } } -func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...Option) (*pb.PushResponse, error) { - params := new(lightPushParameters) - params.host = wakuLP.h - params.log = wakuLP.log - params.pm = wakuLP.pm - - optList := append(DefaultOptions(wakuLP.h), opts...) - for _, opt := range optList { - opt(params) +// request sends a message via lightPush protocol to either a specified peer or peer that is selected. +func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushParameters) (*pb.PushResponse, error) { + if params == nil { + return nil, errors.New("lightpush params are mandatory") } if params.selectedPeer == "" { @@ -215,23 +210,40 @@ func (wakuLP *WakuLightPush) Stop() { wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) } -// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol -func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, topic string, opts ...Option) ([]byte, error) { +// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol +// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding. +func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { if message == nil { return nil, errors.New("message can't be null") } + params := new(lightPushParameters) + params.host = wakuLP.h + params.log = wakuLP.log + params.pm = wakuLP.pm + + optList := append(DefaultOptions(wakuLP.h), opts...) + for _, opt := range optList { + opt(params) + } + if params.pubsubTopic == "" { + var err error + params.pubsubTopic, err = protocol.GetPubSubTopicFromContentTopic(message.ContentTopic) + if err != nil { + return nil, err + } + } req := new(pb.PushRequest) req.Message = message - req.PubsubTopic = topic + req.PubsubTopic = params.pubsubTopic - response, err := wakuLP.request(ctx, req, opts...) + response, err := wakuLP.request(ctx, req, params) if err != nil { return nil, err } if response.IsSuccess { - hash := message.Hash(topic) + hash := message.Hash(params.pubsubTopic) wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash)) return hash, nil } @@ -239,7 +251,8 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa return nil, errors.New(response.Info) } -// Publish is used to broadcast a WakuMessage to the default waku pubsub topic via lightpush protocol +// Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the contentTopic) via lightpush protocol +// If auto-sharding is not to be used, then PublishToTopic API should be used func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { - return wakuLP.PublishToTopic(ctx, message, relay.DefaultWakuTopic, opts...) + return wakuLP.PublishToTopic(ctx, message, opts...) } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 183988897..f0d496d62 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -17,6 +17,7 @@ type lightPushParameters struct { requestID []byte pm *peermanager.PeerManager log *zap.Logger + pubsubTopic string } // Option is the type of options accepted when performing LightPush protocol requests @@ -40,7 +41,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { if params.pm == nil { p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log) } else { - p, err = params.pm.SelectPeer(LightPushID_v20beta1, fromThesePeers) + p, err = params.pm.SelectPeer(LightPushID_v20beta1, "", fromThesePeers...) } if err == nil { params.selectedPeer = p @@ -50,6 +51,12 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { } } +func WithPubSubTopic(pubsubTopic string) Option { + return func(params *lightPushParameters) { + params.pubsubTopic = pubsubTopic + } +} + // WithFastestPeerSelection is an option used to select a peer from the peer store // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 69168fed5..5a9191949 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -13,13 +13,12 @@ import ( "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" ) -func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscription, host.Host) { +func makeWakuRelay(t *testing.T, pusubTopic string) (*relay.WakuRelay, *relay.Subscription, host.Host) { port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -34,7 +33,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri err = relay.Start(context.Background()) require.NoError(t, err) - sub, err := relay.SubscribeToTopic(context.Background(), topic) + sub, err := relay.SubscribeToTopic(context.Background(), pusubTopic) require.NoError(t, err) return relay, sub, host @@ -87,13 +86,8 @@ func TestWakuLightPush(t *testing.T) { err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1) require.NoError(t, err) - msg1 := tests.CreateWakuMessage("test1", utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("test2", utils.GetUnixEpoch()) - req := new(pb.PushRequest) - req.Message = msg1 - req.PubsubTopic = string(testTopic) - // Wait for the mesh connection to happen between node1 and node2 time.Sleep(2 * time.Second) var wg sync.WaitGroup @@ -102,23 +96,19 @@ func TestWakuLightPush(t *testing.T) { go func() { defer wg.Done() <-sub1.Ch - <-sub1.Ch }() wg.Add(1) go func() { defer wg.Done() <-sub2.Ch - <-sub2.Ch }() - // Verifying successful request - resp, err := client.request(ctx, req) - require.NoError(t, err) - require.True(t, resp.IsSuccess) + var lpOptions []Option + lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) // Checking that msg hash is correct - hash, err := client.PublishToTopic(ctx, msg2, testTopic) + hash, err := client.PublishToTopic(ctx, msg2, lpOptions...) require.NoError(t, err) require.Equal(t, protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), string(testTopic)).Hash(), hash) wg.Wait() @@ -147,6 +137,90 @@ func TestWakuLightPushNoPeers(t *testing.T) { require.NoError(t, err) client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger()) client.SetHost(clientHost) - _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), testTopic) + var lpOptions []Option + lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) + + _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), lpOptions...) require.Errorf(t, err, "no suitable remote peers") } + +// Node1: Relay +// Node2: Relay+Lightpush +// Client that will lightpush a message +// +// Node1 and Node 2 are peers +// Client and Node 2 are peers +// Client will use lightpush request, sending the message to Node2 +// +// Client send a successful message using lightpush +// Node2 receive the message and broadcast it +// Node1 receive the message + +func TestWakuLightPushAutoSharding(t *testing.T) { + contentTopic := "0/test/1/testTopic/proto" + cTopic1, err := protocol.StringToContentTopic(contentTopic) + require.NoError(t, err) + //Computing pubSubTopic only for filterFullNode. + pubSubTopicInst := protocol.GetShardFromContentTopic(cTopic1, protocol.GenerationZeroShardsCount) + pubSubTopic := pubSubTopicInst.String() + node1, sub1, host1 := makeWakuRelay(t, pubSubTopic) + defer node1.Stop() + defer sub1.Unsubscribe() + + node2, sub2, host2 := makeWakuRelay(t, pubSubTopic) + defer node2.Stop() + defer sub2.Unsubscribe() + + ctx := context.Background() + lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger()) + lightPushNode2.SetHost(host2) + err = lightPushNode2.Start(ctx) + require.NoError(t, err) + defer lightPushNode2.Stop() + + port, err := tests.FindFreePort(t, "", 5) + require.NoError(t, err) + + clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) + require.NoError(t, err) + client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger()) + client.SetHost(clientHost) + + host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host2.Peerstore().AddProtocols(host1.ID(), relay.WakuRelayID_v200) + require.NoError(t, err) + + err = host2.Connect(ctx, host2.Peerstore().PeerInfo(host1.ID())) + require.NoError(t, err) + + clientHost.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1) + require.NoError(t, err) + + msg1 := tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch()) + + // Wait for the mesh connection to happen between node1 and node2 + time.Sleep(2 * time.Second) + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + <-sub1.Ch + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-sub2.Ch + + }() + + // Verifying successful request + hash1, err := client.Publish(ctx, msg1) + require.NoError(t, err) + require.Equal(t, protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), string(pubSubTopic)).Hash(), hash1) + + wg.Wait() + +} diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index 289c3280e..704e0d21b 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -37,7 +37,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { if params.pm == nil { p, err = utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log) } else { - p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, fromThesePeers) + p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, "", fromThesePeers...) } if err == nil { params.selectedPeer = p diff --git a/waku/v2/protocol/shard.go b/waku/v2/protocol/shard.go index 3f6994207..aeeb32cb2 100644 --- a/waku/v2/protocol/shard.go +++ b/waku/v2/protocol/shard.go @@ -230,3 +230,13 @@ func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticSharding return NewStaticShardingPubsubTopic(ClusterIndex, uint16(shard)) } + +func GetPubSubTopicFromContentTopic(cTopicString string) (string, error) { + cTopic, err := StringToContentTopic(cTopicString) + if err != nil { + return "", fmt.Errorf("%s : %s", err.Error(), cTopicString) + } + pTopic := GetShardFromContentTopic(cTopic, GenerationZeroShardsCount) + + return pTopic.String(), nil +} diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index a1440a18a..3855ff6a9 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -111,7 +111,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption if params.s.pm == nil { p, err = utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log) } else { - p, err = params.s.pm.SelectPeer(StoreID_v20beta4, fromThesePeers) + p, err = params.s.pm.SelectPeer(StoreID_v20beta4, "", fromThesePeers...) } if err == nil { params.selectedPeer = p diff --git a/waku/v2/protocol/filter/subscriptions_map.go b/waku/v2/protocol/subscription/subscriptions_map.go similarity index 73% rename from waku/v2/protocol/filter/subscriptions_map.go rename to waku/v2/protocol/subscription/subscriptions_map.go index 35ccc5aff..6239e18b4 100644 --- a/waku/v2/protocol/filter/subscriptions_map.go +++ b/waku/v2/protocol/subscription/subscriptions_map.go @@ -1,7 +1,8 @@ -package filter +package subscription import ( "encoding/json" + "errors" "sync" "github.com/google/uuid" @@ -20,7 +21,7 @@ type SubscriptionDetails struct { once sync.Once PeerID peer.ID - ContentFilter ContentFilter + ContentFilter protocol.ContentFilter C chan *protocol.Envelope } @@ -28,39 +29,41 @@ type SubscriptionDetails struct { type SubscriptionSet map[string]*SubscriptionDetails type PeerSubscription struct { - peerID peer.ID - subsPerPubsubTopic map[string]SubscriptionSet + PeerID peer.ID + SubsPerPubsubTopic map[string]SubscriptionSet } type SubscriptionsMap struct { sync.RWMutex logger *zap.Logger - items map[peer.ID]*PeerSubscription + Items map[peer.ID]*PeerSubscription } +var ErrNotFound = errors.New("not found") + func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap { return &SubscriptionsMap{ logger: logger.Named("subscription-map"), - items: make(map[peer.ID]*PeerSubscription), + Items: make(map[peer.ID]*PeerSubscription), } } -func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf ContentFilter) *SubscriptionDetails { +func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.ContentFilter) *SubscriptionDetails { sub.Lock() defer sub.Unlock() - peerSubscription, ok := sub.items[peerID] + peerSubscription, ok := sub.Items[peerID] if !ok { peerSubscription = &PeerSubscription{ - peerID: peerID, - subsPerPubsubTopic: make(map[string]SubscriptionSet), + PeerID: peerID, + SubsPerPubsubTopic: make(map[string]SubscriptionSet), } - sub.items[peerID] = peerSubscription + sub.Items[peerID] = peerSubscription } - _, ok = peerSubscription.subsPerPubsubTopic[cf.PubsubTopic] + _, ok = peerSubscription.SubsPerPubsubTopic[cf.PubsubTopic] if !ok { - peerSubscription.subsPerPubsubTopic[cf.PubsubTopic] = make(SubscriptionSet) + peerSubscription.SubsPerPubsubTopic[cf.PubsubTopic] = make(SubscriptionSet) } details := &SubscriptionDetails{ @@ -68,10 +71,10 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf ContentFilter) * mapRef: sub, PeerID: peerID, C: make(chan *protocol.Envelope, 1024), - ContentFilter: ContentFilter{cf.PubsubTopic, maps.Clone(cf.ContentTopics)}, + ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)}, } - sub.items[peerID].subsPerPubsubTopic[cf.PubsubTopic][details.ID] = details + sub.Items[peerID].SubsPerPubsubTopic[cf.PubsubTopic][details.ID] = details return details } @@ -80,23 +83,23 @@ func (sub *SubscriptionsMap) IsSubscribedTo(peerID peer.ID) bool { sub.RLock() defer sub.RUnlock() - _, ok := sub.items[peerID] + _, ok := sub.Items[peerID] return ok } // Check if we have subscriptions for all (pubsubTopic, contentTopics[i]) pairs provided -func (sub *SubscriptionsMap) Has(peerID peer.ID, cf ContentFilter) bool { +func (sub *SubscriptionsMap) Has(peerID peer.ID, cf protocol.ContentFilter) bool { sub.RLock() defer sub.RUnlock() // Check if peer exits - peerSubscription, ok := sub.items[peerID] + peerSubscription, ok := sub.Items[peerID] if !ok { return false } //TODO: Handle pubsubTopic as null // Check if pubsub topic exists - subscriptions, ok := peerSubscription.subsPerPubsubTopic[cf.PubsubTopic] + subscriptions, ok := peerSubscription.SubsPerPubsubTopic[cf.PubsubTopic] if !ok { return false } @@ -122,12 +125,12 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { sub.Lock() defer sub.Unlock() - peerSubscription, ok := sub.items[subscription.PeerID] + peerSubscription, ok := sub.Items[subscription.PeerID] if !ok { return ErrNotFound } - delete(peerSubscription.subsPerPubsubTopic[subscription.ContentFilter.PubsubTopic], subscription.ID) + delete(peerSubscription.SubsPerPubsubTopic[subscription.ContentFilter.PubsubTopic], subscription.ID) return nil } @@ -150,7 +153,7 @@ func (s *SubscriptionDetails) Remove(contentTopics ...string) { } } -func (s *SubscriptionDetails) closeC() { +func (s *SubscriptionDetails) CloseC() { s.once.Do(func() { s.Lock() defer s.Unlock() @@ -161,7 +164,7 @@ func (s *SubscriptionDetails) closeC() { } func (s *SubscriptionDetails) Close() error { - s.closeC() + s.CloseC() return s.mapRef.Delete(s) } @@ -174,7 +177,7 @@ func (s *SubscriptionDetails) Clone() *SubscriptionDetails { mapRef: s.mapRef, Closed: false, PeerID: s.PeerID, - ContentFilter: ContentFilter{s.ContentFilter.PubsubTopic, maps.Clone(s.ContentFilter.ContentTopics)}, + ContentFilter: protocol.ContentFilter{PubsubTopic: s.ContentFilter.PubsubTopic, ContentTopics: maps.Clone(s.ContentFilter.ContentTopics)}, C: make(chan *protocol.Envelope), } @@ -182,15 +185,15 @@ func (s *SubscriptionDetails) Clone() *SubscriptionDetails { } func (sub *SubscriptionsMap) clear() { - for _, peerSubscription := range sub.items { - for _, subscriptionSet := range peerSubscription.subsPerPubsubTopic { + for _, peerSubscription := range sub.Items { + for _, subscriptionSet := range peerSubscription.SubsPerPubsubTopic { for _, subscription := range subscriptionSet { - subscription.closeC() + subscription.CloseC() } } } - sub.items = make(map[peer.ID]*PeerSubscription) + sub.Items = make(map[peer.ID]*PeerSubscription) } func (sub *SubscriptionsMap) Clear() { @@ -203,7 +206,7 @@ func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope) sub.RLock() defer sub.RUnlock() - subscriptions, ok := sub.items[peerID].subsPerPubsubTopic[envelope.PubsubTopic()] + subscriptions, ok := sub.Items[peerID].SubsPerPubsubTopic[envelope.PubsubTopic()] if ok { iterateSubscriptionSet(sub.logger, subscriptions, envelope) } diff --git a/waku/v2/protocol/filter/subscriptions_map_test.go b/waku/v2/protocol/subscription/subscriptions_map_test.go similarity index 75% rename from waku/v2/protocol/filter/subscriptions_map_test.go rename to waku/v2/protocol/subscription/subscriptions_map_test.go index 382280bb8..80190dbbb 100644 --- a/waku/v2/protocol/filter/subscriptions_map_test.go +++ b/waku/v2/protocol/subscription/subscriptions_map_test.go @@ -1,4 +1,4 @@ -package filter +package subscription import ( "context" @@ -6,18 +6,29 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/test" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/utils" ) +const PUBSUB_TOPIC = "/test/topic" + +func createPeerID(t *testing.T) peer.ID { + peerId, err := test.RandPeerID() + assert.NoError(t, err) + return peerId +} + func TestSubscriptionMapAppend(t *testing.T) { fmap := NewSubscriptionMap(utils.Logger()) peerID := createPeerID(t) - contentTopics := NewContentTopicSet("ct1", "ct2") + contentTopics := protocol.NewContentTopicSet("ct1", "ct2") - sub := fmap.NewSubscription(peerID, ContentFilter{PUBSUB_TOPIC, contentTopics}) + sub := fmap.NewSubscription(peerID, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC, ContentTopics: contentTopics}) _, found := sub.ContentFilter.ContentTopics["ct1"] require.True(t, found) _, found = sub.ContentFilter.ContentTopics["ct2"] @@ -44,12 +55,12 @@ func TestSubscriptionMapAppend(t *testing.T) { func TestSubscriptionClear(t *testing.T) { fmap := NewSubscriptionMap(utils.Logger()) - contentTopics := NewContentTopicSet("ct1", "ct2") + contentTopics := protocol.NewContentTopicSet("ct1", "ct2") var subscriptions = []*SubscriptionDetails{ - fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "1", contentTopics}), - fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "2", contentTopics}), - fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "3", contentTopics}), + fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: contentTopics}), + fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "2", ContentTopics: contentTopics}), + fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "3", ContentTopics: contentTopics}), } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) @@ -84,9 +95,9 @@ func TestSubscriptionsNotify(t *testing.T) { p1 := createPeerID(t) p2 := createPeerID(t) var subscriptions = []*SubscriptionDetails{ - fmap.NewSubscription(p1, ContentFilter{PUBSUB_TOPIC + "1", NewContentTopicSet("ct1", "ct2")}), - fmap.NewSubscription(p2, ContentFilter{PUBSUB_TOPIC + "1", NewContentTopicSet("ct1")}), - fmap.NewSubscription(p1, ContentFilter{PUBSUB_TOPIC + "2", NewContentTopicSet("ct1", "ct2")}), + fmap.NewSubscription(p1, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: protocol.NewContentTopicSet("ct1", "ct2")}), + fmap.NewSubscription(p2, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: protocol.NewContentTopicSet("ct1")}), + fmap.NewSubscription(p1, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "2", ContentTopics: protocol.NewContentTopicSet("ct1", "ct2")}), } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)