Skip to content

Commit

Permalink
Merge b182bdb into 07beea0
Browse files Browse the repository at this point in the history
  • Loading branch information
ABresting authored Dec 28, 2023
2 parents 07beea0 + b182bdb commit 6164530
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 43 deletions.
2 changes: 1 addition & 1 deletion tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import
./waku_archive/test_driver_queue,
./waku_archive/test_driver_sqlite_query,
./waku_archive/test_driver_sqlite,
./waku_archive/test_retention_policy,
./waku_archive/test_waku_archive

const os* {.strdefine.} = ""
Expand All @@ -27,6 +26,7 @@ when os == "Linux" and
defined(postgres):
import
./waku_archive/test_driver_postgres_query,
./waku_archive/test_retention_policy,
./waku_archive/test_driver_postgres

# Waku store test suite
Expand Down
84 changes: 46 additions & 38 deletions tests/waku_archive/test_retention_policy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import
std/[sequtils,times],
stew/results,
testutils/unittests,
chronos
chronos,
os
import
../../../waku/common/databases/db_sqlite,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../../../waku/waku_archive,
../../../waku/waku_archive/driver/sqlite_driver,
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_archive/retention_policy,
../../../waku/waku_archive/retention_policy/retention_policy_capacity,
../../../waku/waku_archive/retention_policy/retention_policy_size,
Expand All @@ -19,6 +20,18 @@ import
../testlib/wakucore


const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"

proc newTestPostgresDriver(): ArchiveDriver =
let driver = PostgresDriver.new(dbUrl = storeMessageDbUrl).tryGet()
discard waitFor driver.reset()

let initRes = waitFor driver.init()
assert initRes.isOk(), initRes.error

return driver


suite "Waku Archive - Retention policy":

test "capacity retention policy - windowed message deletion":
Expand All @@ -27,7 +40,7 @@ suite "Waku Archive - Retention policy":
capacity = 100
excess = 60

let driver = newSqliteArchiveDriver()
let driver = newTestPostgresDriver()

let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
Expand All @@ -44,66 +57,61 @@ suite "Waku Archive - Retention policy":
## Then
let numMessages = (waitFor driver.getMessagesCount()).tryGet()
check:
# Expected number of messages is 120 because
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
# the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store
# Expected number of messages is 115 because
# (capacity = 100) + (half of the overflow window = 15)
numMessages == 115

## Cleanup
(waitFor driver.close()).expect("driver to close")

test "size retention policy - windowed message deletion":
## Given
let
# in bytes
sizeLimit:int64 = 52428
excess = 325

let driver = newSqliteArchiveDriver()

let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit)
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()
let driver = newTestPostgresDriver()

# make sure that the db is empty to before test begins
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
# if there are messages in db, empty them
# if there are messages in db, delete them before the test begins
if storedMsg.len > 0:
let now = getNanosecondTime(getTime().toUnixFloat())
require (waitFor driver.deleteMessagesOlderThanTimestamp(ts=now)).isOk()
require (waitFor driver.performVacuum()).isOk()

## When
##
# get the minimum/empty size of the database
let sizeLimit = int64((waitFor driver.getDatabaseSize()).tryGet())
let num_messages = 100
let retryLimit = 4

# create a number of messages so that the size of the DB overshoots
for i in 1..excess:
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp))
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit)
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()

## When
# create a number of messages to increase DB size
for i in 1..num_messages:
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp))

# waitFor is used to synchronously wait for the futures to complete.
discard waitFor allFinished(putFutures)
sleep(150)

## Then
# calculate the current database size
let sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet())

# NOTE: since vacuumin is done manually, this needs to be revisited if vacuuming done automatically
let sizeBeforeRetPolicy = int64((waitFor driver.getDatabaseSize()).tryGet())
var sizeAfterRetPolicy:int64 = sizeBeforeRetPolicy
var retryCounter = 0

# get the rows count pre-deletion
let rowsCountBeforeDeletion = (waitFor driver.getMessagesCount()).tryGet()
while (sizeAfterRetPolicy >= sizeBeforeRetPolicy) and (retryCounter < retryLimit):
# execute the retention policy
require (waitFor retentionPolicy.execute(driver)).isOk()

# execute policy provided the current db size oveflows, results in rows deletion
require (sizeDB >= sizeLimit)
require (waitFor retentionPolicy.execute(driver)).isOk()

# get the number or rows from database
let rowCountAfterDeletion = (waitFor driver.getMessagesCount()).tryGet()
# get the updated DB size post vacuum
sizeAfterRetPolicy = int64((waitFor driver.getDatabaseSize()).tryGet())
retryCounter += 1
sleep(150)

check:
# size of the database is used to check if the storage limit has been preserved
# check the current database size with the limitSize provided by the user
# it should be lower
rowCountAfterDeletion <= rowsCountBeforeDeletion
# check if the size of the database has been reduced after executing the retention policy
sizeAfterRetPolicy < sizeBeforeRetPolicy

## Cleanup
(waitFor driver.close()).expect("driver to close")
Expand All @@ -114,7 +122,7 @@ suite "Waku Archive - Retention policy":
const contentTopic = "test-content-topic"

let
driver = newSqliteArchiveDriver()
driver = newTestPostgresDriver()
retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)

let messages = @[
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ method getMessages*(driver: ArchiveDriver,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard

method getDbType*(driver: ArchiveDriver): string {.base, raises: [ValueError].} =
raise newException(ValueError, "Database type method not implemented in subclass")

method getMessagesCount*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard

Expand Down
17 changes: 17 additions & 0 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type PostgresDriver* = ref object of ArchiveDriver
writeConnPool: PgAsyncPool
readConnPool: PgAsyncPool

method getDbType*(s: PostgresDriver): string =
return "postgres"

proc dropTableQuery(): string =
"DROP TABLE messages"

Expand Down Expand Up @@ -535,3 +538,17 @@ proc sleep*(s: PostgresDriver, seconds: int):
return err("exception sleeping: " & getCurrentExceptionMsg())

return ok()

method performVacuum*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
## Perform a VACUUM operation to clean up the database.
debug "starting Postgres database vacuuming"
try:
let execRes = await s.writeConnPool.pgQuery("VACUUM")
if execRes.isErr:
return err("error in Postgres Vacuum operation: " & execRes.error)
except DbError as e:
return err("Database error occurred during Postgres VACUUM: " & $e.msg)

debug "finished Postgres database vacuuming"
return ok()
3 changes: 3 additions & 0 deletions waku/waku_archive/driver/queue_driver/queue_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T
var items = SortedSet[Index, IndexedWakuMessage].init()
return QueueDriver(items: items, capacity: capacity)

method getDbType*(driver: QueueDriver): string =
return "queue"

proc contains*(driver: QueueDriver, index: Index): bool =
## Return `true` if the store queue already contains the `index`, `false` otherwise.
driver.items.eq(index).isOk()
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type SqliteDriver* = ref object of ArchiveDriver
db: SqliteDatabase
insertStmt: SqliteStmt[InsertMessageParams, void]

method getDbType*(s: SqliteDriver): string =
return "sqlite"

proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] =

# Database initialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,12 @@ method execute*(p: CapacityRetentionPolicy,
(await driver.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow)).isOkOr:
return err("deleting oldest messages failed: " & error)

# perform vacuum
let resVaccum = await driver.performVacuum()
if resVaccum.isErr():
return err("vacuumming failed: " & resVaccum.error)

# sleep to give it some time to complete vacuuming
await sleepAsync(350)

return ok()
22 changes: 18 additions & 4 deletions waku/waku_archive/retention_policy/retention_policy_size.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
method execute*(p: SizeRetentionPolicy,
driver: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.async.} =

# In SQLite vacuuming is done manually,
let dbEngine = driver.getDbType()
if dbEngine == "sqlite":
return ok()

## when db size overshoots the database limit, shread 20% of outdated messages
# get size of database
let dbSize = (await driver.getDatabaseSize()).valueOr:
Expand All @@ -52,18 +58,26 @@ method execute*(p: SizeRetentionPolicy,
if totalSizeOfDB < p.sizeLimit:
return ok()

# to shread/delete messsges, get the total row/message count
let numMessages = (await driver.getMessagesCount()).valueOr:
return err("failed to get messages count: " & error)

# NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
# if vacumming is done automatically then we aim to check DB size periodially for efficient
# retention policy implementation.
# to shread/delete messsges, get the total row/message count
let numMessages = (await driver.getMessagesCount()).valueOr:
return err("failed to get messages count: " & error)

# 80% of the total messages are to be kept, delete others
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)
echo ("deleting oldest messages not within limit: " & $pageDeleteWindow)

(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
return err("deleting oldest messages failed: " & error)

# perform vacuum
let resVaccum = await driver.performVacuum()
if resVaccum.isErr():
return err("vacuumming failed: " & resVaccum.error)

# sleep to give it some time to complete vacuuming
await sleepAsync(350)

return ok()
8 changes: 8 additions & 0 deletions waku/waku_archive/retention_policy/retention_policy_time.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,12 @@ method execute*(p: TimeRetentionPolicy,
if res.isErr():
return err("failed to delete oldest messages: " & res.error)

# perform vacuum
let resVaccum = await driver.performVacuum()
if resVaccum.isErr():
return err("vacuumming failed: " & resVaccum.error)

# sleep to give it some time to complete vacuuming
await sleepAsync(350)

return ok()

0 comments on commit 6164530

Please sign in to comment.