Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: further filter improvements #1617

Merged
merged 7 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tests/v2/waku_filter_v2/test_waku_filter_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,9 @@ suite "Waku Filter - subscription maintenance":
switch.peerStore[ProtoBook][peerId1] = @[WakuFilterPushCodec]
switch.peerStore[ProtoBook][peerId2] = @[WakuFilterPushCodec]
switch.peerStore[ProtoBook][peerId3] = @[WakuFilterPushCodec]
require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).isOk()
require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).isOk()
require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).isOk()
require wakuFilter.handleSubscribeRequest(peerId1, filterSubscribeRequest).statusCode == 200
require wakuFilter.handleSubscribeRequest(peerId2, filterSubscribeRequest).statusCode == 200
require wakuFilter.handleSubscribeRequest(peerId3, filterSubscribeRequest).statusCode == 200

# Then
check:
Expand Down
64 changes: 47 additions & 17 deletions waku/v2/protocol/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import
logScope:
topics = "waku filter"

const
MaxContentTopicsPerRequest = 30

type
WakuFilter* = ref object of LPProtocol
subscriptions*: FilterSubscriptions # a mapping of peer ids to a sequence of filter criteria
Expand All @@ -41,6 +44,9 @@ proc subscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic],
if pubsubTopic.isNone() or contentTopics.len() == 0:
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))

if contentTopics.len() > MaxContentTopicsPerRequest:
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))

let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))

trace "subscribing peer to filter criteria", peerId=peerId, filterCriteria=filterCriteria
Expand All @@ -64,6 +70,9 @@ proc unsubscribe(wf: WakuFilter, peerId: PeerID, pubsubTopic: Option[PubsubTopic
if pubsubTopic.isNone() or contentTopics.len() == 0:
return err(FilterSubscribeError.badRequest("pubsubTopic and contentTopics must be specified"))

if contentTopics.len() > MaxContentTopicsPerRequest:
return err(FilterSubscribeError.badRequest("exceeds maximum content topics: " & $MaxContentTopicsPerRequest))

let filterCriteria = toHashSet(contentTopics.mapIt((pubsubTopic.get(), it)))

trace "unsubscribing peer from filter criteria", peerId=peerId, filterCriteria=filterCriteria
Expand Down Expand Up @@ -100,15 +109,24 @@ proc handleSubscribeRequest*(wf: WakuFilter, peerId: PeerId, request: FilterSubs

var subscribeResult: FilterSubscribeResult

case request.filterSubscribeType
of FilterSubscribeType.SUBSCRIBER_PING:
subscribeResult = wf.pingSubscriber(peerId)
of FilterSubscribeType.SUBSCRIBE:
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE:
subscribeResult = wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE_ALL:
subscribeResult = wf.unsubscribeAll(peerId)
let requestStartTime = Moment.now()

block:
## Handle subscribe request
case request.filterSubscribeType
of FilterSubscribeType.SUBSCRIBER_PING:
subscribeResult = wf.pingSubscriber(peerId)
of FilterSubscribeType.SUBSCRIBE:
subscribeResult = wf.subscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE:
subscribeResult = wf.unsubscribe(peerId, request.pubsubTopic, request.contentTopics)
of FilterSubscribeType.UNSUBSCRIBE_ALL:
subscribeResult = wf.unsubscribeAll(peerId)

let
requestDuration = Moment.now() - requestStartTime
requestDurationSec = requestDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point
waku_filter_request_duration_seconds.observe(requestDurationSec, labelValues = [$request.filterSubscribeType])

if subscribeResult.isErr():
return FilterSubscribeResponse(
Expand Down Expand Up @@ -161,19 +179,31 @@ proc maintainSubscriptions*(wf: WakuFilter) =

wf.subscriptions.removePeers(peersToRemove)

const MessagePushTimeout = 20.seconds
proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.} =
trace "handling message", pubsubTopic=pubsubTopic, message=message

let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len() == 0:
trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
return
let handleMessageStartTime = Moment.now()

block:
## Find subscribers and push message to them
let subscribedPeers = wf.subscriptions.findSubscribedPeers(pubsubTopic, message.contentTopic)
if subscribedPeers.len() == 0:
trace "no subscribed peers found", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
return

let messagePush = MessagePush(
pubsubTopic: pubsubTopic,
wakuMessage: message)

let messagePush = MessagePush(
pubsubTopic: pubsubTopic,
wakuMessage: message)
if not await wf.pushToPeers(subscribedPeers, messagePush).withTimeout(MessagePushTimeout):
debug "timed out pushing message to peers", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
waku_filter_errors.inc(labelValues = [pushTimeoutFailure])

await wf.pushToPeers(subscribedPeers, messagePush)
let
handleMessageDuration = Moment.now() - handleMessageStartTime
handleMessageDurationSec = handleMessageDuration.milliseconds.float / 1000 # Duration in seconds with millisecond precision floating point
waku_filter_handle_message_duration_seconds.observe(handleMessageDurationSec)

proc initProtocolHandler(wf: WakuFilter) =

Expand Down
3 changes: 3 additions & 0 deletions waku/v2/protocol/waku_filter_v2/protocol_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ export metrics

declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
declarePublicGauge waku_filter_requests, "number of filter subscribe requests received", ["type"]
declarePublicHistogram waku_filter_request_duration_seconds, "duration of filter subscribe requests", ["type"]
declarePublicHistogram waku_filter_handle_message_duration_seconds, "duration to push message to filter subscribers"

# Error types (metric label values)
const
dialFailure* = "dial_failure"
decodeRpcFailure* = "decode_rpc_failure"
requestIdMismatch* = "request_id_mismatch"
errorResponse* = "error_response"
pushTimeoutFailure* = "push_timeout_failure"