Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring timestamps #842

Merged
merged 24 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ The full list of changes is below.

### Changes

- ...
- A new type `Timestamp` for all timestamps is introduced (currently an alias for int64).
- All timestamps now have nanosecond resolution.

### Fixes

- ...
Expand Down
9 changes: 5 additions & 4 deletions tests/v2/test_jsonrpc_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import
../../waku/v2/protocol/waku_swap/waku_swap,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/utils/peers,
../../waku/v2/utils/time,
../test_helpers

template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0]
Expand Down Expand Up @@ -102,7 +103,7 @@ procSuite "Waku v2 JSON-RPC API":
response == true

# Publish a message on the default topic
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(epochTime())))
response = await client.post_waku_v2_relay_v1_message(defaultTopic, WakuRelayMessage(payload: @[byte 1], contentTopic: some(defaultContentTopic), timestamp: some(getNanosecondTime(epochTime()))))

check:
# @TODO poll topic to verify message has been published
Expand Down Expand Up @@ -260,7 +261,7 @@ procSuite "Waku v2 JSON-RPC API":
let client = newRpcHttpClient()
await client.connect("127.0.0.1", rpcPort, false)

let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(0.float64), some(9.float64), some(StorePagingOptions()))
let response = await client.get_waku_v2_store_v1_messages(some(defaultTopic), some(@[HistoryContentFilter(contentTopic: defaultContentTopic)]), some(Timestamp(0)), some(Timestamp(9)), some(StorePagingOptions()))
check:
response.messages.len() == 8
response.pagingOptions.isSome()
Expand Down Expand Up @@ -573,7 +574,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()

await node1.start()
Expand Down Expand Up @@ -664,7 +665,7 @@ procSuite "Waku v2 JSON-RPC API":
pubSubTopic = "polling"
contentTopic = defaultContentTopic
payload = @[byte 9]
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(epochTime()))
message = WakuRelayMessage(payload: payload, contentTopic: some(contentTopic), timestamp: some(getNanosecondTime(epochTime())))
topicCache = newTable[string, seq[WakuMessage]]()

await node1.start()
Expand Down
21 changes: 11 additions & 10 deletions tests/v2/test_message_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/storage/sqlite,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/utils/time,
./utils

suite "Message Store":
Expand All @@ -16,9 +17,9 @@ suite "Message Store":
topic = ContentTopic("/waku/2/default-content/proto")
pubsubTopic = "/waku/2/default-waku/proto"

t1 = epochTime()
t2 = epochTime()
t3 = high(float64)
t1 = getNanosecondTime(epochTime())
t2 = getNanosecondTime(epochTime())
t3 = getNanosecondTime(high(float64))
var msgs = @[
WakuMessage(payload: @[byte 1, 2, 3], contentTopic: topic, version: uint32(0), timestamp: t1),
WakuMessage(payload: @[byte 1, 2, 3, 4], contentTopic: topic, version: uint32(1), timestamp: t2),
Expand All @@ -45,7 +46,7 @@ suite "Message Store":
var msgFlag, psTopicFlag = true

var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1

# Note: cannot use `check` within `{.raises: [Defect].}` block:
Expand Down Expand Up @@ -136,7 +137,7 @@ suite "Message Store":

for i in 1..capacity:
let
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: i.float)
msg = WakuMessage(payload: @[byte i], contentTopic: contentTopic, version: uint32(0), timestamp: Timestamp(i))
index = computeIndex(msg)
output = store.put(index, msg, pubsubTopic)

Expand All @@ -145,9 +146,9 @@ suite "Message Store":

var
responseCount = 0
lastMessageTimestamp = 0.float
lastMessageTimestamp = Timestamp(0)

proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) {.raises: [Defect].} =
responseCount += 1
lastMessageTimestamp = msg.timestamp

Expand All @@ -157,7 +158,7 @@ suite "Message Store":
check:
resMax.isOk
responseCount == capacity # We retrieved all items
lastMessageTimestamp == capacity.float # Returned rows were ordered correctly
lastMessageTimestamp == Timestamp(capacity) # Returned rows were ordered correctly

# Now test getAll with a limit smaller than total stored items
responseCount = 0 # Reset response count
Expand All @@ -167,7 +168,7 @@ suite "Message Store":
check:
resLimit.isOk
responseCount == capacity - 2 # We retrieved limited number of items
lastMessageTimestamp == capacity.float # We retrieved the youngest items in the store, in order
lastMessageTimestamp == Timestamp(capacity) # We retrieved the youngest items in the store, in order

# Test zero limit
responseCount = 0 # Reset response count
Expand All @@ -177,4 +178,4 @@ suite "Message Store":
check:
resZero.isOk
responseCount == 0 # No items retrieved
lastMessageTimestamp == 0.float # No items retrieved
lastMessageTimestamp == Timestamp(0) # No items retrieved
7 changes: 4 additions & 3 deletions tests/v2/test_waku_pagination.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import
testutils/unittests, nimcrypto/sha2,
libp2p/protobuf/minprotobuf,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/utils/time,
../test_helpers


Expand All @@ -12,7 +13,7 @@ proc createSampleList(s: int): seq[IndexedWakuMessage] =
var data {.noinit.}: array[32, byte]
for x in data.mitems: x = 1
for i in 0..<s:
result.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]), index: Index(receiverTime: float64(i), senderTime: float64(i), digest: MDigest[256](data: data)) ))
result.add(IndexedWakuMessage(msg: WakuMessage(payload: @[byte i]), index: Index(receiverTime: Timestamp(i), senderTime: Timestamp(i), digest: MDigest[256](data: data)) ))

procSuite "pagination":
test "Index computation test":
Expand Down Expand Up @@ -305,7 +306,7 @@ suite "time-window history query":
let
version = 0'u32
payload = @[byte 0, 1, 2]
timestamp = float64(10)
timestamp = Timestamp(10)
msg = WakuMessage(payload: payload, version: version, timestamp: timestamp)
pb = msg.encode()

Expand Down Expand Up @@ -338,4 +339,4 @@ suite "time-window history query":
let
timestampDecoded = msgDecoded.value.timestamp
check:
timestampDecoded == float64(0)
timestampDecoded == Timestamp(0)
77 changes: 39 additions & 38 deletions tests/v2/test_waku_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/node/storage/message/waku_message_store,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/time,
../test_helpers, ./utils

procSuite "Waku Store":
Expand Down Expand Up @@ -525,7 +526,7 @@ procSuite "Waku Store":
let
index = computeIndex(WakuMessage(payload: @[byte 1], contentTopic: defaultContentTopic))
pagingInfo = PagingInfo(pageSize: 1, cursor: index, direction: PagingDirection.BACKWARD)
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: float64(10), endTime: float64(11))
query = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: defaultContentTopic), HistoryContentFilter(contentTopic: defaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
pb = query.encode()
decodedQuery = HistoryQuery.init(pb.buffer)

Expand Down Expand Up @@ -575,25 +576,25 @@ procSuite "Waku Store":
key2 = PrivateKey.random(ECDSA, rng[]).get()
# peer2 = PeerInfo.new(key2)
var
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(9))]

msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: float(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: float(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: float(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: float(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: float(7))]
msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: Timestamp(7)),
WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: Timestamp(8)),
WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: Timestamp(9))]

msgList2 = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"), timestamp: Timestamp(0)),
WakuMessage(payload: @[byte 11],contentTopic: ContentTopic("1"), timestamp: Timestamp(1)),
WakuMessage(payload: @[byte 12],contentTopic: ContentTopic("2"), timestamp: Timestamp(2)),
WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3)),
WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4)),
WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(5)),
WakuMessage(payload: @[byte 13],contentTopic: ContentTopic("2"), timestamp: Timestamp(6)),
WakuMessage(payload: @[byte 14],contentTopic: ContentTopic("1"), timestamp: Timestamp(7))]

#--------------------
# setup default test store
Expand Down Expand Up @@ -643,11 +644,11 @@ procSuite "Waku Store":
proc handler(response: HistoryResponse) {.gcsafe, closure.} =
check:
response.messages.len() == 2
response.messages.anyIt(it.timestamp == float(3))
response.messages.anyIt(it.timestamp == float(5))
response.messages.anyIt(it.timestamp == Timestamp(3))
response.messages.anyIt(it.timestamp == Timestamp(5))
completionFut.complete(true)

let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(5))
await proto.query(rpc, handler)

check:
Expand All @@ -663,7 +664,7 @@ procSuite "Waku Store":
response.messages.len() == 0
completionFut.complete(true)

let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(2), endTime: float(2))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(2), endTime: Timestamp(2))
await proto.query(rpc, handler)

check:
Expand All @@ -680,7 +681,7 @@ procSuite "Waku Store":
completionFut.complete(true)

# time window is invalid since start time > end time
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: float(5), endTime: float(2))
let rpc = HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: ContentTopic("1"))], startTime: Timestamp(5), endTime: Timestamp(2))
await proto.query(rpc, handler)

check:
Expand All @@ -689,18 +690,18 @@ procSuite "Waku Store":
test "find last seen message":
var
msgList = @[IndexedWakuMessage(msg: WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2"))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: float(1))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: float(2))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: float(3))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: float(4))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: float(9))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: float(6))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: float(7))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: float(8))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: float(5)))]
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 1],contentTopic: ContentTopic("1"), timestamp: Timestamp(1))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 2],contentTopic: ContentTopic("2"), timestamp: Timestamp(2))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 3],contentTopic: ContentTopic("1"), timestamp: Timestamp(3))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 4],contentTopic: ContentTopic("2"), timestamp: Timestamp(4))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 5],contentTopic: ContentTopic("1"), timestamp: Timestamp(9))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 6],contentTopic: ContentTopic("2"), timestamp: Timestamp(6))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 7],contentTopic: ContentTopic("1"), timestamp: Timestamp(7))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 8],contentTopic: ContentTopic("2"), timestamp: Timestamp(8))),
IndexedWakuMessage(msg: WakuMessage(payload: @[byte 9],contentTopic: ContentTopic("1"),timestamp: Timestamp(5)))]

check:
findLastSeen(msgList) == float(9)
findLastSeen(msgList) == Timestamp(9)

asyncTest "resume message history":
# starts a new node
Expand All @@ -727,7 +728,7 @@ procSuite "Waku Store":
response.messages.len() == 4
completionFut.complete(true)

let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))
let successResult = await proto.queryFrom(rpc, handler, listenSwitch.peerInfo.toRemotePeerInfo())

check:
Expand All @@ -737,7 +738,7 @@ procSuite "Waku Store":

asyncTest "queryFromWithPaging with empty pagingInfo":

let rpc = HistoryQuery(startTime: float(2), endTime: float(5))
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5))

let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())

Expand All @@ -747,7 +748,7 @@ procSuite "Waku Store":

asyncTest "queryFromWithPaging with pagination":
var pinfo = PagingInfo(direction:PagingDirection.FORWARD, pageSize: 1)
let rpc = HistoryQuery(startTime: float(2), endTime: float(5), pagingInfo: pinfo)
let rpc = HistoryQuery(startTime: Timestamp(2), endTime: Timestamp(5), pagingInfo: pinfo)

let messagesResult = await proto.queryFromWithPaging(rpc, listenSwitch.peerInfo.toRemotePeerInfo())

Expand Down
3 changes: 2 additions & 1 deletion tests/v2/test_wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/utils/peers,
../../waku/v2/utils/time,
../../waku/v2/node/wakunode2,
../test_helpers

Expand Down Expand Up @@ -1104,7 +1105,7 @@ procSuite "WakuNode":

# count the total number of retrieved messages from the database
var responseCount = 0
proc data(receiverTimestamp: float64, msg: WakuMessage, psTopic: string) =
proc data(receiverTimestamp: Timestamp, msg: WakuMessage, psTopic: string) =
responseCount += 1
# retrieve all the messages in the db
let res = store.getAll(data)
Expand Down
3 changes: 2 additions & 1 deletion waku/common/wakubridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import
# Waku v2 imports
libp2p/crypto/crypto,
../v2/utils/namespacing,
../v2/utils/time,
../v2/node/wakunode2,
# Common cli config
./config_bridge
Expand Down Expand Up @@ -91,7 +92,7 @@ func toWakuMessage(env: Envelope): WakuMessage =
# Translate a Waku v1 envelope to a Waku v2 message
WakuMessage(payload: env.data,
contentTopic: toV2ContentTopic(env.topic),
timestamp: float64(env.expiry - env.ttl),
timestamp: (getNanosecondTime(env.expiry) - getNanosecondTime(env.ttl)),
version: 1)

proc toWakuV2(bridge: WakuBridge, env: Envelope) {.async.} =
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/node/jsonrpc/jsonrpc_callsigs.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool

# Store API

proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[float64], endTime: Option[float64], pagingOptions: Option[StorePagingOptions]): StoreResponse
proc get_waku_v2_store_v1_messages(pubsubTopicOption: Option[string], contentFiltersOption: Option[seq[HistoryContentFilter]], startTime: Option[Timestamp], endTime: Option[Timestamp], pagingOptions: Option[StorePagingOptions]): StoreResponse

# Filter API

Expand Down
5 changes: 3 additions & 2 deletions waku/v2/node/jsonrpc/jsonrpc_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import
std/[options,tables],
eth/keys,
../../protocol/waku_message,
../../utils/pagination
../../utils/pagination,
../../utils/time

type
StoreResponse* = object
Expand All @@ -21,7 +22,7 @@ type
payload*: seq[byte]
contentTopic*: Option[ContentTopic]
# sender generated timestamp
timestamp*: Option[float64]
timestamp*: Option[Timestamp]

WakuPeer* = object
multiaddr*: string
Expand Down
Loading