Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(subscription-map): uniform operations and encapsulation #853

Merged
merged 5 commits into from
Nov 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions cmd/waku/server/rest/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
}

// unsubscribe on filter
errCh, err := s.node.FilterLightnode().Unsubscribe(
result, err := s.node.FilterLightnode().Unsubscribe(
req.Context(),
protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...),
filter.WithRequestID(message.RequestId),
Expand All @@ -190,14 +190,17 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
// on success
writeResponse(w, filterSubscriptionResponse{
RequestId: message.RequestId,
StatusDesc: s.unsubscribeGetMessage(errCh),
StatusDesc: s.unsubscribeGetMessage(result),
}, http.StatusOK)
}

func (s *FilterService) unsubscribeGetMessage(ch <-chan filter.WakuFilterPushResult) string {
func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResult) string {
if result == nil {
return http.StatusText(http.StatusOK)
}
var peerIds string
ind := 0
for entry := range ch {
for _, entry := range result.Errors() {
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
Expand Down
20 changes: 11 additions & 9 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,16 @@ func FilterUnsubscribe(filterJSON string, peerID string, ms int) error {
return errors.New("peerID is required")
}

pushResult, err := wakuState.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...)
result, err := wakuState.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...)
if err != nil {
return err
}

result := <-pushResult

return result.Err
errs := result.Errors()
if len(errs) == 0 {
return nil
}
return errs[0].Err
}

type unsubscribeAllResult struct {
Expand Down Expand Up @@ -192,19 +194,19 @@ func FilterUnsubscribeAll(peerID string, ms int) (string, error) {
fOptions = append(fOptions, filter.UnsubscribeAll())
}

pushResult, err := wakuState.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...)
result, err := wakuState.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...)
if err != nil {
return "", err
}

var unsubscribeResult []unsubscribeAllResult

for result := range pushResult {
for _, err := range result.Errors() {
ur := unsubscribeAllResult{
PeerID: result.PeerID.Pretty(),
PeerID: err.PeerID.Pretty(),
}
if result.Err != nil {
ur.Error = result.Err.Error()
if err.Err != nil {
ur.Error = err.Err.Error()
}
unsubscribeResult = append(unsubscribeResult, ur)
}
Expand Down
6 changes: 5 additions & 1 deletion waku/v2/protocol/content_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
return s
}

func (cf ContentTopicSet) ToList() []string {
return maps.Keys(cf)
}

// ContentFilter is used to specify the filter to be applied for a FilterNode.
// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding.
// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding)
Expand All @@ -25,7 +29,7 @@ type ContentFilter struct {
}

func (cf ContentFilter) ContentTopicsList() []string {
return maps.Keys(cf.ContentTopics)
return cf.ContentTopics.ToList()
}

func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter {
Expand Down
172 changes: 61 additions & 111 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"net/http"
"strings"
"sync"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -46,11 +47,27 @@ type WakuFilterLightNode struct {
pm *peermanager.PeerManager
}

type WakuFilterPushResult struct {
type WakuFilterPushError struct {
Err error
PeerID peer.ID
}

type WakuFilterPushResult struct {
errs []WakuFilterPushError
sync.RWMutex
}

func (arr *WakuFilterPushResult) Add(err WakuFilterPushError) {
arr.Lock()
defer arr.Unlock()
arr.errs = append(arr.errs, err)
}
func (arr *WakuFilterPushResult) Errors() []WakuFilterPushError {
arr.RLock()
defer arr.RUnlock()
return arr.errs
harsh-98 marked this conversation as resolved.
Show resolved Hide resolved
}

// NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
// Note that broadcaster is optional.
// Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode.
Expand Down Expand Up @@ -95,7 +112,7 @@ func (wf *WakuFilterLightNode) Stop() {
wf.log.Warn("unsubscribing from full nodes", zap.Error(err))
}

for r := range res {
for _, r := range res.Errors() {
if r.Err != nil {
wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID))
}
Expand Down Expand Up @@ -395,59 +412,8 @@ func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscrip
return wf.Ping(ctx, subscription.PeerID)
}

func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
return nil
}

wf.subscriptions.RLock()
defer wf.subscriptions.RUnlock()

var output []*subscription.SubscriptionDetails

for _, peerSubscription := range wf.subscriptions.Items {
for _, subscriptions := range peerSubscription.SubsPerPubsubTopic {
for _, subscriptionDetail := range subscriptions {
output = append(output, subscriptionDetail)
}
}
}

return output
}

func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter protocol.ContentFilter) {
wf.subscriptions.Lock()
defer wf.subscriptions.Unlock()

peerSubscription, ok := wf.subscriptions.Items[peerID]
if !ok {
return
}

subscriptionDetailList, ok := peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic]
if !ok {
return
}

for subscriptionDetailID, subscriptionDetail := range subscriptionDetailList {
subscriptionDetail.Remove(contentFilter.ContentTopicsList()...)
if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 {
delete(subscriptionDetailList, subscriptionDetailID)
subscriptionDetail.CloseC()
}
}

if len(subscriptionDetailList) == 0 {
delete(wf.subscriptions.Items[peerID].SubsPerPubsubTopic, contentFilter.PubsubTopic)
}

}

// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down Expand Up @@ -475,28 +441,18 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
if err != nil {
return nil, err
}
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items))
result := &WakuFilterPushResult{}
for pTopic, cTopics := range pubSubTopicMap {
cFilter := protocol.NewContentFilter(pTopic, cTopics...)
for peerID := range wf.subscriptions.Items {
if params.selectedPeer != "" && peerID != params.selectedPeer {
continue
}

subscriptions, ok := wf.subscriptions.Items[peerID]
if !ok || subscriptions == nil {
continue
}

wf.cleanupSubscriptions(peerID, cFilter)
if len(subscriptions.SubsPerPubsubTopic) == 0 {
delete(wf.subscriptions.Items, peerID)
}

if params.wg != nil {
params.wg.Add(1)
}

peers, subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter)
for _, sub := range subs {
sub.Remove(cTopics...)
}
if params.wg != nil {
params.wg.Add(len(peers))
}
// send unsubscribe request to all the peers
for _, peerID := range peers {
go func(peerID peer.ID) {
defer func() {
if params.wg != nil {
Expand All @@ -506,10 +462,10 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
err := wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, cFilter)

if params.wg != nil {
resultChan <- WakuFilterPushResult{
result.Add(WakuFilterPushError{
Err: err,
PeerID: peerID,
}
})
}
}(peerID)
}
Expand All @@ -518,16 +474,19 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
params.wg.Wait()
}

close(resultChan)
return result, nil
}

return resultChan, nil
func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails {
_, subs := wf.subscriptions.GetSubscription("", protocol.ContentFilter{})
return subs
}

// UnsubscribeWithSubscription is used to close a particular subscription
// If there are no more subscriptions matching the passed [peer, contentFilter] pair,
// server unsubscribe is also performed
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails,
opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand All @@ -542,20 +501,18 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,
// Close this sub
sub.Close()

resultChan := make(chan WakuFilterPushResult, 1)
result := &WakuFilterPushResult{}

if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) {
// Last sub for this [peer, contentFilter] pair
paramsCopy := params.Copy()
paramsCopy.selectedPeer = sub.PeerID
err = wf.unsubscribeFromServer(ctx, paramsCopy, sub.ContentFilter)
resultChan <- WakuFilterPushResult{
params.selectedPeer = sub.PeerID
err = wf.unsubscribeFromServer(ctx, params, sub.ContentFilter)
result.Add(WakuFilterPushError{
Err: err,
PeerID: sub.PeerID,
}
})
}
close(resultChan)
return resultChan, err
return result, err

}

Expand All @@ -573,28 +530,23 @@ func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params
return err
}

func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
// close all subscribe for selectedPeer or if selectedPeer == "", then all peers
// send the unsubscribeAll request to the peers
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return nil, err
}

wf.subscriptions.Lock()
defer wf.subscriptions.Unlock()

resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items))

for peerID := range wf.subscriptions.Items {
if params.selectedPeer != "" && peerID != params.selectedPeer {
continue
}

delete(wf.subscriptions.Items, peerID)

if params.wg != nil {
params.wg.Add(1)
}

peerIds, subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{})
for _, sub := range subs {
sub.Close()
}
result := &WakuFilterPushResult{}
if params.wg != nil {
params.wg.Add(len(peerIds))
}
for _, peerId := range peerIds {
go func(peerID peer.ID) {
defer func() {
if params.wg != nil {
Expand All @@ -613,25 +565,23 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
if params.wg != nil {
resultChan <- WakuFilterPushResult{
result.Add(WakuFilterPushError{
Err: err,
PeerID: peerID,
}
})
}
}(peerID)
}(peerId)
}

if params.wg != nil {
params.wg.Wait()
}

close(resultChan)

return resultChan, nil
return result, nil
}

// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down
Loading