Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/autoshard filter #723

Merged
merged 29 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
ae89971
fix: updated filterv2 protocol as per rfc, make pubsub topic optional
chaitanyaprem Sep 8, 2023
717df70
chore: make broadcaster optional in filter client
chaitanyaprem Sep 9, 2023
3e5ea69
Merge branch 'master' into fix/filterv2
chaitanyaprem Sep 9, 2023
763c8f6
feat: update filter client to support autosharding
chaitanyaprem Sep 9, 2023
5bb8a39
reverting optional pubSub topic in Filter.MessagePush
chaitanyaprem Sep 10, 2023
e214652
chore: add filter tests for autoshard
chaitanyaprem Sep 10, 2023
b94ab32
chore:update examples
chaitanyaprem Sep 11, 2023
9980103
chore:update filter example for autosharding
chaitanyaprem Sep 11, 2023
30f7191
chore: update example README
chaitanyaprem Sep 11, 2023
ced8c0c
chore:increase test-time for autoshard tests
chaitanyaprem Sep 11, 2023
c868f31
chore:update filter API docs for autosharding
chaitanyaprem Sep 11, 2023
6442198
Merge branch 'fix/filterv2' into feat/autoshard-filter
chaitanyaprem Sep 11, 2023
b6d36c7
fix: revert pubSubTopic as optional and add higher level validation
chaitanyaprem Sep 12, 2023
c1514a8
Merge branch 'fix/filterv2' into feat/autoshard-filter
chaitanyaprem Sep 12, 2023
c525692
Merge branch 'master' into feat/autoshard-filter
chaitanyaprem Sep 12, 2023
c127322
Merge branch 'master' into feat/autoshard-filter
chaitanyaprem Sep 13, 2023
5a37761
Merge branch 'master' into feat/autoshard-filter
chaitanyaprem Sep 18, 2023
58ab07c
Merge branch 'master' into feat/autoshard-filter
chaitanyaprem Sep 18, 2023
8b1fcdd
chore: derive pubSubTopic in filter messagePush
chaitanyaprem Sep 18, 2023
6eab8fa
Merge branch 'master' into feat/autoshard-filter
chaitanyaprem Sep 18, 2023
a91d6e7
Merge branch 'master' into feat/autoshard-filter
chaitanyaprem Sep 18, 2023
fcae89c
Merge branch 'master' into feat/autoshard-filter
chaitanyaprem Sep 19, 2023
ba856a0
chore: address review comments
chaitanyaprem Sep 19, 2023
1d7dceb
chore: docs changes to indicate sharding impact on pubSubTopic
chaitanyaprem Sep 19, 2023
f053b8f
Merge branch 'master' into feat/autoshard-filter
chaitanyaprem Sep 19, 2023
6457022
fix: handle partial errors durin subscribe and return failed content-…
chaitanyaprem Sep 19, 2023
36ca48d
chore: fix json lint error
chaitanyaprem Sep 19, 2023
f43d6ac
Merge branch 'master' into feat/autoshard-filter
chaitanyaprem Sep 19, 2023
ecd355c
Merge branch 'master' into feat/autoshard-filter
chaitanyaprem Sep 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,11 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
filterOpt = filter.WithPeer(peerID)
chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID))
}
theFilter, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt)
theFilters, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt)
if err != nil {
chat.ui.ErrorMessage(err)
} else {
chat.C = theFilter.C
chat.C = theFilters[0].C //Picking first subscription since there is only 1 contentTopic specified.
}
} else {

Expand Down
42 changes: 17 additions & 25 deletions examples/filter2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,20 @@ The app will run 2 nodes ("full" node and "light" node), with light node subscri

## Flow description

1. Light node submits a FilterRequest through WakuNode.SubscribeFilter. This request is submitted to a particular peer.
Filter is stored in WakuNode.filters map. That's it.
DONE
2. Full node: we read incoming messages in WakuFilter.onRequest(). It is set as a stream handler on wakunode.Host for WakuFilterProtocolId.
3. In WakuFilter.onRequest():
3.1. We check whether it's a MessagePush or FilterRequest.
3.2. If it's a MessagePush, then we're on a light node. Invoke pushHandler coming from WakuNode.mountFilter()
3.3. If it's a FilterRequest, add a subscriber.
4. WakuNode.Subscribe has a message loop extracting WakuMessages from a wakurelay.Subscription object.
It denotes a pubsub topic subscription.
All envelopes are then submitted to node.broadcaster.


## Nim code flow
1. Light node: WakuFilter.subscribe(). Find a peer, wrileLP(FilterRequest). Store requestId in WakuNode.filters along with a ContentFilterHandler proc.
2. Full node: WakuFilter inherits LPProtocol. LPProtocol.handler invokes readLP() to read FilterRPC messages
3. this handler function has a signature (conn: Connection, proto: string).
3.1. it checks whether a MessagePush or FilterRequest is received.
3.2. (light node) if it's a MessagePush, then we're on a light node. Invoke pushHandler of MessagePushHandler type. This pushHandler comes from WakuNode.mountFilter(). It iterates through all registered WakuNode.filters (stored in step 1) and invokes their ContentFilterHandler proc.
3.3. (full node) if it's a FilterRequest, create a Subscriber and add to WakuFilter.subscribers seq
4. (full node) Each time a message is received through GossipSub in wakunode.subscribe.defaultHandler(), we iterate through subscriptions.
5. (full node) One of these subscriptions is a filter subscription added by WakuNode.mountFilter(), which in turn is returned from WakuFilter.subscription()
6. (full node) This subscription iterates through subscribers added by WakuFilter.handler() fn (subscribers being light nodes)
7. (full node) Once subscriber peer is found, a message is pushed directly to the peer (go to step 3.2)

### Light Node
1. A light node is created with option WithWakuFilterLightNode.
2. Starting this node sets stream handler on wakunode.Host for WakuFilterProtocolId.
3. Light node submits a FilterSubscribeRequest through WakuFilterLightNode.Subscribe. This request is submitted to a particular peer.
Filter is stored in WakuFilterLightNode.subscriptions map. That's it.
4. Now we wait on WakuFilterLightNode.onRequest to process any further messages.
5. On receiving a message check and notify all subscribers on relevant channel (which is part of subscription obbject).
6. If a broadcaster is specified,
WakuNode.Subscribe has a message loop extracting WakuMessages from a wakurelay.Subscription object.It denotes a pubsub topic subscription.All envelopes are then submitted to node.broadcaster.
### Full Node
1. Full node is created with option WithWakuFilterFullNode.
2. We read incoming messages in WithWakuFilterFullNode.onRequest(). It is set as a stream handler on wakunode.Host for WakuFilterProtocolId.
3. In WakuFilter.onRequest
* We check the type of FilterRequest and handle accordingly.
* If it's a FilterRequest for subscribe, add a subscriber.
* If it is a SubscriberPing request, check if subscriptions exists or not and respond accordingly.
* If it is an unsubscribe/unsubscribeAll request, check and remove relevant subscriptions.
5 changes: 2 additions & 3 deletions examples/filter2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var log = logging.Logger("filter2")

var pubSubTopic = protocol.DefaultPubsubTopic()

const contentTopic = "test"
const contentTopic = "/filter2test/1/testTopic/proto"

func main() {
hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:60000")
Expand Down Expand Up @@ -98,7 +98,6 @@ func main() {

// Send FilterRequest from light node to full node
cf := filter.ContentFilter{
PubsubTopic: pubSubTopic.String(),
ContentTopics: filter.NewContentTopicSet(contentTopic),
}

Expand All @@ -108,7 +107,7 @@ func main() {
}

go func() {
for env := range theFilter.C {
for env := range theFilter[0].C { //Safely picking first subscriptions since only 1 contentTopic is subscribed
log.Info("Light node received msg, ", string(env.Message().Payload))
}
log.Info("Message channel closed!")
Expand Down
16 changes: 11 additions & 5 deletions library/c/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ The criteria to create subscription to a filter full node in JSON Format:
Fields:

- `contentTopics`: Array of content topics.
- `topic`: pubsub topic.
- `topic`: Optional pubsub topic when using contentTopics as per Autosharding. In case of named or static-sharding, pubSub topic is mandatory.


### `LegacyFilterSubscription` type
Expand Down Expand Up @@ -884,15 +884,21 @@ Creates a subscription to a filter full node matching a content filter..

A status code. Refer to the [`Status codes`](#status-codes) section for possible values.

If the function is executed succesfully, `onOkCb` will receive the subscription details.
If the function is executed succesfully, `onOkCb` will receive the following subscription details along with any partial errors.

For example:

```json
{
"peerID": "....",
"pubsubTopic": "...",
"contentTopics": [...]
"subscriptions" : [
{
"ID": "<subscriptionID>",
"peerID": "....",
"pubsubTopic": "...",
"contentTopics": [...]
}
],
"error" : "subscriptions failed for contentTopics:<topicA>,.." // Empty if all subscriptions are succesful
}
```

Expand Down
6 changes: 3 additions & 3 deletions library/c/api_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import "github.com/waku-org/go-waku/library"
// filterJSON must contain a JSON with this format:
//
// {
// "pubsubTopic": "the pubsub topic" // mandatory
// "pubsubTopic": "the pubsub topic" // optional if using autosharding, mandatory if using static or named sharding.
// "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10
// }
//
// peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
// It returns a json object containing the peerID to which we are subscribed to and the details of the subscription
// It returns a json object containing the details of the subscriptions along with any errors in case of partial failures
//
//export waku_filter_subscribe
func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, onOkCb C.WakuCallBack, onErrCb C.WakuCallBack) C.int {
Expand All @@ -42,7 +42,7 @@ func waku_filter_ping(peerID *C.char, ms C.int, onErrCb C.WakuCallBack) C.int {
// criteria
//
// {
// "pubsubTopic": "the pubsub topic" // mandatory
// "pubsubTopic": "the pubsub topic" // optional if using autosharding, mandatory if using static or named sharding.
// "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10
// }
//
Expand Down
27 changes: 18 additions & 9 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func toContentFilter(filterJSON string) (filter.ContentFilter, error) {
}, nil
}

type subscribeResult struct {
Subscriptions []*filter.SubscriptionDetails `json:"subscriptions"`
Error string `json:"error,omitempty"`
}

// FilterSubscribe is used to create a subscription to a filter node to receive messages
func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
cf, err := toContentFilter(filterJSON)
Expand Down Expand Up @@ -60,18 +65,22 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
fOptions = append(fOptions, filter.WithAutomaticPeerSelection())
}

subscriptionDetails, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...)
if err != nil {
subscriptions, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...)
if err != nil && subscriptions == nil {
return "", err
}

go func(subscriptionDetails *filter.SubscriptionDetails) {
for envelope := range subscriptionDetails.C {
send("message", toSubscriptionMessage(envelope))
}
}(subscriptionDetails)

return marshalJSON(subscriptionDetails)
for _, subscriptionDetails := range subscriptions {
go func(subscriptionDetails *filter.SubscriptionDetails) {
for envelope := range subscriptionDetails.C {
send("message", toSubscriptionMessage(envelope))
}
}(subscriptionDetails)
}
var subResult subscribeResult
subResult.Subscriptions = subscriptions
subResult.Error = err.Error()
return marshalJSON(subResult)
}

// FilterPing is used to determine if a peer has an active subscription
Expand Down
Loading