-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(filter-v2): new filter protocol increment - message handling and clients #1600
Conversation
@@ -3,7 +3,7 @@ when (NimMajor, NimMinor) < (1, 4): | |||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in this file is purely whitespace. Please ignore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! just some minor comments.
check: | ||
not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed | ||
|
||
asyncTest "subscribe to multiple content topics and unsubscribe all": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps missing a testcase subscribing to multiple gossipsub topics and multiple content topics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Added in 9105f88 (I want to significantly extend test cases, but at least cover the most basic happy paths for now, so this was indeed an oversight).
switch.peerStore[ProtoBook][peerId1] = @[WakuFilterPushCodec] | ||
switch.peerStore[ProtoBook][peerId2] = @[WakuFilterPushCodec] | ||
switch.peerStore[ProtoBook][peerId3] = @[WakuFilterPushCodec] | ||
discard wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe nitpick?
require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).isOk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. Done in 9105f88
|
||
# When | ||
# Remove peerId1 and peerId3 from peer store | ||
discard switch.peerStore[ProtoBook].del(peerId1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switch.peerStore.del(peerId)
since afaik peerStore[ProtoBook].del
just removed the ProtoBook entry for that peerId
(see other places as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! 9105f88
trace "pushing message to subscribed peer", peer=peer | ||
|
||
if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec): | ||
# Check that peer has not been removed from peer store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beyond this pr, but this makes me think that the peermanager should have knowledge of subscriptions/filter to not remove these peers from the peerstore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree! My current thinking is that we may implement some kind of policy which could be set by clients of the peer store. Needs more thinking, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the peermanager should have knowledge of subscriptions/filter to not remove these peers from the peerstore.
It makes sense, but... How would you implement that without coupling the peer management code to the filter protocol code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense, but... How would you implement that without coupling the peer management code to the filter protocol code?
mm interesting. I would say its not possible without coupling it. I mean, the cleanest thing imho would be to have a new field in the Peer (subscribedToFilter) that can be set or unset via a proc used by filter. Then the peer manager will never? drop this connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't want any direct coupling between the filter protocol and peer manager, if that can at all be avoided. My idea would be more that the app setting up its peer manager does so with a certain policy of e.g. maintaining a set number of slots for service clients. The issue here is less about maintaining the connection (it can always be recreated) - it's simply about storing the connection information for the clients in the store and maintaining a predictable amount of such client peers in the peer store.
|
||
const MaintainSubscriptionsInterval* = 1.minutes | ||
|
||
proc startMaintainingSubscriptions*(wf: WakuFilter, interval: Duration) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
beyond this pr, not blocking.
since we have a few of these timers, I would suggest creating a heartbeat so that we can reuse code.
and then just.
heartbeat "sometext", interval:
await functionToRun.
ofc it depends if:
- we want to run it exactly every x time.
- or wait x time between the last completed run.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
9105f88
to
3fb24bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM ✅
proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) {.async.} = | ||
trace "pushing message to subscribed peers", peers=peers, messagePush=messagePush | ||
|
||
let bufferToPublish = messagePush.encode().buffer | ||
|
||
var pushFuts: seq[Future[void]] | ||
for peerId in peers: | ||
let pushFut = wf.pushToPeer(peerId, bufferToPublish) | ||
pushFuts.add(pushFut) | ||
|
||
await allFutures(pushFuts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we need a timeout when pushing the messages to the subscribers. I don't know if it should be per push, per peer, or a unique timeout for the allFutures
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! The individual peer dials already have timeouts, but not the stream writes. Since these should be spawned concurrently, I think a global timeout for allFutures
will be enough safety for now.
Background
This is a further increment towards the implementation of the filter protocol changes proposed in vacp2p/rfc#562 with wire protocol in waku-org/waku-proto#16.
It builds on #1584 and add:
Note that this is still an incomplete feature and isolated in code.
There are also some comments on the previous PR which I will address separately (but would have bloated this one even more).
Next steps