diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 35e9f41a60..d924c8f258 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -17,7 +17,6 @@ import ./waku_archive/test_driver_queue, ./waku_archive/test_driver_sqlite_query, ./waku_archive/test_driver_sqlite, - ./waku_archive/test_retention_policy, ./waku_archive/test_waku_archive const os* {.strdefine.} = "" @@ -27,6 +26,7 @@ when os == "Linux" and defined(postgres): import ./waku_archive/test_driver_postgres_query, + ./waku_archive/test_retention_policy, ./waku_archive/test_driver_postgres # Waku store test suite diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 5233d69240..3e21bcea30 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -4,13 +4,14 @@ import std/[sequtils,times], stew/results, testutils/unittests, - chronos + chronos, + os import ../../../waku/common/databases/db_sqlite, ../../../waku/waku_core, ../../../waku/waku_core/message/digest, ../../../waku/waku_archive, - ../../../waku/waku_archive/driver/sqlite_driver, + ../../../waku/waku_archive/driver/postgres_driver, ../../../waku/waku_archive/retention_policy, ../../../waku/waku_archive/retention_policy/retention_policy_capacity, ../../../waku/waku_archive/retention_policy/retention_policy_size, @@ -19,6 +20,18 @@ import ../testlib/wakucore +const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres" + +proc newTestPostgresDriver(): ArchiveDriver = + let driver = PostgresDriver.new(dbUrl = storeMessageDbUrl).tryGet() + discard waitFor driver.reset() + + let initRes = waitFor driver.init() + assert initRes.isOk(), initRes.error + + return driver + + suite "Waku Archive - Retention policy": test "capacity retention policy - windowed message deletion": @@ -27,7 +40,7 @@ suite "Waku Archive - Retention policy": capacity = 100 excess = 60 - let driver = newSqliteArchiveDriver() + let driver = newTestPostgresDriver() let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() @@ -44,9 +57,8 @@ suite "Waku Archive - Retention policy": ## Then let numMessages = (waitFor driver.getMessagesCount()).tryGet() check: - # Expected number of messages is 120 because - # (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete) - # the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store + # Expected number of messages is 115 because + # (capacity = 100) + (half of the overflow window = 15) numMessages == 115 ## Cleanup @@ -54,56 +66,52 @@ suite "Waku Archive - Retention policy": test "size retention policy - windowed message deletion": ## Given - let - # in bytes - sizeLimit:int64 = 52428 - excess = 325 - - let driver = newSqliteArchiveDriver() - - let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit) - var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() + let driver = newTestPostgresDriver() # make sure that the db is empty to before test begins let storedMsg = (waitFor driver.getAllMessages()).tryGet() - # if there are messages in db, empty them + # if there are messages in db, delete them before the test begins if storedMsg.len > 0: let now = getNanosecondTime(getTime().toUnixFloat()) require (waitFor driver.deleteMessagesOlderThanTimestamp(ts=now)).isOk() require (waitFor driver.performVacuum()).isOk() - ## When - ## + # get the minimum/empty size of the database + let sizeLimit = int64((waitFor driver.getDatabaseSize()).tryGet()) + let num_messages = 100 + let retryLimit = 4 - # create a number of messages so that the size of the DB overshoots - for i in 1..excess: - let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) - putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)) + let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit) + var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() + + ## When + # create a number of messages to increase DB size + for i in 1..num_messages: + let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) + putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)) # waitFor is used to synchronously wait for the futures to complete. discard waitFor allFinished(putFutures) + sleep(150) ## Then # calculate the current database size - let sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet()) - - # NOTE: since vacuumin is done manually, this needs to be revisited if vacuuming done automatically + let sizeBeforeRetPolicy = int64((waitFor driver.getDatabaseSize()).tryGet()) + var sizeAfterRetPolicy:int64 = sizeBeforeRetPolicy + var retryCounter = 0 - # get the rows count pre-deletion - let rowsCountBeforeDeletion = (waitFor driver.getMessagesCount()).tryGet() + while (sizeAfterRetPolicy >= sizeBeforeRetPolicy) and (retryCounter < retryLimit): + # execute the retention policy + require (waitFor retentionPolicy.execute(driver)).isOk() - # execute policy provided the current db size oveflows, results in rows deletion - require (sizeDB >= sizeLimit) - require (waitFor retentionPolicy.execute(driver)).isOk() - - # get the number or rows from database - let rowCountAfterDeletion = (waitFor driver.getMessagesCount()).tryGet() + # get the updated DB size post vacuum + sizeAfterRetPolicy = int64((waitFor driver.getDatabaseSize()).tryGet()) + retryCounter += 1 + sleep(150) check: - # size of the database is used to check if the storage limit has been preserved - # check the current database size with the limitSize provided by the user - # it should be lower - rowCountAfterDeletion <= rowsCountBeforeDeletion + # check if the size of the database has been reduced after executing the retention policy + sizeAfterRetPolicy < sizeBeforeRetPolicy ## Cleanup (waitFor driver.close()).expect("driver to close") @@ -114,7 +122,7 @@ suite "Waku Archive - Retention policy": const contentTopic = "test-content-topic" let - driver = newSqliteArchiveDriver() + driver = newTestPostgresDriver() retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) let messages = @[ diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index 3a9262f482..9ed378a202 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -43,6 +43,9 @@ method getMessages*(driver: ArchiveDriver, ascendingOrder = true): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard +method getDbType*(driver: ArchiveDriver): string {.base, raises: [ValueError].} = + raise newException(ValueError, "Database type method not implemented in subclass") + method getMessagesCount*(driver: ArchiveDriver): Future[ArchiveDriverResult[int64]] {.base, async.} = discard diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index a2450c6ce5..b6e0a193dc 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -24,6 +24,9 @@ type PostgresDriver* = ref object of ArchiveDriver writeConnPool: PgAsyncPool readConnPool: PgAsyncPool +method getDbType*(s: PostgresDriver): string = + return "postgres" + proc dropTableQuery(): string = "DROP TABLE messages" @@ -535,3 +538,17 @@ proc sleep*(s: PostgresDriver, seconds: int): return err("exception sleeping: " & getCurrentExceptionMsg()) return ok() + +method performVacuum*(s: PostgresDriver): + Future[ArchiveDriverResult[void]] {.async.} = + ## Perform a VACUUM operation to clean up the database. + debug "starting Postgres database vacuuming" + try: + let execRes = await s.writeConnPool.pgQuery("VACUUM") + if execRes.isErr: + return err("error in Postgres Vacuum operation: " & execRes.error) + except DbError as e: + return err("Database error occurred during Postgres VACUUM: " & $e.msg) + + debug "finished Postgres database vacuuming" + return ok() diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index 9fd266e6d8..9ad6cb36fd 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -84,6 +84,9 @@ proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T var items = SortedSet[Index, IndexedWakuMessage].init() return QueueDriver(items: items, capacity: capacity) +method getDbType*(driver: QueueDriver): string = + return "queue" + proc contains*(driver: QueueDriver, index: Index): bool = ## Return `true` if the store queue already contains the `index`, `false` otherwise. driver.items.eq(index).isOk() diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 1d0cbc5bc5..66a135f6c7 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -47,6 +47,9 @@ type SqliteDriver* = ref object of ArchiveDriver db: SqliteDatabase insertStmt: SqliteStmt[InsertMessageParams, void] +method getDbType*(s: SqliteDriver): string = + return "sqlite" + proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] = # Database initialization diff --git a/waku/waku_archive/retention_policy/retention_policy_capacity.nim b/waku/waku_archive/retention_policy/retention_policy_capacity.nim index bb1d146cf1..f34a3681dc 100644 --- a/waku/waku_archive/retention_policy/retention_policy_capacity.nim +++ b/waku/waku_archive/retention_policy/retention_policy_capacity.nim @@ -71,4 +71,12 @@ method execute*(p: CapacityRetentionPolicy, (await driver.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow)).isOkOr: return err("deleting oldest messages failed: " & error) + # perform vacuum + let resVaccum = await driver.performVacuum() + if resVaccum.isErr(): + return err("vacuumming failed: " & resVaccum.error) + + # sleep to give it some time to complete vacuuming + await sleepAsync(350) + return ok() diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim index 8b512230b2..02c57cbad9 100644 --- a/waku/waku_archive/retention_policy/retention_policy_size.nim +++ b/waku/waku_archive/retention_policy/retention_policy_size.nim @@ -41,6 +41,12 @@ proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T = method execute*(p: SizeRetentionPolicy, driver: ArchiveDriver): Future[RetentionPolicyResult[void]] {.async.} = + + # In SQLite vacuuming is done manually, + let dbEngine = driver.getDbType() + if dbEngine == "sqlite": + return ok() + ## when db size overshoots the database limit, shread 20% of outdated messages # get size of database let dbSize = (await driver.getDatabaseSize()).valueOr: @@ -52,18 +58,26 @@ method execute*(p: SizeRetentionPolicy, if totalSizeOfDB < p.sizeLimit: return ok() - # to shread/delete messsges, get the total row/message count - let numMessages = (await driver.getMessagesCount()).valueOr: - return err("failed to get messages count: " & error) - # NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows # if vacumming is done automatically then we aim to check DB size periodially for efficient # retention policy implementation. + # to shread/delete messsges, get the total row/message count + let numMessages = (await driver.getMessagesCount()).valueOr: + return err("failed to get messages count: " & error) # 80% of the total messages are to be kept, delete others let pageDeleteWindow = int(float(numMessages) * DeleteLimit) + echo ("deleting oldest messages not within limit: " & $pageDeleteWindow) (await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr: return err("deleting oldest messages failed: " & error) + # perform vacuum + let resVaccum = await driver.performVacuum() + if resVaccum.isErr(): + return err("vacuumming failed: " & resVaccum.error) + + # sleep to give it some time to complete vacuuming + await sleepAsync(350) + return ok() diff --git a/waku/waku_archive/retention_policy/retention_policy_time.nim b/waku/waku_archive/retention_policy/retention_policy_time.nim index 27622f2e4d..33b8246aa4 100644 --- a/waku/waku_archive/retention_policy/retention_policy_time.nim +++ b/waku/waku_archive/retention_policy/retention_policy_time.nim @@ -49,4 +49,12 @@ method execute*(p: TimeRetentionPolicy, if res.isErr(): return err("failed to delete oldest messages: " & res.error) + # perform vacuum + let resVaccum = await driver.performVacuum() + if resVaccum.isErr(): + return err("vacuumming failed: " & resVaccum.error) + + # sleep to give it some time to complete vacuuming + await sleepAsync(350) + return ok()