diff --git a/waku/waku_core.nim b/waku/waku_core.nim index 8ffcf489a3..ac68e4df0d 100644 --- a/waku/waku_core.nim +++ b/waku/waku_core.nim @@ -2,10 +2,12 @@ import ./waku_core/topics, ./waku_core/time, ./waku_core/message, - ./waku_core/peers + ./waku_core/peers, + ./waku_core/subscription export topics, time, message, - peers + peers, + subscription diff --git a/waku/waku_core/subscription.nim b/waku/waku_core/subscription.nim new file mode 100644 index 0000000000..4de1fc7539 --- /dev/null +++ b/waku/waku_core/subscription.nim @@ -0,0 +1,7 @@ +import + ./subscription/subscription_manager, + ./subscription/push_handler + +export + subscription_manager, + push_handler \ No newline at end of file diff --git a/waku/waku_core/subscription/push_handler.nim b/waku/waku_core/subscription/push_handler.nim new file mode 100644 index 0000000000..490e2c56b7 --- /dev/null +++ b/waku/waku_core/subscription/push_handler.nim @@ -0,0 +1,13 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + chronos + +import + ../topics, + ../message + +type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.} diff --git a/waku/waku_core/subscription/subscription_manager.nim b/waku/waku_core/subscription/subscription_manager.nim new file mode 100644 index 0000000000..36293ae8a6 --- /dev/null +++ b/waku/waku_core/subscription/subscription_manager.nim @@ -0,0 +1,48 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/tables, + stew/results, + chronicles, + chronos + +import + ./push_handler, + ../topics, + ../message + +## Subscription manager +type SubscriptionManager* = object + subscriptions: TableRef[(string, ContentTopic), FilterPushHandler] + +proc init*(T: type SubscriptionManager): T = + SubscriptionManager(subscriptions: newTable[(string, ContentTopic), FilterPushHandler]()) + +proc clear*(m: var SubscriptionManager) = + m.subscriptions.clear() + +proc registerSubscription*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, handler: FilterPushHandler) = + try: + # TODO: Handle over subscription surprises + m.subscriptions[(pubsubTopic, contentTopic)]= handler + except CatchableError: + error "failed to register filter subscription", error=getCurrentExceptionMsg() + +proc removeSubscription*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic) = + m.subscriptions.del((pubsubTopic, contentTopic)) + +proc notifySubscriptionHandler*(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, message: WakuMessage) = + if not m.subscriptions.hasKey((pubsubTopic, contentTopic)): + return + + try: + let handler = m.subscriptions[(pubsubTopic, contentTopic)] + asyncSpawn handler(pubsubTopic, message) + except CatchableError: + discard + +proc getSubscriptionsCount*(m: SubscriptionManager): int = + m.subscriptions.len() diff --git a/waku/waku_filter/client.nim b/waku/waku_filter/client.nim index cad3b7ada8..ae7b46b665 100644 --- a/waku/waku_filter/client.nim +++ b/waku/waku_filter/client.nim @@ -27,50 +27,7 @@ logScope: const Defaultstring = "/waku/2/default-waku/proto" - -### Client, filter subscripton manager - -type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.} - - -## Subscription manager - -type SubscriptionManager = object - subscriptions: TableRef[(string, ContentTopic), FilterPushHandler] - -proc init(T: type SubscriptionManager): T = - SubscriptionManager(subscriptions: newTable[(string, ContentTopic), FilterPushHandler]()) - -proc clear(m: var SubscriptionManager) = - m.subscriptions.clear() - -proc registerSubscription(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, handler: FilterPushHandler) = - try: - m.subscriptions[(pubsubTopic, contentTopic)]= handler - except: # TODO: Fix "BareExcept" warning - error "failed to register filter subscription", error=getCurrentExceptionMsg() - -proc removeSubscription(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic) = - m.subscriptions.del((pubsubTopic, contentTopic)) - -proc notifySubscriptionHandler(m: SubscriptionManager, pubsubTopic: PubsubTopic, contentTopic: ContentTopic, message: WakuMessage) = - if not m.subscriptions.hasKey((pubsubTopic, contentTopic)): - return - - try: - let handler = m.subscriptions[(pubsubTopic, contentTopic)] - asyncSpawn handler(pubsubTopic, message) - except: # TODO: Fix "BareExcept" warning - discard - -proc getSubscriptionsCount(m: SubscriptionManager): int = - m.subscriptions.len() - - ## Client - -type MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.} - type WakuFilterClientLegacy* = ref object of LPProtocol rng: ref rand.HmacDrbgContext peerManager: PeerManager