From d2e54d3073e79e807e8023cc240f9c393d6cd894 Mon Sep 17 00:00:00 2001 From: ABresting Date: Thu, 21 Dec 2023 16:47:26 +0530 Subject: [PATCH 1/4] postgres vacuum enabled with test case --- tests/waku_archive/test_retention_policy.nim | 53 +++++++++++-------- waku/waku_archive/driver.nim | 3 ++ .../postgres_driver/postgres_driver.nim | 17 ++++++ .../driver/queue_driver/queue_driver.nim | 3 ++ .../driver/sqlite_driver/sqlite_driver.nim | 3 ++ .../retention_policy_size.nim | 41 +++++++++----- 6 files changed, 84 insertions(+), 36 deletions(-) diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 5233d69240..0aac229308 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -4,13 +4,14 @@ import std/[sequtils,times], stew/results, testutils/unittests, - chronos + chronos, + os import ../../../waku/common/databases/db_sqlite, ../../../waku/waku_core, ../../../waku/waku_core/message/digest, ../../../waku/waku_archive, - ../../../waku/waku_archive/driver/sqlite_driver, + ../../../waku/waku_archive/driver/postgres_driver, ../../../waku/waku_archive/retention_policy, ../../../waku/waku_archive/retention_policy/retention_policy_capacity, ../../../waku/waku_archive/retention_policy/retention_policy_size, @@ -19,6 +20,18 @@ import ../testlib/wakucore +const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres" + +proc newTestPostgresDriver(): ArchiveDriver = + let driver = PostgresDriver.new(dbUrl = storeMessageDbUrl).tryGet() + discard waitFor driver.reset() + + let initRes = waitFor driver.init() + assert initRes.isOk(), initRes.error + + return driver + + suite "Waku Archive - Retention policy": test "capacity retention policy - windowed message deletion": @@ -27,7 +40,7 @@ suite "Waku Archive - Retention policy": capacity = 100 excess = 60 - let driver = newSqliteArchiveDriver() + let driver = newTestPostgresDriver() let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() @@ -54,15 +67,8 @@ suite "Waku Archive - Retention policy": test "size retention policy - windowed message deletion": ## Given - let - # in bytes - sizeLimit:int64 = 52428 - excess = 325 - - let driver = newSqliteArchiveDriver() - - let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit) - var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() + let driver = newTestPostgresDriver() + let dbEngine = driver.getDbType() # make sure that the db is empty to before test begins let storedMsg = (waitFor driver.getAllMessages()).tryGet() @@ -72,6 +78,13 @@ suite "Waku Archive - Retention policy": require (waitFor driver.deleteMessagesOlderThanTimestamp(ts=now)).isOk() require (waitFor driver.performVacuum()).isOk() + # in bytes + let sizeLimit:int64 = 10122851 + let excess = 69 + + let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit) + var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() + ## When ## @@ -85,25 +98,19 @@ suite "Waku Archive - Retention policy": ## Then # calculate the current database size - let sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet()) - - # NOTE: since vacuumin is done manually, this needs to be revisited if vacuuming done automatically - - # get the rows count pre-deletion - let rowsCountBeforeDeletion = (waitFor driver.getMessagesCount()).tryGet() + var sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet()) - # execute policy provided the current db size oveflows, results in rows deletion require (sizeDB >= sizeLimit) require (waitFor retentionPolicy.execute(driver)).isOk() - # get the number or rows from database - let rowCountAfterDeletion = (waitFor driver.getMessagesCount()).tryGet() + # get the updated DB size post vacuum + sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet()) check: # size of the database is used to check if the storage limit has been preserved # check the current database size with the limitSize provided by the user # it should be lower - rowCountAfterDeletion <= rowsCountBeforeDeletion + sizeDB <= sizeLimit ## Cleanup (waitFor driver.close()).expect("driver to close") @@ -114,7 +121,7 @@ suite "Waku Archive - Retention policy": const contentTopic = "test-content-topic" let - driver = newSqliteArchiveDriver() + driver = newTestPostgresDriver() retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) let messages = @[ diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index 3a9262f482..9ed378a202 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -43,6 +43,9 @@ method getMessages*(driver: ArchiveDriver, ascendingOrder = true): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.base, async.} = discard +method getDbType*(driver: ArchiveDriver): string {.base, raises: [ValueError].} = + raise newException(ValueError, "Database type method not implemented in subclass") + method getMessagesCount*(driver: ArchiveDriver): Future[ArchiveDriverResult[int64]] {.base, async.} = discard diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index a2450c6ce5..b6e0a193dc 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -24,6 +24,9 @@ type PostgresDriver* = ref object of ArchiveDriver writeConnPool: PgAsyncPool readConnPool: PgAsyncPool +method getDbType*(s: PostgresDriver): string = + return "postgres" + proc dropTableQuery(): string = "DROP TABLE messages" @@ -535,3 +538,17 @@ proc sleep*(s: PostgresDriver, seconds: int): return err("exception sleeping: " & getCurrentExceptionMsg()) return ok() + +method performVacuum*(s: PostgresDriver): + Future[ArchiveDriverResult[void]] {.async.} = + ## Perform a VACUUM operation to clean up the database. + debug "starting Postgres database vacuuming" + try: + let execRes = await s.writeConnPool.pgQuery("VACUUM") + if execRes.isErr: + return err("error in Postgres Vacuum operation: " & execRes.error) + except DbError as e: + return err("Database error occurred during Postgres VACUUM: " & $e.msg) + + debug "finished Postgres database vacuuming" + return ok() diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index 9fd266e6d8..9b61e976a4 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -84,6 +84,9 @@ proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T var items = SortedSet[Index, IndexedWakuMessage].init() return QueueDriver(items: items, capacity: capacity) +method getDbType*(driver: QueueDriver): string = + return "sqlite" + proc contains*(driver: QueueDriver, index: Index): bool = ## Return `true` if the store queue already contains the `index`, `false` otherwise. driver.items.eq(index).isOk() diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 1d0cbc5bc5..66a135f6c7 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -47,6 +47,9 @@ type SqliteDriver* = ref object of ArchiveDriver db: SqliteDatabase insertStmt: SqliteStmt[InsertMessageParams, void] +method getDbType*(s: SqliteDriver): string = + return "sqlite" + proc new*(T: type SqliteDriver, db: SqliteDatabase): ArchiveDriverResult[T] = # Database initialization diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim index 8b512230b2..b5da29fb89 100644 --- a/waku/waku_archive/retention_policy/retention_policy_size.nim +++ b/waku/waku_archive/retention_policy/retention_policy_size.nim @@ -41,29 +41,44 @@ proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T = method execute*(p: SizeRetentionPolicy, driver: ArchiveDriver): Future[RetentionPolicyResult[void]] {.async.} = + + # In SQLite vacuuming is done manually, + let dbEngine = driver.getDbType() + if dbEngine == "sqlite": + return ok() + ## when db size overshoots the database limit, shread 20% of outdated messages # get size of database - let dbSize = (await driver.getDatabaseSize()).valueOr: + var dbSize = (await driver.getDatabaseSize()).valueOr: return err("failed to get database size: " & $error) # database size in bytes - let totalSizeOfDB: int64 = int64(dbSize) + var totalSizeOfDB: int64 = int64(dbSize) if totalSizeOfDB < p.sizeLimit: return ok() + + # NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows + # if vacumming is done automatically then we aim to check DB size periodially for efficient + # retention policy implementation. + while (totalSizeOfDB > p.sizeLimit): + # to shread/delete messsges, get the total row/message count + let numMessages = (await driver.getMessagesCount()).valueOr: + return err("failed to get messages count: " & error) - # to shread/delete messsges, get the total row/message count - let numMessages = (await driver.getMessagesCount()).valueOr: - return err("failed to get messages count: " & error) - - # NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows - # if vacumming is done automatically then we aim to check DB size periodially for efficient - # retention policy implementation. + # 80% of the total messages are to be kept, delete others + let pageDeleteWindow = int(float(numMessages) * DeleteLimit) - # 80% of the total messages are to be kept, delete others - let pageDeleteWindow = int(float(numMessages) * DeleteLimit) + (await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr: + return err("deleting oldest messages failed: " & error) - (await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr: - return err("deleting oldest messages failed: " & error) + # perform vacuum + let resVaccum = await driver.performVacuum() + if resVaccum.isErr(): + return err("vacuumming failed: " & resVaccum.error) + # recompute the DB size + dbSize = (await driver.getDatabaseSize()).valueOr: + return err("failed to get database size: " & $error) + totalSizeOfDB = int64(dbSize) return ok() From 606b7da1ec71ce286c005888d2f04764e9b7105c Mon Sep 17 00:00:00 2001 From: ABresting Date: Fri, 22 Dec 2023 01:05:25 +0530 Subject: [PATCH 2/4] postgres vacuum updated --- tests/waku_archive/test_retention_policy.nim | 15 +++++++++------ .../retention_policy/retention_policy_size.nim | 9 +++++++-- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 0aac229308..63e94f7f67 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -79,18 +79,18 @@ suite "Waku Archive - Retention policy": require (waitFor driver.performVacuum()).isOk() # in bytes - let sizeLimit:int64 = 10122851 - let excess = 69 + let sizeLimit:int64 = 10422851 + let excess = 800 let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit) var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() ## When - ## + ## # create a number of messages so that the size of the DB overshoots for i in 1..excess: - let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) + let msg = fakeWakuMessage(payload= @[byte (i*10)], contentTopic=DefaultContentTopic, ts=Timestamp(i)) putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)) # waitFor is used to synchronously wait for the futures to complete. @@ -100,9 +100,12 @@ suite "Waku Archive - Retention policy": # calculate the current database size var sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet()) - require (sizeDB >= sizeLimit) + if (sizeDB < sizeLimit): + # we put a warning/debug that the size of the DB is less than the limit already + echo ("WARNING: DB size is less than the limit. This test may not work as expected") + require (waitFor retentionPolicy.execute(driver)).isOk() - + # get the updated DB size post vacuum sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet()) diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim index b5da29fb89..274f46408a 100644 --- a/waku/waku_archive/retention_policy/retention_policy_size.nim +++ b/waku/waku_archive/retention_policy/retention_policy_size.nim @@ -54,6 +54,8 @@ method execute*(p: SizeRetentionPolicy, # database size in bytes var totalSizeOfDB: int64 = int64(dbSize) + let retryLimit = 2 + var retryCounter:int = 0 if totalSizeOfDB < p.sizeLimit: return ok() @@ -61,7 +63,7 @@ method execute*(p: SizeRetentionPolicy, # NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows # if vacumming is done automatically then we aim to check DB size periodially for efficient # retention policy implementation. - while (totalSizeOfDB > p.sizeLimit): + while (totalSizeOfDB > p.sizeLimit) and (retryLimit > retryCounter): # to shread/delete messsges, get the total row/message count let numMessages = (await driver.getMessagesCount()).valueOr: return err("failed to get messages count: " & error) @@ -76,9 +78,12 @@ method execute*(p: SizeRetentionPolicy, let resVaccum = await driver.performVacuum() if resVaccum.isErr(): return err("vacuumming failed: " & resVaccum.error) - # recompute the DB size + # recompute the DB size to check if the size has actually reduced or not + dbSize = (await driver.getDatabaseSize()).valueOr: return err("failed to get database size: " & $error) totalSizeOfDB = int64(dbSize) + retryCounter += 1 + sleep(150*retryCounter) return ok() From 9eedf812ab9d9a7c4deaa6abb982e4ad6ea24f72 Mon Sep 17 00:00:00 2001 From: ABresting Date: Fri, 22 Dec 2023 10:13:57 +0530 Subject: [PATCH 3/4] postgres vacuum excluded for MacOS --- tests/all_tests_waku.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index 35e9f41a60..d924c8f258 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -17,7 +17,6 @@ import ./waku_archive/test_driver_queue, ./waku_archive/test_driver_sqlite_query, ./waku_archive/test_driver_sqlite, - ./waku_archive/test_retention_policy, ./waku_archive/test_waku_archive const os* {.strdefine.} = "" @@ -27,6 +26,7 @@ when os == "Linux" and defined(postgres): import ./waku_archive/test_driver_postgres_query, + ./waku_archive/test_retention_policy, ./waku_archive/test_driver_postgres # Waku store test suite From b182bdb801de665e1096b4e9e61a04d07794f1ae Mon Sep 17 00:00:00 2001 From: ABresting Date: Thu, 28 Dec 2023 19:11:13 +0530 Subject: [PATCH 4/4] updated review code --- tests/waku_archive/test_retention_policy.nim | 52 +++++++++--------- .../driver/queue_driver/queue_driver.nim | 2 +- .../retention_policy_capacity.nim | 8 +++ .../retention_policy_size.nim | 54 +++++++++---------- .../retention_policy_time.nim | 8 +++ 5 files changed, 66 insertions(+), 58 deletions(-) diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 63e94f7f67..3e21bcea30 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -11,7 +11,7 @@ import ../../../waku/waku_core, ../../../waku/waku_core/message/digest, ../../../waku/waku_archive, - ../../../waku/waku_archive/driver/postgres_driver, + ../../../waku/waku_archive/driver/postgres_driver, ../../../waku/waku_archive/retention_policy, ../../../waku/waku_archive/retention_policy/retention_policy_capacity, ../../../waku/waku_archive/retention_policy/retention_policy_size, @@ -57,9 +57,8 @@ suite "Waku Archive - Retention policy": ## Then let numMessages = (waitFor driver.getMessagesCount()).tryGet() check: - # Expected number of messages is 120 because - # (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete) - # the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store + # Expected number of messages is 115 because + # (capacity = 100) + (half of the overflow window = 15) numMessages == 115 ## Cleanup @@ -68,52 +67,51 @@ suite "Waku Archive - Retention policy": test "size retention policy - windowed message deletion": ## Given let driver = newTestPostgresDriver() - let dbEngine = driver.getDbType() # make sure that the db is empty to before test begins let storedMsg = (waitFor driver.getAllMessages()).tryGet() - # if there are messages in db, empty them + # if there are messages in db, delete them before the test begins if storedMsg.len > 0: let now = getNanosecondTime(getTime().toUnixFloat()) require (waitFor driver.deleteMessagesOlderThanTimestamp(ts=now)).isOk() require (waitFor driver.performVacuum()).isOk() - # in bytes - let sizeLimit:int64 = 10422851 - let excess = 800 + # get the minimum/empty size of the database + let sizeLimit = int64((waitFor driver.getDatabaseSize()).tryGet()) + let num_messages = 100 + let retryLimit = 4 let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit) var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() ## When - ## - - # create a number of messages so that the size of the DB overshoots - for i in 1..excess: - let msg = fakeWakuMessage(payload= @[byte (i*10)], contentTopic=DefaultContentTopic, ts=Timestamp(i)) - putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)) + # create a number of messages to increase DB size + for i in 1..num_messages: + let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) + putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), computeMessageHash(DefaultPubsubTopic, msg), msg.timestamp)) # waitFor is used to synchronously wait for the futures to complete. discard waitFor allFinished(putFutures) + sleep(150) ## Then # calculate the current database size - var sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet()) + let sizeBeforeRetPolicy = int64((waitFor driver.getDatabaseSize()).tryGet()) + var sizeAfterRetPolicy:int64 = sizeBeforeRetPolicy + var retryCounter = 0 - if (sizeDB < sizeLimit): - # we put a warning/debug that the size of the DB is less than the limit already - echo ("WARNING: DB size is less than the limit. This test may not work as expected") - - require (waitFor retentionPolicy.execute(driver)).isOk() + while (sizeAfterRetPolicy >= sizeBeforeRetPolicy) and (retryCounter < retryLimit): + # execute the retention policy + require (waitFor retentionPolicy.execute(driver)).isOk() - # get the updated DB size post vacuum - sizeDB = int64((waitFor driver.getDatabaseSize()).tryGet()) + # get the updated DB size post vacuum + sizeAfterRetPolicy = int64((waitFor driver.getDatabaseSize()).tryGet()) + retryCounter += 1 + sleep(150) check: - # size of the database is used to check if the storage limit has been preserved - # check the current database size with the limitSize provided by the user - # it should be lower - sizeDB <= sizeLimit + # check if the size of the database has been reduced after executing the retention policy + sizeAfterRetPolicy < sizeBeforeRetPolicy ## Cleanup (waitFor driver.close()).expect("driver to close") diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index 9b61e976a4..9ad6cb36fd 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -85,7 +85,7 @@ proc new*(T: type QueueDriver, capacity: int = QueueDriverDefaultMaxCapacity): T return QueueDriver(items: items, capacity: capacity) method getDbType*(driver: QueueDriver): string = - return "sqlite" + return "queue" proc contains*(driver: QueueDriver, index: Index): bool = ## Return `true` if the store queue already contains the `index`, `false` otherwise. diff --git a/waku/waku_archive/retention_policy/retention_policy_capacity.nim b/waku/waku_archive/retention_policy/retention_policy_capacity.nim index bb1d146cf1..f34a3681dc 100644 --- a/waku/waku_archive/retention_policy/retention_policy_capacity.nim +++ b/waku/waku_archive/retention_policy/retention_policy_capacity.nim @@ -71,4 +71,12 @@ method execute*(p: CapacityRetentionPolicy, (await driver.deleteOldestMessagesNotWithinLimit(limit=p.capacity + p.deleteWindow)).isOkOr: return err("deleting oldest messages failed: " & error) + # perform vacuum + let resVaccum = await driver.performVacuum() + if resVaccum.isErr(): + return err("vacuumming failed: " & resVaccum.error) + + # sleep to give it some time to complete vacuuming + await sleepAsync(350) + return ok() diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim index 274f46408a..02c57cbad9 100644 --- a/waku/waku_archive/retention_policy/retention_policy_size.nim +++ b/waku/waku_archive/retention_policy/retention_policy_size.nim @@ -49,41 +49,35 @@ method execute*(p: SizeRetentionPolicy, ## when db size overshoots the database limit, shread 20% of outdated messages # get size of database - var dbSize = (await driver.getDatabaseSize()).valueOr: + let dbSize = (await driver.getDatabaseSize()).valueOr: return err("failed to get database size: " & $error) # database size in bytes - var totalSizeOfDB: int64 = int64(dbSize) - let retryLimit = 2 - var retryCounter:int = 0 + let totalSizeOfDB: int64 = int64(dbSize) if totalSizeOfDB < p.sizeLimit: return ok() - - # NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows - # if vacumming is done automatically then we aim to check DB size periodially for efficient - # retention policy implementation. - while (totalSizeOfDB > p.sizeLimit) and (retryLimit > retryCounter): - # to shread/delete messsges, get the total row/message count - let numMessages = (await driver.getMessagesCount()).valueOr: - return err("failed to get messages count: " & error) - - # 80% of the total messages are to be kept, delete others - let pageDeleteWindow = int(float(numMessages) * DeleteLimit) - - (await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr: - return err("deleting oldest messages failed: " & error) - - # perform vacuum - let resVaccum = await driver.performVacuum() - if resVaccum.isErr(): - return err("vacuumming failed: " & resVaccum.error) - # recompute the DB size to check if the size has actually reduced or not - - dbSize = (await driver.getDatabaseSize()).valueOr: - return err("failed to get database size: " & $error) - totalSizeOfDB = int64(dbSize) - retryCounter += 1 - sleep(150*retryCounter) + + # NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows + # if vacumming is done automatically then we aim to check DB size periodially for efficient + # retention policy implementation. + # to shread/delete messsges, get the total row/message count + let numMessages = (await driver.getMessagesCount()).valueOr: + return err("failed to get messages count: " & error) + + # 80% of the total messages are to be kept, delete others + let pageDeleteWindow = int(float(numMessages) * DeleteLimit) + echo ("deleting oldest messages not within limit: " & $pageDeleteWindow) + + (await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr: + return err("deleting oldest messages failed: " & error) + + # perform vacuum + let resVaccum = await driver.performVacuum() + if resVaccum.isErr(): + return err("vacuumming failed: " & resVaccum.error) + + # sleep to give it some time to complete vacuuming + await sleepAsync(350) return ok() diff --git a/waku/waku_archive/retention_policy/retention_policy_time.nim b/waku/waku_archive/retention_policy/retention_policy_time.nim index 27622f2e4d..33b8246aa4 100644 --- a/waku/waku_archive/retention_policy/retention_policy_time.nim +++ b/waku/waku_archive/retention_policy/retention_policy_time.nim @@ -49,4 +49,12 @@ method execute*(p: TimeRetentionPolicy, if res.isErr(): return err("failed to delete oldest messages: " & res.error) + # perform vacuum + let resVaccum = await driver.performVacuum() + if resVaccum.isErr(): + return err("vacuumming failed: " & resVaccum.error) + + # sleep to give it some time to complete vacuuming + await sleepAsync(350) + return ok()