From 21cac6d491a6d995a7a8ba84c85fecc7817b3d8b Mon Sep 17 00:00:00 2001 From: G <28568419+s1fr0@users.noreply.github.com> Date: Thu, 17 Feb 2022 16:00:15 +0100 Subject: [PATCH] Refactoring timestamps (#842) * Refactor timestamps type from float64 to int64 (milliseconds resolution) * Revert epochs to float64 * Update 00002_addSenderTimeStamp.up.sql * Update quicksim2.nim * Add files via upload * Delete 00003_convertTimestampsToInts.up.sql * Add files via upload * Rename 00003_convertTimestampsToInts.up.sql to 00003_addTimestampsToInts.up.sql * Delete 00003_addTimestampsToInts.up.sql * Rln-relay integration into chat2 (#835) * adds ProofMetadata * adds EPOCH_INTERVAL * adds messageLog field * adds updateLog, toEpoch, fromEpoch, getEpoch, compareTo * adds unit test for toEpoch and fromEpoch * adds unit test for Epoch comparison * adds result codes for updateLog * adds unit test for update log * renames epoch related consts * modifies updateLog with new return type and new logic of spam detection * adds unit text for the modified updateLog * changes max epoch gap type size * splits updateLog into two procs isSpam and updateLog * updates unittests * fixes a bug, returns false when the message is not spam * renames messageLog to nullifierLog * renames isSpam to hasDuplicate * updates the rln validator, adds comments * adds appendRLNProof proc plus some code beatification * unit test for validate message * adds unhappy test to validateMessage unit test * renames EPOCH_UNIT_SECONDS * renames MAX_CLOCK_GAP_SECONDS * WIP: integration test * fixes compile errors * sets a real epoch value * updates on old unittests * adds comments to the rln relay tests * adds more comments * makes rln import conditional * adds todos * adds more todos * adds rln-relay mount process into chat2 * further todos * logs contentTopic * introduces rln relay configs * changes default pubsub topic * adds contentTopic config * imports rln relay dependencies * consolidates imports * removes module identifier from ContentTopic * adds contentTopic field * adds contentTopic argument to mountRlnRelay calls * appends rln proof to chat2 messages * changes the default chat2 contentTopic * adds missing content topic fields * fixes a bug * adds a new logic about empty content topics * appends proof only when rln flag is active * removes unnecessary todos * fixes an indentation issue * adds log messages * verifies the proof against the concatenation of msg payload and content topic * a bug fix * removes duplicate epoch time calculation * updates log level to trace * updates default rln-relay content topic * adds support for empty content topics * updates changelog * changelog updates * removes a commented code block * updates addRLNRelayValidator string doc * Squashed commit of the following: commit bc36c99ab202d07baa0a5f0100bd10d1d76fdfa1 Merge: dc2b2946 5a77d6e2 Author: G <28568419+s1fr0@users.noreply.github.com> Date: Sat Feb 5 01:10:06 2022 +0100 Merge branch 'master' into int64-timestamps-ns commit dc2b294667bb5770cc32b93cc560638cf5ce7087 Author: s1fr0 <28568419+s1fr0@users.noreply.github.com> Date: Sat Feb 5 00:24:45 2022 +0100 Fix commit f97b95a036a197938df38a5adaea46fca778016d Author: s1fr0 <28568419+s1fr0@users.noreply.github.com> Date: Sat Feb 5 00:13:18 2022 +0100 Missing import commit 060c4f8d64e1b6e7c0593540fa8fa7f4cadf6df7 Author: s1fr0 <28568419+s1fr0@users.noreply.github.com> Date: Sat Feb 5 00:10:36 2022 +0100 Fixed typo commit 08ca99b6f692d3df6d4c7c2312c7cada05fc0041 Author: s1fr0 <28568419+s1fr0@users.noreply.github.com> Date: Fri Feb 4 23:59:20 2022 +0100 Time util file commit 2b5c360746990936dec256e90d08dae3c3e35a94 Author: s1fr0 <28568419+s1fr0@users.noreply.github.com> Date: Fri Feb 4 23:33:20 2022 +0100 Moved time utility functions to utils/time commit fdaf121f089aa011855303cc8dd1ce52aec506ad Author: s1fr0 <28568419+s1fr0@users.noreply.github.com> Date: Fri Feb 4 23:10:25 2022 +0100 Fix comment commit c7e06ab4e7618d9a3fe8aa744dd48bf3f7d8754c Author: s1fr0 <28568419+s1fr0@users.noreply.github.com> Date: Fri Feb 4 23:04:13 2022 +0100 Restore previous migration script commit 80282db1d79df676255d4b8e6e09d9f8a2b00fd3 Author: s1fr0 <28568419+s1fr0@users.noreply.github.com> Date: Fri Feb 4 22:54:15 2022 +0100 Typo commit b9d67f89b0eea11a8362dbb10b5f9d6894343352 Author: s1fr0 <28568419+s1fr0@users.noreply.github.com> Date: Fri Feb 4 22:49:29 2022 +0100 Added utilities to get int64 nanosecond, microsecond, millisecond time resolution from float commit 0130d496e694a01cfc9eeb90b7cbc77764490bf9 Author: s1fr0 <28568419+s1fr0@users.noreply.github.com> Date: Fri Feb 4 22:36:35 2022 +0100 Switched to nanoseconds support. * Update CHANGELOG.md * Create 00003_convertTimestampsToInt64.up.sql Migration script * Moved migration script to right location * Update waku_rln_relay_utils.nim * Update waku_rln_relay_utils.nim * Addressed reviewers' comments * Update default fleet metrics dashboard (#844) * Fix * No need for float * Aligning master to changes in PR * Further fixes Co-authored-by: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com> Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> --- CHANGELOG.md | 4 +- tests/v2/test_jsonrpc_waku.nim | 9 +-- tests/v2/test_message_store.nim | 21 ++++--- tests/v2/test_pagination_utils.nim | 31 +++++----- tests/v2/test_waku_pagination.nim | 9 +-- tests/v2/test_waku_store.nim | 61 ++++++++++--------- tests/v2/test_waku_store_queue.nim | 47 +++++++------- tests/v2/test_wakunode.nim | 3 +- waku/common/wakubridge.nim | 3 +- waku/v2/node/jsonrpc/jsonrpc_callsigs.nim | 2 +- waku/v2/node/jsonrpc/jsonrpc_types.nim | 5 +- waku/v2/node/jsonrpc/jsonrpc_utils.nim | 9 +-- waku/v2/node/jsonrpc/store_api.nim | 7 ++- waku/v2/node/quicksim2.nim | 11 ++-- .../v2/node/storage/message/message_store.nim | 3 +- .../storage/message/waku_message_store.nim | 17 +++--- .../00003_convertTimestampsToInt64.up.sql | 30 +++++++++ waku/v2/protocol/waku_message.nim | 13 ++-- .../waku_rln_relay/waku_rln_relay_utils.nim | 1 - waku/v2/protocol/waku_store/waku_store.nim | 48 +++++++++------ .../protocol/waku_store/waku_store_types.nim | 5 +- waku/v2/utils/pagination.nim | 5 +- waku/v2/utils/time.nim | 24 ++++++++ 23 files changed, 226 insertions(+), 142 deletions(-) create mode 100644 waku/v2/node/storage/migration/migrations_scripts/message/00003_convertTimestampsToInt64.up.sql create mode 100644 waku/v2/utils/time.nim diff --git a/CHANGELOG.md b/CHANGELOG.md index a45b7515b8..42686c638c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,9 @@ The full list of changes is below. ### Changes -- ... +- A new type `Timestamp` for all timestamps is introduced (currently an alias for int64). +- All timestamps now have nanosecond resolution. + ### Fixes - ... diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 973d4e3137..19fe4d4b9a 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -25,6 +25,7 @@ import ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/utils/peers, + ../../waku/v2/utils/time, ../test_helpers template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] @@ -102,7 +103,7 @@ procSuite "Waku v2 JSON-RPC API": response == true # Publish a message on the default topic - response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(epochTime()))) + response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime())))) check: # @TODO poll topic to verify message has been published @@ -260,7 +261,7 @@ procSuite "Waku v2 JSON-RPC API": let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort, false) - let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(0.float64), some(9.float64), some(StorePagingOptions())) + let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions())) check: response.messages.len() == 8 response.pagingOptions.isSome() @@ -573,7 +574,7 @@ procSuite "Waku v2 JSON-RPC API": pubSubTopic = "polling" contentTopic = defaultContentTopic payload = @[byte 9] - message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime())) + message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime()))) topicCache = newTable[string, seq[WakuMessage]]() await node1.start() @@ -664,7 +665,7 @@ procSuite "Waku v2 JSON-RPC API": pubSubTopic = "polling" contentTopic = defaultContentTopic payload = @[byte 9] - message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime())) + message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime()))) topicCache = newTable[string, seq[WakuMessage]]() await node1.start() diff --git a/tests/v2/test_message_store.nim b/tests/v2/test_message_store.nim index 0f898d4c0a..fc70318047 100644 --- a/tests/v2/test_message_store.nim +++ b/tests/v2/test_message_store.nim @@ -6,6 +6,7 @@ import ../../waku/v2/node/storage/message/waku_message_store, ../../waku/v2/node/storage/sqlite, ../../waku/v2/protocol/waku_store/waku_store, + ../../waku/v2/utils/time, ./utils suite "Message Store": @@ -16,9 +17,9 @@ suite "Message Store": topic = ContentTopic("/waku/2/default-content/proto") pubsubTopic = "/waku/2/default-waku/proto" - t1 = epochTime() - t2 = epochTime() - t3 = high(float64) + t1 = getNanosecondTime(epochTime()) + t2 = getNanosecondTime(epochTime()) + t3 = getNanosecondTime(high(float64)) var msgs = @[ WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1), WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2), @@ -45,7 +46,7 @@ suite "Message Store": var msgFlag, psTopicFlag = true var responseCount = 0 - proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = + proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = responseCount += 1 # Note: cannot use `check` within `{.raises: [Defect].}` block: @@ -136,7 +137,7 @@ suite "Message Store": for i in 1..capacity: let - msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: i.float) + msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i)) index = computeIndex(msg) output = store.put(index, msg, pubsubTopic) @@ -145,9 +146,9 @@ suite "Message Store": var responseCount = 0 - lastMessageTimestamp = 0.float + lastMessageTimestamp = Timestamp(0) - proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = + proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} = responseCount += 1 lastMessageTimestamp = msg.timestamp @@ -157,7 +158,7 @@ suite "Message Store": check: resMax.isOk responseCount == capacity # We retrieved all items - lastMessageTimestamp == capacity.float # Returned rows were ordered correctly + lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly # Now test getAll with a limit smaller than total stored items responseCount = 0 # Reset response count @@ -167,7 +168,7 @@ suite "Message Store": check: resLimit.isOk responseCount == capacity - 2 # We retrieved limited number of items - lastMessageTimestamp == capacity.float # We retrieved the youngest items in the store, in order + lastMessageTimestamp == Timestamp(capacity) # We retrieved the youngest items in the store, in order # Test zero limit responseCount = 0 # Reset response count @@ -177,4 +178,4 @@ suite "Message Store": check: resZero.isOk responseCount == 0 # No items retrieved - lastMessageTimestamp == 0.float # No items retrieved + lastMessageTimestamp == Timestamp(0) # No items retrieved diff --git a/tests/v2/test_pagination_utils.nim b/tests/v2/test_pagination_utils.nim index 62033efff2..b4360805c6 100644 --- a/tests/v2/test_pagination_utils.nim +++ b/tests/v2/test_pagination_utils.nim @@ -5,7 +5,8 @@ import chronos, stew/byteutils, libp2p/crypto/crypto, - ../../waku/v2/utils/pagination + ../../waku/v2/utils/pagination, + ../../waku/v2/utils/time procSuite "Pagination utils": @@ -24,26 +25,26 @@ procSuite "Pagination utils": ## Test vars let smallIndex1 = Index(digest: hashFromStr("1234"), - receiverTime: 0.00, - senderTime: 1000.00) + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(1000)) smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime - receiverTime: 0.00, - senderTime: 1000.00) + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(1000)) largeIndex1 = Index(digest: hashFromStr("1234"), - receiverTime: 0.00, - senderTime: 9000.00) # only senderTime differ from smallIndex1 + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(9000)) # only senderTime differ from smallIndex1 largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1 - receiverTime: 0.00, - senderTime: 1000.00) + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(1000)) eqIndex1 = Index(digest: hashFromStr("0003"), - receiverTime: 0.00, - senderTime: 54321.00) + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(54321)) eqIndex2 = Index(digest: hashFromStr("0003"), - receiverTime: 0.00, - senderTime: 54321.00) + receiverTime: getNanosecondTime(0), + senderTime: getNanosecondTime(54321)) eqIndex3 = Index(digest: hashFromStr("0003"), - receiverTime: 9999.00, # receiverTime difference should have no effect on comparisons - senderTime: 54321.00) + receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons + senderTime: getNanosecondTime(54321)) ## Test suite diff --git a/tests/v2/test_waku_pagination.nim b/tests/v2/test_waku_pagination.nim index d4ff5b95bb..b4c4cb04e4 100644 --- a/tests/v2/test_waku_pagination.nim +++ b/tests/v2/test_waku_pagination.nim @@ -4,6 +4,7 @@ import testutils/unittests, nimcrypto/sha2, libp2p/protobuf/minprotobuf, ../../waku/v2/protocol/waku_store/waku_store, + ../../waku/v2/utils/time, ../test_helpers @@ -17,8 +18,8 @@ proc createSampleStoreQueue(s: int): StoreQueueRef = for i in 0.. end time - let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(5), endTime: float(2)) + let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(5), endTime: Timestamp(2)) await proto.query(rpc, handler) check: @@ -709,7 +710,7 @@ procSuite "Waku Store": response.messages.len() == 4 completionFut.complete(true) - let rpc = HistoryQuery(startTime: float(2), endTime: float(5)) + let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5)) let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo()) check: @@ -719,7 +720,7 @@ procSuite "Waku Store": asyncTest "queryFromWithPaging with empty pagingInfo": - let rpc = HistoryQuery(startTime: float(2), endTime: float(5)) + let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5)) let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) @@ -729,7 +730,7 @@ procSuite "Waku Store": asyncTest "queryFromWithPaging with pagination": var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1) - let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo) + let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5), pagingInfo: pinfo) let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo()) @@ -790,14 +791,14 @@ procSuite "Waku Store": let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity) for i in 1..capacity: - await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: i.float64)) + await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: Timestamp(i))) await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically check: store.messages.len == capacity # Store is at capacity # Test that capacity holds - await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: (capacity + 1).float64)) + await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: Timestamp(capacity + 1))) check: store.messages.len == capacity # Store is still at capacity diff --git a/tests/v2/test_waku_store_queue.nim b/tests/v2/test_waku_store_queue.nim index 3a88768162..224d7fc7b4 100644 --- a/tests/v2/test_waku_store_queue.nim +++ b/tests/v2/test_waku_store_queue.nim @@ -3,7 +3,8 @@ import std/sequtils, testutils/unittests, - ../../waku/v2/protocol/waku_store/waku_store_types + ../../waku/v2/protocol/waku_store/waku_store_types, + ../../waku/v2/utils/time procSuite "Sorted store queue": @@ -12,8 +13,8 @@ procSuite "Sorted store queue": ## Use i to generate an IndexedWakuMessage var data {.noinit.}: array[32, byte] for x in data.mitems: x = i.byte - return IndexedWakuMessage(msg: WakuMessage(payload: @[byte i], timestamp: float64(i)), - index: Index(receiverTime: float64(i), senderTime: float64(i), digest: MDigest[256](data: data))) + return IndexedWakuMessage(msg: WakuMessage(payload: @[byte i], timestamp: Timestamp(i)), + index: Index(receiverTime: Timestamp(i), senderTime: Timestamp(i), digest: MDigest[256](data: data))) # Test variables let @@ -62,8 +63,8 @@ procSuite "Sorted store queue": let first = testStoreQueue.first() check: first.isOk() - first.get().msg.timestamp == 1.0 - + first.get().msg.timestamp == Timestamp(1) + # Error condition let emptyQ = StoreQueueRef.new(capacity) check: @@ -73,7 +74,7 @@ procSuite "Sorted store queue": let last = testStoreQueue.last() check: last.isOk() - last.get().msg.timestamp == 5.0 + last.get().msg.timestamp == Timestamp(5) # Error condition let emptyQ = StoreQueueRef.new(capacity) @@ -91,7 +92,7 @@ procSuite "Sorted store queue": # First page pInfo.pageSize == 3 pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == 3.0 + pInfo.cursor.senderTime == Timestamp(3) err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[1,2,3] @@ -103,7 +104,7 @@ procSuite "Sorted store queue": # Second page pInfo.pageSize == 2 pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == 5.0 + pInfo.cursor.senderTime == Timestamp(5) err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[4,5] @@ -114,7 +115,7 @@ procSuite "Sorted store queue": # Empty last page pInfo.pageSize == 0 pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == 5.0 + pInfo.cursor.senderTime == Timestamp(5) err == HistoryResponseError.NONE res.len == 0 @@ -129,7 +130,7 @@ procSuite "Sorted store queue": # First page pInfo.pageSize == 3 pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == 3.0 + pInfo.cursor.senderTime == Timestamp(3) err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[3,4,5] @@ -141,7 +142,7 @@ procSuite "Sorted store queue": # Second page pInfo.pageSize == 2 pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == 1.0 + pInfo.cursor.senderTime == Timestamp(1) err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[1,2] @@ -152,7 +153,7 @@ procSuite "Sorted store queue": # Empty last page pInfo.pageSize == 0 pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == 1.0 + pInfo.cursor.senderTime == Timestamp(1) err == HistoryResponseError.NONE res.len == 0 @@ -170,7 +171,7 @@ procSuite "Sorted store queue": # First page pInfo.pageSize == 2 pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == 4.0 + pInfo.cursor.senderTime == Timestamp(4) err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[2,4] @@ -181,7 +182,7 @@ procSuite "Sorted store queue": # Empty next page pInfo.pageSize == 0 pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == 4.0 + pInfo.cursor.senderTime == Timestamp(4) err == HistoryResponseError.NONE res.len == 0 @@ -195,7 +196,7 @@ procSuite "Sorted store queue": # First page pInfo.pageSize == 2 pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == 3.0 + pInfo.cursor.senderTime == Timestamp(3) err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[3,5] @@ -206,7 +207,7 @@ procSuite "Sorted store queue": # Next page pInfo.pageSize == 1 pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == 1.0 + pInfo.cursor.senderTime == Timestamp(1) err == HistoryResponseError.NONE res.mapIt(it.timestamp.int) == @[1] @@ -217,7 +218,7 @@ procSuite "Sorted store queue": # Empty last page pInfo.pageSize == 0 pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == 1.0 + pInfo.cursor.senderTime == Timestamp(1) err == HistoryResponseError.NONE res.len == 0 @@ -228,14 +229,14 @@ procSuite "Sorted store queue": var (res, pInfo, err) = testStoreQueue.getPage(predicate, PagingInfo(pageSize: 3, - cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()), + cursor: Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()), direction: PagingDirection.BACKWARD)) check: # Empty response with error pInfo.pageSize == 0 pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == 3.0 + pInfo.cursor.senderTime == Timestamp(3) err == HistoryResponseError.INVALID_CURSOR res.len == 0 @@ -243,14 +244,14 @@ procSuite "Sorted store queue": (res, pInfo, err) = testStoreQueue.getPage(predicate, PagingInfo(pageSize: 3, - cursor: Index(receiverTime: float64(3), senderTime: float64(3), digest: MDigest[256]()), + cursor: Index(receiverTime: Timestamp(3), senderTime: Timestamp(3), digest: MDigest[256]()), direction: PagingDirection.FORWARD)) check: # Empty response with error pInfo.pageSize == 0 pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == 3.0 + pInfo.cursor.senderTime == Timestamp(3) err == HistoryResponseError.INVALID_CURSOR res.len == 0 @@ -271,7 +272,7 @@ procSuite "Sorted store queue": # Empty response pInfo.pageSize == 0 pInfo.direction == PagingDirection.BACKWARD - pInfo.cursor.senderTime == 0.0 + pInfo.cursor.senderTime == Timestamp(0) err == HistoryResponseError.NONE res.len == 0 @@ -285,7 +286,7 @@ procSuite "Sorted store queue": # Empty response pInfo.pageSize == 0 pInfo.direction == PagingDirection.FORWARD - pInfo.cursor.senderTime == 0.0 + pInfo.cursor.senderTime == Timestamp(0) err == HistoryResponseError.NONE res.len == 0 diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 1bee86608e..2eef235769 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -22,6 +22,7 @@ import ../../waku/v2/protocol/waku_lightpush/waku_lightpush, ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/utils/peers, + ../../waku/v2/utils/time, ../../waku/v2/node/wakunode2, ../test_helpers @@ -1156,7 +1157,7 @@ procSuite "WakuNode": # count the total number of retrieved messages from the database var responseCount = 0 - proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) = + proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) = responseCount += 1 # retrieve all the messages in the db let res = store.getAll(data) diff --git a/waku/common/wakubridge.nim b/waku/common/wakubridge.nim index b134ce0754..d0a54898cb 100644 --- a/waku/common/wakubridge.nim +++ b/waku/common/wakubridge.nim @@ -13,6 +13,7 @@ import # Waku v2 imports libp2p/crypto/crypto, ../v2/utils/namespacing, + ../v2/utils/time, ../v2/node/wakunode2, # Common cli config ./config_bridge @@ -91,7 +92,7 @@ func toWakuMessage(env: Envelope): WakuMessage = # Translate a Waku v1 envelope to a Waku v2 message WakuMessage(payload: env.data, contentTopic: toV2ContentTopic(env.topic), - timestamp: float64(env.expiry - env.ttl), + timestamp: (getNanosecondTime(env.expiry) - getNanosecondTime(env.ttl)), version: 1) proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} = diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim index 240fcca3cc..24aba43227 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim @@ -16,7 +16,7 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool # Store API -proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[float64], endTime: Option[float64], pagingOptions: Option[StorePagingOptions]): StoreResponse +proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse # Filter API diff --git a/waku/v2/node/jsonrpc/jsonrpc_types.nim b/waku/v2/node/jsonrpc/jsonrpc_types.nim index fa4219744c..077991021c 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_types.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_types.nim @@ -4,7 +4,8 @@ import std/[options,tables], eth/keys, ../../protocol/waku_message, - ../../utils/pagination + ../../utils/pagination, + ../../utils/time type StoreResponse* = object @@ -21,7 +22,7 @@ type payload*: seq[byte] contentTopic*: Option[ContentTopic] # sender generated timestamp - timestamp*: Option[float64] + timestamp*: Option[Timestamp] WakuPeer* = object multiaddr*: string diff --git a/waku/v2/node/jsonrpc/jsonrpc_utils.nim b/waku/v2/node/jsonrpc/jsonrpc_utils.nim index d4ddaa2371..ddff71654b 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_utils.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_utils.nim @@ -6,6 +6,7 @@ import ../../../v1/node/rpc/hexstrings, ../../protocol/waku_store/waku_store_types, ../../protocol/waku_message, + ../../utils/time, ../waku_payload, ./jsonrpc_types @@ -41,12 +42,12 @@ proc toStoreResponse*(historyResponse: HistoryResponse): StoreResponse = proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32): WakuMessage = const defaultCT = ContentTopic("/waku/2/default-content/proto") - var t: float64 + var t: Timestamp if relayMessage.timestamp.isSome: t = relayMessage.timestamp.get else: # incoming WakuRelayMessages with no timestamp will get 0 timestamp - t = float64(0) + t = Timestamp(0) WakuMessage(payload: relayMessage.payload, contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, version: version, @@ -60,12 +61,12 @@ proc toWakuMessage*(relayMessage: WakuRelayMessage, version: uint32, rng: ref Br dst: pubKey, symkey: symkey) - var t: float64 + var t: Timestamp if relayMessage.timestamp.isSome: t = relayMessage.timestamp.get else: # incoming WakuRelayMessages with no timestamp will get 0 timestamp - t = float64(0) + t = Timestamp(0) WakuMessage(payload: payload.encode(version, rng[]).get(), contentTopic: if relayMessage.contentTopic.isSome: relayMessage.contentTopic.get else: defaultCT, diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index 0914ce8a61..5b3e99abb3 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -5,6 +5,7 @@ import chronicles, json_rpc/rpcserver, ../wakunode2, + ../../utils/time, ./jsonrpc_types, ./jsonrpc_utils export jsonrpc_types @@ -17,7 +18,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Store API version 1 definitions - rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[float64], endTime: Option[float64], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: + rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: ## Returns history for a list of content topics with optional paging debug "get_waku_v2_store_v1_messages" @@ -29,8 +30,8 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = let historyQuery = HistoryQuery(pubsubTopic: if pubsubTopicOption.isSome: pubsubTopicOption.get() else: "", contentFilters: if contentFiltersOption.isSome: contentFiltersOption.get() else: @[], - startTime: if startTime.isSome: startTime.get() else: 0.float64, - endTime: if endTime.isSome: endTime.get() else: 0.float64, + startTime: if startTime.isSome: startTime.get() else: Timestamp(0), + endTime: if endTime.isSome: endTime.get() else: Timestamp(0), pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) await node.query(historyQuery, queryFuncHandler) diff --git a/waku/v2/node/quicksim2.nim b/waku/v2/node/quicksim2.nim index 3273889474..f3cac49edf 100644 --- a/waku/v2/node/quicksim2.nim +++ b/waku/v2/node/quicksim2.nim @@ -12,6 +12,7 @@ import ../protocol/waku_filter/waku_filter_types, ../protocol/waku_store/waku_store_types, ../protocol/waku_message, + ../utils/time, ./wakunode2, ./waku_payload, ./jsonrpc/[jsonrpc_types,jsonrpc_utils] @@ -63,11 +64,11 @@ os.sleep(2000) for i in 0..