diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 482c8194ff..875e781a8c 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -552,7 +552,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = await node.mountFilterClient() node.peerManager.addServicePeer(peerInfo.value, WakuFilterCodec) - proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe.} = + proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = trace "Hit filter handler", contentTopic=msg.contentTopic chat.printReceivedMessage(msg) diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 960672a4a9..9a01a4ec57 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -54,7 +54,7 @@ suite "Waku Filter": let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() let pushHandlerFuture = newFuture[(string, WakuMessage)]() - proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.} = pushHandlerFuture.complete((pubsubTopic, message)) let @@ -97,7 +97,7 @@ suite "Waku Filter": let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() var pushHandlerFuture = newFuture[void]() - proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.} = pushHandlerFuture.complete() let @@ -149,7 +149,7 @@ suite "Waku Filter": let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() var pushHandlerFuture = newFuture[void]() - proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.} = pushHandlerFuture.complete() let @@ -214,7 +214,7 @@ suite "Waku Filter": let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() var pushHandlerFuture = newFuture[void]() - proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.} = pushHandlerFuture.complete() let diff --git a/tests/v2/test_wakunode_filter.nim b/tests/v2/test_wakunode_filter.nim index 1695beb2bb..9f7f9b11ec 100644 --- a/tests/v2/test_wakunode_filter.nim +++ b/tests/v2/test_wakunode_filter.nim @@ -25,10 +25,10 @@ suite "WakuNode - Filter": clientKey = generateSecp256k1Key() client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) - await allFutures(server.start(), client.start()) + waitFor allFutures(server.start(), client.start()) - await server.mountFilter() - await client.mountFilterClient() + waitFor server.mountFilter() + waitFor client.mountFilterClient() ## Given let serverPeerInfo = server.peerInfo.toRemotePeerInfo() @@ -39,18 +39,18 @@ suite "WakuNode - Filter": message = fakeWakuMessage(contentTopic=contentTopic) var filterPushHandlerFut = newFuture[(PubsubTopic, WakuMessage)]() - proc filterPushHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} = + proc filterPushHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = filterPushHandlerFut.complete((pubsubTopic, msg)) ## When await client.filterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo) # Wait for subscription to take effect - await sleepAsync(100.millis) + waitFor sleepAsync(100.millis) - await server.filterHandleMessage(pubSubTopic, message) + waitFor server.filterHandleMessage(pubSubTopic, message) - require await filterPushHandlerFut.withTimeout(5.seconds) + require waitFor filterPushHandlerFut.withTimeout(5.seconds) ## Then check filterPushHandlerFut.completed() @@ -60,4 +60,4 @@ suite "WakuNode - Filter": filterMessage == message ## Cleanup - await allFutures(client.stop(), server.stop()) + waitFor allFutures(client.stop(), server.stop()) diff --git a/tests/v2/waku_archive/test_driver_queue_query.nim b/tests/v2/waku_archive/test_driver_queue_query.nim index dacb2c4f46..179c3e1580 100644 --- a/tests/v2/waku_archive/test_driver_queue_query.nim +++ b/tests/v2/waku_archive/test_driver_queue_query.nim @@ -58,10 +58,11 @@ suite "Queue driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( maxPageSize=5, ascendingOrder=true ) @@ -75,7 +76,7 @@ suite "Queue driver - query by content topic": filteredMessages == expected[0..4] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "single content topic": ## Given @@ -101,10 +102,11 @@ suite "Queue driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], maxPageSize=2, ascendingOrder=true @@ -119,7 +121,7 @@ suite "Queue driver - query by content topic": filteredMessages == expected[2..3] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "single content topic - descending order": ## Given @@ -145,10 +147,11 @@ suite "Queue driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], maxPageSize=2, ascendingOrder=false @@ -163,7 +166,7 @@ suite "Queue driver - query by content topic": filteredMessages == expected[6..7].reversed() ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "multiple content topic": ## Given @@ -191,10 +194,11 @@ suite "Queue driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic1, contentTopic2], maxPageSize=2, ascendingOrder=true @@ -209,7 +213,7 @@ suite "Queue driver - query by content topic": filteredMessages == expected[2..3] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "single content topic - no results": ## Given @@ -230,10 +234,11 @@ suite "Queue driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], maxPageSize=2, ascendingOrder=true @@ -248,7 +253,7 @@ suite "Queue driver - query by content topic": filteredMessages.len == 0 ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "content topic and max page size - not enough messages stored": ## Given @@ -258,10 +263,11 @@ suite "Queue driver - query by content topic": for t in 0..<40: let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t)) - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[DefaultContentTopic], maxPageSize=pageSize, ascendingOrder=true @@ -276,7 +282,7 @@ suite "Queue driver - query by content topic": filteredMessages.len == 40 ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") suite "SQLite driver - query by pubsub topic": @@ -306,10 +312,11 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( pubsubTopic=some(pubsubTopic), maxPageSize=2, ascendingOrder=true @@ -325,7 +332,7 @@ suite "SQLite driver - query by pubsub topic": filteredMessages == expectedMessages[4..5] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "no pubsub topic": ## Given @@ -352,10 +359,11 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( maxPageSize=2, ascendingOrder=true ) @@ -370,7 +378,7 @@ suite "SQLite driver - query by pubsub topic": filteredMessages == expectedMessages[0..1] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "content topic and pubsub topic": ## Given @@ -398,10 +406,11 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], pubsubTopic=some(pubsubTopic), maxPageSize=2, @@ -418,7 +427,7 @@ suite "SQLite driver - query by pubsub topic": filteredMessages == expectedMessages[4..5] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") suite "Queue driver - query by cursor": @@ -447,12 +456,13 @@ suite "Queue driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( cursor=some(cursor), maxPageSize=2, ascendingOrder=true @@ -467,7 +477,7 @@ suite "Queue driver - query by cursor": filteredMessages == expected[5..6] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "only cursor - descending order": ## Given @@ -493,12 +503,13 @@ suite "Queue driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( cursor=some(cursor), maxPageSize=2, ascendingOrder=false @@ -513,7 +524,7 @@ suite "Queue driver - query by cursor": filteredMessages == expected[2..3].reversed() ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "content topic and cursor": ## Given @@ -537,12 +548,13 @@ suite "Queue driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], cursor=some(cursor), maxPageSize=10, @@ -558,7 +570,7 @@ suite "Queue driver - query by cursor": filteredMessages == expected[5..6] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "content topic and cursor - descending order": ## Given @@ -582,12 +594,13 @@ suite "Queue driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], cursor=some(cursor), maxPageSize=10, @@ -603,7 +616,7 @@ suite "Queue driver - query by cursor": filteredMessages == expected[2..5].reversed() ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "pubsub topic and cursor": ## Given @@ -634,12 +647,13 @@ suite "Queue driver - query by cursor": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(expected[5][0], expected[5][1]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( pubsubTopic=some(pubsubTopic), cursor=some(cursor), maxPageSize=10, @@ -656,7 +670,7 @@ suite "Queue driver - query by cursor": filteredMessages == expectedMessages[6..7] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "pubsub topic and cursor - descending order": ## Given @@ -687,12 +701,13 @@ suite "Queue driver - query by cursor": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(expected[6][0], expected[6][1]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( pubsubTopic=some(pubsubTopic), cursor=some(cursor), maxPageSize=10, @@ -709,7 +724,7 @@ suite "Queue driver - query by cursor": filteredMessages == expectedMessages[4..5].reversed() ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") suite "Queue driver - query by time range": @@ -737,10 +752,11 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( startTime=some(ts(15, timeOrigin)), maxPageSize=10, ascendingOrder=true @@ -755,7 +771,7 @@ suite "Queue driver - query by time range": filteredMessages == expected[2..6] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "end time only": ## Given @@ -780,10 +796,11 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( endTime=some(ts(45, timeOrigin)), maxPageSize=10, ascendingOrder=true @@ -798,7 +815,7 @@ suite "Queue driver - query by time range": filteredMessages == expected[0..4] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "start time and end time": ## Given @@ -829,10 +846,11 @@ suite "Queue driver - query by time range": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( startTime=some(ts(15, timeOrigin)), endTime=some(ts(45, timeOrigin)), maxPageSize=10, @@ -849,7 +867,7 @@ suite "Queue driver - query by time range": filteredMessages == expectedMessages[2..4] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "invalid time range - no results": ## Given @@ -875,10 +893,11 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], startTime=some(ts(45, timeOrigin)), endTime=some(ts(15, timeOrigin)), @@ -894,9 +913,9 @@ suite "Queue driver - query by time range": filteredMessages.len == 0 ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") - test "time range start and content topic": + asynctest "time range start and content topic": ## Given const contentTopic = "test-content-topic" @@ -919,10 +938,11 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], startTime=some(ts(15, timeOrigin)), maxPageSize=10, @@ -937,7 +957,7 @@ suite "Queue driver - query by time range": filteredMessages == expected[2..6] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "time range start and content topic - descending order": ## Given @@ -965,10 +985,11 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], startTime=some(ts(15, timeOrigin)), maxPageSize=10, @@ -983,9 +1004,9 @@ suite "Queue driver - query by time range": filteredMessages == expected[2..6].reversed() ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") - test "time range start, single content topic and cursor": + asynctest "time range start, single content topic and cursor": ## Given const contentTopic = "test-content-topic" @@ -1011,12 +1032,13 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[3]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], cursor=some(cursor), startTime=some(ts(15, timeOrigin)), @@ -1032,9 +1054,9 @@ suite "Queue driver - query by time range": filteredMessages == expected[4..9] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") - test "time range start, single content topic and cursor - descending order": + asynctest "time range start, single content topic and cursor - descending order": ## Given const contentTopic = "test-content-topic" @@ -1060,12 +1082,13 @@ suite "Queue driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], cursor=some(cursor), startTime=some(ts(15, timeOrigin)), @@ -1081,7 +1104,7 @@ suite "Queue driver - query by time range": filteredMessages == expected[3..4].reversed() ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "time range, content topic, pubsub topic and cursor": ## Given @@ -1112,12 +1135,13 @@ suite "Queue driver - query by time range": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], pubsubTopic=some(pubsubTopic), cursor=some(cursor), @@ -1136,7 +1160,7 @@ suite "Queue driver - query by time range": filteredMessages == expectedMessages[3..4] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "time range, content topic, pubsub topic and cursor - descending order": ## Given @@ -1167,12 +1191,13 @@ suite "Queue driver - query by time range": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(expected[7][0], expected[7][1]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], pubsubTopic=some(pubsubTopic), cursor=some(cursor), @@ -1191,7 +1216,7 @@ suite "Queue driver - query by time range": filteredMessages == expectedMessages[4..5].reversed() ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range": ## Given @@ -1222,12 +1247,13 @@ suite "Queue driver - query by time range": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(expected[1][0], expected[1][1]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], pubsubTopic=some(pubsubTopic), cursor=some(cursor), @@ -1247,7 +1273,7 @@ suite "Queue driver - query by time range": filteredMessages == expectedMessages[4..5] ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range, descending order": ## Given @@ -1278,12 +1304,13 @@ suite "Queue driver - query by time range": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + let retFut = waitFor driver.put(topic, msg, computeDigest(msg), msg.timestamp) + require retFut.isOk() let cursor = computeTestCursor(expected[1][0], expected[1][1]) ## When - let res = driver.getMessages( + let res = waitFor driver.getMessages( contentTopic= @[contentTopic], pubsubTopic=some(pubsubTopic), cursor=some(cursor), @@ -1302,4 +1329,4 @@ suite "Queue driver - query by time range": filteredMessages.len == 0 ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") diff --git a/tests/v2/waku_archive/test_driver_sqlite.nim b/tests/v2/waku_archive/test_driver_sqlite.nim index 6f5d999468..4a918247ef 100644 --- a/tests/v2/waku_archive/test_driver_sqlite.nim +++ b/tests/v2/waku_archive/test_driver_sqlite.nim @@ -39,7 +39,7 @@ suite "SQLite driver": not driver.isNil() ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "insert a message": ## Given @@ -50,13 +50,13 @@ suite "SQLite driver": let msg = fakeWakuMessage(contentTopic=contentTopic) ## When - let putRes = driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) + let putRes = waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp) ## Then check: putRes.isOk() - let storedMsg = driver.getAllMessages().tryGet() + let storedMsg = (waitFor driver.getAllMessages()).tryGet() check: storedMsg.len == 1 storedMsg.all do (item: auto) -> bool: @@ -65,4 +65,4 @@ suite "SQLite driver": pubsubTopic == DefaultPubsubTopic ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") diff --git a/tests/v2/waku_archive/test_driver_sqlite_query.nim b/tests/v2/waku_archive/test_driver_sqlite_query.nim index 57c30ac284..c0cd03c399 100644 --- a/tests/v2/waku_archive/test_driver_sqlite_query.nim +++ b/tests/v2/waku_archive/test_driver_sqlite_query.nim @@ -39,7 +39,7 @@ proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveC suite "SQLite driver - query by content topic": - test "no content topic": + asyncTest "no content topic": ## Given const contentTopic = "test-content-topic" @@ -62,10 +62,10 @@ suite "SQLite driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( maxPageSize=5, ascendingOrder=true ) @@ -79,9 +79,9 @@ suite "SQLite driver - query by content topic": filteredMessages == expected[0..4] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "single content topic": + asyncTest "single content topic": ## Given const contentTopic = "test-content-topic" @@ -105,10 +105,10 @@ suite "SQLite driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], maxPageSize=2, ascendingOrder=true @@ -123,9 +123,9 @@ suite "SQLite driver - query by content topic": filteredMessages == expected[2..3] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "single content topic - descending order": + asyncTest "single content topic - descending order": ## Given const contentTopic = "test-content-topic" @@ -149,10 +149,10 @@ suite "SQLite driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], maxPageSize=2, ascendingOrder=false @@ -167,9 +167,9 @@ suite "SQLite driver - query by content topic": filteredMessages == expected[6..7].reversed() ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "multiple content topic": + asyncTest "multiple content topic": ## Given const contentTopic1 = "test-content-topic-1" const contentTopic2 = "test-content-topic-2" @@ -195,10 +195,10 @@ suite "SQLite driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic1, contentTopic2], maxPageSize=2, ascendingOrder=true @@ -213,9 +213,9 @@ suite "SQLite driver - query by content topic": filteredMessages == expected[2..3] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "single content topic - no results": + asyncTest "single content topic - no results": ## Given const contentTopic = "test-content-topic" @@ -234,10 +234,10 @@ suite "SQLite driver - query by content topic": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], maxPageSize=2, ascendingOrder=true @@ -252,9 +252,9 @@ suite "SQLite driver - query by content topic": filteredMessages.len == 0 ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "content topic and max page size - not enough messages stored": + asyncTest "content topic and max page size - not enough messages stored": ## Given const pageSize: uint = 50 @@ -262,10 +262,10 @@ suite "SQLite driver - query by content topic": for t in 0..<40: let msg = fakeWakuMessage(@[byte t], DefaultContentTopic, ts=ts(t)) - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[DefaultContentTopic], maxPageSize=pageSize, ascendingOrder=true @@ -280,12 +280,12 @@ suite "SQLite driver - query by content topic": filteredMessages.len == 40 ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") suite "SQLite driver - query by pubsub topic": - test "pubsub topic": + asyncTest "pubsub topic": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" @@ -310,10 +310,10 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( pubsubTopic=some(pubsubTopic), maxPageSize=2, ascendingOrder=true @@ -329,9 +329,9 @@ suite "SQLite driver - query by pubsub topic": filteredMessages == expectedMessages[4..5] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "no pubsub topic": + asyncTest "no pubsub topic": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" @@ -356,10 +356,10 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( maxPageSize=2, ascendingOrder=true ) @@ -374,9 +374,9 @@ suite "SQLite driver - query by pubsub topic": filteredMessages == expectedMessages[0..1] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "content topic and pubsub topic": + asyncTest "content topic and pubsub topic": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" @@ -402,10 +402,10 @@ suite "SQLite driver - query by pubsub topic": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], pubsubTopic=some(pubsubTopic), maxPageSize=2, @@ -422,12 +422,12 @@ suite "SQLite driver - query by pubsub topic": filteredMessages == expectedMessages[4..5] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") suite "SQLite driver - query by cursor": - test "only cursor": + asyncTest "only cursor": ## Given const contentTopic = "test-content-topic" @@ -451,12 +451,12 @@ suite "SQLite driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( cursor=some(cursor), maxPageSize=2, ascendingOrder=true @@ -471,9 +471,9 @@ suite "SQLite driver - query by cursor": filteredMessages == expected[5..6] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "only cursor - descending order": + asyncTest "only cursor - descending order": ## Given const contentTopic = "test-content-topic" @@ -497,12 +497,12 @@ suite "SQLite driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( cursor=some(cursor), maxPageSize=2, ascendingOrder=false @@ -517,9 +517,9 @@ suite "SQLite driver - query by cursor": filteredMessages == expected[2..3].reversed() ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "content topic and cursor": + asyncTest "content topic and cursor": ## Given const contentTopic = "test-content-topic" @@ -541,12 +541,12 @@ suite "SQLite driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[4]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], cursor=some(cursor), maxPageSize=10, @@ -562,9 +562,9 @@ suite "SQLite driver - query by cursor": filteredMessages == expected[5..6] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "content topic and cursor - descending order": + asyncTest "content topic and cursor - descending order": ## Given const contentTopic = "test-content-topic" @@ -586,12 +586,12 @@ suite "SQLite driver - query by cursor": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], cursor=some(cursor), maxPageSize=10, @@ -607,9 +607,9 @@ suite "SQLite driver - query by cursor": filteredMessages == expected[2..5].reversed() ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "pubsub topic and cursor": + asyncTest "pubsub topic and cursor": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" @@ -638,12 +638,12 @@ suite "SQLite driver - query by cursor": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[5][0], expected[5][1]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( pubsubTopic=some(pubsubTopic), cursor=some(cursor), maxPageSize=10, @@ -660,9 +660,9 @@ suite "SQLite driver - query by cursor": filteredMessages == expectedMessages[6..7] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "pubsub topic and cursor - descending order": + asyncTest "pubsub topic and cursor - descending order": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" @@ -691,12 +691,12 @@ suite "SQLite driver - query by cursor": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[6][0], expected[6][1]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( pubsubTopic=some(pubsubTopic), cursor=some(cursor), maxPageSize=10, @@ -713,12 +713,12 @@ suite "SQLite driver - query by cursor": filteredMessages == expectedMessages[4..5].reversed() ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") suite "SQLite driver - query by time range": - test "start time only": + asyncTest "start time only": ## Given const contentTopic = "test-content-topic" @@ -741,10 +741,10 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( startTime=some(ts(15, timeOrigin)), maxPageSize=10, ascendingOrder=true @@ -759,9 +759,9 @@ suite "SQLite driver - query by time range": filteredMessages == expected[2..6] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "end time only": + asyncTest "end time only": ## Given const contentTopic = "test-content-topic" @@ -784,10 +784,10 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( endTime=some(ts(45, timeOrigin)), maxPageSize=10, ascendingOrder=true @@ -802,9 +802,9 @@ suite "SQLite driver - query by time range": filteredMessages == expected[0..4] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "start time and end time": + asyncTest "start time and end time": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" @@ -833,10 +833,10 @@ suite "SQLite driver - query by time range": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( startTime=some(ts(15, timeOrigin)), endTime=some(ts(45, timeOrigin)), maxPageSize=10, @@ -853,9 +853,9 @@ suite "SQLite driver - query by time range": filteredMessages == expectedMessages[2..4] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "invalid time range - no results": + asyncTest "invalid time range - no results": ## Given const contentTopic = "test-content-topic" @@ -879,10 +879,10 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], startTime=some(ts(45, timeOrigin)), endTime=some(ts(15, timeOrigin)), @@ -898,9 +898,9 @@ suite "SQLite driver - query by time range": filteredMessages.len == 0 ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "time range start and content topic": + asyncTest "time range start and content topic": ## Given const contentTopic = "test-content-topic" @@ -923,10 +923,10 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], startTime=some(ts(15, timeOrigin)), maxPageSize=10, @@ -941,9 +941,9 @@ suite "SQLite driver - query by time range": filteredMessages == expected[2..6] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "time range start and content topic - descending order": + asyncTest "time range start and content topic - descending order": ## Given const contentTopic = "test-content-topic" @@ -969,10 +969,10 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], startTime=some(ts(15, timeOrigin)), maxPageSize=10, @@ -987,9 +987,9 @@ suite "SQLite driver - query by time range": filteredMessages == expected[2..6].reversed() ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "time range start, single content topic and cursor": + asyncTest "time range start, single content topic and cursor": ## Given const contentTopic = "test-content-topic" @@ -1015,12 +1015,12 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[3]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], cursor=some(cursor), startTime=some(ts(15, timeOrigin)), @@ -1036,9 +1036,9 @@ suite "SQLite driver - query by time range": filteredMessages == expected[4..9] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "time range start, single content topic and cursor - descending order": + asyncTest "time range start, single content topic and cursor - descending order": ## Given const contentTopic = "test-content-topic" @@ -1064,12 +1064,12 @@ suite "SQLite driver - query by time range": debug "randomized message insertion sequence", sequence=messages.mapIt(it.payload) for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[6]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], cursor=some(cursor), startTime=some(ts(15, timeOrigin)), @@ -1085,9 +1085,9 @@ suite "SQLite driver - query by time range": filteredMessages == expected[3..4].reversed() ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "time range, content topic, pubsub topic and cursor": + asyncTest "time range, content topic, pubsub topic and cursor": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" @@ -1116,12 +1116,12 @@ suite "SQLite driver - query by time range": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(DefaultPubsubTopic, expected[1][1]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], pubsubTopic=some(pubsubTopic), cursor=some(cursor), @@ -1140,9 +1140,9 @@ suite "SQLite driver - query by time range": filteredMessages == expectedMessages[3..4] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "time range, content topic, pubsub topic and cursor - descending order": + asyncTest "time range, content topic, pubsub topic and cursor - descending order": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" @@ -1171,12 +1171,12 @@ suite "SQLite driver - query by time range": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[7][0], expected[7][1]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], pubsubTopic=some(pubsubTopic), cursor=some(cursor), @@ -1195,9 +1195,9 @@ suite "SQLite driver - query by time range": filteredMessages == expectedMessages[4..5].reversed() ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range": + asyncTest "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" @@ -1226,12 +1226,12 @@ suite "SQLite driver - query by time range": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[1][0], expected[1][1]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], pubsubTopic=some(pubsubTopic), cursor=some(cursor), @@ -1251,9 +1251,9 @@ suite "SQLite driver - query by time range": filteredMessages == expectedMessages[4..5] ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") - test "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range, descending order": + asyncTest "time range, content topic, pubsub topic and cursor - cursor timestamp out of time range, descending order": ## Given const contentTopic = "test-content-topic" const pubsubTopic = "test-pubsub-topic" @@ -1282,12 +1282,12 @@ suite "SQLite driver - query by time range": for row in messages: let (topic, msg) = row - require driver.put(topic, msg, computeDigest(msg), msg.timestamp).isOk() + require (await driver.put(topic, msg, computeDigest(msg), msg.timestamp)).isOk() let cursor = computeTestCursor(expected[1][0], expected[1][1]) ## When - let res = driver.getMessages( + let res = await driver.getMessages( contentTopic= @[contentTopic], pubsubTopic=some(pubsubTopic), cursor=some(cursor), @@ -1306,4 +1306,4 @@ suite "SQLite driver - query by time range": filteredMessages.len == 0 ## Cleanup - driver.close().expect("driver to close") + (await driver.close()).expect("driver to close") diff --git a/tests/v2/waku_archive/test_retention_policy.nim b/tests/v2/waku_archive/test_retention_policy.nim index 9dc5072c2a..cddf259a13 100644 --- a/tests/v2/waku_archive/test_retention_policy.nim +++ b/tests/v2/waku_archive/test_retention_policy.nim @@ -40,11 +40,11 @@ suite "Waku Archive - Retention policy": for i in 1..capacity+excess: let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - require retentionPolicy.execute(driver).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (waitFor retentionPolicy.execute(driver)).isOk() ## Then - let numMessages = driver.getMessagesCount().tryGet() + let numMessages = (waitFor driver.getMessagesCount()).tryGet() check: # Expected number of messages is 120 because # (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete) @@ -52,7 +52,7 @@ suite "Waku Archive - Retention policy": numMessages == 120 ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") test "store capacity should be limited": ## Given @@ -76,11 +76,11 @@ suite "Waku Archive - Retention policy": ## When for msg in messages: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() - require retentionPolicy.execute(driver).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() + require (waitFor retentionPolicy.execute(driver)).isOk() ## Then - let storedMsg = driver.getAllMessages().tryGet() + let storedMsg = (waitFor driver.getAllMessages()).tryGet() check: storedMsg.len == capacity storedMsg.all do (item: auto) -> bool: @@ -89,4 +89,4 @@ suite "Waku Archive - Retention policy": pubsubTopic == DefaultPubsubTopic ## Cleanup - driver.close().expect("driver to close") + (waitFor driver.close()).expect("driver to close") diff --git a/tests/v2/waku_archive/test_waku_archive.nim b/tests/v2/waku_archive/test_waku_archive.nim index 936a0b70c6..4e3ae9e04b 100644 --- a/tests/v2/waku_archive/test_waku_archive.nim +++ b/tests/v2/waku_archive/test_waku_archive.nim @@ -4,6 +4,7 @@ import std/[options, sequtils], testutils/unittests, chronicles, + chronos, libp2p/crypto/crypto import ../../../waku/common/sqlite, @@ -47,11 +48,11 @@ suite "Waku Archive - message handling": let message = fakeWakuMessage(ephemeral=false, ts=validSenderTime) ## When - archive.handleMessage(DefaultPubSubTopic, message) + waitFor archive.handleMessage(DefaultPubSubTopic, message) ## Then check: - driver.getMessagesCount().tryGet() == 1 + (waitFor driver.getMessagesCount()).tryGet() == 1 test "it should not driver an ephemeral message": ## Setup @@ -69,11 +70,11 @@ suite "Waku Archive - message handling": ## When for msg in msgList: - archive.handleMessage(DefaultPubsubTopic, msg) + waitFor archive.handleMessage(DefaultPubsubTopic, msg) ## Then check: - driver.getMessagesCount().tryGet() == 2 + (waitFor driver.getMessagesCount()).tryGet() == 2 test "it should driver a message with no sender timestamp": ## Setup @@ -85,11 +86,11 @@ suite "Waku Archive - message handling": let message = fakeWakuMessage(ts=invalidSenderTime) ## When - archive.handleMessage(DefaultPubSubTopic, message) + waitFor archive.handleMessage(DefaultPubSubTopic, message) ## Then check: - driver.getMessagesCount().tryGet() == 1 + (waitFor driver.getMessagesCount()).tryGet() == 1 test "it should not driver a message with a sender time variance greater than max time variance (future)": ## Setup @@ -104,11 +105,11 @@ suite "Waku Archive - message handling": let message = fakeWakuMessage(ts=invalidSenderTime) ## When - archive.handleMessage(DefaultPubSubTopic, message) + waitFor archive.handleMessage(DefaultPubSubTopic, message) ## Then check: - driver.getMessagesCount().tryGet() == 0 + (waitFor driver.getMessagesCount()).tryGet() == 0 test "it should not driver a message with a sender time variance greater than max time variance (past)": ## Setup @@ -123,11 +124,11 @@ suite "Waku Archive - message handling": let message = fakeWakuMessage(ts=invalidSenderTime) ## When - archive.handleMessage(DefaultPubSubTopic, message) + waitFor archive.handleMessage(DefaultPubSubTopic, message) ## Then check: - driver.getMessagesCount().tryGet() == 0 + (waitFor driver.getMessagesCount()).tryGet() == 0 procSuite "Waku Archive - find messages": @@ -147,14 +148,14 @@ procSuite "Waku Archive - find messages": ] let archiveA = block: - let - driver = newTestArchiveDriver() - archive = newTestWakuArchive(driver) + let + driver = newTestArchiveDriver() + archive = newTestWakuArchive(driver) - for msg in msgListA: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + for msg in msgListA: + require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() - archive + archive test "handle query": ## Setup @@ -167,14 +168,14 @@ procSuite "Waku Archive - find messages": msg1 = fakeWakuMessage(contentTopic=topic) msg2 = fakeWakuMessage() - archive.handleMessage("foo", msg1) - archive.handleMessage("foo", msg2) + waitFor archive.handleMessage("foo", msg1) + waitFor archive.handleMessage("foo", msg2) ## Given let req = ArchiveQuery(contentTopics: @[topic]) ## When - let queryRes = archive.findMessages(req) + let queryRes = waitFor archive.findMessages(req) ## Then check: @@ -201,15 +202,15 @@ procSuite "Waku Archive - find messages": msg2 = fakeWakuMessage(contentTopic=topic2) msg3 = fakeWakuMessage(contentTopic=topic3) - archive.handleMessage("foo", msg1) - archive.handleMessage("foo", msg2) - archive.handleMessage("foo", msg3) + waitFor archive.handleMessage("foo", msg1) + waitFor archive.handleMessage("foo", msg2) + waitFor archive.handleMessage("foo", msg3) ## Given let req = ArchiveQuery(contentTopics: @[topic1, topic3]) ## When - let queryRes = archive.findMessages(req) + let queryRes = waitFor archive.findMessages(req) ## Then check: @@ -233,7 +234,7 @@ procSuite "Waku Archive - find messages": let req = ArchiveQuery(contentTopics: queryTopics) ## When - let queryRes = archive.findMessages(req) + let queryRes = waitFor archive.findMessages(req) ## Then check: @@ -264,9 +265,9 @@ procSuite "Waku Archive - find messages": msg2 = fakeWakuMessage(contentTopic=contentTopic2) msg3 = fakeWakuMessage(contentTopic=contentTopic3) - archive.handleMessage(pubsubtopic1, msg1) - archive.handleMessage(pubsubtopic2, msg2) - archive.handleMessage(pubsubtopic2, msg3) + waitFor archive.handleMessage(pubsubtopic1, msg1) + waitFor archive.handleMessage(pubsubtopic2, msg2) + waitFor archive.handleMessage(pubsubtopic2, msg3) ## Given # This query targets: pubsubtopic1 AND (contentTopic1 OR contentTopic3) @@ -276,7 +277,7 @@ procSuite "Waku Archive - find messages": ) ## When - let queryRes = archive.findMessages(req) + let queryRes = waitFor archive.findMessages(req) ## Then check: @@ -302,15 +303,15 @@ procSuite "Waku Archive - find messages": msg2 = fakeWakuMessage() msg3 = fakeWakuMessage() - archive.handleMessage(pubsubtopic2, msg1) - archive.handleMessage(pubsubtopic2, msg2) - archive.handleMessage(pubsubtopic2, msg3) + waitFor archive.handleMessage(pubsubtopic2, msg1) + waitFor archive.handleMessage(pubsubtopic2, msg2) + waitFor archive.handleMessage(pubsubtopic2, msg3) ## Given let req = ArchiveQuery(pubsubTopic: some(pubsubTopic1)) ## When - let res = archive.findMessages(req) + let res = waitFor archive.findMessages(req) ## Then check: @@ -333,15 +334,15 @@ procSuite "Waku Archive - find messages": msg2 = fakeWakuMessage(payload="TEST-2") msg3 = fakeWakuMessage(payload="TEST-3") - archive.handleMessage(pubsubTopic, msg1) - archive.handleMessage(pubsubTopic, msg2) - archive.handleMessage(pubsubTopic, msg3) + waitFor archive.handleMessage(pubsubTopic, msg1) + waitFor archive.handleMessage(pubsubTopic, msg2) + waitFor archive.handleMessage(pubsubTopic, msg3) ## Given let req = ArchiveQuery(pubsubTopic: some(pubsubTopic)) ## When - let res = archive.findMessages(req) + let res = waitFor archive.findMessages(req) ## Then check: @@ -368,7 +369,7 @@ procSuite "Waku Archive - find messages": var cursors = newSeq[Option[ArchiveCursor]](3) for i in 0..<3: - let res = archiveA.findMessages(nextReq) + let res = waitFor archiveA.findMessages(nextReq) require res.isOk() # Keep query response content @@ -404,7 +405,7 @@ procSuite "Waku Archive - find messages": var cursors = newSeq[Option[ArchiveCursor]](3) for i in 0..<3: - let res = archiveA.findMessages(nextReq) + let res = waitFor archiveA.findMessages(nextReq) require res.isOk() # Keep query response content @@ -446,13 +447,13 @@ procSuite "Waku Archive - find messages": ] for msg in msgList: - require driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() ## Given let req = ArchiveQuery(contentTopics: @[DefaultContentTopic]) ## When - let res = archive.findMessages(req) + let res = waitFor archive.findMessages(req) ## Then check: @@ -475,7 +476,7 @@ procSuite "Waku Archive - find messages": ) ## When - let res = archiveA.findMessages(req) + let res = waitFor archiveA.findMessages(req) ## Then check res.isOk() @@ -495,7 +496,7 @@ procSuite "Waku Archive - find messages": ) ## When - let res = archiveA.findMessages(req) + let res = waitFor archiveA.findMessages(req) ## Then check res.isOk() @@ -514,7 +515,7 @@ procSuite "Waku Archive - find messages": ) ## When - let res = archiveA.findMessages(req) + let res = waitFor archiveA.findMessages(req) ## Then check res.isOk() diff --git a/tests/v2/waku_store/test_waku_store.nim b/tests/v2/waku_store/test_waku_store.nim index f9689c6d0a..b9f22cf1c4 100644 --- a/tests/v2/waku_store/test_waku_store.nim +++ b/tests/v2/waku_store/test_waku_store.nim @@ -14,8 +14,6 @@ import ../testlib/common, ../testlib/wakucore - - proc newTestWakuStore(switch: Switch, handler: HistoryQueryHandler): Future[WakuStore] {.async.} = let peerManager = PeerManager.new(switch) @@ -27,8 +25,7 @@ proc newTestWakuStore(switch: Switch, handler: HistoryQueryHandler): Future[Waku return proto proc newTestWakuStoreClient(switch: Switch): WakuStoreClient = - let - peerManager = PeerManager.new(switch) + let peerManager = PeerManager.new(switch) WakuStoreClient.new(peerManager, rng) @@ -48,7 +45,8 @@ suite "Waku Store - query handler": let msg = fakeWakuMessage(contentTopic=DefaultContentTopic) var queryHandlerFut = newFuture[(HistoryQuery)]() - let queryHandler = proc(req: HistoryQuery): HistoryResult = + + let queryHandler = proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.} = queryHandlerFut.complete(req) return ok(HistoryResponse(messages: @[msg])) @@ -90,7 +88,7 @@ suite "Waku Store - query handler": let serverPeerInfo = serverSwitch.peerInfo.toRemotePeerInfo() var queryHandlerFut = newFuture[(HistoryQuery)]() - let queryHandler = proc(req: HistoryQuery): HistoryResult = + let queryHandler = proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.} = queryHandlerFut.complete(req) return err(HistoryError(kind: HistoryErrorKind.BAD_REQUEST)) diff --git a/tests/v2/waku_store/test_wakunode_store.nim b/tests/v2/waku_store/test_wakunode_store.nim index 37c5fb9f9f..568caf7054 100644 --- a/tests/v2/waku_store/test_wakunode_store.nim +++ b/tests/v2/waku_store/test_wakunode_store.nim @@ -25,7 +25,6 @@ import ../testlib/wakucore, ../testlib/wakunode - proc newTestArchiveDriver(): ArchiveDriver = let database = SqliteDatabase.new(":memory:").tryGet() SqliteDriver.new(database).tryGet() @@ -55,15 +54,15 @@ procSuite "WakuNode - Store": ] let archiveA = block: - let driver = newTestArchiveDriver() + let driver = newTestArchiveDriver() - for msg in msgListA: - let msg_digest = waku_archive.computeDigest(msg) - require driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp).isOk() + for msg in msgListA: + let msg_digest = waku_archive.computeDigest(msg) + require (waitFor driver.put(DefaultPubsubTopic, msg, msg_digest, msg.timestamp)).isOk() - driver + driver - asyncTest "Store protocol returns expected messages": + test "Store protocol returns expected messages": ## Setup let serverKey = generateSecp256k1Key() @@ -71,10 +70,10 @@ procSuite "WakuNode - Store": clientKey = generateSecp256k1Key() client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) - await allFutures(client.start(), server.start()) + waitFor allFutures(client.start(), server.start()) server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy)) - await server.mountStore() + waitFor server.mountStore() client.mountStoreClient() @@ -83,7 +82,7 @@ procSuite "WakuNode - Store": let serverPeer = server.peerInfo.toRemotePeerInfo() ## When - let queryRes = await client.query(req, peer=serverPeer) + let queryRes = waitFor client.query(req, peer=serverPeer) ## Then check queryRes.isOk() @@ -93,9 +92,9 @@ procSuite "WakuNode - Store": response.messages == msgListA # Cleanup - await allFutures(client.stop(), server.stop()) + waitFor allFutures(client.stop(), server.stop()) - asyncTest "Store node history response - forward pagination": + test "Store node history response - forward pagination": ## Setup let serverKey = generateSecp256k1Key() @@ -103,10 +102,10 @@ procSuite "WakuNode - Store": clientKey = generateSecp256k1Key() client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) - await allFutures(client.start(), server.start()) + waitFor allFutures(client.start(), server.start()) server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy)) - await server.mountStore() + waitFor server.mountStore() client.mountStoreClient() @@ -121,7 +120,7 @@ procSuite "WakuNode - Store": var cursors = newSeq[Option[HistoryCursor]](2) for i in 0..<2: - let res = await client.query(nextReq, peer=serverPeer) + let res = waitFor client.query(nextReq, peer=serverPeer) require res.isOk() # Keep query response content @@ -142,9 +141,9 @@ procSuite "WakuNode - Store": pages[1] == msgListA[7..9] # Cleanup - await allFutures(client.stop(), server.stop()) + waitFor allFutures(client.stop(), server.stop()) - asyncTest "Store node history response - backward pagination": + test "Store node history response - backward pagination": ## Setup let serverKey = generateSecp256k1Key() @@ -152,10 +151,10 @@ procSuite "WakuNode - Store": clientKey = generateSecp256k1Key() client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) - await allFutures(client.start(), server.start()) + waitFor allFutures(client.start(), server.start()) server.mountArchive(some(archiveA), none(MessageValidator), none(RetentionPolicy)) - await server.mountStore() + waitFor server.mountStore() client.mountStoreClient() @@ -170,7 +169,7 @@ procSuite "WakuNode - Store": var cursors = newSeq[Option[HistoryCursor]](2) for i in 0..<2: - let res = await client.query(nextReq, peer=serverPeer) + let res = waitFor client.query(nextReq, peer=serverPeer) require res.isOk() # Keep query response content @@ -191,9 +190,9 @@ procSuite "WakuNode - Store": pages[1] == msgListA[0..2] # Cleanup - await allFutures(client.stop(), server.stop()) + waitFor allFutures(client.stop(), server.stop()) - asyncTest "Store protocol returns expected message when relay is disabled and filter enabled": + test "Store protocol returns expected message when relay is disabled and filter enabled": ## See nwaku issue #937: 'Store: ability to decouple store from relay' ## Setup let @@ -204,13 +203,13 @@ procSuite "WakuNode - Store": clientKey = generateSecp256k1Key() client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0)) - await allFutures(client.start(), server.start(), filterSource.start()) + waitFor allFutures(client.start(), server.start(), filterSource.start()) - await filterSource.mountFilter() + waitFor filterSource.mountFilter() let driver = newTestArchiveDriver() server.mountArchive(some(driver), none(MessageValidator), none(RetentionPolicy)) - await server.mountStore() - await server.mountFilterClient() + waitFor server.mountStore() + waitFor server.mountFilterClient() client.mountStoreClient() ## Given @@ -221,20 +220,20 @@ procSuite "WakuNode - Store": ## Then let filterFut = newFuture[(PubsubTopic, WakuMessage)]() - proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} = + proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = filterFut.complete((pubsubTopic, msg)) - await server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer) + waitFor server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer) - await sleepAsync(100.millis) + waitFor sleepAsync(100.millis) # Send filter push message to server from source node - await filterSource.wakuFilterLegacy.handleMessage(DefaultPubsubTopic, message) + waitFor filterSource.wakuFilterLegacy.handleMessage(DefaultPubsubTopic, message) # Wait for the server filter to receive the push message - require await filterFut.withTimeout(5.seconds) + require waitFor filterFut.withTimeout(5.seconds) - let res = await client.query(HistoryQuery(contentTopics: @[DefaultContentTopic]), peer=serverPeer) + let res = waitFor client.query(HistoryQuery(contentTopics: @[DefaultContentTopic]), peer=serverPeer) ## Then check res.isOk() @@ -250,4 +249,4 @@ procSuite "WakuNode - Store": handledMsg == message ## Cleanup - await allFutures(client.stop(), server.stop(), filterSource.stop()) + waitFor allFutures(client.stop(), server.stop(), filterSource.stop()) diff --git a/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim b/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim index 22b5593747..50fbd7bba0 100644 --- a/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim +++ b/tests/v2/wakunode_jsonrpc/test_jsonrpc_store.nim @@ -22,7 +22,7 @@ import ../../v2/testlib/wakunode -proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] = +proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] = let digest = waku_archive.computeDigest(message) receivedTime = if message.timestamp > 0: message.timestamp @@ -83,7 +83,7 @@ procSuite "Waku v2 JSON-RPC API - Store": ] for msg in msgList: - require driver.put(DefaultPubsubTopic, msg).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk() let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort, false) @@ -133,7 +133,7 @@ procSuite "Waku v2 JSON-RPC API - Store": fakeWakuMessage(@[byte 9], ts=9) ] for msg in msgList: - require driver.put(DefaultPubsubTopic, msg).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk() let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort, false) diff --git a/tests/v2/wakunode_rest/test_rest_store.nim b/tests/v2/wakunode_rest/test_rest_store.nim index 37c3bb600b..d5a9037098 100644 --- a/tests/v2/wakunode_rest/test_rest_store.nim +++ b/tests/v2/wakunode_rest/test_rest_store.nim @@ -28,7 +28,7 @@ import logScope: topics = "waku node rest store_api test" -proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Result[void, string] = +proc put(store: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage): Future[Result[void, string]] = let digest = waku_archive.computeDigest(message) receivedTime = if message.timestamp > 0: message.timestamp @@ -107,7 +107,7 @@ procSuite "Waku v2 Rest API - Store": fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("c2"), ts=9) ] for msg in msgList: - require driver.put(DefaultPubsubTopic, msg).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk() let client = newRestHttpClient(initTAddress(restAddress, restPort)) @@ -178,7 +178,7 @@ procSuite "Waku v2 Rest API - Store": fakeWakuMessage(@[byte 09], ts=ts(90, timeOrigin)) ] for msg in msgList: - require driver.put(DefaultPubsubTopic, msg).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk() let client = newRestHttpClient(initTAddress(restAddress, restPort)) @@ -266,7 +266,7 @@ procSuite "Waku v2 Rest API - Store": fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("2"), ts=9) ] for msg in msgList: - require driver.put(DefaultPubsubTopic, msg).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk() let client = newRestHttpClient(initTAddress(restAddress, restPort)) @@ -338,7 +338,7 @@ procSuite "Waku v2 Rest API - Store": fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9) ] for msg in msgList: - require driver.put(DefaultPubsubTopic, msg).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk() let client = newRestHttpClient(initTAddress(restAddress, restPort)) @@ -427,7 +427,7 @@ procSuite "Waku v2 Rest API - Store": fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9) ] for msg in msgList: - require driver.put(DefaultPubsubTopic, msg).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk() let client = newRestHttpClient(initTAddress(restAddress, restPort)) @@ -482,7 +482,7 @@ procSuite "Waku v2 Rest API - Store": fakeWakuMessage(@[byte 9], contentTopic=ContentTopic("ct2"), ts=9) ] for msg in msgList: - require driver.put(DefaultPubsubTopic, msg).isOk() + require (waitFor driver.put(DefaultPubsubTopic, msg)).isOk() let client = newRestHttpClient(initTAddress(restAddress, restPort)) diff --git a/waku/v2/node/jsonrpc/filter/handlers.nim b/waku/v2/node/jsonrpc/filter/handlers.nim index 457c2e74ad..900fd85fa6 100644 --- a/waku/v2/node/jsonrpc/filter/handlers.nim +++ b/waku/v2/node/jsonrpc/filter/handlers.nim @@ -44,7 +44,7 @@ proc installFilterApiHandlers*(node: WakuNode, server: RpcServer, cache: Message pubsubTopic: PubsubTopic = topic.get(DefaultPubsubTopic) contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) - let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} = + let handler: FilterPushHandler = proc(pubsubTopic: PubsubTopic, msg: WakuMessage) {.async, gcsafe, closure.} = cache.addMessage(msg.contentTopic, msg) let subFut = node.filterSubscribe(pubsubTopic, contentTopics, handler, peerOpt.get()) diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index b30b39dcd0..7f8a60c5b9 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -264,7 +264,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = if node.wakuArchive.isNil(): return - node.wakuArchive.handleMessage(topic, msg) + await node.wakuArchive.handleMessage(topic, msg) let defaultHandler = proc(topic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} = @@ -455,11 +455,11 @@ proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: C # Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled # TODO: Move this logic to wakunode2 app - let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.raises: [Exception].} = + let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.async, gcsafe, closure.} = if node.wakuRelay.isNil() and not node.wakuStore.isNil(): - node.wakuArchive.handleMessage(pubSubTopic, message) + await node.wakuArchive.handleMessage(pubSubTopic, message) - handler(pubsubTopic, message) + await handler(pubsubTopic, message) let subRes = await node.wakuFilterClientLegacy.subscribe(pubsubTopic, contentTopics, handlerWrapper, peer=remotePeer) if subRes.isOk(): @@ -521,7 +521,6 @@ proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: Conte await node.filterUnsubscribe(pubsubTopic, contentTopics, peer=peerOpt.get()) - ## Waku archive proc mountArchive*(node: WakuNode, @@ -544,8 +543,11 @@ proc executeMessageRetentionPolicy*(node: WakuNode) = debug "executing message retention policy" - node.wakuArchive.executeMessageRetentionPolicy() - node.wakuArchive.reportStoredMessagesMetric() + try: + waitFor node.wakuArchive.executeMessageRetentionPolicy() + waitFor node.wakuArchive.reportStoredMessagesMetric() + except CatchableError: + debug "Error executing retention policy " & getCurrentExceptionMsg() proc startMessageRetentionPolicyPeriodicTask*(node: WakuNode, interval: Duration) = if node.wakuArchive.isNil(): @@ -602,14 +604,13 @@ proc mountStore*(node: WakuNode) {.async, raises: [Defect, LPError].} = return # TODO: Review this handler logic. Maybe, move it to the appplication code - let queryHandler: HistoryQueryHandler = proc(request: HistoryQuery): HistoryResult = + let queryHandler: HistoryQueryHandler = proc(request: HistoryQuery): Future[HistoryResult] {.async.} = let request = request.toArchiveQuery() - let response = node.wakuArchive.findMessages(request) - response.toHistoryResult() + let response = await node.wakuArchive.findMessages(request) + return response.toHistoryResult() node.wakuStore = WakuStore.new(node.peerManager, node.rng, queryHandler) - if node.started: # Node has started already. Let's start store too. await node.wakuStore.start() diff --git a/waku/v2/waku_archive/archive.nim b/waku/v2/waku_archive/archive.nim index 67bc99d20c..bedd738034 100644 --- a/waku/v2/waku_archive/archive.nim +++ b/waku/v2/waku_archive/archive.nim @@ -80,9 +80,9 @@ proc new*(T: type WakuArchive, retentionPolicy: retentionPolicy.get(nil) ) - - -proc handleMessage*(w: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage) = +proc handleMessage*(w: WakuArchive, + pubsubTopic: PubsubTopic, + msg: WakuMessage) {.async.} = if msg.ephemeral: # Ephemeral message, do not store return @@ -93,7 +93,6 @@ proc handleMessage*(w: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage) waku_archive_errors.inc(labelValues = [validationRes.error]) return - let insertStartTime = getTime().toUnixFloat() block: @@ -104,7 +103,7 @@ proc handleMessage*(w: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage) trace "handling message", pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp, digest=msgDigest - let putRes = w.driver.put(pubsubTopic, msg, msgDigest, msgReceivedTime) + let putRes = await w.driver.put(pubsubTopic, msg, msgDigest, msgReceivedTime) if putRes.isErr(): error "failed to insert message", err=putRes.error waku_archive_errors.inc(labelValues = [insertFailure]) @@ -113,7 +112,7 @@ proc handleMessage*(w: WakuArchive, pubsubTopic: PubsubTopic, msg: WakuMessage) waku_archive_insert_duration_seconds.observe(insertDuration) -proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe.} = +proc findMessages*(w: WakuArchive, query: ArchiveQuery): Future[ArchiveResult] {.async, gcsafe.} = ## Search the archive to return a single page of messages matching the query criteria let qContentTopics = query.contentTopics @@ -128,10 +127,9 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe. if qContentTopics.len > 10: return err(ArchiveError.invalidQuery("too many content topics")) - let queryStartTime = getTime().toUnixFloat() - let queryRes = w.driver.getMessages( + let queryRes = await w.driver.getMessages( contentTopic = qContentTopics, pubsubTopic = qPubSubTopic, cursor = qCursor, @@ -144,16 +142,13 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe. let queryDuration = getTime().toUnixFloat() - queryStartTime waku_archive_query_duration_seconds.observe(queryDuration) - # Build response if queryRes.isErr(): return err(ArchiveError(kind: ArchiveErrorKind.DRIVER_ERROR, cause: queryRes.error)) let rows = queryRes.get() - var messages = newSeq[WakuMessage]() var cursor = none(ArchiveCursor) - if rows.len == 0: return ok(ArchiveResponse(messages: messages, cursor: cursor)) @@ -187,28 +182,27 @@ proc findMessages*(w: WakuArchive, query: ArchiveQuery): ArchiveResult {.gcsafe. if not qAscendingOrder: reverse(messages) - ok(ArchiveResponse(messages: messages, cursor: cursor)) - + return ok(ArchiveResponse(messages: messages, cursor: cursor)) # Retention policy -proc executeMessageRetentionPolicy*(w: WakuArchive) = +proc executeMessageRetentionPolicy*(w: WakuArchive) {.async.} = if w.retentionPolicy.isNil(): return if w.driver.isNil(): return - let retPolicyRes = w.retentionPolicy.execute(w.driver) + let retPolicyRes = await w.retentionPolicy.execute(w.driver) if retPolicyRes.isErr(): waku_archive_errors.inc(labelValues = [retPolicyFailure]) error "failed execution of retention policy", error=retPolicyRes.error -proc reportStoredMessagesMetric*(w: WakuArchive) = +proc reportStoredMessagesMetric*(w: WakuArchive) {.async.} = if w.driver.isNil(): return - let resCount = w.driver.getMessagesCount() + let resCount = await w.driver.getMessagesCount() if resCount.isErr(): error "failed to get messages count", error=resCount.error return diff --git a/waku/v2/waku_archive/driver.nim b/waku/v2/waku_archive/driver.nim index 4642dae9c0..cd4b21ba8d 100644 --- a/waku/v2/waku_archive/driver.nim +++ b/waku/v2/waku_archive/driver.nim @@ -5,53 +5,58 @@ else: import std/options, - stew/results + stew/results, + chronos import ../waku_core, ./common - const DefaultPageSize*: uint = 25 - type ArchiveDriverResult*[T] = Result[T, string] - ArchiveDriver* = ref object of RootObj type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp) - # ArchiveDriver interface -method put*(driver: ArchiveDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): ArchiveDriverResult[void] {.base.} = discard - - -method getAllMessages*(driver: ArchiveDriver): ArchiveDriverResult[seq[ArchiveRow]] {.base.} = discard - -method getMessages*( - driver: ArchiveDriver, - contentTopic: seq[ContentTopic] = @[], - pubsubTopic = none(PubsubTopic), - cursor = none(ArchiveCursor), - startTime = none(Timestamp), - endTime = none(Timestamp), - maxPageSize = DefaultPageSize, - ascendingOrder = true -): ArchiveDriverResult[seq[ArchiveRow]] {.base.} = discard - - -method getMessagesCount*(driver: ArchiveDriver): ArchiveDriverResult[int64] {.base.} = discard - -method getOldestMessageTimestamp*(driver: ArchiveDriver): ArchiveDriverResult[Timestamp] {.base.} = discard - -method getNewestMessageTimestamp*(driver: ArchiveDriver): ArchiveDriverResult[Timestamp] {.base.} = discard - - -method deleteMessagesOlderThanTimestamp*(driver: ArchiveDriver, ts: Timestamp): ArchiveDriverResult[void] {.base.} = discard - -method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver, limit: int): ArchiveDriverResult[void] {.base.} = discard - - -method close*(driver: ArchiveDriver): ArchiveDriverResult[void] {.base.} = - ok() +method put*(driver: ArchiveDriver, + pubsubTopic: PubsubTopic, + message: WakuMessage, + digest: MessageDigest, + receivedTime: Timestamp): + Future[ArchiveDriverResult[void]] {.base, async.} = discard + +method getAllMessages*(driver: ArchiveDriver): + Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard + +method getMessages*(driver: ArchiveDriver, + contentTopic: seq[ContentTopic] = @[], + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true): + Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard + +method getMessagesCount*(driver: ArchiveDriver): + Future[ArchiveDriverResult[int64]] {.base, async.} = discard + +method getOldestMessageTimestamp*(driver: ArchiveDriver): + Future[ArchiveDriverResult[Timestamp]] {.base, async.} = discard + +method getNewestMessageTimestamp*(driver: ArchiveDriver): + Future[ArchiveDriverResult[Timestamp]] {.base, async.} = discard + +method deleteMessagesOlderThanTimestamp*(driver: ArchiveDriver, + ts: Timestamp): + Future[ArchiveDriverResult[void]] {.base, async.} = discard + +method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver, + limit: int): + Future[ArchiveDriverResult[void]] {.base, async.} = discard + +method close*(driver: ArchiveDriver): + Future[ArchiveDriverResult[void]] {.base, async.} = discard diff --git a/waku/v2/waku_archive/driver/queue_driver/queue_driver.nim b/waku/v2/waku_archive/driver/queue_driver/queue_driver.nim index ddcd9be9fb..9a8b54f852 100644 --- a/waku/v2/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/v2/waku_archive/driver/queue_driver/queue_driver.nim @@ -7,21 +7,19 @@ import std/options, stew/results, stew/sorted_set, - chronicles + chronicles, + chronos import ../../../waku_core, ../../common, ../../driver, ./index - logScope: topics = "waku archive queue_store" - const QueueDriverDefaultMaxCapacity* = 25_000 - type IndexedWakuMessage = object # TODO: may need to rename this object as it holds both the index and the pubsub topic of a waku message @@ -43,7 +41,6 @@ proc `$`(error: QueueDriverErrorKind): string = of INVALID_CURSOR: "invalid_cursor" - type QueueDriver* = ref object of ArchiveDriver ## Bounded repository for indexed messages ## @@ -59,7 +56,6 @@ type QueueDriver* = ref object of ArchiveDriver items: SortedSet[Index, IndexedWakuMessage] # sorted set of stored messages capacity: int # Maximum amount of messages to keep - ### Helpers proc walkToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], @@ -82,14 +78,12 @@ proc walkToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], return nextItem - #### API proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T = var items = SortedSet[Index, IndexedWakuMessage].init() return QueueDriver(items: items, capacity: capacity) - proc contains*(driver: QueueDriver, index: Index): bool = ## Return `true` if the store queue already contains the `index`, `false` otherwise. driver.items.eq(index).isOk() @@ -202,7 +196,6 @@ proc last*(driver: QueueDriver): ArchiveDriverResult[IndexedWakuMessage] = return ok(res.value.data) - ## --- Queue API --- proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[void] = @@ -231,27 +224,30 @@ proc add*(driver: QueueDriver, msg: IndexedWakuMessage): ArchiveDriverResult[voi return ok() - -method put*(driver: QueueDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): ArchiveDriverResult[void] = +method put*(driver: QueueDriver, + pubsubTopic: PubsubTopic, + message: WakuMessage, + digest: MessageDigest, + receivedTime: Timestamp): + Future[ArchiveDriverResult[void]] {.async.} = let index = Index(pubsubTopic: pubsubTopic, senderTime: message.timestamp, receiverTime: receivedTime, digest: digest) let message = IndexedWakuMessage(msg: message, index: index, pubsubTopic: pubsubTopic) - driver.add(message) - + return driver.add(message) -method getAllMessages*(driver: QueueDriver): ArchiveDriverResult[seq[ArchiveRow]] = +method getAllMessages*(driver: QueueDriver): + Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = # TODO: Implement this message_store method - err("interface method not implemented") - -method getMessages*( - driver: QueueDriver, - contentTopic: seq[ContentTopic] = @[], - pubsubTopic = none(PubsubTopic), - cursor = none(ArchiveCursor), - startTime = none(Timestamp), - endTime = none(Timestamp), - maxPageSize = DefaultPageSize, - ascendingOrder = true -): ArchiveDriverResult[seq[ArchiveRow]] = + return err("interface method not implemented") + +method getMessages*(driver: QueueDriver, + contentTopic: seq[ContentTopic] = @[], + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true): + Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.}= let cursor = cursor.map(toIndex) let matchesQuery: QueryFilterMatcher = func(row: IndexedWakuMessage): bool = @@ -272,29 +268,38 @@ method getMessages*( var pageRes: QueueDriverGetPageResult try: pageRes = driver.getPage(maxPageSize, ascendingOrder, cursor, matchesQuery) - except: # TODO: Fix "BareExcept" warning + except CatchableError, Exception: return err(getCurrentExceptionMsg()) if pageRes.isErr(): return err($pageRes.error) - ok(pageRes.value) + return ok(pageRes.value) +method getMessagesCount*(driver: QueueDriver): + Future[ArchiveDriverResult[int64]] {.async} = + return ok(int64(driver.len())) -method getMessagesCount*(driver: QueueDriver): ArchiveDriverResult[int64] = - ok(int64(driver.len())) +method getOldestMessageTimestamp*(driver: QueueDriver): + Future[ArchiveDriverResult[Timestamp]] {.async.} = + return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) -method getOldestMessageTimestamp*(driver: QueueDriver): ArchiveDriverResult[Timestamp] = - driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) +method getNewestMessageTimestamp*(driver: QueueDriver): + Future[ArchiveDriverResult[Timestamp]] {.async.} = + return driver.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) -method getNewestMessageTimestamp*(driver: QueueDriver): ArchiveDriverResult[Timestamp] = - driver.last().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) - - -method deleteMessagesOlderThanTimestamp*(driver: QueueDriver, ts: Timestamp): ArchiveDriverResult[void] = +method deleteMessagesOlderThanTimestamp*(driver: QueueDriver, + ts: Timestamp): + Future[ArchiveDriverResult[void]] {.async.} = # TODO: Implement this message_store method - err("interface method not implemented") + return err("interface method not implemented") -method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver, limit: int): ArchiveDriverResult[void] = +method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver, + limit: int): + Future[ArchiveDriverResult[void]] {.async.} = # TODO: Implement this message_store method - err("interface method not implemented") + return err("interface method not implemented") + +method close*(driver: QueueDriver): + Future[ArchiveDriverResult[void]] {.async.} = + return ok() \ No newline at end of file diff --git a/waku/v2/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/v2/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 4c4b6b0749..aaa208be9b 100644 --- a/waku/v2/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/v2/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -8,7 +8,8 @@ else: import std/options, stew/[byteutils, results], - chronicles + chronicles, + chronos import ../../../../common/sqlite, ../../../waku_core, @@ -20,8 +21,6 @@ import logScope: topics = "waku archive sqlite" - - proc init(db: SqliteDatabase): ArchiveDriverResult[void] = ## Misconfiguration can lead to nil DB if db.isNil(): @@ -43,7 +42,6 @@ proc init(db: SqliteDatabase): ArchiveDriverResult[void] = ok() - type SqliteDriver* = ref object of ArchiveDriver db: SqliteDatabase insertStmt: SqliteStmt[InsertMessageParams, void] @@ -59,21 +57,13 @@ proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] = let insertStmt = db.prepareInsertMessageStmt() ok(SqliteDriver(db: db, insertStmt: insertStmt)) -method close*(s: SqliteDriver): ArchiveDriverResult[void] = - ## Close the database connection - - # Dispose statements - s.insertStmt.dispose() - - # Close connection - s.db.close() - - ok() - - -method put*(s: SqliteDriver, pubsubTopic: PubsubTopic, message: WakuMessage, digest: MessageDigest, receivedTime: Timestamp): ArchiveDriverResult[void] = +method put*(s: SqliteDriver, + pubsubTopic: PubsubTopic, + message: WakuMessage, + digest: MessageDigest, + receivedTime: Timestamp): + Future[ArchiveDriverResult[void]] {.async.} = ## Inserts a message into the store - let res = s.insertStmt.exec(( @(digest.data), # id receivedTime, # storedAt @@ -83,30 +73,27 @@ method put*(s: SqliteDriver, pubsubTopic: PubsubTopic, message: WakuMessage, dig int64(message.version), # version message.timestamp # senderTimestamp )) - if res.isErr(): - return err("message insert failed: " & res.error) - - ok() + return res -method getAllMessages*(s: SqliteDriver): ArchiveDriverResult[seq[ArchiveRow]] = +method getAllMessages*(s: SqliteDriver): + Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = ## Retrieve all messages from the store. - s.db.selectAllMessages() - - -method getMessages*( - s: SqliteDriver, - contentTopic: seq[ContentTopic] = @[], - pubsubTopic = none(PubsubTopic), - cursor = none(ArchiveCursor), - startTime = none(Timestamp), - endTime = none(Timestamp), - maxPageSize = DefaultPageSize, - ascendingOrder = true -): ArchiveDriverResult[seq[ArchiveRow]] = + return s.db.selectAllMessages() + +method getMessages*(s: SqliteDriver, + contentTopic: seq[ContentTopic] = @[], + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true): + Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + let cursor = cursor.map(toDbCursor) - let rows = ?s.db.selectMessagesByHistoryQueryWithLimit( + let rowsRes = s.db.selectMessagesByHistoryQueryWithLimit( contentTopic, pubsubTopic, cursor, @@ -116,21 +103,35 @@ method getMessages*( ascending=ascendingOrder ) - ok(rows) + return rowsRes +method getMessagesCount*(s: SqliteDriver): + Future[ArchiveDriverResult[int64]] {.async.} = + return s.db.getMessageCount() -method getMessagesCount*(s: SqliteDriver): ArchiveDriverResult[int64] = - s.db.getMessageCount() +method getOldestMessageTimestamp*(s: SqliteDriver): + Future[ArchiveDriverResult[Timestamp]] {.async.} = + return s.db.selectOldestReceiverTimestamp() -method getOldestMessageTimestamp*(s: SqliteDriver): ArchiveDriverResult[Timestamp] = - s.db.selectOldestReceiverTimestamp() +method getNewestMessageTimestamp*(s: SqliteDriver): + Future[ArchiveDriverResult[Timestamp]] {.async.} = + return s.db.selectnewestReceiverTimestamp() -method getNewestMessageTimestamp*(s: SqliteDriver): ArchiveDriverResult[Timestamp] = - s.db.selectnewestReceiverTimestamp() +method deleteMessagesOlderThanTimestamp*(s: SqliteDriver, + ts: Timestamp): + Future[ArchiveDriverResult[void]] {.async.} = + return s.db.deleteMessagesOlderThanTimestamp(ts) +method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver, + limit: int): + Future[ArchiveDriverResult[void]] {.async.} = + return s.db.deleteOldestMessagesNotWithinLimit(limit) -method deleteMessagesOlderThanTimestamp*(s: SqliteDriver, ts: Timestamp): ArchiveDriverResult[void] = - s.db.deleteMessagesOlderThanTimestamp(ts) - -method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver, limit: int): ArchiveDriverResult[void] = - s.db.deleteOldestMessagesNotWithinLimit(limit) +method close*(s: SqliteDriver): + Future[ArchiveDriverResult[void]] {.async.} = + ## Close the database connection + # Dispose statements + s.insertStmt.dispose() + # Close connection + s.db.close() + return ok() diff --git a/waku/v2/waku_archive/retention_policy.nim b/waku/v2/waku_archive/retention_policy.nim index defd89f1d4..8cf63eccab 100644 --- a/waku/v2/waku_archive/retention_policy.nim +++ b/waku/v2/waku_archive/retention_policy.nim @@ -4,7 +4,8 @@ else: {.push raises: [].} import - stew/results + stew/results, + chronos import ./driver @@ -12,5 +13,5 @@ type RetentionPolicyResult*[T] = Result[T, string] type RetentionPolicy* = ref object of RootObj - -method execute*(p: RetentionPolicy, store: ArchiveDriver): RetentionPolicyResult[void] {.base.} = discard \ No newline at end of file +method execute*(p: RetentionPolicy, store: ArchiveDriver): + Future[RetentionPolicyResult[void]] {.base, async.} = discard \ No newline at end of file diff --git a/waku/v2/waku_archive/retention_policy/retention_policy_capacity.nim b/waku/v2/waku_archive/retention_policy/retention_policy_capacity.nim index e55266f212..72b243301e 100644 --- a/waku/v2/waku_archive/retention_policy/retention_policy_capacity.nim +++ b/waku/v2/waku_archive/retention_policy/retention_policy_capacity.nim @@ -5,7 +5,8 @@ else: import stew/results, - chronicles + chronicles, + chronos import ../driver, ../retention_policy @@ -13,7 +14,6 @@ import logScope: topics = "waku archive retention_policy" - const DefaultCapacity*: int = 25_000 const MaxOverflow = 1.3 @@ -38,7 +38,6 @@ type totalCapacity: int # = capacity * MaxOverflow deleteWindow: int # = capacity * (MaxOverflow - 1) / 2; half of the overflow window, the amount of messages deleted when overflow occurs - proc calculateTotalCapacity(capacity: int, overflow: float): int = int(float(capacity) * overflow) @@ -48,7 +47,6 @@ proc calculateOverflowWindow(capacity: int, overflow: float): int = proc calculateDeleteWindow(capacity: int, overflow: float): int = calculateOverflowWindow(capacity, overflow) div 2 - proc init*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T = let totalCapacity = calculateTotalCapacity(capacity, MaxOverflow) @@ -60,14 +58,21 @@ proc init*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T = deleteWindow: deleteWindow ) -method execute*(p: CapacityRetentionPolicy, driver: ArchiveDriver): RetentionPolicyResult[void] = - let numMessages = ?driver.getMessagesCount().mapErr(proc(err: string): string = "failed to get messages count: " & err) +method execute*(p: CapacityRetentionPolicy, + driver: ArchiveDriver): + Future[RetentionPolicyResult[void]] {.async.} = + + let numMessagesRes = await driver.getMessagesCount() + if numMessagesRes.isErr(): + return err("failed to get messages count: " & numMessagesRes.error) + + let numMessages = numMessagesRes.value if numMessages < p.totalCapacity: return ok() - let res = driver.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow) + let res = await driver.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow) if res.isErr(): - return err("deleting oldest messages failed: " & res.error()) + return err("deleting oldest messages failed: " & res.error) - ok() + return ok() diff --git a/waku/v2/waku_archive/retention_policy/retention_policy_time.nim b/waku/v2/waku_archive/retention_policy/retention_policy_time.nim index 69588d7173..27622f2e4d 100644 --- a/waku/v2/waku_archive/retention_policy/retention_policy_time.nim +++ b/waku/v2/waku_archive/retention_policy/retention_policy_time.nim @@ -29,21 +29,24 @@ proc init*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T = retentionTime: retentionTime.seconds ) - -method execute*(p: TimeRetentionPolicy, driver: ArchiveDriver): RetentionPolicyResult[void] = +method execute*(p: TimeRetentionPolicy, + driver: ArchiveDriver): + Future[RetentionPolicyResult[void]] {.async.} = ## Delete messages that exceed the retention time by 10% and more (batch delete for efficiency) - let oldestReceiverTimestamp = ?driver.getOldestMessageTimestamp().mapErr(proc(err: string): string = "failed to get oldest message timestamp: " & err) + let omtRes = await driver.getOldestMessageTimestamp() + if omtRes.isErr(): + return err("failed to get oldest message timestamp: " & omtRes.error) let now = getNanosecondTime(getTime().toUnixFloat()) let retentionTimestamp = now - p.retentionTime.nanoseconds let thresholdTimestamp = retentionTimestamp - p.retentionTime.nanoseconds div 10 - if thresholdTimestamp <= oldestReceiverTimestamp: + if thresholdTimestamp <= omtRes.value: return ok() - let res = driver.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp) + let res = await driver.deleteMessagesOlderThanTimestamp(ts=retentionTimestamp) if res.isErr(): - return err("failed to delete oldest messages: " & res.error()) + return err("failed to delete oldest messages: " & res.error) - ok() + return ok() diff --git a/waku/v2/waku_filter/client.nim b/waku/v2/waku_filter/client.nim index e2f431d9a5..cad3b7ada8 100644 --- a/waku/v2/waku_filter/client.nim +++ b/waku/v2/waku_filter/client.nim @@ -30,7 +30,7 @@ const Defaultstring = "/waku/2/default-waku/proto" ### Client, filter subscripton manager -type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} +type FilterPushHandler* = proc(pubsubTopic: PubsubTopic, message: WakuMessage) {.async, gcsafe, closure.} ## Subscription manager @@ -59,7 +59,7 @@ proc notifySubscriptionHandler(m: SubscriptionManager, pubsubTopic: PubsubTopic, try: let handler = m.subscriptions[(pubsubTopic, contentTopic)] - handler(pubsubTopic, message) + asyncSpawn handler(pubsubTopic, message) except: # TODO: Fix "BareExcept" warning discard diff --git a/waku/v2/waku_store/protocol.nim b/waku/v2/waku_store/protocol.nim index cf2ed9f7e8..692ec962bd 100644 --- a/waku/v2/waku_store/protocol.nim +++ b/waku/v2/waku_store/protocol.nim @@ -34,7 +34,7 @@ const MaxMessageTimestampVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" -type HistoryQueryHandler* = proc(req: HistoryQuery): HistoryResult {.gcsafe.} +type HistoryQueryHandler* = proc(req: HistoryQuery): Future[HistoryResult] {.async, gcsafe.} type WakuStore* = ref object of LPProtocol @@ -74,7 +74,7 @@ proc initProtocolHandler(ws: WakuStore) = var responseRes: HistoryResult try: - responseRes = ws.queryHandler(request) + responseRes = await ws.queryHandler(request) except Exception: error "history query failed", peerId= $conn.peerId, requestId=requestId, error=getCurrentExceptionMsg()