From 2e8703d61c667d762c88b8b4e3ae6093774a9db4 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Fri, 28 Jun 2024 09:18:09 +0200 Subject: [PATCH 1/9] reliability with store-v3 Better structure of delivery monitor and related modules store protocol: simple cleanup of unused const use of observer observable pattern to inform delivery_monitor about subscription state waku_store resume does not need to get an explicit peerId as query param waku_sync protocol needs some peerId as peerId is optional in query proc send_monitor becomes a publish observer of lightpush and relay deliver monitor add more protection against possible crash and better logs --- .../00001_addNotDeliveredMessagesTable.up.sql | 9 + waku/factory/external_config.nim | 9 + waku/factory/waku.nim | 25 ++- .../delivery_monitor/delivery_callback.nim | 17 ++ .../delivery_monitor/delivery_monitor.nim | 43 ++++ .../not_delivered_storage/migrations.nim | 26 +++ .../not_delivered_storage.nim | 38 ++++ .../delivery_monitor/publish_observer.nim | 9 + waku/node/delivery_monitor/recv_monitor.nim | 195 ++++++++++++++++ waku/node/delivery_monitor/send_monitor.nim | 212 ++++++++++++++++++ .../subscriptions_observer.nim | 13 ++ waku/node/waku_node.nim | 4 +- waku/waku_api/rest/store/handlers.nim | 10 +- waku/waku_filter_v2/client.nim | 21 +- waku/waku_lightpush/client.nim | 29 ++- waku/waku_relay/protocol.nim | 14 +- waku/waku_store/client.nim | 13 +- waku/waku_store/protocol.nim | 3 - waku/waku_store/resume.nim | 21 +- waku/waku_store_legacy/protocol.nim | 3 - waku/waku_sync/protocol.nim | 2 +- 21 files changed, 682 insertions(+), 34 deletions(-) create mode 100644 migrations/sent_msgs/00001_addNotDeliveredMessagesTable.up.sql create mode 100644 waku/node/delivery_monitor/delivery_callback.nim create mode 100644 waku/node/delivery_monitor/delivery_monitor.nim create mode 100644 waku/node/delivery_monitor/not_delivered_storage/migrations.nim create mode 100644 waku/node/delivery_monitor/not_delivered_storage/not_delivered_storage.nim create mode 100644 waku/node/delivery_monitor/publish_observer.nim create mode 100644 waku/node/delivery_monitor/recv_monitor.nim create mode 100644 waku/node/delivery_monitor/send_monitor.nim create mode 100644 waku/node/delivery_monitor/subscriptions_observer.nim diff --git a/migrations/sent_msgs/00001_addNotDeliveredMessagesTable.up.sql b/migrations/sent_msgs/00001_addNotDeliveredMessagesTable.up.sql new file mode 100644 index 0000000000..2c0a13b485 --- /dev/null +++ b/migrations/sent_msgs/00001_addNotDeliveredMessagesTable.up.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS NotDeliveredMessages( + messageHash BLOB PRIMARY KEY, + timestamp INTEGER NOT NULL, + contentTopic BLOB NOT NULL, + pubsubTopic BLOB NOT NULL, + payload BLOB, + meta BLOB, + version INTEGER NOT NULL + ); \ No newline at end of file diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 6995a9a8ea..a5eeec7574 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -463,6 +463,15 @@ type WakuNodeConf* = object name: "lightpushnode" .}: string + ## Reliability config + reliabilityEnabled* {. + desc: + """Adds an extra effort in the delivery/reception of messages by leveraging store-v3 requests. +with the drawback of consuming some more bandwitdh.""", + defaultValue: false, + name: "reliability" + .}: bool + ## REST HTTP config rest* {. desc: "Enable Waku REST HTTP server: true|false", defaultValue: true, name: "rest" diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index c77ea52373..28a6326b98 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -20,6 +20,7 @@ import ../waku_node, ../node/peer_manager, ../node/health_monitor, + ../node/delivery_monitor/delivery_monitor, ../waku_api/message_cache, ../waku_api/rest/server, ../waku_archive, @@ -51,6 +52,8 @@ type Waku* = object node*: WakuNode + deliveryMonitor: DeliveryMonitor + restServer*: WakuRestServerRef metricsServer*: MetricsHttpServerRef @@ -147,13 +150,29 @@ proc init*(T: type Waku, conf: WakuNodeConf): Result[Waku, string] = error "Failed setting up node", error = nodeRes.error return err("Failed setting up node: " & nodeRes.error) + let node = nodeRes.get() + + var deliveryMonitor: DeliveryMonitor + if conf.reliabilityEnabled: + if conf.storenode == "": + return err("A storenode should be set when reliability mode is on") + + let deliveryMonitorRes = DeliveryMonitor.new( + node.wakuStoreClient, node.wakuRelay, node.wakuLightpushClient, + node.wakuFilterClient, + ) + if deliveryMonitorRes.isErr(): + return err("could not create delivery monitor: " & $deliveryMonitorRes.error) + deliveryMonitor = deliveryMonitorRes.get() + var waku = Waku( version: git_version, conf: confCopy, rng: rng, key: confCopy.nodekey.get(), - node: nodeRes.get(), + node: node, dynamicBootstrapNodes: dynamicBootstrapNodesRes.get(), + deliveryMonitor: deliveryMonitor, ) ok(waku) @@ -237,6 +256,10 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: (await waku.wakuDiscV5.start()).isOkOr: return err("failed to start waku discovery v5: " & $error) + ## Reliability + if not waku[].deliveryMonitor.isNil(): + waku[].deliveryMonitor.startDeliveryMonitor() + return ok() # Waku shutdown diff --git a/waku/node/delivery_monitor/delivery_callback.nim b/waku/node/delivery_monitor/delivery_callback.nim new file mode 100644 index 0000000000..c996bc7b0a --- /dev/null +++ b/waku/node/delivery_monitor/delivery_callback.nim @@ -0,0 +1,17 @@ +import ../../waku_core + +type DeliveryDirection* {.pure.} = enum + PUBLISHING + RECEIVING + +type DeliverySuccess* {.pure.} = enum + SUCCESSFUL + UNSUCCESSFUL + +type DeliveryFeedbackCallback* = proc( + success: DeliverySuccess, + dir: DeliveryDirection, + comment: string, + msgHash: WakuMessageHash, + msg: WakuMessage, +) {.gcsafe, raises: [].} diff --git a/waku/node/delivery_monitor/delivery_monitor.nim b/waku/node/delivery_monitor/delivery_monitor.nim new file mode 100644 index 0000000000..28f9e2507a --- /dev/null +++ b/waku/node/delivery_monitor/delivery_monitor.nim @@ -0,0 +1,43 @@ +## This module helps to ensure the correct transmission and reception of messages + +import results +import chronos +import + ./recv_monitor, + ./send_monitor, + ./delivery_callback, + ../../waku_core, + ../../waku_store/client, + ../../waku_relay/protocol, + ../../waku_lightpush/client, + ../../waku_filter_v2/client + +type DeliveryMonitor* = ref object + sendMonitor: SendMonitor + recvMonitor: RecvMonitor + +proc new*( + T: type DeliveryMonitor, + storeClient: WakuStoreClient, + wakuRelay: protocol.WakuRelay, + wakuLightpushClient: WakuLightPushClient, + wakuFilterClient: WakuFilterClient, +): Result[T, string] = + ## storeClient is needed to give store visitility to DeliveryMonitor + ## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendMonitor to re-publish + let sendMonitor = ?SendMonitor.new(storeClient, wakuRelay, wakuLightpushClient) + let recvMonitor = RecvMonitor.new(storeClient, wakuFilterClient) + return ok(DeliveryMonitor(sendMonitor: sendMonitor, recvMonitor: recvMonitor)) + +proc startDeliveryMonitor*(self: DeliveryMonitor) = + self.sendMonitor.startSendMonitor() + self.recvMonitor.startRecvMonitor() + +proc stopDeliveryMonitor*(self: DeliveryMonitor) {.async.} = + self.sendMonitor.stopSendMonitor() + await self.recvMonitor.stopRecvMonitor() + +proc setDeliveryCallback*(self: DeliveryMonitor, deliveryCb: DeliveryFeedbackCallback) = + ## The deliveryCb is a proc defined by the api client so that it can get delivery feedback + self.sendMonitor.setDeliveryCallback(deliveryCb) + self.recvMonitor.setDeliveryCallback(deliveryCb) diff --git a/waku/node/delivery_monitor/not_delivered_storage/migrations.nim b/waku/node/delivery_monitor/not_delivered_storage/migrations.nim new file mode 100644 index 0000000000..66fb8587c7 --- /dev/null +++ b/waku/node/delivery_monitor/not_delivered_storage/migrations.nim @@ -0,0 +1,26 @@ +{.push raises: [].} + +import std/[tables, strutils, os], results, chronicles +import ../../../common/databases/db_sqlite, ../../../common/databases/common + +logScope: + topics = "waku node delivery_monitor" + +const TargetSchemaVersion* = 1 + # increase this when there is an update in the database schema + +template projectRoot(): string = + currentSourcePath.rsplit(DirSep, 1)[0] / ".." / ".." / ".." / ".." + +const PeerStoreMigrationPath: string = projectRoot / "migrations" / "sent_msgs" + +proc migrate*(db: SqliteDatabase): DatabaseResult[void] = + debug "starting peer store's sqlite database migration for sent messages" + + let migrationRes = + migrate(db, TargetSchemaVersion, migrationsScriptsDir = PeerStoreMigrationPath) + if migrationRes.isErr(): + return err("failed to execute migration scripts: " & migrationRes.error) + + debug "finished peer store's sqlite database migration for sent messages" + ok() diff --git a/waku/node/delivery_monitor/not_delivered_storage/not_delivered_storage.nim b/waku/node/delivery_monitor/not_delivered_storage/not_delivered_storage.nim new file mode 100644 index 0000000000..85611310bb --- /dev/null +++ b/waku/node/delivery_monitor/not_delivered_storage/not_delivered_storage.nim @@ -0,0 +1,38 @@ +## This module is aimed to keep track of the sent/published messages that are considered +## not being properly delivered. +## +## The archiving of such messages will happen in a local sqlite database. +## +## In the very first approach, we consider that a message is sent properly is it has been +## received by any store node. +## + +import results +import + ../../../common/databases/db_sqlite, + ../../../waku_core/message/message, + ../../../node/delivery_monitor/not_delivered_storage/migrations + +const NotDeliveredMessagesDbUrl = "not-delivered-messages.db" + +type NotDeliveredStorage* = ref object + database: SqliteDatabase + +type TrackedWakuMessage = object + msg: WakuMessage + numTrials: uint + ## for statistics purposes. Counts the number of times the node has tried to publish it + +proc new*(T: type NotDeliveredStorage): Result[T, string] = + let db = ?SqliteDatabase.new(NotDeliveredMessagesDbUrl) + + ?migrate(db) + + return ok(NotDeliveredStorage(database: db)) + +proc archiveMessage*( + self: NotDeliveredStorage, msg: WakuMessage +): Result[void, string] = + ## Archives a waku message so that we can keep track of it + ## even when the app restarts + return ok() diff --git a/waku/node/delivery_monitor/publish_observer.nim b/waku/node/delivery_monitor/publish_observer.nim new file mode 100644 index 0000000000..1f517f8bde --- /dev/null +++ b/waku/node/delivery_monitor/publish_observer.nim @@ -0,0 +1,9 @@ +import chronicles +import ../../waku_core/message/message + +type PublishObserver* = ref object of RootObj + +method onMessagePublished*( + self: PublishObserver, pubsubTopic: string, message: WakuMessage +) {.base, gcsafe, raises: [].} = + error "onMessagePublished not implemented" diff --git a/waku/node/delivery_monitor/recv_monitor.nim b/waku/node/delivery_monitor/recv_monitor.nim new file mode 100644 index 0000000000..f0aa180133 --- /dev/null +++ b/waku/node/delivery_monitor/recv_monitor.nim @@ -0,0 +1,195 @@ +## This module is in charge of taking care of the messages that this node is expecting to +## receive and is backed by store-v3 requests to get an additional degree of certainty +## + +import std/[tables, sequtils, sets, options] +import chronos, chronicles, libp2p/utility +import + ../../waku_core, + ./delivery_callback, + ./subscriptions_observer, + ../../waku_store/[client, common], + ../../waku_filter_v2/client, + ../../waku_core/topics + +const StoreCheckPeriod = chronos.minutes(5) ## How often to perform store queries + +const MaxMessageLife = chronos.minutes(7) ## Max time we will keep track of rx messages + +const PruneOldMsgsPeriod = chronos.minutes(1) + +const DelayExtra* = chronos.seconds(5) + ## Additional security time to overlap the missing messages queries + +type TupleHashAndMsg = tuple[hash: WakuMessageHash, msg: WakuMessage] + +type RecvMessage = object + msgHash: WakuMessageHash + rxTime: Timestamp + ## timestamp of the rx message. We will not keep the rx messages forever + +type RecvMonitor* = ref object of SubscriptionObserver + topicsInterest: Table[PubsubTopic, seq[ContentTopic]] + ## Tracks message verification requests and when was the last time a + ## pubsub topic was verified for missing messages + ## The key contains pubsub-topics + + storeClient: WakuStoreClient + deliveryCb: DeliveryFeedbackCallback + + recentReceivedMsgs: seq[RecvMessage] + + msgCheckerHandler: Future[void] ## allows to stop the msgChecker async task + msgPrunerHandler: Future[void] ## removes too old messages + + startTimeToCheck: Timestamp + endTimeToCheck: Timestamp + +proc getMissingMsgsFromStore( + self: RecvMonitor, msgHashes: seq[WakuMessageHash] +): Future[Result[seq[TupleHashAndMsg], string]] {.async.} = + let storeResp: StoreQueryResponse = ( + await self.storeClient.query( + StoreQueryRequest(includeData: true, messageHashes: msgHashes) + ) + ).valueOr: + return err("getMissingMsgsFromStore: " & $error) + + let otheriseMsg = WakuMessage() ## message to be returned if the Option message is none + return ok( + storeResp.messages.mapIt((hash: it.messageHash, msg: it.message.get(otheriseMsg))) + ) + +proc performDeliveryFeedback( + self: RecvMonitor, + success: DeliverySuccess, + dir: DeliveryDirection, + comment: string, + msgHash: WakuMessageHash, + msg: WakuMessage, +) {.gcsafe, raises: [].} = + ## This procs allows to bring delivery feedback to the API client + ## It requires a 'deliveryCb' to be registered beforehand. + if self.deliveryCb.isNil(): + error "deliveryCb is nil in performDeliveryFeedback", + success, dir, comment, msg_hash + return + + debug "recv monitor performDeliveryFeedback", + success, dir, comment, msg_hash = shortLog(msgHash) + self.deliveryCb(success, dir, comment, msgHash, msg) + +proc msgChecker(self: RecvMonitor) {.async.} = + ## Continuously checks if a message has been received + while true: + await sleepAsync(StoreCheckPeriod) + + self.endTimeToCheck = getNowInNanosecondTime() + + var msgHashesInStore = newSeq[WakuMessageHash](0) + for pubsubTopic, cTopics in self.topicsInterest.pairs: + let storeResp: StoreQueryResponse = ( + await self.storeClient.query( + StoreQueryRequest( + includeData: false, + pubsubTopic: some(PubsubTopic(pubsubTopic)), + contentTopics: cTopics, + startTime: some(self.startTimeToCheck - DelayExtra.nanos), + endTime: some(self.endTimeToCheck + DelayExtra.nanos), + ) + ) + ).valueOr: + error "msgChecker failed to get remote msgHashes", + pubsubTopic, cTopics, error = $error + continue + + msgHashesInStore.add(storeResp.messages.mapIt(it.messageHash)) + + ## compare the msgHashes seen from the store vs the ones received directly + let rxMsgHashes = self.recentReceivedMsgs.mapIt(it.msgHash) + let missedHashes: seq[WakuMessageHash] = + msgHashesInStore.filterIt(not rxMsgHashes.contains(it)) + + ## Now retrieve the missed WakuMessages + let missingMsgsRet = await self.getMissingMsgsFromStore(missedHashes) + if missingMsgsRet.isOk(): + ## Give feedback so that the api client can perfom any action with the missed messages + for msgTuple in missingMsgsRet.get(): + self.performDeliveryFeedback( + DeliverySuccess.UNSUCCESSFUL, RECEIVING, "Missed message", msgTuple.hash, + msgTuple.msg, + ) + else: + error "failed to retrieve missing messages: ", error = $missingMsgsRet.error + + ## update next check times + self.startTimeToCheck = self.endTimeToCheck + +method onSubscribe( + self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string] +) {.gcsafe, raises: [].} = + debug "onSubscribe", pubsubTopic, contentTopics + self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): + contentTopicsOfInterest[].add(contentTopics) + do: + self.topicsInterest[pubsubTopic] = contentTopics + +method onUnsubscribe( + self: RecvMonitor, pubsubTopic: string, contentTopics: seq[string] +) {.gcsafe, raises: [].} = + debug "onUnsubscribe", pubsubTopic, contentTopics + + self.topicsInterest.withValue(pubsubTopic, contentTopicsOfInterest): + let remainingCTopics = + contentTopicsOfInterest[].filterIt(not contentTopics.contains(it)) + contentTopicsOfInterest[] = remainingCTopics + + if remainingCTopics.len == 0: + self.topicsInterest.del(pubsubTopic) + do: + error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics + +proc new*( + T: type RecvMonitor, + storeClient: WakuStoreClient, + wakuFilterClient: WakuFilterClient, +): T = + ## The storeClient will help to acquire any possible missed messages + + let now = getNowInNanosecondTime() + var recvMonitor = RecvMonitor(storeClient: storeClient, startTimeToCheck: now) + + if not wakuFilterClient.isNil(): + wakuFilterClient.addSubscrObserver(recvMonitor) + + let filterPushHandler = proc( + pubsubTopic: PubsubTopic, message: WakuMessage + ) {.async, closure.} = + ## Captures all the messages recived through filter + + let msgHash = computeMessageHash(pubSubTopic, message) + let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) + recvMonitor.recentReceivedMsgs.add(rxMsg) + + wakuFilterClient.registerPushHandler(filterPushHandler) + + return recvMonitor + +proc loopPruneOldMessages(self: RecvMonitor) {.async.} = + while true: + let oldestAllowedTime = getNowInNanosecondTime() - MaxMessageLife.nanos + self.recentReceivedMsgs.keepItIf(it.rxTime > oldestAllowedTime) + await sleepAsync(PruneOldMsgsPeriod) + +proc startRecvMonitor*(self: RecvMonitor) = + self.msgCheckerHandler = self.msgChecker() + self.msgPrunerHandler = self.loopPruneOldMessages() + +proc stopRecvMonitor*(self: RecvMonitor) {.async.} = + if not self.msgCheckerHandler.isNil(): + await self.msgCheckerHandler.cancelAndWait() + if not self.msgPrunerHandler.isNil(): + await self.msgPrunerHandler.cancelAndWait() + +proc setDeliveryCallback*(self: RecvMonitor, deliveryCb: DeliveryFeedbackCallback) = + self.deliveryCb = deliveryCb diff --git a/waku/node/delivery_monitor/send_monitor.nim b/waku/node/delivery_monitor/send_monitor.nim new file mode 100644 index 0000000000..8a90d9ab49 --- /dev/null +++ b/waku/node/delivery_monitor/send_monitor.nim @@ -0,0 +1,212 @@ +## This module reinforces the publish operation with regular store-v3 requests. +## + +import std/[sets, sequtils] +import chronos, chronicles, libp2p/utility +import + ./delivery_callback, + ./publish_observer, + ../../waku_core, + ./not_delivered_storage/not_delivered_storage, + ../../waku_store/[client, common], + ../../waku_archive/archive, + ../../waku_relay/protocol, + ../../waku_lightpush/client + +const MaxTimeInCache* = chronos.minutes(1) + ## Messages older than this time will get completely forgotten on publication and a + ## feedback will be given when that happens + +const SendCheckInterval* = chronos.seconds(3) + ## Interval at which we check that messages have been properly received by a store node + +const MaxMessagesToCheckAtOnce = 100 + ## Max number of messages to check if they were properly archived by a store node + +const ArchiveTime = chronos.seconds(3) + ## Estimation of the time we wait until we start confirming that a message has been properly + ## received and archived by a store node + +type DeliveryInfo = object + pubsubTopic: string + msg: WakuMessage + +type SendMonitor* = ref object of PublishObserver + publishedMessages: Table[WakuMessageHash, DeliveryInfo] + ## Cache that contains the delivery info per pubsub-topic. + ## This is needed to make sure the published messages are properly published + + msgStoredCheckerHandle: Future[void] ## handle that allows to stop the async task + + notDeliveredStorage: NotDeliveredStorage + ## NOTE: this is not fully used because that might be tackled by higher abstraction layers + + storeClient: WakuStoreClient + deliveryCb: DeliveryFeedbackCallback + + wakuRelay: protocol.WakuRelay + wakuLightpushClient: WakuLightPushClient + +proc new*( + T: type SendMonitor, + storeClient: WakuStoreClient, + wakuRelay: protocol.WakuRelay, + wakuLightpushClient: WakuLightPushClient, +): Result[T, string] = + if wakuRelay.isNil() and wakuLightpushClient.isNil(): + return err( + "Could not create SendMonitor. wakuRelay or wakuLightpushClient should be set" + ) + + let notDeliveredStorage = ?NotDeliveredStorage.new() + + let sendMonitor = SendMonitor( + notDeliveredStorage: notDeliveredStorage, + storeClient: storeClient, + wakuRelay: wakuRelay, + wakuLightpushClient: wakuLightPushClient, + ) + + if not wakuRelay.isNil(): + wakuRelay.addPublishObserver(sendMonitor) + + if not wakuLightpushClient.isNil(): + wakuLightpushClient.addPublishObserver(sendMonitor) + + return ok(sendMonitor) + +proc performFeedbackAndCleanup( + self: SendMonitor, + msgsToDiscard: Table[WakuMessageHash, DeliveryInfo], + success: DeliverySuccess, + dir: DeliveryDirection, + comment: string, +) = + ## This procs allows to bring delivery feedback to the API client + ## It requires a 'deliveryCb' to be registered beforehand. + if self.deliveryCb.isNil(): + error "deliveryCb is nil in performFeedbackAndCleanup", + success, dir, comment, hashes = toSeq(msgsToDiscard.keys).mapIt(shortLog(it)) + return + + for hash, deliveryInfo in msgsToDiscard: + debug "send monitor performFeedbackAndCleanup", + success, dir, comment, msg_hash = shortLog(hash) + + self.deliveryCb(success, dir, comment, hash, deliveryInfo.msg) + self.publishedMessages.del(hash) + +proc checkMsgsInStore( + self: SendMonitor, msgsToValidate: Table[WakuMessageHash, DeliveryInfo] +): Future[ + Result[ + tuple[ + publishedCorrectly: Table[WakuMessageHash, DeliveryInfo], + notYetPublished: Table[WakuMessageHash, DeliveryInfo], + ], + void, + ] +] {.async.} = + let hashesToValidate = toSeq(msgsToValidate.keys) + + let storeResp: StoreQueryResponse = ( + await self.storeClient.query( + StoreQueryRequest(includeData: false, messageHashes: hashesToValidate) + ) + ).valueOr: + error "checkMsgsInStore failed to get remote msgHashes", + hashes = hashesToValidate.mapIt(shortLog(it)), error = $error + return err() + + let publishedHashes = storeResp.messages.mapIt(it.messageHash) + + var notYetPublished: Table[WakuMessageHash, DeliveryInfo] + var publishedCorrectly: Table[WakuMessageHash, DeliveryInfo] + + for msgHash, deliveryInfo in msgsToValidate.pairs: + if publishedHashes.contains(msgHash): + publishedCorrectly[msgHash] = deliveryInfo + self.publishedMessages.del(msgHash) ## we will no longer track that message + else: + notYetPublished[msgHash] = deliveryInfo + + return ok((publishedCorrectly: publishedCorrectly, notYetPublished: notYetPublished)) + +proc processMessages(self: SendMonitor) {.async.} = + var msgsToValidate: Table[WakuMessageHash, DeliveryInfo] + var msgsToDiscard: Table[WakuMessageHash, DeliveryInfo] + + let now = getNowInNanosecondTime() + let timeToCheckThreshold = now - ArchiveTime.nanos + let maxLifeTime = now - MaxTimeInCache.nanos + + for hash, deliveryInfo in self.publishedMessages.pairs: + if deliveryInfo.msg.timestamp < maxLifeTime: + ## message is too old + msgsToDiscard[hash] = deliveryInfo + + if deliveryInfo.msg.timestamp < timeToCheckThreshold: + msgsToValidate[hash] = deliveryInfo + + ## Discard the messages that are too old + self.performFeedbackAndCleanup( + msgsToDiscard, DeliverySuccess.UNSUCCESSFUL, DeliveryDirection.PUBLISHING, + "Could not publish messages. Please try again.", + ) + + let (publishedCorrectly, notYetPublished) = ( + await self.checkMsgsInStore(msgsToValidate) + ).valueOr: + return ## the error log is printed in checkMsgsInStore + + ## Give positive feedback for the correctly published messages + self.performFeedbackAndCleanup( + publishedCorrectly, DeliverySuccess.SUCCESSFUL, DeliveryDirection.PUBLISHING, + "messages published correctly", + ) + + ## Try to publish again + for msgHash, deliveryInfo in notYetPublished.pairs: + let pubsubTopic = deliveryInfo.pubsubTopic + let msg = deliveryInfo.msg + if not self.wakuRelay.isNil(): + debug "trying to publish again with wakuRelay", msgHash, pubsubTopic + let ret = await self.wakuRelay.publish(pubsubTopic, msg) + if ret == 0: + error "could not publish with wakuRelay.publish", msgHash, pubsubTopic + continue + + if not self.wakuLightpushClient.isNil(): + debug "trying to publish again with wakuLightpushClient", msgHash, pubsubTopic + (await self.wakuLightpushClient.publishToAny(pubsubTopic, msg)).isOkOr: + error "could not publish with publishToAny", error = $error + continue + +proc checkIfMessagesStored(self: SendMonitor) {.async.} = + ## Continuously monitors that the sent messages have been received by a store node + while true: + await self.processMessages() + await sleepAsync(SendCheckInterval) + +method onMessagePublished( + self: SendMonitor, pubsubTopic: string, msg: WakuMessage +) {.gcsafe, raises: [].} = + ## Implementation of the PublishObserver interface. + ## + ## When publishing a message either through relay or lightpush, we want to add some extra effort + ## to make sure it is received to one store node. Hence, keep track of those published messages. + + debug "onMessagePublished" + let msgHash = computeMessageHash(pubSubTopic, msg) + + if not self.publishedMessages.hasKey(msgHash): + self.publishedMessages[msgHash] = DeliveryInfo(pubsubTopic: pubsubTopic, msg: msg) + +proc startSendMonitor*(self: SendMonitor) = + self.msgStoredCheckerHandle = self.checkIfMessagesStored() + +proc stopSendMonitor*(self: SendMonitor) = + self.msgStoredCheckerHandle.cancel() + +proc setDeliveryCallback*(self: SendMonitor, deliveryCb: DeliveryFeedbackCallback) = + self.deliveryCb = deliveryCb diff --git a/waku/node/delivery_monitor/subscriptions_observer.nim b/waku/node/delivery_monitor/subscriptions_observer.nim new file mode 100644 index 0000000000..0c5d552210 --- /dev/null +++ b/waku/node/delivery_monitor/subscriptions_observer.nim @@ -0,0 +1,13 @@ +import chronicles + +type SubscriptionObserver* = ref object of RootObj + +method onSubscribe*( + self: SubscriptionObserver, pubsubTopic: string, contentTopics: seq[string] +) {.base, gcsafe, raises: [].} = + error "onSubscribe not implemented" + +method onUnsubscribe*( + self: SubscriptionObserver, pubsubTopic: string, contentTopics: seq[string] +) {.gcsafe, raises: [].} = + error "onUnsubscribe not implemented" diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 3895f52312..e30fdf0f1f 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -979,7 +979,7 @@ proc mountStoreClient*(node: WakuNode) = node.wakuStoreClient = store_client.WakuStoreClient.new(node.peerManager, node.rng) proc query*( - node: WakuNode, request: store_common.StoreQueryRequest, peer: RemotePeerInfo + node: WakuNode, request: store_common.StoreQueryRequest ): Future[store_common.WakuStoreResult[store_common.StoreQueryResponse]] {. async, gcsafe .} = @@ -987,7 +987,7 @@ proc query*( if node.wakuStoreClient.isNil(): return err("waku store v3 client is nil") - let response = (await node.wakuStoreClient.query(request, peer)).valueOr: + let response = (await node.wakuStoreClient.query(request)).valueOr: var res = StoreQueryResponse() res.statusCode = uint32(error.kind) res.statusDesc = $error diff --git a/waku/waku_api/rest/store/handlers.nim b/waku/waku_api/rest/store/handlers.nim index b835645b0d..cd1479e339 100644 --- a/waku/waku_api/rest/store/handlers.nim +++ b/waku/waku_api/rest/store/handlers.nim @@ -26,9 +26,9 @@ const NoPeerNoDiscError* = # Queries the store-node with the query parameters and # returns a RestApiResponse that is sent back to the api client. proc performStoreQuery( - selfNode: WakuNode, storeQuery: StoreQueryRequest, storePeer: RemotePeerInfo + selfNode: WakuNode, storeQuery: StoreQueryRequest ): Future[RestApiResponse] {.async.} = - let queryFut = selfNode.query(storeQuery, storePeer) + let queryFut = selfNode.query(storeQuery) if not await queryFut.withTimeout(futTimeout): const msg = "No history response received (timeout)" @@ -45,7 +45,7 @@ proc performStoreQuery( let res = futRes.get() if res.statusCode == uint32(ErrorCode.TOO_MANY_REQUESTS): - debug "Request rate limit reached on peer ", storePeer + debug "Request rate limit reached on peer " return RestApiResponse.tooManyRequests("Request rate limit reached") let resp = RestApiResponse.jsonResponse(res, status = Http200).valueOr: @@ -222,7 +222,7 @@ proc installStoreApiHandlers*( let parsedPeerAddr = parseUrlPeerAddr(peer).valueOr: return RestApiResponse.badRequest(error) - let peerInfo = parsedPeerAddr.valueOr: + let _ = parsedPeerAddr.valueOr: node.peerManager.selectPeer(WakuStoreCodec).valueOr: let handler = discHandler.valueOr: return NoPeerNoDiscError @@ -235,4 +235,4 @@ proc installStoreApiHandlers*( "No suitable service peer & none discovered" ) - return await node.performStoreQuery(storeQuery, peerInfo) + return await node.performStoreQuery(storeQuery) diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index 07b67a8b2e..769deed571 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -4,7 +4,13 @@ import std/options, chronicles, chronos, libp2p/protocols/protocol, bearssl/rand import - ../node/peer_manager, ../waku_core, ./common, ./protocol_metrics, ./rpc_codec, ./rpc + ../node/peer_manager, + ../node/delivery_monitor/subscriptions_observer, + ../waku_core, + ./common, + ./protocol_metrics, + ./rpc_codec, + ./rpc logScope: topics = "waku filter client" @@ -13,12 +19,16 @@ type WakuFilterClient* = ref object of LPProtocol rng: ref HmacDrbgContext peerManager: PeerManager pushHandlers: seq[FilterPushHandler] + subscrObservers: seq[SubscriptionObserver] func generateRequestId(rng: ref HmacDrbgContext): string = var bytes: array[10, byte] hmacDrbgGenerate(rng[], bytes) return toHex(bytes) +proc addSubscrObserver*(wfc: WakuFilterClient, obs: SubscriptionObserver) = + wfc.subscrObservers.add(obs) + proc sendSubscribeRequest( wfc: WakuFilterClient, servicePeer: RemotePeerInfo, @@ -113,7 +123,10 @@ proc subscribe*( requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq ) - return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + ?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + + for obs in wfc.subscrObservers: + obs.onSubscribe(pubSubTopic, contentTopicSeq) proc unsubscribe*( wfc: WakuFilterClient, @@ -132,7 +145,9 @@ proc unsubscribe*( requestId = requestId, pubsubTopic = pubsubTopic, contentTopics = contentTopicSeq ) - return await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + ?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + for obs in wfc.subscrObservers: + obs.onUnsubscribe(pubSubTopic, contentTopicSeq) proc unsubscribeAll*( wfc: WakuFilterClient, servicePeer: RemotePeerInfo diff --git a/waku/waku_lightpush/client.nim b/waku/waku_lightpush/client.nim index da502d4562..4f516dec5d 100644 --- a/waku/waku_lightpush/client.nim +++ b/waku/waku_lightpush/client.nim @@ -3,6 +3,7 @@ import std/options, results, chronicles, chronos, metrics, bearssl/rand import ../node/peer_manager, + ../node/delivery_monitor/publish_observer, ../utils/requests, ../waku_core, ./common, @@ -16,12 +17,16 @@ logScope: type WakuLightPushClient* = ref object peerManager*: PeerManager rng*: ref rand.HmacDrbgContext + publishObservers: seq[PublishObserver] proc new*( T: type WakuLightPushClient, peerManager: PeerManager, rng: ref rand.HmacDrbgContext ): T = WakuLightPushClient(peerManager: peerManager, rng: rng) +proc addPublishObserver*(wl: WakuLightPushClient, obs: PublishObserver) = + wl.publishObservers.add(obs) + proc sendPushRequest( wl: WakuLightPushClient, req: PushRequest, peer: PeerId | RemotePeerInfo ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = @@ -67,4 +72,26 @@ proc publish*( peer: PeerId | RemotePeerInfo, ): Future[WakuLightPushResult[void]] {.async, gcsafe.} = let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) - return await wl.sendPushRequest(pushRequest, peer) + ?await wl.sendPushRequest(pushRequest, peer) + + for obs in wl.publishObservers: + obs.onMessagePublished(pubSubTopic, message) + + return ok() + +proc publishToAny*( + wl: WakuLightPushClient, pubSubTopic: PubsubTopic, message: WakuMessage +): Future[WakuLightPushResult[void]] {.async, gcsafe.} = + ## This proc is similar to the publish one but in this case + ## we don't specify a particular peer and instead we get it from peer manager + + let peer = wl.peerManager.selectPeer(WakuLightPushCodec).valueOr: + return err("could not retrieve a peer supporting WakuLightPushCodec") + + let pushRequest = PushRequest(pubSubTopic: pubSubTopic, message: message) + ?await wl.sendPushRequest(pushRequest, peer) + + for obs in wl.publishObservers: + obs.onMessagePublished(pubSubTopic, message) + + return ok() diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index b922f69d3b..d84d512b58 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -18,7 +18,7 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/stream/connection, libp2p/switch -import ../waku_core, ./message_id +import ../waku_core, ./message_id, ../node/delivery_monitor/publish_observer logScope: topics = "waku relay" @@ -128,6 +128,7 @@ type wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]] # a map of validators to error messages to return when validation fails validatorInserted: Table[PubsubTopic, bool] + publishObservers: seq[PublishObserver] proc initProtocolHandler(w: WakuRelay) = proc handler(conn: Connection, proto: string) {.async.} = @@ -254,7 +255,14 @@ proc addValidator*( ) {.gcsafe.} = w.wakuValidators.add((handler, errorMessage)) +proc addPublishObserver*(w: WakuRelay, obs: PublishObserver) = + ## Observer when whe api client performed a publish operation. This + ## is initially aimed for bringing an additional layer of delivery reliability thanks + ## to store + w.publishObservers.add(obs) + proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = + ## Observes when a message is sent/received from the GossipSub PoV procCall GossipSub(w).addObserver(observer) method start*(w: WakuRelay) {.async, base.} = @@ -391,4 +399,8 @@ proc publish*( let relayedPeerCount = await procCall GossipSub(w).publish(pubsubTopic, data) + if relayedPeerCount > 0: + for obs in w.publishObservers: + obs.onMessagePublished(pubSubTopic, message) + return relayedPeerCount diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index d6d53a9c7c..14c642c89a 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -48,11 +48,20 @@ proc sendStoreRequest( return ok(res) proc query*( - self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo | PeerId -): Future[StoreQueryResult] {.async, gcsafe.} = + self: WakuStoreClient, request: StoreQueryRequest, peerId = none(PeerId) +): Future[StoreQueryResult] {.async.} = if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor: return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor")) + let peer: PeerId = + if peerId.isSome(): + peerId.get() + else: + let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr: + return + err(StoreError(kind: BAD_RESPONSE, cause: "no service store peer connected")) + peer.peerId + let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr: waku_store_errors.inc(labelValues = [dialFailure]) diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index a4e5467a23..2f47cc6c8c 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -25,9 +25,6 @@ import logScope: topics = "waku store" -const MaxMessageTimestampVariance* = getNanoSecondTime(20) - # 20 seconds maximum allowable sender timestamp "drift" - type StoreQueryRequestHandler* = proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.} diff --git a/waku/waku_store/resume.nim b/waku/waku_store/resume.nim index 208ba0aa66..c7e930748a 100644 --- a/waku/waku_store/resume.nim +++ b/waku/waku_store/resume.nim @@ -28,9 +28,9 @@ const ResumeRangeLimit = 6 # hours type - TransferCallback* = proc( - timestamp: Timestamp, peer: RemotePeerInfo - ): Future[Result[void, string]] {.async: (raises: []), closure.} + TransferCallback* = proc(timestamp: Timestamp): Future[Result[void, string]] {. + async: (raises: []), closure + .} StoreResume* = ref object handle: Future[void] @@ -80,7 +80,7 @@ proc initTransferHandler( # tying archive, store client and resume into one callback and saving it for later self.transferCallBack = some( proc( - timestamp: Timestamp, peer: RemotePeerInfo + timestamp: Timestamp ): Future[Result[void, string]] {.async: (raises: []), closure.} = var req = StoreQueryRequest() req.includeData = true @@ -90,7 +90,7 @@ proc initTransferHandler( while true: let catchable = catch: - await wakuStoreClient.query(req, peer) + await wakuStoreClient.query(req) if catchable.isErr(): return err("store client error: " & catchable.error.msg) @@ -156,16 +156,16 @@ proc setLastOnlineTimestamp*( return ok() proc startStoreResume*( - self: StoreResume, time: Timestamp, peer: RemotePeerInfo + self: StoreResume, time: Timestamp ): Future[Result[void, string]] {.async.} = - info "starting store resume", lastOnline = $time, peer = $peer + info "starting store resume", lastOnline = $time # get the callback we saved if possible let callback = self.transferCallBack.valueOr: return err("transfer callback uninitialised") # run the callback - (await callback(time, peer)).isOkOr: + (await callback(time)).isOkOr: return err("transfer callback failed: " & $error) info "store resume completed" @@ -173,9 +173,6 @@ proc startStoreResume*( return ok() proc autoStoreResume*(self: StoreResume): Future[Result[void, string]] {.async.} = - let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr: - return err("no suitable peer found for store resume") - let lastOnlineTs = self.getLastOnlineTimestamp().valueOr: return err("failed to get last online timestamp: " & $error) @@ -184,7 +181,7 @@ proc autoStoreResume*(self: StoreResume): Future[Result[void, string]] {.async.} let maxTime = now - (ResumeRangeLimit * 3600 * 1_000_000_000) let ts = max(lastOnlineTs, maxTime) - return await self.startStoreResume(ts, peer) + return await self.startStoreResume(ts) proc periodicSetLastOnline(self: StoreResume) {.async.} = ## Save a timestamp periodically diff --git a/waku/waku_store_legacy/protocol.nim b/waku/waku_store_legacy/protocol.nim index 6f158394e2..a4e5c92468 100644 --- a/waku/waku_store_legacy/protocol.nim +++ b/waku/waku_store_legacy/protocol.nim @@ -26,9 +26,6 @@ import logScope: topics = "waku legacy store" -const MaxMessageTimestampVariance* = getNanoSecondTime(20) - # 20 seconds maximum allowable sender timestamp "drift" - type HistoryQueryHandler* = proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.} diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 0a5e6e49d8..5f356f830c 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -276,7 +276,7 @@ proc createTransferCallback( while true: let catchable = catch: - await wakuStoreClient.query(query, peerId) + await wakuStoreClient.query(query, some(peerId)) if catchable.isErr(): return err("store client error: " & catchable.error.msg) From 805eb3884097da0a3171b37a1084fb64e3688b48 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Thu, 15 Aug 2024 19:40:05 +0200 Subject: [PATCH 2/9] chat2 compilation issue --- apps/chat2/chat2.nim | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index dd2694cf77..9a9c36c9f6 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -490,9 +490,8 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = echo &"{chatLine}" info "Hit store handler" - let queryRes = await node.query( - StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get() - ) + let queryRes = + await node.query(StoreQueryRequest(contentTopics: @[chat.contentTopic])) if queryRes.isOk(): storeHandler(queryRes.value) From 371a58c796b83d5a3d39e648f8e463eeec27235c Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Thu, 15 Aug 2024 20:30:25 +0200 Subject: [PATCH 3/9] creating a separate proc in store client for delivery monitor --- waku/node/delivery_monitor/recv_monitor.nim | 4 ++-- waku/node/delivery_monitor/send_monitor.nim | 2 +- waku/node/waku_node.nim | 4 ++-- waku/waku_api/rest/store/handlers.nim | 10 ++++---- waku/waku_store/client.nim | 26 ++++++++++++++------- waku/waku_store/resume.nim | 21 ++++++++++------- waku/waku_sync/protocol.nim | 2 +- 7 files changed, 41 insertions(+), 28 deletions(-) diff --git a/waku/node/delivery_monitor/recv_monitor.nim b/waku/node/delivery_monitor/recv_monitor.nim index f0aa180133..ef66f0c6e6 100644 --- a/waku/node/delivery_monitor/recv_monitor.nim +++ b/waku/node/delivery_monitor/recv_monitor.nim @@ -49,7 +49,7 @@ proc getMissingMsgsFromStore( self: RecvMonitor, msgHashes: seq[WakuMessageHash] ): Future[Result[seq[TupleHashAndMsg], string]] {.async.} = let storeResp: StoreQueryResponse = ( - await self.storeClient.query( + await self.storeClient.queryToAny( StoreQueryRequest(includeData: true, messageHashes: msgHashes) ) ).valueOr: @@ -89,7 +89,7 @@ proc msgChecker(self: RecvMonitor) {.async.} = var msgHashesInStore = newSeq[WakuMessageHash](0) for pubsubTopic, cTopics in self.topicsInterest.pairs: let storeResp: StoreQueryResponse = ( - await self.storeClient.query( + await self.storeClient.queryToAny( StoreQueryRequest( includeData: false, pubsubTopic: some(PubsubTopic(pubsubTopic)), diff --git a/waku/node/delivery_monitor/send_monitor.nim b/waku/node/delivery_monitor/send_monitor.nim index 8a90d9ab49..b0b73e6b84 100644 --- a/waku/node/delivery_monitor/send_monitor.nim +++ b/waku/node/delivery_monitor/send_monitor.nim @@ -110,7 +110,7 @@ proc checkMsgsInStore( let hashesToValidate = toSeq(msgsToValidate.keys) let storeResp: StoreQueryResponse = ( - await self.storeClient.query( + await self.storeClient.queryToAny( StoreQueryRequest(includeData: false, messageHashes: hashesToValidate) ) ).valueOr: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e30fdf0f1f..3895f52312 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -979,7 +979,7 @@ proc mountStoreClient*(node: WakuNode) = node.wakuStoreClient = store_client.WakuStoreClient.new(node.peerManager, node.rng) proc query*( - node: WakuNode, request: store_common.StoreQueryRequest + node: WakuNode, request: store_common.StoreQueryRequest, peer: RemotePeerInfo ): Future[store_common.WakuStoreResult[store_common.StoreQueryResponse]] {. async, gcsafe .} = @@ -987,7 +987,7 @@ proc query*( if node.wakuStoreClient.isNil(): return err("waku store v3 client is nil") - let response = (await node.wakuStoreClient.query(request)).valueOr: + let response = (await node.wakuStoreClient.query(request, peer)).valueOr: var res = StoreQueryResponse() res.statusCode = uint32(error.kind) res.statusDesc = $error diff --git a/waku/waku_api/rest/store/handlers.nim b/waku/waku_api/rest/store/handlers.nim index cd1479e339..b835645b0d 100644 --- a/waku/waku_api/rest/store/handlers.nim +++ b/waku/waku_api/rest/store/handlers.nim @@ -26,9 +26,9 @@ const NoPeerNoDiscError* = # Queries the store-node with the query parameters and # returns a RestApiResponse that is sent back to the api client. proc performStoreQuery( - selfNode: WakuNode, storeQuery: StoreQueryRequest + selfNode: WakuNode, storeQuery: StoreQueryRequest, storePeer: RemotePeerInfo ): Future[RestApiResponse] {.async.} = - let queryFut = selfNode.query(storeQuery) + let queryFut = selfNode.query(storeQuery, storePeer) if not await queryFut.withTimeout(futTimeout): const msg = "No history response received (timeout)" @@ -45,7 +45,7 @@ proc performStoreQuery( let res = futRes.get() if res.statusCode == uint32(ErrorCode.TOO_MANY_REQUESTS): - debug "Request rate limit reached on peer " + debug "Request rate limit reached on peer ", storePeer return RestApiResponse.tooManyRequests("Request rate limit reached") let resp = RestApiResponse.jsonResponse(res, status = Http200).valueOr: @@ -222,7 +222,7 @@ proc installStoreApiHandlers*( let parsedPeerAddr = parseUrlPeerAddr(peer).valueOr: return RestApiResponse.badRequest(error) - let _ = parsedPeerAddr.valueOr: + let peerInfo = parsedPeerAddr.valueOr: node.peerManager.selectPeer(WakuStoreCodec).valueOr: let handler = discHandler.valueOr: return NoPeerNoDiscError @@ -235,4 +235,4 @@ proc installStoreApiHandlers*( "No suitable service peer & none discovered" ) - return await node.performStoreQuery(storeQuery) + return await node.performStoreQuery(storeQuery, peerInfo) diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index 14c642c89a..4b05249e38 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -48,19 +48,29 @@ proc sendStoreRequest( return ok(res) proc query*( + self: WakuStoreClient, request: StoreQueryRequest, peer: RemotePeerInfo | PeerId +): Future[StoreQueryResult] {.async, gcsafe.} = + if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor: + return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor")) + + let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr: + waku_store_errors.inc(labelValues = [dialFailure]) + + return err(StoreError(kind: ErrorCode.PEER_DIAL_FAILURE, address: $peer)) + + return await self.sendStoreRequest(request, connection) + +proc queryToAny*( self: WakuStoreClient, request: StoreQueryRequest, peerId = none(PeerId) ): Future[StoreQueryResult] {.async.} = + ## This proc is similar to the query one but in this case + ## we don't specify a particular peer and instead we get it from peer manager + if request.paginationCursor.isSome() and request.paginationCursor.get() == EmptyCursor: return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: "invalid cursor")) - let peer: PeerId = - if peerId.isSome(): - peerId.get() - else: - let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr: - return - err(StoreError(kind: BAD_RESPONSE, cause: "no service store peer connected")) - peer.peerId + let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr: + return err(StoreError(kind: BAD_RESPONSE, cause: "no service store peer connected")) let connection = (await self.peerManager.dialPeer(peer, WakuStoreCodec)).valueOr: waku_store_errors.inc(labelValues = [dialFailure]) diff --git a/waku/waku_store/resume.nim b/waku/waku_store/resume.nim index c7e930748a..208ba0aa66 100644 --- a/waku/waku_store/resume.nim +++ b/waku/waku_store/resume.nim @@ -28,9 +28,9 @@ const ResumeRangeLimit = 6 # hours type - TransferCallback* = proc(timestamp: Timestamp): Future[Result[void, string]] {. - async: (raises: []), closure - .} + TransferCallback* = proc( + timestamp: Timestamp, peer: RemotePeerInfo + ): Future[Result[void, string]] {.async: (raises: []), closure.} StoreResume* = ref object handle: Future[void] @@ -80,7 +80,7 @@ proc initTransferHandler( # tying archive, store client and resume into one callback and saving it for later self.transferCallBack = some( proc( - timestamp: Timestamp + timestamp: Timestamp, peer: RemotePeerInfo ): Future[Result[void, string]] {.async: (raises: []), closure.} = var req = StoreQueryRequest() req.includeData = true @@ -90,7 +90,7 @@ proc initTransferHandler( while true: let catchable = catch: - await wakuStoreClient.query(req) + await wakuStoreClient.query(req, peer) if catchable.isErr(): return err("store client error: " & catchable.error.msg) @@ -156,16 +156,16 @@ proc setLastOnlineTimestamp*( return ok() proc startStoreResume*( - self: StoreResume, time: Timestamp + self: StoreResume, time: Timestamp, peer: RemotePeerInfo ): Future[Result[void, string]] {.async.} = - info "starting store resume", lastOnline = $time + info "starting store resume", lastOnline = $time, peer = $peer # get the callback we saved if possible let callback = self.transferCallBack.valueOr: return err("transfer callback uninitialised") # run the callback - (await callback(time)).isOkOr: + (await callback(time, peer)).isOkOr: return err("transfer callback failed: " & $error) info "store resume completed" @@ -173,6 +173,9 @@ proc startStoreResume*( return ok() proc autoStoreResume*(self: StoreResume): Future[Result[void, string]] {.async.} = + let peer = self.peerManager.selectPeer(WakuStoreCodec).valueOr: + return err("no suitable peer found for store resume") + let lastOnlineTs = self.getLastOnlineTimestamp().valueOr: return err("failed to get last online timestamp: " & $error) @@ -181,7 +184,7 @@ proc autoStoreResume*(self: StoreResume): Future[Result[void, string]] {.async.} let maxTime = now - (ResumeRangeLimit * 3600 * 1_000_000_000) let ts = max(lastOnlineTs, maxTime) - return await self.startStoreResume(ts) + return await self.startStoreResume(ts, peer) proc periodicSetLastOnline(self: StoreResume) {.async.} = ## Save a timestamp periodically diff --git a/waku/waku_sync/protocol.nim b/waku/waku_sync/protocol.nim index 5f356f830c..0a5e6e49d8 100644 --- a/waku/waku_sync/protocol.nim +++ b/waku/waku_sync/protocol.nim @@ -276,7 +276,7 @@ proc createTransferCallback( while true: let catchable = catch: - await wakuStoreClient.query(query, some(peerId)) + await wakuStoreClient.query(query, peerId) if catchable.isErr(): return err("store client error: " & catchable.error.msg) From 7432ea0e19ddb3163d5a56410b6905f38bf512fe Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Thu, 15 Aug 2024 21:00:10 +0200 Subject: [PATCH 4/9] chat2 undo --- apps/chat2/chat2.nim | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 9a9c36c9f6..dd2694cf77 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -490,8 +490,9 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = echo &"{chatLine}" info "Hit store handler" - let queryRes = - await node.query(StoreQueryRequest(contentTopics: @[chat.contentTopic])) + let queryRes = await node.query( + StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get() + ) if queryRes.isOk(): storeHandler(queryRes.value) From 7b372b0b09257dbfdc120233a03d77a59c57123d Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Thu, 15 Aug 2024 21:33:24 +0200 Subject: [PATCH 5/9] client filter return ok --- waku/waku_filter_v2/client.nim | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/waku/waku_filter_v2/client.nim b/waku/waku_filter_v2/client.nim index 769deed571..617648aff8 100644 --- a/waku/waku_filter_v2/client.nim +++ b/waku/waku_filter_v2/client.nim @@ -128,6 +128,8 @@ proc subscribe*( for obs in wfc.subscrObservers: obs.onSubscribe(pubSubTopic, contentTopicSeq) + return ok() + proc unsubscribe*( wfc: WakuFilterClient, servicePeer: RemotePeerInfo, @@ -146,9 +148,12 @@ proc unsubscribe*( ) ?await wfc.sendSubscribeRequest(servicePeer, filterSubscribeRequest) + for obs in wfc.subscrObservers: obs.onUnsubscribe(pubSubTopic, contentTopicSeq) + return ok() + proc unsubscribeAll*( wfc: WakuFilterClient, servicePeer: RemotePeerInfo ): Future[FilterSubscribeResult] {.async: (raises: []).} = From 8fe4d9004f5c175c7ccd9a69f4a2ff45ed1fb166 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 23 Aug 2024 17:03:22 +0200 Subject: [PATCH 6/9] Update waku/waku_relay/protocol.nim Co-authored-by: gabrielmer <101006718+gabrielmer@users.noreply.github.com> --- waku/waku_relay/protocol.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index d84d512b58..218dcf3d23 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -256,7 +256,7 @@ proc addValidator*( w.wakuValidators.add((handler, errorMessage)) proc addPublishObserver*(w: WakuRelay, obs: PublishObserver) = - ## Observer when whe api client performed a publish operation. This + ## Observer when the api client performed a publish operation. This ## is initially aimed for bringing an additional layer of delivery reliability thanks ## to store w.publishObservers.add(obs) From 89cf03ce9d044fbe7e63a0d3f6a6eabaea5c7479 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 23 Aug 2024 17:09:40 +0200 Subject: [PATCH 7/9] update comment --- waku/node/delivery_monitor/send_monitor.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/node/delivery_monitor/send_monitor.nim b/waku/node/delivery_monitor/send_monitor.nim index b0b73e6b84..ce1ccf0cc9 100644 --- a/waku/node/delivery_monitor/send_monitor.nim +++ b/waku/node/delivery_monitor/send_monitor.nim @@ -33,7 +33,7 @@ type DeliveryInfo = object type SendMonitor* = ref object of PublishObserver publishedMessages: Table[WakuMessageHash, DeliveryInfo] - ## Cache that contains the delivery info per pubsub-topic. + ## Cache that contains the delivery info per message hash. ## This is needed to make sure the published messages are properly published msgStoredCheckerHandle: Future[void] ## handle that allows to stop the async task From cee592d8ecddcf1ec74914567917d85dae5d1603 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 23 Aug 2024 17:16:59 +0200 Subject: [PATCH 8/9] fix typo --- waku/node/delivery_monitor/recv_monitor.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/waku/node/delivery_monitor/recv_monitor.nim b/waku/node/delivery_monitor/recv_monitor.nim index ef66f0c6e6..cae706ddd2 100644 --- a/waku/node/delivery_monitor/recv_monitor.nim +++ b/waku/node/delivery_monitor/recv_monitor.nim @@ -55,9 +55,9 @@ proc getMissingMsgsFromStore( ).valueOr: return err("getMissingMsgsFromStore: " & $error) - let otheriseMsg = WakuMessage() ## message to be returned if the Option message is none + let otherwiseMsg = WakuMessage() ## message to be returned if the Option message is none return ok( - storeResp.messages.mapIt((hash: it.messageHash, msg: it.message.get(otheriseMsg))) + storeResp.messages.mapIt((hash: it.messageHash, msg: it.message.get(otherwiseMsg))) ) proc performDeliveryFeedback( From 1743f7078e471c118d3768ef9b815d0747a71ee1 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 23 Aug 2024 17:20:59 +0200 Subject: [PATCH 9/9] Update recv_monitor.nim --- waku/node/delivery_monitor/recv_monitor.nim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/waku/node/delivery_monitor/recv_monitor.nim b/waku/node/delivery_monitor/recv_monitor.nim index cae706ddd2..3f82ddcd2e 100644 --- a/waku/node/delivery_monitor/recv_monitor.nim +++ b/waku/node/delivery_monitor/recv_monitor.nim @@ -55,7 +55,8 @@ proc getMissingMsgsFromStore( ).valueOr: return err("getMissingMsgsFromStore: " & $error) - let otherwiseMsg = WakuMessage() ## message to be returned if the Option message is none + let otherwiseMsg = WakuMessage() + ## message to be returned if the Option message is none return ok( storeResp.messages.mapIt((hash: it.messageHash, msg: it.message.get(otherwiseMsg))) )