Skip to content

Commit

Permalink
Refactoring timestamps (#842)
Browse files Browse the repository at this point in the history
* Refactor timestamps type from float64 to int64 (milliseconds resolution)

* Revert epochs to float64

* Update 00002_addSenderTimeStamp.up.sql

* Update quicksim2.nim

* Add files via upload

* Delete 00003_convertTimestampsToInts.up.sql

* Add files via upload

* Rename 00003_convertTimestampsToInts.up.sql to 00003_addTimestampsToInts.up.sql

* Delete 00003_addTimestampsToInts.up.sql

* Rln-relay integration into chat2 (#835)

* adds ProofMetadata

* adds EPOCH_INTERVAL

* adds messageLog field

* adds updateLog, toEpoch, fromEpoch, getEpoch, compareTo

* adds unit test for toEpoch and fromEpoch

* adds unit test for Epoch comparison

* adds result codes for updateLog

* adds unit test for update log

* renames epoch related consts

* modifies updateLog with new return type and new logic of spam detection

* adds unit text for the modified updateLog

* changes max epoch gap type size

* splits updateLog into two procs isSpam and updateLog

* updates unittests

* fixes a bug, returns false when the message is not spam

* renames messageLog to nullifierLog

* renames isSpam to hasDuplicate

* updates the rln validator, adds comments

* adds appendRLNProof proc plus some code beatification

* unit test for validate message

* adds unhappy test to validateMessage unit test

* renames EPOCH_UNIT_SECONDS

* renames MAX_CLOCK_GAP_SECONDS

* WIP: integration test

* fixes compile errors

* sets a real epoch value

* updates on old unittests

* adds comments to the rln relay tests

* adds more comments

* makes rln import conditional

* adds todos

* adds more todos

* adds rln-relay mount process into chat2

* further todos

* logs contentTopic

* introduces rln relay configs

* changes default pubsub topic

* adds contentTopic config

* imports rln relay dependencies

* consolidates imports

* removes module identifier from ContentTopic

* adds contentTopic field

* adds contentTopic argument to mountRlnRelay calls

* appends rln proof to chat2 messages

* changes the default chat2 contentTopic

* adds missing content topic fields

* fixes a bug

* adds a new logic about empty content topics

* appends proof only when rln flag is active

* removes unnecessary todos

* fixes an indentation issue

* adds log messages

* verifies the proof against the concatenation of msg payload and content topic

* a bug fix

* removes duplicate epoch time calculation

* updates log level to trace

* updates default rln-relay content topic

* adds support for empty content topics

* updates changelog

* changelog updates

* removes a commented code block

* updates addRLNRelayValidator string doc

* Squashed commit of the following:

commit bc36c99
Merge: dc2b294 5a77d6e
Author: G <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 01:10:06 2022 +0100

    Merge branch 'master' into int64-timestamps-ns

commit dc2b294
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 00:24:45 2022 +0100

    Fix

commit f97b95a
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 00:13:18 2022 +0100

    Missing import

commit 060c4f8
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Sat Feb 5 00:10:36 2022 +0100

    Fixed typo

commit 08ca99b
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:59:20 2022 +0100

    Time util file

commit 2b5c360
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:33:20 2022 +0100

    Moved time utility functions to utils/time

commit fdaf121
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:10:25 2022 +0100

    Fix comment

commit c7e06ab
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 23:04:13 2022 +0100

    Restore previous migration script

commit 80282db
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 22:54:15 2022 +0100

    Typo

commit b9d67f8
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 22:49:29 2022 +0100

    Added utilities to get int64 nanosecond, microsecond, millisecond time resolution from float

commit 0130d49
Author: s1fr0 <28568419+s1fr0@users.noreply.github.com>
Date:   Fri Feb 4 22:36:35 2022 +0100

    Switched to nanoseconds support.

* Update CHANGELOG.md

* Create 00003_convertTimestampsToInt64.up.sql

Migration script

* Moved migration script to right location

* Update waku_rln_relay_utils.nim

* Update waku_rln_relay_utils.nim

* Addressed reviewers' comments

* Update default fleet metrics dashboard (#844)

* Fix

* No need for float

* Aligning master to changes in PR

* Further fixes

Co-authored-by: Sanaz Taheri Boshrooyeh <35961250+staheri14@users.noreply.github.com>
Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 17, 2022
1 parent bb3e594 commit 21cac6d
Show file tree
Hide file tree
Showing 23 changed files with 226 additions and 142 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ The full list of changes is below.

### Changes

- ...
- A new type `Timestamp` for all timestamps is introduced (currently an alias for int64).
- All timestamps now have nanosecond resolution.

### Fixes

- ...
Expand Down
9 changes: 5 additions & 4 deletions tests/v2/test_jsonrpc_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/utils/peers,
../../waku/v2/utils/time,
../test_helpers

template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
Expand Down Expand Up @@ -102,7 +103,7 @@ procSuite "Waku v2 JSON-RPC API":
response == true

# Publish a message on the default topic
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))

check:
# @TODO poll topic to verify message has been published
Expand Down Expand Up @@ -260,7 +261,7 @@ procSuite "Waku v2 JSON-RPC API":
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)

let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(0.float64), some(9.float64), some(StorePagingOptions()))
let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions()))
check:
response.messages.len() == 8
response.pagingOptions.isSome()
Expand Down Expand Up @@ -573,7 +574,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()

await node1.start()
Expand Down Expand Up @@ -664,7 +665,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()

await node1.start()
Expand Down
21 changes: 11 additions & 10 deletions tests/v2/test_message_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/storage/sqlite,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/utils/time,
./utils

suite "Message Store":
Expand All @@ -16,9 +17,9 @@ suite "Message Store":
topic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"

t1 = epochTime()
t2 = epochTime()
t3 = high(float64)
t1 = getNanosecondTime(epochTime())
t2 = getNanosecondTime(epochTime())
t3 = getNanosecondTime(high(float64))
var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
Expand All @@ -45,7 +46,7 @@ suite "Message Store":
var msgFlag, psTopicFlag = true

var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1

# Note: cannot use `check` within `{.raises: [Defect].}` block:
Expand Down Expand Up @@ -136,7 +137,7 @@ suite "Message Store":

for i in 1..capacity:
let
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: i.float)
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
index = computeIndex(msg)
output = store.put(index, msg, pubsubTopic)

Expand All @@ -145,9 +146,9 @@ suite "Message Store":

var
responseCount = 0
lastMessageTimestamp = 0.float
lastMessageTimestamp = Timestamp(0)

proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1
lastMessageTimestamp = msg.timestamp

Expand All @@ -157,7 +158,7 @@ suite "Message Store":
check:
resMax.isOk
responseCount == capacity # We retrieved all items
lastMessageTimestamp == capacity.float # Returned rows were ordered correctly
lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly

# Now test getAll with a limit smaller than total stored items
responseCount = 0 # Reset response count
Expand All @@ -167,7 +168,7 @@ suite "Message Store":
check:
resLimit.isOk
responseCount == capacity - 2 # We retrieved limited number of items
lastMessageTimestamp == capacity.float # We retrieved the youngest items in the store, in order
lastMessageTimestamp == Timestamp(capacity) # We retrieved the youngest items in the store, in order

# Test zero limit
responseCount = 0 # Reset response count
Expand All @@ -177,4 +178,4 @@ suite "Message Store":
check:
resZero.isOk
responseCount == 0 # No items retrieved
lastMessageTimestamp == 0.float # No items retrieved
lastMessageTimestamp == Timestamp(0) # No items retrieved
31 changes: 16 additions & 15 deletions tests/v2/test_pagination_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import
chronos,
stew/byteutils,
libp2p/crypto/crypto,
../../waku/v2/utils/pagination
../../waku/v2/utils/pagination,
../../waku/v2/utils/time

procSuite "Pagination utils":

Expand All @@ -24,26 +25,26 @@ procSuite "Pagination utils":
## Test vars
let
smallIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: 0.00,
senderTime: 1000.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime
receiverTime: 0.00,
senderTime: 1000.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
largeIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: 0.00,
senderTime: 9000.00) # only senderTime differ from smallIndex1
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(9000)) # only senderTime differ from smallIndex1
largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1
receiverTime: 0.00,
senderTime: 1000.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
eqIndex1 = Index(digest: hashFromStr("0003"),
receiverTime: 0.00,
senderTime: 54321.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex2 = Index(digest: hashFromStr("0003"),
receiverTime: 0.00,
senderTime: 54321.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex3 = Index(digest: hashFromStr("0003"),
receiverTime: 9999.00, # receiverTime difference should have no effect on comparisons
senderTime: 54321.00)
receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons
senderTime: getNanosecondTime(54321))


## Test suite
Expand Down
9 changes: 5 additions & 4 deletions tests/v2/test_waku_pagination.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import
testutils/unittests, nimcrypto/sha2,
libp2p/protobuf/minprotobuf,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/utils/time,
../test_helpers


Expand All @@ -17,8 +18,8 @@ proc createSampleStoreQueue(s: int): StoreQueueRef =

for i in 0..<s:
discard testStoreQueue.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]),
index: Index(receiverTime: float64(i),
senderTime: float64(i),
index: Index(receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MDigest[256](data: data)) ))

return testStoreQueue
Expand Down Expand Up @@ -273,7 +274,7 @@ suite "time-window history query":
let
version = 0'u32
payload = @[byte 0, 1, 2]
timestamp = float64(10)
timestamp = Timestamp(10)
msg = WakuMessage(payload: payload, version: version, timestamp: timestamp)
pb = msg.encode()

Expand Down Expand Up @@ -307,4 +308,4 @@ suite "time-window history query":
let
timestampDecoded = msgDecoded.value.timestamp
check:
timestampDecoded == float64(0)
timestampDecoded == Timestamp(0)
61 changes: 31 additions & 30 deletions tests/v2/test_waku_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/time,
../test_helpers, ./utils

procSuite "Waku Store":
Expand Down Expand Up @@ -525,7 +526,7 @@ procSuite "Waku Store":
let
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11))
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
pb = query.encode()
decodedQuery = HistoryQuery.init(pb.buffer)

Expand Down Expand Up @@ -575,25 +576,25 @@ procSuite "Waku Store":
key2 = PrivateKey.random(ECDSA, rng[]).get()
# peer2 = PeerInfo.new(key2)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))]

msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: float(7))]
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: Timestamp(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: Timestamp(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: Timestamp(9))]

msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: Timestamp(7))]

#--------------------
# setup default test store
Expand Down Expand Up @@ -641,11 +642,11 @@ procSuite "Waku Store":
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 2
response.messages.anyIt(it.timestamp == float(3))
response.messages.anyIt(it.timestamp == float(5))
response.messages.anyIt(it.timestamp == Timestamp(3))
response.messages.anyIt(it.timestamp == Timestamp(5))
completionFut.complete(true)

let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(5))
await proto.query(rpc, handler)

check:
Expand All @@ -661,7 +662,7 @@ procSuite "Waku Store":
response.messages.len() == 0
completionFut.complete(true)

let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(2))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(2))
await proto.query(rpc, handler)

check:
Expand All @@ -678,7 +679,7 @@ procSuite "Waku Store":
completionFut.complete(true)

# time window is invalid since start time > end time
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(5), endTime: float(2))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(5), endTime: Timestamp(2))
await proto.query(rpc, handler)

check:
Expand Down Expand Up @@ -709,7 +710,7 @@ procSuite "Waku Store":
response.messages.len() == 4
completionFut.complete(true)

let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo())

check:
Expand All @@ -719,7 +720,7 @@ procSuite "Waku Store":

asyncTest "queryFromWithPaging with empty pagingInfo":

let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))

let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())

Expand All @@ -729,7 +730,7 @@ procSuite "Waku Store":

asyncTest "queryFromWithPaging with pagination":
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo)
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5), pagingInfo: pinfo)

let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())

Expand Down Expand Up @@ -790,14 +791,14 @@ procSuite "Waku Store":
let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity)

for i in 1..capacity:
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: i.float64))
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: Timestamp(i)))
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically

check:
store.messages.len == capacity # Store is at capacity

# Test that capacity holds
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: (capacity + 1).float64))
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: Timestamp(capacity + 1)))

check:
store.messages.len == capacity # Store is still at capacity
Expand Down
Loading

0 comments on commit 21cac6d

Please sign in to comment.