Skip to content

Commit

Permalink
chore: derive pubSubTopic in filter messagePush
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Sep 18, 2023
1 parent 58ab07c commit 8b1fcdd
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,23 +131,27 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
wf.metrics.RecordError(decodeRPCFailure)
return
}
pubSubTopic := messagePush.PubsubTopic
//For now returning failure, this will get addressed with autosharding changes for filter.
if messagePush.PubsubTopic == nil {
logger.Error("empty pubsub topic")
wf.metrics.RecordError(decodeRPCFailure)
return
*pubSubTopic, err = getPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic)
if err != nil {
logger.Error("could not derive pubSubTopic from contentTopic", zap.Error(err))
wf.metrics.RecordError(decodeRPCFailure)
return
}
}
if !wf.subscriptions.Has(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) {
if !wf.subscriptions.Has(s.Conn().RemotePeer(), *pubSubTopic, messagePush.WakuMessage.ContentTopic) {
logger.Warn("received messagepush with invalid subscription parameters",
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", *messagePush.PubsubTopic),
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", *pubSubTopic),
zap.String("contentTopic", messagePush.WakuMessage.ContentTopic))
wf.metrics.RecordError(invalidSubscriptionMessage)
return
}

wf.metrics.RecordMessage()

wf.notify(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage)
wf.notify(s.Conn().RemotePeer(), *pubSubTopic, messagePush.WakuMessage)

logger.Info("received message push")
}
Expand Down Expand Up @@ -214,6 +218,16 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
return nil
}

func getPubSubTopicFromContentTopic(cTopicString string) (string, error) {
cTopic, err := protocol.StringToContentTopic(cTopicString)
if err != nil {
return "", errors.New(err.Error() + " : " + cTopicString)
}
pTopic := protocol.GetShardFromContentTopic(cTopic, protocol.GenerationZeroShardsCount)

return pTopic.String(), nil
}

// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) {
pubSubTopicMap := make(map[string][]string)
Expand All @@ -224,16 +238,15 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st
} else {
//Parse the content-Topics to figure out shards.
for _, cTopicString := range contentFilter.ContentTopics {
cTopic, err := protocol.StringToContentTopic(cTopicString)
pTopicStr, err := getPubSubTopicFromContentTopic(cTopicString)
if err != nil {
return nil, errors.New(err.Error() + " : " + cTopicString)
return nil, err
}
pTopic := protocol.GetShardFromContentTopic(cTopic, protocol.GenerationZeroShardsCount)
_, ok := pubSubTopicMap[pTopic.String()]
_, ok := pubSubTopicMap[pTopicStr]
if !ok {
pubSubTopicMap[pTopic.String()] = make([]string, 1)
pubSubTopicMap[pTopicStr] = make([]string, 1)
}
pubSubTopicMap[pTopic.String()] = append(pubSubTopicMap[pTopic.String()], cTopicString)
pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString)
}
}
return pubSubTopicMap, nil
Expand Down

0 comments on commit 8b1fcdd

Please sign in to comment.