diff --git a/tests/v2/waku_filter_v2/test_waku_filter_protocol.nim b/tests/v2/waku_filter_v2/test_waku_filter_protocol.nim index 90e9b80db4..2377d41dec 100644 --- a/tests/v2/waku_filter_v2/test_waku_filter_protocol.nim +++ b/tests/v2/waku_filter_v2/test_waku_filter_protocol.nim @@ -255,9 +255,9 @@ suite "Waku Filter - subscription maintenance": switch.peerStore[ProtoBook][peerId1] = @[WakuFilterPushCodec] switch.peerStore[ProtoBook][peerId2] = @[WakuFilterPushCodec] switch.peerStore[ProtoBook][peerId3] = @[WakuFilterPushCodec] - require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).isOk() - require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).isOk() - require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).isOk() + require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).statusCode == 200 + require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).statusCode == 200 + require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).statusCode == 200 # Then check: diff --git a/waku/v2/protocol/waku_filter_v2/protocol.nim b/waku/v2/protocol/waku_filter_v2/protocol.nim index 938802442b..bc73ed6587 100644 --- a/waku/v2/protocol/waku_filter_v2/protocol.nim +++ b/waku/v2/protocol/waku_filter_v2/protocol.nim @@ -23,6 +23,9 @@ import logScope: topics = "waku filter" +const + MaxContentTopicsPerRequest = 30 + type WakuFilter* = ref object of LPProtocol subscriptions*: FilterSubscriptions # a mapping of peer ids to a sequence of filter criteria @@ -41,6 +44,9 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic], if pubsubTopic.isNone() or contentTopics.len() == 0: return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")) + if contentTopics.len() > MaxContentTopicsPerRequest: + return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest)) + let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it))) trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria @@ -64,6 +70,9 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic if pubsubTopic.isNone() or contentTopics.len() == 0: return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified")) + if contentTopics.len() > MaxContentTopicsPerRequest: + return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest)) + let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it))) trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria @@ -100,15 +109,24 @@ proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubs var subscribeResult: FilterSubscribeResult - case request.filterSubscribeType - of FilterSubscribeType.SUBSCRIBER_PING: - subscribeResult = wf.pingSubscriber(peerId) - of FilterSubscribeType.SUBSCRIBE: - subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics) - of FilterSubscribeType.UNSUBSCRIBE: - subscribeResult = wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics) - of FilterSubscribeType.UNSUBSCRIBE_ALL: - subscribeResult = wf.unsubscribeAll(peerId) + let requestStartTime = Moment.now() + + block: + ## Handle subscribe request + case request.filterSubscribeType + of FilterSubscribeType.SUBSCRIBER_PING: + subscribeResult = wf.pingSubscriber(peerId) + of FilterSubscribeType.SUBSCRIBE: + subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics) + of FilterSubscribeType.UNSUBSCRIBE: + subscribeResult = wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics) + of FilterSubscribeType.UNSUBSCRIBE_ALL: + subscribeResult = wf.unsubscribeAll(peerId) + + let + requestDuration = Moment.now() - requestStartTime + requestDurationSec = requestDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point + waku_filter_request_duration_seconds.observe(requestDurationSec, labelValues = [$request.filterSubscribeType]) if subscribeResult.isErr(): return FilterSubscribeResponse( @@ -161,19 +179,31 @@ proc maintainSubscriptions*(wf: WakuFilter) = wf.subscriptions.removePeers(peersToRemove) +const MessagePushTimeout = 20.seconds proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} = trace "handling message", pubsubTopic=pubsubTopic, message=message - let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic) - if subscribedPeers.len() == 0: - trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic - return + let handleMessageStartTime = Moment.now() + + block: + ## Find subscribers and push message to them + let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic) + if subscribedPeers.len() == 0: + trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic + return + + let messagePush = MessagePush( + pubsubTopic: pubsubTopic, + wakuMessage: message) - let messagePush = MessagePush( - pubsubTopic: pubsubTopic, - wakuMessage: message) + if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(MessagePushTimeout): + debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic + waku_filter_errors.inc(labelValues = [pushTimeoutFailure]) - await wf.pushToPeers(subscribedPeers, messagePush) + let + handleMessageDuration = Moment.now() - handleMessageStartTime + handleMessageDurationSec = handleMessageDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point + waku_filter_handle_message_duration_seconds.observe(handleMessageDurationSec) proc initProtocolHandler(wf: WakuFilter) = diff --git a/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim b/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim index e64d4253f4..16a4c9bba2 100644 --- a/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim +++ b/waku/v2/protocol/waku_filter_v2/protocol_metrics.nim @@ -9,6 +9,8 @@ export metrics declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"] declarePublicGauge waku_filter_requests, "number of filter subscribe requests received", ["type"] +declarePublicHistogram waku_filter_request_duration_seconds, "duration of filter subscribe requests", ["type"] +declarePublicHistogram waku_filter_handle_message_duration_seconds, "duration to push message to filter subscribers" # Error types (metric label values) const @@ -16,3 +18,4 @@ const decodeRpcFailure* = "decode_rpc_failure" requestIdMismatch* = "request_id_mismatch" errorResponse* = "error_response" + pushTimeoutFailure* = "push_timeout_failure"