diff --git a/tests/v2/waku_filter_v2/test_waku_filter.nim b/tests/v2/waku_filter_v2/test_waku_filter.nim new file mode 100644 index 0000000000..6d30c22ff2 --- /dev/null +++ b/tests/v2/waku_filter_v2/test_waku_filter.nim @@ -0,0 +1,375 @@ +{.used.} + +import + std/[options,tables], + testutils/unittests, + chronos, + chronicles, + libp2p/peerstore +import + ../../../waku/v2/node/peer_manager, + ../../../waku/v2/protocol/waku_filter_v2, + ../../../waku/v2/protocol/waku_filter_v2/client, + ../../../waku/v2/protocol/waku_filter_v2/rpc, + ../../../waku/v2/protocol/waku_message, + ../testlib/common, + ../testlib/waku2 + +proc newTestWakuFilter(switch: Switch): Future[WakuFilter] {.async.} = + let + peerManager = PeerManager.new(switch) + proto = WakuFilter.new(peerManager) + + await proto.start() + switch.mount(proto) + + return proto + +proc newTestWakuFilterClient(switch: Switch, messagePushHandler: MessagePushHandler): Future[WakuFilterClient] {.async.} = + let + peerManager = PeerManager.new(switch) + proto = WakuFilterClient.new(rng, messagePushHandler, peerManager) + + await proto.start() + switch.mount(proto) + + return proto + +suite "Waku Filter - end to end": + + asyncTest "ping": + # Given + var + voidHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + discard + let + serverSwitch = newStandardSwitch() + clientSwitch = newStandardSwitch() + wakuFilter = await newTestWakuFilter(serverSwitch) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch, voidHandler) + + # When + await allFutures(serverSwitch.start(), clientSwitch.start()) + let response = await wakuFilterClient.ping(serverSwitch.peerInfo.toRemotePeerInfo()) + + # Then + check: + response.isErr() # Not subscribed + response.error().kind == FilterSubscribeErrorKind.NOT_FOUND + + # When + let response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), DefaultPubsubTopic, @[DefaultContentTopic]) + + require response2.isOk() + + let response3 = await wakuFilterClient.ping(serverSwitch.peerInfo.toRemotePeerInfo()) + + # Then + check: + response3.isOk() # Subscribed + + # Teardown + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + + asyncTest "simple subscribe and unsubscribe request": + # Given + var + pushHandlerFuture = newFuture[(string, WakuMessage)]() + messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + pushHandlerFuture.complete((pubsubTopic, message)) + + let + serverSwitch = newStandardSwitch() + clientSwitch = newStandardSwitch() + wakuFilter = await newTestWakuFilter(serverSwitch) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + clientPeerId = clientSwitch.peerInfo.peerId + pubsubTopic = DefaultPubsubTopic + contentTopics = @[DefaultContentTopic] + + # When + await allFutures(serverSwitch.start(), clientSwitch.start()) + let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + + # Then + check: + response.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + await wakuFilter.handleMessage(DefaultPubsubTopic, msg1) + + require await pushHandlerFuture.withTimeout(3.seconds) + + # Then + let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic == DefaultPubsubTopic + pushedMsg == msg1 + + # When + let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + + # Then + check: + response2.isOk() + wakuFilter.subscriptions.len == 0 + + # When + let msg2 = fakeWakuMessage(contentTopic=DefaultContentTopic) + pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future + await wakuFilter.handleMessage(DefaultPubsubTopic, msg2) + + # Then + check: + not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed + + # Teardown + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + + asyncTest "subscribe, unsubscribe multiple content topics": + # Given + var + pushHandlerFuture = newFuture[(string, WakuMessage)]() + messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + pushHandlerFuture.complete((pubsubTopic, message)) + + let + serverSwitch = newStandardSwitch() + clientSwitch = newStandardSwitch() + wakuFilter = await newTestWakuFilter(serverSwitch) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + clientPeerId = clientSwitch.peerInfo.peerId + pubsubTopic = DefaultPubsubTopic + contentTopic2 = ContentTopic("/waku/2/non-default-content/proto") + contentTopics = @[DefaultContentTopic, contentTopic2] + + # When + await allFutures(serverSwitch.start(), clientSwitch.start()) + let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + + # Then + check: + response.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + await wakuFilter.handleMessage(DefaultPubsubTopic, msg1) + + require await pushHandlerFuture.withTimeout(3.seconds) + + # Then + let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic == DefaultPubsubTopic + pushedMsg == msg1 + + # When + let msg2 = fakeWakuMessage(contentTopic=contentTopic2) + pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future + await wakuFilter.handleMessage(DefaultPubsubTopic, msg2) + + require await pushHandlerFuture.withTimeout(3.seconds) + + # Then + let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic2 == DefaultPubsubTopic + pushedMsg2 == msg2 + + # When + let response2 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, @[contentTopic2]) # Unsubscribe only one content topic + + # Then + check: + response2.isOk() + wakuFilter.subscriptions.len == 1 + + # When + let msg3 = fakeWakuMessage(contentTopic=DefaultContentTopic) + pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future + await wakuFilter.handleMessage(DefaultPubsubTopic, msg3) + + require await pushHandlerFuture.withTimeout(3.seconds) + + # Then + let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic3 == DefaultPubsubTopic + pushedMsg3 == msg3 + + # When + let msg4 = fakeWakuMessage(contentTopic=contentTopic2) + pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future + await wakuFilter.handleMessage(DefaultPubsubTopic, msg4) + + # Then + check: + not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed + + asyncTest "subscribe to multiple content topics and unsubscribe all": + # Given + var + pushHandlerFuture = newFuture[(string, WakuMessage)]() + messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + pushHandlerFuture.complete((pubsubTopic, message)) + + let + serverSwitch = newStandardSwitch() + clientSwitch = newStandardSwitch() + wakuFilter = await newTestWakuFilter(serverSwitch) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + clientPeerId = clientSwitch.peerInfo.peerId + pubsubTopic = DefaultPubsubTopic + contentTopic2 = ContentTopic("/waku/2/non-default-content/proto") + contentTopics = @[DefaultContentTopic, contentTopic2] + + # When + await allFutures(serverSwitch.start(), clientSwitch.start()) + let response = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + + # Then + check: + response.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + # When + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + await wakuFilter.handleMessage(DefaultPubsubTopic, msg1) + + require await pushHandlerFuture.withTimeout(3.seconds) + + # Then + let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic == DefaultPubsubTopic + pushedMsg == msg1 + + # When + let msg2 = fakeWakuMessage(contentTopic=contentTopic2) + pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future + await wakuFilter.handleMessage(DefaultPubsubTopic, msg2) + + require await pushHandlerFuture.withTimeout(3.seconds) + + # Then + let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic2 == DefaultPubsubTopic + pushedMsg2 == msg2 + + # When + let response2 = await wakuFilterClient.unsubscribeAll(serverSwitch.peerInfo.toRemotePeerInfo()) + + # Then + check: + response2.isOk() + wakuFilter.subscriptions.len == 0 + + # When + let + msg3 = fakeWakuMessage(contentTopic=DefaultContentTopic) + msg4 = fakeWakuMessage(contentTopic=contentTopic2) + pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future + await wakuFilter.handleMessage(DefaultPubsubTopic, msg3) + await wakuFilter.handleMessage(DefaultPubsubTopic, msg4) + + # Then + check: + not (await pushHandlerFuture.withTimeout(2.seconds)) # Neither message should be pushed + + # Teardown + await allFutures(wakuFilter.stop(), wakuFilterClient.stop(), serverSwitch.stop(), clientSwitch.stop()) + + asyncTest "subscribe, unsubscribe multiple pubsub topics and content topics": + # Given + var + pushHandlerFuture = newFuture[(string, WakuMessage)]() + messagePushHandler: MessagePushHandler = proc(pubsubTopic: PubsubTopic, message: WakuMessage) = + pushHandlerFuture.complete((pubsubTopic, message)) + + let + serverSwitch = newStandardSwitch() + clientSwitch = newStandardSwitch() + wakuFilter = await newTestWakuFilter(serverSwitch) + wakuFilterClient = await newTestWakuFilterClient(clientSwitch, messagePushHandler) + clientPeerId = clientSwitch.peerInfo.peerId + pubsubTopic = DefaultPubsubTopic + pubsubTopic2 = PubsubTopic("/waku/2/non-default-pubsub/proto") + contentTopic2 = ContentTopic("/waku/2/non-default-content/proto") + contentTopics = @[DefaultContentTopic, contentTopic2] + + ## Step 1: We can subscribe to multiple pubsub topics and content topics + + # When + await allFutures(serverSwitch.start(), clientSwitch.start()) + let + response1 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic, contentTopics) + response2 = await wakuFilterClient.subscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, contentTopics) + + # Then + check: + response1.isOk() + response2.isOk() + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(clientPeerId) + + ## Step 2: We receive messages for multiple subscribed pubsub topics and content topics + + # When + let msg1 = fakeWakuMessage(contentTopic=DefaultContentTopic) + await wakuFilter.handleMessage(DefaultPubsubTopic, msg1) + + require await pushHandlerFuture.withTimeout(3.seconds) + + # Then + let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic == DefaultPubsubTopic + pushedMsg == msg1 + + # When + let msg2 = fakeWakuMessage(contentTopic=contentTopic2) + pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic2, msg2) + + require await pushHandlerFuture.withTimeout(3.seconds) + + # Then + let (pushedMsgPubsubTopic2, pushedMsg2) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic2 == pubsubTopic2 + pushedMsg2 == msg2 + + ## Step 3: We can selectively unsubscribe from pubsub topics and content topic(s) + + # When + let response3 = await wakuFilterClient.unsubscribe(serverSwitch.peerInfo.toRemotePeerInfo(), pubsubTopic2, @[contentTopic2]) + require response3.isOk() + + let msg3 = fakeWakuMessage(contentTopic=contentTopic2) + pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future + await wakuFilter.handleMessage(pubsubTopic2, msg3) + + # Then + check: + not (await pushHandlerFuture.withTimeout(2.seconds)) # No message should be pushed + + ## Step 4: We can still receive messages for other subscribed pubsub topics and content topics + + # When + pushHandlerFuture = newFuture[(string, WakuMessage)]() # Clear previous future + await wakuFilter.handleMessage(DefaultPubsubTopic, msg3) + + require await pushHandlerFuture.withTimeout(3.seconds) + + # Then + let (pushedMsgPubsubTopic3, pushedMsg3) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic3 == DefaultPubsubTopic + pushedMsg3 == msg3 diff --git a/tests/v2/waku_filter_v2/test_waku_filter_v2.nim b/tests/v2/waku_filter_v2/test_waku_filter_protocol.nim similarity index 78% rename from tests/v2/waku_filter_v2/test_waku_filter_v2.nim rename to tests/v2/waku_filter_v2/test_waku_filter_protocol.nim index d24601a0a4..90e9b80db4 100644 --- a/tests/v2/waku_filter_v2/test_waku_filter_v2.nim +++ b/tests/v2/waku_filter_v2/test_waku_filter_protocol.nim @@ -4,7 +4,8 @@ import std/[options,sets,strutils,tables], testutils/unittests, chronos, - chronicles + chronicles, + libp2p/peerstore import ../../../waku/v2/node/peer_manager, ../../../waku/v2/protocol/waku_filter_v2, @@ -13,14 +14,11 @@ import ../testlib/common, ../testlib/waku2 -proc newTestWakuFilter(switch: Switch): Future[WakuFilter] {.async.} = +proc newTestWakuFilter(switch: Switch): WakuFilter = let peerManager = PeerManager.new(switch) proto = WakuFilter.new(peerManager) - await proto.start() - switch.mount(proto) - return proto proc generateRequestId(rng: ref HmacDrbgContext): string = @@ -51,7 +49,7 @@ suite "Waku Filter - handling subscribe requests": # Given let switch = newStandardSwitch() - wakuFilter = await newTestWakuFilter(switch) + wakuFilter = newTestWakuFilter(switch) peerId = PeerId.random().get() filterSubscribeRequest = createRequest( filterSubscribeType = FilterSubscribeType.SUBSCRIBE, @@ -89,7 +87,7 @@ suite "Waku Filter - handling subscribe requests": # Given let switch = newStandardSwitch() - wakuFilter = await newTestWakuFilter(switch) + wakuFilter = newTestWakuFilter(switch) peerId = PeerId.random().get() nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto") filterSubscribeRequest = createRequest( @@ -127,7 +125,7 @@ suite "Waku Filter - handling subscribe requests": # Given let switch = newStandardSwitch() - wakuFilter = await newTestWakuFilter(switch) + wakuFilter = newTestWakuFilter(switch) peerId = PeerId.random().get() nonDefaultContentTopic = ContentTopic("/waku/2/non-default-waku/proto") filterSubscribeRequest1 = createRequest( @@ -203,7 +201,7 @@ suite "Waku Filter - handling subscribe requests": # Given let switch = newStandardSwitch() - wakuFilter = await newTestWakuFilter(switch) + wakuFilter = newTestWakuFilter(switch) peerId = PeerId.random().get() pingRequest = createRequest( filterSubscribeType = FilterSubscribeType.SUBSCRIBER_PING @@ -237,3 +235,64 @@ suite "Waku Filter - handling subscribe requests": response3.statusCode == 200 response3.statusDesc.get() == "OK" +suite "Waku Filter - subscription maintenance": + + asyncTest "simple maintenance": + # Given + let + switch = newStandardSwitch() + wakuFilter = newTestWakuFilter(switch) + peerId1 = PeerId.random().get() + peerId2 = PeerId.random().get() + peerId3 = PeerId.random().get() + filterSubscribeRequest = createRequest( + filterSubscribeType = FilterSubscribeType.SUBSCRIBE, + pubsubTopic = some(DefaultPubsubTopic), + contentTopics = @[DefaultContentTopic] + ) + + # When + switch.peerStore[ProtoBook][peerId1] = @[WakuFilterPushCodec] + switch.peerStore[ProtoBook][peerId2] = @[WakuFilterPushCodec] + switch.peerStore[ProtoBook][peerId3] = @[WakuFilterPushCodec] + require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).isOk() + require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).isOk() + require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).isOk() + + # Then + check: + wakuFilter.subscriptions.len == 3 + wakuFilter.subscriptions.hasKey(peerId1) + wakuFilter.subscriptions.hasKey(peerId2) + wakuFilter.subscriptions.hasKey(peerId3) + + # When + # Maintenance loop should leave all peers in peer store intact + wakuFilter.maintainSubscriptions() + + # Then + check: + wakuFilter.subscriptions.len == 3 + wakuFilter.subscriptions.hasKey(peerId1) + wakuFilter.subscriptions.hasKey(peerId2) + wakuFilter.subscriptions.hasKey(peerId3) + + # When + # Remove peerId1 and peerId3 from peer store + switch.peerStore.del(peerId1) + switch.peerStore.del(peerId3) + wakuFilter.maintainSubscriptions() + + # Then + check: + wakuFilter.subscriptions.len == 1 + wakuFilter.subscriptions.hasKey(peerId2) + + # When + # Remove peerId2 from peer store + switch.peerStore.del(peerId2) + wakuFilter.maintainSubscriptions() + + # Then + check: + wakuFilter.subscriptions.len == 0 diff --git a/waku/v2/protocol/waku_filter/client.nim b/waku/v2/protocol/waku_filter/client.nim index 41f4437374..a1fbd676b2 100644 --- a/waku/v2/protocol/waku_filter/client.nim +++ b/waku/v2/protocol/waku_filter/client.nim @@ -3,7 +3,7 @@ when (NimMajor, NimMinor) < (1, 4): else: {.push raises: [].} -import +import std/[options, tables, sequtils], stew/results, chronicles, @@ -79,10 +79,10 @@ type WakuFilterClient* = ref object of LPProtocol proc handleMessagePush(wf: WakuFilterClient, peerId: PeerId, requestId: string, rpc: MessagePush) = for msg in rpc.messages: - let + let pubsubTopic = Defaultstring # TODO: Extend the filter push rpc to provide the pubsub topic. This is a limitation - contentTopic = msg.contentTopic - + contentTopic = msg.contentTopic + wf.subManager.notifySubscriptionHandler(pubsubTopic, contentTopic, msg) @@ -104,10 +104,10 @@ proc initProtocolHandler(wf: WakuFilterClient) = return waku_filter_messages.inc(labelValues = ["MessagePush"]) - + let peerId = conn.peerId - requestId = rpc.requestId + requestId = rpc.requestId push = rpc.push.get() info "received filter message push", peerId=conn.peerId, requestId=requestId @@ -118,8 +118,8 @@ proc initProtocolHandler(wf: WakuFilterClient) = proc new*(T: type WakuFilterClient, peerManager: PeerManager, - rng: ref rand.HmacDrbgContext): T = - + rng: ref rand.HmacDrbgContext): T = + let wf = WakuFilterClient( peerManager: peerManager, rng: rng, @@ -138,9 +138,9 @@ proc sendFilterRpc(wf: WakuFilterClient, rpc: FilterRPC, peer: PeerId|RemotePeer await connection.writeLP(rpc.encode().buffer) return ok() -proc sendFilterRequestRpc(wf: WakuFilterClient, - pubsubTopic: PubsubTopic, - contentTopics: seq[ContentTopic], +proc sendFilterRequestRpc(wf: WakuFilterClient, + pubsubTopic: PubsubTopic, + contentTopics: seq[ContentTopic], subscribe: bool, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} = @@ -150,8 +150,8 @@ proc sendFilterRequestRpc(wf: WakuFilterClient, let rpc = FilterRpc( requestId: requestId, request: some(FilterRequest( - subscribe: subscribe, - pubSubTopic: pubsubTopic, + subscribe: subscribe, + pubSubTopic: pubsubTopic, contentFilters: contentFilters )) ) @@ -160,15 +160,15 @@ proc sendFilterRequestRpc(wf: WakuFilterClient, if sendRes.isErr(): waku_filter_errors.inc(labelValues = [sendRes.error]) return err(sendRes.error) - + return ok() -proc subscribe*(wf: WakuFilterClient, - pubsubTopic: PubsubTopic, - contentTopic: ContentTopic|seq[ContentTopic], +proc subscribe*(wf: WakuFilterClient, + pubsubTopic: PubsubTopic, + contentTopic: ContentTopic|seq[ContentTopic], handler: FilterPushHandler, - peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} = + peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} = var topics: seq[ContentTopic] when contentTopic is seq[ContentTopic]: topics = contentTopic @@ -184,8 +184,8 @@ proc subscribe*(wf: WakuFilterClient, return ok() -proc unsubscribe*(wf: WakuFilterClient, - pubsubTopic: PubsubTopic, +proc unsubscribe*(wf: WakuFilterClient, + pubsubTopic: PubsubTopic, contentTopic: ContentTopic|seq[ContentTopic], peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} = var topics: seq[ContentTopic] @@ -207,4 +207,4 @@ proc clearSubscriptions*(wf: WakuFilterClient) = wf.subManager.clear() proc getSubscriptionsCount*(wf: WakuFilterClient): int = - wf.subManager.getSubscriptionsCount() \ No newline at end of file + wf.subManager.getSubscriptionsCount() diff --git a/waku/v2/protocol/waku_filter_v2/client.nim b/waku/v2/protocol/waku_filter_v2/client.nim new file mode 100644 index 0000000000..310b603f71 --- /dev/null +++ b/waku/v2/protocol/waku_filter_v2/client.nim @@ -0,0 +1,141 @@ +## Waku Filter client for subscribing and receiving filtered messages + +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/options, + chronicles, + chronos, + libp2p/protocols/protocol, + bearssl/rand +import + ../../node/peer_manager, + ../waku_message, + ./common, + ./protocol_metrics, + ./rpc_codec, + ./rpc + +logScope: + topics = "waku filter client" + +type + MessagePushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} + WakuFilterClient* = ref object of LPProtocol + rng: ref HmacDrbgContext + messagePushHandler: MessagePushHandler + peerManager: PeerManager + +func generateRequestId(rng: ref HmacDrbgContext): string = + var bytes: array[10, byte] + hmacDrbgGenerate(rng[], bytes) + return toHex(bytes) + +proc sendSubscribeRequest(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, filterSubscribeRequest: FilterSubscribeRequest): Future[FilterSubscribeResult] {.async.} = + trace "Sending filter subscribe request", servicePeer, filterSubscribeRequest + + let connOpt = await wfc.peerManager.dialPeer(servicePeer, WakuFilterSubscribeCodec) + if connOpt.isNone(): + trace "Failed to dial filter service peer", servicePeer + waku_filter_errors.inc(labelValues = [dialFailure]) + return err(FilterSubscribeError.peerDialFailure($servicePeer)) + + let connection = connOpt.get() + + # TODO: this can raise an exception + await connection.writeLP(filterSubscribeRequest.encode().buffer) + + let respBuf = await connection.readLp(MaxSubscribeResponseSize) + let respDecodeRes = FilterSubscribeResponse.decode(respBuf) + if respDecodeRes.isErr(): + trace "Failed to decode filter subscribe response", servicePeer + waku_filter_errors.inc(labelValues = [decodeRpcFailure]) + return err(FilterSubscribeError.badResponse(decodeRpcFailure)) + + let response = respDecodeRes.get() + + if response.requestId != filterSubscribeRequest.requestId: + trace "Filter subscribe response requestId mismatch", servicePeer, response + waku_filter_errors.inc(labelValues = [requestIdMismatch]) + return err(FilterSubscribeError.badResponse(requestIdMismatch)) + + if response.statusCode != 200: + trace "Filter subscribe error response", servicePeer, response + waku_filter_errors.inc(labelValues = [errorResponse]) + let cause = if response.statusDesc.isSome(): response.statusDesc.get() + else: "filter subscribe error" + return err(FilterSubscribeError.parse(response.statusCode, cause=cause)) + + return ok() + +proc ping*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSubscribeResult] {.async.} = + let requestId = generateRequestId(wfc.rng) + let filterSubscribeRequest = FilterSubscribeRequest.ping(requestId) + + return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + +proc subscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} = + let requestId = generateRequestId(wfc.rng) + let filterSubscribeRequest = FilterSubscribeRequest.subscribe( + requestId = requestId, + pubsubTopic = pubsubTopic, + contentTopics = contentTopics + ) + + return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + +proc unsubscribe*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): Future[FilterSubscribeResult] {.async.} = + let requestId = generateRequestId(wfc.rng) + let filterSubscribeRequest = FilterSubscribeRequest.unsubscribe( + requestId = requestId, + pubsubTopic = pubsubTopic, + contentTopics = contentTopics + ) + + return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + +proc unsubscribeAll*(wfc: WakuFilterClient, servicePeer: RemotePeerInfo): Future[FilterSubscribeResult] {.async.} = + let requestId = generateRequestId(wfc.rng) + let filterSubscribeRequest = FilterSubscribeRequest.unsubscribeAll( + requestId = requestId + ) + + return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + +proc initProtocolHandler(wfc: WakuFilterClient) = + + proc handler(conn: Connection, proto: string) {.async.} = + let buf = await conn.readLp(MaxPushSize) + + let decodeRes = MessagePush.decode(buf) + if decodeRes.isErr(): + error "Failed to decode message push", peerId=conn.peerId + waku_filter_errors.inc(labelValues = [decodeRpcFailure]) + return + + let messagePush = decodeRes.value #TODO: toAPI() split here + trace "Received message push", peerId=conn.peerId, messagePush + + wfc.messagePushHandler(messagePush.pubsubTopic, messagePush.wakuMessage) + + # Protocol specifies no response for now + return + + wfc.handler = handler + wfc.codec = WakuFilterPushCodec + +proc new*(T: type WakuFilterClient, + rng: ref HmacDrbgContext, + messagePushHandler: MessagePushHandler, + peerManager: PeerManager): T = + + let wfc = WakuFilterClient( + rng: rng, + messagePushHandler: messagePushHandler, + peerManager: peerManager + ) + wfc.initProtocolHandler() + wfc diff --git a/waku/v2/protocol/waku_filter_v2/common.nim b/waku/v2/protocol/waku_filter_v2/common.nim index f4f4e4a33c..8f2b0d3ff3 100644 --- a/waku/v2/protocol/waku_filter_v2/common.nim +++ b/waku/v2/protocol/waku_filter_v2/common.nim @@ -13,17 +13,33 @@ const type FilterSubscribeErrorKind* {.pure.} = enum UNKNOWN = uint32(000) + PEER_DIAL_FAILURE = uint32(200) # TODO shouldn't this be an error code, e.g. 504 Gateway Timeout? + BAD_RESPONSE = uint32(300) BAD_REQUEST = uint32(400) NOT_FOUND = uint32(404) SERVICE_UNAVAILABLE = uint32(503) FilterSubscribeError* = object - kind*: FilterSubscribeErrorKind - cause*: string + case kind*: FilterSubscribeErrorKind + of PEER_DIAL_FAILURE: + address*: string + of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, SERVICE_UNAVAILABLE: + cause*: string + else: + discard FilterSubscribeResult* = Result[void, FilterSubscribeError] # Convenience functions +proc peerDialFailure*(T: type FilterSubscribeError, address: string): FilterSubscribeError = + FilterSubscribeError( + kind: FilterSubscribeErrorKind.PEER_DIAL_FAILURE, + address: address) + +proc badResponse*(T: type FilterSubscribeError, cause = "bad response"): FilterSubscribeError = + FilterSubscribeError( + kind: FilterSubscribeErrorKind.BAD_RESPONSE, + cause: cause) proc badRequest*(T: type FilterSubscribeError, cause = "bad request"): FilterSubscribeError = FilterSubscribeError( @@ -40,8 +56,34 @@ proc serviceUnavailable*(T: type FilterSubscribeError, cause = "service unavaila kind: FilterSubscribeErrorKind.SERVICE_UNAVAILABLE, cause: cause) +proc parse*(T: type FilterSubscribeErrorKind, kind: uint32): T = + case kind: + of 000, 200, 300, 400, 404, 503: + FilterSubscribeErrorKind(kind) + else: + FilterSubscribeErrorKind.UNKNOWN + +proc parse*(T: type FilterSubscribeError, kind: uint32, cause = "", address = ""): T = + let kind = FilterSubscribeErrorKind.parse(kind) + case kind: + of PEER_DIAL_FAILURE: + FilterSubscribeError( + kind: kind, + address: address) + of BAD_RESPONSE, BAD_REQUEST, NOT_FOUND, SERVICE_UNAVAILABLE: + FilterSubscribeError( + kind: kind, + cause: cause) + else: + FilterSubscribeError( + kind: kind) + proc `$`*(err: FilterSubscribeError): string = case err.kind: + of FilterSubscribeErrorKind.PEER_DIAL_FAILURE: + "PEER_DIAL_FAILURE: " & err.address + of FilterSubscribeErrorKind.BAD_RESPONSE: + "BAD_RESPONSE: " & err.cause of FilterSubscribeErrorKind.BAD_REQUEST: "BAD_REQUEST: " & err.cause of FilterSubscribeErrorKind.NOT_FOUND: diff --git a/waku/v2/protocol/waku_filter_v2/protocol.nim b/waku/v2/protocol/waku_filter_v2/protocol.nim index 150306031b..938802442b 100644 --- a/waku/v2/protocol/waku_filter_v2/protocol.nim +++ b/waku/v2/protocol/waku_filter_v2/protocol.nim @@ -17,21 +17,15 @@ import ./common, ./protocol_metrics, ./rpc_codec, - ./rpc + ./rpc, + ./subscriptions logScope: topics = "waku filter" -const - MaxSubscriptions* = 1000 # TODO make configurable - MaxCriteriaPerSubscription = 1000 - type - FilterCriterion* = (PubsubTopic, ContentTopic) # a single filter criterion is fully defined by a pubsub topic and content topic - FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria - WakuFilter* = ref object of LPProtocol - subscriptions*: Table[PeerID, FilterCriteria] # a mapping of peer ids to a sequence of filter criteria + subscriptions*: FilterSubscriptions # a mapping of peer ids to a sequence of filter criteria peerManager: PeerManager proc pingSubscriber(wf: WakuFilter, peerId: PeerID): FilterSubscribeResult = @@ -59,7 +53,7 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], peerSubscription.incl(filterCriteria) wf.subscriptions[peerId] = peerSubscription else: - if wf.subscriptions.len() >= MaxSubscriptions: + if wf.subscriptions.len() >= MaxTotalSubscriptions: return err(FilterSubscribeError.serviceUnavailable("node has reached maximum number of subscriptions")) debug "creating new subscription", peerId=peerId wf.subscriptions[peerId] = filterCriteria @@ -125,8 +119,61 @@ proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubs else: return FilterSubscribeResponse.ok(request.requestId) -proc handleMessage*(wf: WakuFilter, message: WakuMessage) = - raiseAssert "Unimplemented" +proc pushToPeer(wf: WakuFilter, peer: PeerId, buffer: seq[byte]) {.async.} = + trace "pushing message to subscribed peer", peer=peer + + if not wf.peerManager.peerStore.hasPeer(peer, WakuFilterPushCodec): + # Check that peer has not been removed from peer store + trace "no addresses for peer", peer=peer + return + + let conn = await wf.peerManager.dialPeer(peer, WakuFilterPushCodec) + if conn.isNone(): + ## We do not remove this peer, but allow the underlying peer manager + ## to do so if it is deemed necessary + trace "no connection to peer", peer=peer + return + + await conn.get().writeLp(buffer) + +proc pushToPeers(wf: WakuFilter, peers: seq[PeerId], messagePush: MessagePush) {.async.} = + trace "pushing message to subscribed peers", peers=peers, messagePush=messagePush + + let bufferToPublish = messagePush.encode().buffer + + var pushFuts: seq[Future[void]] + for peerId in peers: + let pushFut = wf.pushToPeer(peerId, bufferToPublish) + pushFuts.add(pushFut) + + await allFutures(pushFuts) + +proc maintainSubscriptions*(wf: WakuFilter) = + trace "maintaining subscriptions" + + var peersToRemove: seq[PeerId] + for peerId, peerSubscription in wf.subscriptions.pairs(): + ## TODO: currently we only maintain by syncing with peer store. We could + ## consider other metrics, such as subscription age, activity, etc. + if not wf.peerManager.peerStore.hasPeer(peerId, WakuFilterPushCodec): + debug "peer has been removed from peer store, removing subscription", peerId=peerId + peersToRemove.add(peerId) + + wf.subscriptions.removePeers(peersToRemove) + +proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} = + trace "handling message", pubsubTopic=pubsubTopic, message=message + + let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic) + if subscribedPeers.len() == 0: + trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic + return + + let messagePush = MessagePush( + pubsubTopic: pubsubTopic, + wakuMessage: message) + + await wf.pushToPeers(subscribedPeers, messagePush) proc initProtocolHandler(wf: WakuFilter) = @@ -157,3 +204,24 @@ proc new*(T: type WakuFilter, ) wf.initProtocolHandler() wf + +const MaintainSubscriptionsInterval* = 1.minutes + +proc startMaintainingSubscriptions*(wf: WakuFilter, interval: Duration) = + trace "starting to maintain subscriptions" + var maintainSubs: proc(udata: pointer) {.gcsafe, raises: [Defect].} + maintainSubs = proc(udata: pointer) {.gcsafe.} = + maintainSubscriptions(wf) + discard setTimer(Moment.fromNow(interval), maintainSubs) + + discard setTimer(Moment.fromNow(interval), maintainSubs) + +method start*(wf: WakuFilter) {.async.} = + debug "starting filter protocol" + wf.startMaintainingSubscriptions(MaintainSubscriptionsInterval) + + await procCall LPProtocol(wf).start() + +method stop*(wf: WakuFilter) {.async.} = + debug "stopping filter protocol" + await procCall LPProtocol(wf).stop() diff --git a/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim b/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim index db7daad3b3..e64d4253f4 100644 --- a/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim +++ b/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim @@ -12,4 +12,7 @@ declarePublicGauge waku_filter_requests, "number of filter subscribe requests re # Error types (metric label values) const + dialFailure* = "dial_failure" decodeRpcFailure* = "decode_rpc_failure" + requestIdMismatch* = "request_id_mismatch" + errorResponse* = "error_response" diff --git a/waku/v2/protocol/waku_filter_v2/rpc.nim b/waku/v2/protocol/waku_filter_v2/rpc.nim index 904471a244..a158019892 100644 --- a/waku/v2/protocol/waku_filter_v2/rpc.nim +++ b/waku/v2/protocol/waku_filter_v2/rpc.nim @@ -32,10 +32,38 @@ type MessagePush* = object # Message pushed from service node to client wakuMessage*: WakuMessage - pubsubTopic*: Option[string] + pubsubTopic*: string # Convenience functions +proc ping*(T: type FilterSubscribeRequest, requestId: string): T = + FilterSubscribeRequest( + requestId: requestId, + filterSubscribeType: SUBSCRIBER_PING + ) + +proc subscribe*(T: type FilterSubscribeRequest, requestId: string, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): T = + FilterSubscribeRequest( + requestId: requestId, + filterSubscribeType: SUBSCRIBE, + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopics + ) + +proc unsubscribe*(T: type FilterSubscribeRequest, requestId: string, pubsubTopic: PubsubTopic, contentTopics: seq[ContentTopic]): T = + FilterSubscribeRequest( + requestId: requestId, + filterSubscribeType: UNSUBSCRIBE, + pubsubTopic: some(pubsubTopic), + contentTopics: contentTopics + ) + +proc unsubscribeAll*(T: type FilterSubscribeRequest, requestId: string): T = + FilterSubscribeRequest( + requestId: requestId, + filterSubscribeType: UNSUBSCRIBE_ALL + ) + proc ok*(T: type FilterSubscribeResponse, requestId: string, desc = "OK"): T = FilterSubscribeResponse( requestId: requestId, diff --git a/waku/v2/protocol/waku_filter_v2/rpc_codec.nim b/waku/v2/protocol/waku_filter_v2/rpc_codec.nim index f66b4f8af1..404c5128d0 100644 --- a/waku/v2/protocol/waku_filter_v2/rpc_codec.nim +++ b/waku/v2/protocol/waku_filter_v2/rpc_codec.nim @@ -12,6 +12,7 @@ import const MaxSubscribeSize* = 10 * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead + MaxSubscribeResponseSize* = 64*1024 # Responses are small. 64kB safety buffer. MaxPushSize* = 10 * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead proc encode*(rpc: FilterSubscribeRequest): ProtoBuffer = @@ -69,9 +70,33 @@ proc decode*(T: type FilterSubscribeResponse, buffer: seq[byte]): ProtobufResult if not ?pb.getField(2, rpc.statusCode): return err(ProtobufError.missingRequiredField("status_code")) - if not ?pb.getField(3, rpc.statusDesc): + var statusDesc: string + if not ?pb.getField(3, statusDesc): rpc.statusDesc = none(string) else: - rpc.statusDesc = some(rpc.statusDesc.get()) + rpc.statusDesc = some(statusDesc) + + ok(rpc) + +proc encode*(rpc: MessagePush): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, rpc.wakuMessage.encode()) + pb.write3(2, rpc.pubsubTopic) + + pb + +proc decode*(T: type MessagePush, buffer: seq[byte]): ProtobufResult[T] = + let pb = initProtoBuffer(buffer) + var rpc = MessagePush() + + var message: seq[byte] + if not ?pb.getField(1, message): + return err(ProtobufError.missingRequiredField("message")) + else: + rpc.wakuMessage = ?WakuMessage.decode(message) + + if not ?pb.getField(2, rpc.pubsubTopic): + return err(ProtobufError.missingRequiredField("pubsub_topic")) ok(rpc) diff --git a/waku/v2/protocol/waku_filter_v2/subscriptions.nim b/waku/v2/protocol/waku_filter_v2/subscriptions.nim new file mode 100644 index 0000000000..5c5cab42a8 --- /dev/null +++ b/waku/v2/protocol/waku_filter_v2/subscriptions.nim @@ -0,0 +1,45 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[sets,tables], + chronicles, + libp2p/peerid +import + ../waku_message + +logScope: + topics = "waku filter subscriptions" + +const + MaxTotalSubscriptions* = 1000 # TODO make configurable + MaxCriteriaPerSubscription* = 1000 + +type + FilterCriterion* = (PubsubTopic, ContentTopic) # a single filter criterion is fully defined by a pubsub topic and content topic + FilterCriteria* = HashSet[FilterCriterion] # a sequence of filter criteria + FilterSubscriptions* = Table[PeerID, FilterCriteria] # a mapping of peer ids to a sequence of filter criteria + +proc findSubscribedPeers*(subscriptions: FilterSubscriptions, pubsubTopic: PubsubTopic, contentTopic: ContentTopic): seq[PeerID] = + ## Find all peers subscribed to a given topic and content topic + let filterCriterion = (pubsubTopic, contentTopic) + + var subscribedPeers: seq[PeerID] + + # TODO: for large maps, this can be optimized using a reverse index + for (peerId, criteria) in subscriptions.pairs(): + if filterCriterion in criteria: + subscribedPeers.add(peerId) + + subscribedPeers + +proc removePeer*(subscriptions: var FilterSubscriptions, peerId: PeerID) = + ## Remove all subscriptions for a given peer + subscriptions.del(peerId) + +proc removePeers*(subscriptions: var FilterSubscriptions, peerIds: seq[PeerID]) = + ## Remove all subscriptions for a given list of peers + for peerId in peerIds: + subscriptions.removePeer(peerId)