Skip to content

Commit

Permalink
chore: Postgres enhance get oldest timestamp (#2687)
Browse files Browse the repository at this point in the history
* postgres: consider also the existing paritions when getting oldest timestamp
* test_driver_postgres_query: adapt test to oldest timestamp
  • Loading branch information
Ivansete-status committed May 14, 2024
1 parent 474a50f commit 13eb1f7
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
19 changes: 17 additions & 2 deletions tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
{.used.}

import
std/[options, sequtils, random, algorithm], testutils/unittests, chronos, chronicles
std/[options, sequtils, strformat, random, algorithm],
testutils/unittests,
chronos,
chronicles
import
../../../waku/waku_archive,
../../../waku/waku_archive/driver as driver_module,
Expand Down Expand Up @@ -1767,9 +1770,21 @@ suite "Postgres driver - queries":
)
).isOk()

## just keep the second resolution.
## Notice that the oldest timestamps considers the minimum partition timestamp, which
## is expressed in seconds.
let oldestPartitionTimestamp =
Timestamp(float(oldestTime) / 1_000_000_000) * 1_000_000_000

var res = await driver.getOldestMessageTimestamp()
assert res.isOk(), res.error
assert res.get() == oldestTime, "Failed to retrieve the latest timestamp"

## We give certain margin of error. The oldest timestamp is obtained from
## the oldest partition timestamp and there might be at most one second of difference
## between the time created in the test and the oldest-partition-timestamp created within
## the driver logic.
assert abs(res.get() - oldestPartitionTimestamp) < (2 * 1_000_000_000),
fmt"Failed to retrieve the latest timestamp {res.get()} != {oldestPartitionTimestamp}"

res = await driver.getNewestMessageTimestamp()
assert res.isOk(), res.error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ proc getLastMoment*(partition: Partition): int64 =
let lastTimeInSec = partition.timeRange.`end`
return lastTimeInSec

proc getPartitionStartTimeInNanosec*(partition: Partition): int64 =
return partition.timeRange.beginning * 1_000_000_000

proc containsMoment*(partition: Partition, time: int64): bool =
## Returns true if the given moment is contained within the partition window,
## 'false' otherwise.
Expand Down
18 changes: 15 additions & 3 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,10 @@ proc getInt(
try:
retInt = parseInt(str)
except ValueError:
return err("exception in getInt, parseInt: " & getCurrentExceptionMsg())
return err(
"exception in getInt, parseInt, str: " & str & " query: " & query & " exception: " &
getCurrentExceptionMsg()
)

return ok(retInt)

Expand All @@ -726,11 +729,20 @@ method getMessagesCount*(
method getOldestMessageTimestamp*(
s: PostgresDriver
): Future[ArchiveDriverResult[Timestamp]] {.async.} =
## In some cases it could happen that we have
## empty partitions which are older than the current stored rows.
## In those cases we want to consider those older partitions as the oldest considered timestamp.
let oldestPartition = s.partitionMngr.getOldestPartition().valueOr:
return err("could not get oldest partition: " & $error)

let oldestPartitionTimeNanoSec = oldestPartition.getPartitionStartTimeInNanosec()

let intRes = await s.getInt("SELECT MIN(storedAt) FROM messages")
if intRes.isErr():
return err("error in getOldestMessageTimestamp: " & intRes.error)
## Just return the oldest partition time considering the partitions set
return ok(Timestamp(oldestPartitionTimeNanoSec))

return ok(Timestamp(intRes.get()))
return ok(Timestamp(min(intRes.get(), oldestPartitionTimeNanoSec)))

method getNewestMessageTimestamp*(
s: PostgresDriver
Expand Down

0 comments on commit 13eb1f7

Please sign in to comment.