Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring timestamps #842

Merged
merged 24 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ The full list of changes is below.

### Changes

- ...
- A new type `Timestamp` for all timestamps is introduced (currently an alias for int64).
- All timestamps now have nanosecond resolution.

### Fixes

- ...
Expand Down
9 changes: 5 additions & 4 deletions tests/v2/test_jsonrpc_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/utils/peers,
../../waku/v2/utils/time,
../test_helpers

template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
Expand Down Expand Up @@ -102,7 +103,7 @@ procSuite "Waku v2 JSON-RPC API":
response == true

# Publish a message on the default topic
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))

check:
# @TODO poll topic to verify message has been published
Expand Down Expand Up @@ -260,7 +261,7 @@ procSuite "Waku v2 JSON-RPC API":
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)

let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(0.float64), some(9.float64), some(StorePagingOptions()))
let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions()))
check:
response.messages.len() == 8
response.pagingOptions.isSome()
Expand Down Expand Up @@ -573,7 +574,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()

await node1.start()
Expand Down Expand Up @@ -664,7 +665,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()

await node1.start()
Expand Down
21 changes: 11 additions & 10 deletions tests/v2/test_message_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/storage/sqlite,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/utils/time,
./utils

suite "Message Store":
Expand All @@ -16,9 +17,9 @@ suite "Message Store":
topic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"

t1 = epochTime()
t2 = epochTime()
t3 = high(float64)
t1 = getNanosecondTime(epochTime())
t2 = getNanosecondTime(epochTime())
t3 = getNanosecondTime(high(float64))
var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
Expand All @@ -45,7 +46,7 @@ suite "Message Store":
var msgFlag, psTopicFlag = true

var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1

# Note: cannot use `check` within `{.raises: [Defect].}` block:
Expand Down Expand Up @@ -136,7 +137,7 @@ suite "Message Store":

for i in 1..capacity:
let
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: i.float)
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
index = computeIndex(msg)
output = store.put(index, msg, pubsubTopic)

Expand All @@ -145,9 +146,9 @@ suite "Message Store":

var
responseCount = 0
lastMessageTimestamp = 0.float
lastMessageTimestamp = Timestamp(0)

proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1
lastMessageTimestamp = msg.timestamp

Expand All @@ -157,7 +158,7 @@ suite "Message Store":
check:
resMax.isOk
responseCount == capacity # We retrieved all items
lastMessageTimestamp == capacity.float # Returned rows were ordered correctly
lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly

# Now test getAll with a limit smaller than total stored items
responseCount = 0 # Reset response count
Expand All @@ -167,7 +168,7 @@ suite "Message Store":
check:
resLimit.isOk
responseCount == capacity - 2 # We retrieved limited number of items
lastMessageTimestamp == capacity.float # We retrieved the youngest items in the store, in order
lastMessageTimestamp == Timestamp(capacity) # We retrieved the youngest items in the store, in order

# Test zero limit
responseCount = 0 # Reset response count
Expand All @@ -177,4 +178,4 @@ suite "Message Store":
check:
resZero.isOk
responseCount == 0 # No items retrieved
lastMessageTimestamp == 0.float # No items retrieved
lastMessageTimestamp == Timestamp(0) # No items retrieved
31 changes: 16 additions & 15 deletions tests/v2/test_pagination_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import
chronos,
stew/byteutils,
libp2p/crypto/crypto,
../../waku/v2/utils/pagination
../../waku/v2/utils/pagination,
../../waku/v2/utils/time

procSuite "Pagination utils":

Expand All @@ -24,26 +25,26 @@ procSuite "Pagination utils":
## Test vars
let
smallIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: 0.00,
senderTime: 1000.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
smallIndex2 = Index(digest: hashFromStr("1234567"), # digest is less significant than senderTime
receiverTime: 0.00,
senderTime: 1000.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
largeIndex1 = Index(digest: hashFromStr("1234"),
receiverTime: 0.00,
senderTime: 9000.00) # only senderTime differ from smallIndex1
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(9000)) # only senderTime differ from smallIndex1
largeIndex2 = Index(digest: hashFromStr("12345"), # only digest differs from smallIndex1
receiverTime: 0.00,
senderTime: 1000.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(1000))
eqIndex1 = Index(digest: hashFromStr("0003"),
receiverTime: 0.00,
senderTime: 54321.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex2 = Index(digest: hashFromStr("0003"),
receiverTime: 0.00,
senderTime: 54321.00)
receiverTime: getNanosecondTime(0),
senderTime: getNanosecondTime(54321))
eqIndex3 = Index(digest: hashFromStr("0003"),
receiverTime: 9999.00, # receiverTime difference should have no effect on comparisons
senderTime: 54321.00)
receiverTime: getNanosecondTime(9999), # receiverTime difference should have no effect on comparisons
senderTime: getNanosecondTime(54321))


## Test suite
Expand Down
9 changes: 5 additions & 4 deletions tests/v2/test_waku_pagination.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import
testutils/unittests, nimcrypto/sha2,
libp2p/protobuf/minprotobuf,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/utils/time,
../test_helpers


Expand All @@ -17,8 +18,8 @@ proc createSampleStoreQueue(s: int): StoreQueueRef =

for i in 0..<s:
discard testStoreQueue.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]),
index: Index(receiverTime: float64(i),
senderTime: float64(i),
index: Index(receiverTime: Timestamp(i),
senderTime: Timestamp(i),
digest: MDigest[256](data: data)) ))

return testStoreQueue
Expand Down Expand Up @@ -273,7 +274,7 @@ suite "time-window history query":
let
version = 0'u32
payload = @[byte 0, 1, 2]
timestamp = float64(10)
timestamp = Timestamp(10)
msg = WakuMessage(payload: payload, version: version, timestamp: timestamp)
pb = msg.encode()

Expand Down Expand Up @@ -307,4 +308,4 @@ suite "time-window history query":
let
timestampDecoded = msgDecoded.value.timestamp
check:
timestampDecoded == float64(0)
timestampDecoded == Timestamp(0)
61 changes: 31 additions & 30 deletions tests/v2/test_waku_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/time,
../test_helpers, ./utils

procSuite "Waku Store":
Expand Down Expand Up @@ -525,7 +526,7 @@ procSuite "Waku Store":
let
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11))
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
pb = query.encode()
decodedQuery = HistoryQuery.init(pb.buffer)

Expand Down Expand Up @@ -575,25 +576,25 @@ procSuite "Waku Store":
key2 = PrivateKey.random(ECDSA, rng[]).get()
# peer2 = PeerInfo.new(key2)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))]

msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: float(7))]
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: Timestamp(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: Timestamp(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: Timestamp(9))]

msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: Timestamp(7))]

#--------------------
# setup default test store
Expand Down Expand Up @@ -641,11 +642,11 @@ procSuite "Waku Store":
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 2
response.messages.anyIt(it.timestamp == float(3))
response.messages.anyIt(it.timestamp == float(5))
response.messages.anyIt(it.timestamp == Timestamp(3))
response.messages.anyIt(it.timestamp == Timestamp(5))
completionFut.complete(true)

let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(5))
await proto.query(rpc, handler)

check:
Expand All @@ -661,7 +662,7 @@ procSuite "Waku Store":
response.messages.len() == 0
completionFut.complete(true)

let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(2))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(2))
await proto.query(rpc, handler)

check:
Expand All @@ -678,7 +679,7 @@ procSuite "Waku Store":
completionFut.complete(true)

# time window is invalid since start time > end time
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(5), endTime: float(2))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(5), endTime: Timestamp(2))
await proto.query(rpc, handler)

check:
Expand Down Expand Up @@ -709,7 +710,7 @@ procSuite "Waku Store":
response.messages.len() == 4
completionFut.complete(true)

let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo())

check:
Expand All @@ -719,7 +720,7 @@ procSuite "Waku Store":

asyncTest "queryFromWithPaging with empty pagingInfo":

let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))

let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())

Expand All @@ -729,7 +730,7 @@ procSuite "Waku Store":

asyncTest "queryFromWithPaging with pagination":
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo)
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5), pagingInfo: pinfo)

let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())

Expand Down Expand Up @@ -790,14 +791,14 @@ procSuite "Waku Store":
let store = WakuStore.init(PeerManager.new(newStandardSwitch()), crypto.newRng(), capacity = capacity)

for i in 1..capacity:
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: i.float64))
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte i], contentTopic: contentTopic, timestamp: Timestamp(i)))
await sleepAsync(1.millis) # Sleep a millisecond to ensure messages are stored chronologically

check:
store.messages.len == capacity # Store is at capacity

# Test that capacity holds
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: (capacity + 1).float64))
await store.handleMessage(pubsubTopic, WakuMessage(payload: @[byte (capacity + 1)], contentTopic: contentTopic, timestamp: Timestamp(capacity + 1)))

check:
store.messages.len == capacity # Store is still at capacity
Expand Down
Loading