Skip to content

Commit

Permalink
fix: unsubscribe in case invalid push requests are received #1124
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Jul 13, 2024
1 parent f1b514b commit d7e853c
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 14 deletions.
26 changes: 21 additions & 5 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,17 @@ func (wf *WakuFilterLightNode) Stop() {
})
}

func (wf *WakuFilterLightNode) unsubscribeWithoutSubscription(cf protocol.ContentFilter, peerID peer.ID) {
err := wf.request(
wf.Context(),
protocol.GenerateRequestID(),
pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL,
cf, peerID)
if err != nil {
wf.log.Warn("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
}

func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) {
return func(stream network.Stream) {
peerID := stream.Conn().RemotePeer()
Expand All @@ -156,6 +167,9 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
wf.metrics.RecordError(unknownPeerMessagePush)
//Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us
//This could be happening due to https://github.com/waku-org/go-waku/issues/1124
go wf.unsubscribeWithoutSubscription(protocol.ContentFilter{}, peerID)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
Expand Down Expand Up @@ -199,30 +213,32 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
}

logger = messagePush.WakuMessage.Logger(logger, pubSubTopic)

if !wf.subscriptions.Has(peerID, protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
cf := protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)
if !wf.subscriptions.Has(peerID, cf) {
logger.Warn("received messagepush with invalid subscription parameters")
//Unsubscribe from that peer for the contentTopic, possibly due to https://github.com/waku-org/go-waku/issues/1124
go wf.unsubscribeWithoutSubscription(cf, peerID)
wf.metrics.RecordError(invalidSubscriptionMessage)
return
}

wf.metrics.RecordMessage()

wf.notify(peerID, pubSubTopic, messagePush.WakuMessage)
wf.notify(ctx, peerID, pubSubTopic, messagePush.WakuMessage)

logger.Info("received message push")
}
}

func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) {
func (wf *WakuFilterLightNode) notify(ctx context.Context, remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) {
envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic)

if wf.broadcaster != nil {
// Broadcasting message so it's stored
wf.broadcaster.Submit(envelope)
}
// Notify filter subscribers
wf.subscriptions.Notify(remotePeerID, envelope)
wf.subscriptions.Notify(ctx, remotePeerID, envelope)
}

func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
Expand Down
9 changes: 6 additions & 3 deletions waku/v2/protocol/subscription/subscriptions_map.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package subscription

import (
"context"
"errors"
"sync"

Expand Down Expand Up @@ -178,17 +179,17 @@ func (sub *SubscriptionsMap) Clear() {
sub.clear()
}

func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope) {
func (sub *SubscriptionsMap) Notify(ctx context.Context, peerID peer.ID, envelope *protocol.Envelope) {
sub.RLock()
defer sub.RUnlock()

subscriptions, ok := sub.items[peerID].SubsPerPubsubTopic[envelope.PubsubTopic()]
if ok {
iterateSubscriptionSet(sub.logger, subscriptions, envelope)
iterateSubscriptionSet(ctx, sub.logger, subscriptions, envelope)
}
}

func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, envelope *protocol.Envelope) {
func iterateSubscriptionSet(ctx context.Context, logger *zap.Logger, subscriptions SubscriptionSet, envelope *protocol.Envelope) {
for _, subscription := range subscriptions {
func(subscription *SubscriptionDetails) {
subscription.RLock()
Expand All @@ -201,6 +202,8 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e

if !subscription.Closed {
select {
case <-ctx.Done():
return
case subscription.C <- envelope:
default:
logger.Warn("can't deliver message to subscription. subscriber too slow")
Expand Down
12 changes: 6 additions & 6 deletions waku/v2/protocol/subscription/subscriptions_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ func TestSubscriptionsNotify(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
fmap.Notify(p1, envTopic1Ct1)
fmap.Notify(p2, envTopic1Ct1)
fmap.Notify(ctx, p1, envTopic1Ct1)
fmap.Notify(ctx, p2, envTopic1Ct1)
}()

<-successChan
Expand All @@ -177,8 +177,8 @@ func TestSubscriptionsNotify(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
fmap.Notify(p1, envTopic1Ct2)
fmap.Notify(p2, envTopic1Ct2)
fmap.Notify(ctx, p1, envTopic1Ct2)
fmap.Notify(ctx, p2, envTopic1Ct2)
}()

<-successChan
Expand Down Expand Up @@ -207,8 +207,8 @@ func TestSubscriptionsNotify(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
fmap.Notify(p1, envTopic1Ct1_2)
fmap.Notify(p2, envTopic1Ct1_2)
fmap.Notify(ctx, p1, envTopic1Ct1_2)
fmap.Notify(ctx, p2, envTopic1Ct1_2)
}()

<-successChan // One of these successes is for closing the subscription
Expand Down

0 comments on commit d7e853c

Please sign in to comment.