Skip to content

Commit

Permalink
Merge branch 'fix/filterv2' into feat/autoshard-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Sep 12, 2023
2 parents 6442198 + b6d36c7 commit c1514a8
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 14 deletions.
13 changes: 9 additions & 4 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,18 +163,23 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
wf.metrics.RecordError(decodeRPCFailure)
return
}

if !wf.subscriptions.Has(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) {
//For now returning failure, this will get addressed with autosharding changes for filter.
if messagePush.PubsubTopic == nil {
logger.Error("empty pubSub Topic", zap.Error(err))
wf.metrics.RecordError(decodeRPCFailure)
return
}
if !wf.subscriptions.Has(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) {
logger.Warn("received messagepush with invalid subscription parameters",
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", messagePush.PubsubTopic),
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", *messagePush.PubsubTopic),
zap.String("contentTopic", messagePush.WakuMessage.ContentTopic))
wf.metrics.RecordError(invalidSubscriptionMessage)
return
}

wf.metrics.RecordMessage()

wf.notify(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage)
wf.notify(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage)

logger.Info("received message push")
}
Expand Down
16 changes: 9 additions & 7 deletions waku/v2/protocol/filter/pb/waku_filter_v2.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/pb/waku_filter_v2.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ message FilterSubscribeResponse {
// Protocol identifier: /vac/waku/filter-push/2.0.0-beta1
message MessagePushV2 {
WakuMessage waku_message = 1;
string pubsub_topic = 2;
optional string pubsub_topic = 2;
}
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, e
)
pubSubTopic := env.PubsubTopic()
messagePush := &pb.MessagePushV2{
PubsubTopic: pubSubTopic,
PubsubTopic: &pubSubTopic,
WakuMessage: env.Message(),
}

Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/subscriptions_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, topic string, contentTopics ...
if !ok {
return false
}

//TODO: Handle pubsubTopic as null
// Check if pubsub topic exists
subscriptions, ok := peerSubscription.subscriptionsPerTopic[topic]
if !ok {
Expand Down

0 comments on commit c1514a8

Please sign in to comment.