From 0b7e863bee374f4e5ad0d929d761137cac483926 Mon Sep 17 00:00:00 2001 From: ABresting Date: Mon, 2 Oct 2023 16:25:10 +0530 Subject: [PATCH 1/6] chore: added size based retention policy --- tests/waku_archive/test_retention_policy.nim | 41 +++++++++ waku/common/databases/db_sqlite.nim | 3 +- waku/waku_archive/driver.nim | 10 +++ waku/waku_archive/driver/builder.nim | 1 + .../driver/queue_driver/queue_driver.nim | 14 ++- .../driver/sqlite_driver/sqlite_driver.nim | 13 +++ .../waku_archive/retention_policy/builder.nim | 37 +++++++- .../retention_policy_size.nim | 87 +++++++++++++++++++ 8 files changed, 203 insertions(+), 3 deletions(-) create mode 100644 waku/waku_archive/retention_policy/retention_policy_size.nim diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index e08455d275..01e012b44c 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -12,6 +12,7 @@ import ../../../waku/waku_archive/driver/sqlite_driver, ../../../waku/waku_archive/retention_policy, ../../../waku/waku_archive/retention_policy/retention_policy_capacity, + ../../../waku/waku_archive/retention_policy/retention_policy_size, ../testlib/common, ../testlib/wakucore @@ -53,6 +54,45 @@ suite "Waku Archive - Retention policy": ## Cleanup (waitFor driver.close()).expect("driver to close") + + test "size retention policy - windowed message deletion": + ## Given + let + # in megabytes + sizeLimit:float = 0.05 + excess = 123 + + let driver = newTestArchiveDriver() + + let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit) + + ## When + + var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() + var retentionFutures = newSeq[Future[ArchiveDriverResult[void]]]() + + for i in 1..excess: + let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) + putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)) + retentionFutures.add(retentionPolicy.execute(driver)) + + # waitFor is used to synchronously wait for the futures to complete. + discard waitFor allFinished(putFutures & retentionFutures) + + ## Then + # calculate the current database size + let pageSize = (waitFor driver.getPagesSize()).tryGet() + let pageCount = (waitFor driver.getPagesCount()).tryGet() + let sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0) + + check: + # size of the database is used to check if the storage limit has been preserved + # check the current database size with the limitSize provided by the user + # it should be lower + sizeDB <= sizeLimit + + ## Cleanup + (waitFor driver.close()).expect("driver to close") test "store capacity should be limited": ## Given @@ -90,3 +130,4 @@ suite "Waku Archive - Retention policy": ## Cleanup (waitFor driver.close()).expect("driver to close") + diff --git a/waku/common/databases/db_sqlite.nim b/waku/common/databases/db_sqlite.nim index 48eeaa361f..837cf3ab86 100644 --- a/waku/common/databases/db_sqlite.nim +++ b/waku/common/databases/db_sqlite.nim @@ -484,4 +484,5 @@ proc performSqliteVacuum*(db: SqliteDatabase): DatabaseResult[void] = if resVacuum.isErr(): return err("failed to execute vacuum: " & resVacuum.error) - debug "finished sqlite database vacuuming" \ No newline at end of file + debug "finished sqlite database vacuuming" + ok() diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index c3f2accd16..011e88c52b 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -45,6 +45,15 @@ method getMessages*(driver: ArchiveDriver, method getMessagesCount*(driver: ArchiveDriver): Future[ArchiveDriverResult[int64]] {.base, async.} = discard +method getPagesCount*(driver: ArchiveDriver): + Future[ArchiveDriverResult[int64]] {.base, async.} = discard + +method getPagesSize*(driver: ArchiveDriver): + Future[ArchiveDriverResult[int64]] {.base, async.} = discard + +method performsVacuum*(driver: ArchiveDriver): + Future[ArchiveDriverResult[void]] {.base, async.} = discard + method getOldestMessageTimestamp*(driver: ArchiveDriver): Future[ArchiveDriverResult[Timestamp]] {.base, async.} = discard @@ -61,3 +70,4 @@ method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver, method close*(driver: ArchiveDriver): Future[ArchiveDriverResult[void]] {.base, async.} = discard + diff --git a/waku/waku_archive/driver/builder.nim b/waku/waku_archive/driver/builder.nim index f8a47afd7b..a2211c0104 100644 --- a/waku/waku_archive/driver/builder.nim +++ b/waku/waku_archive/driver/builder.nim @@ -105,3 +105,4 @@ proc new*(T: type ArchiveDriver, debug "setting up in-memory waku archive driver" let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages return ok(driver) + diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index 9a8b54f852..4b0b545036 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -280,6 +280,18 @@ method getMessagesCount*(driver: QueueDriver): Future[ArchiveDriverResult[int64]] {.async} = return ok(int64(driver.len())) +method getPagesCount*(driver: QueueDriver): + Future[ArchiveDriverResult[int64]] {.async} = + return ok(int64(driver.len())) + +method getPagesSize*(driver: QueueDriver): + Future[ArchiveDriverResult[int64]] {.async} = + return ok(int64(driver.len())) + +method performsVacuum*(driver: QueueDriver): + Future[ArchiveDriverResult[void]] {.async.} = + return err("interface method not implemented") + method getOldestMessageTimestamp*(driver: QueueDriver): Future[ArchiveDriverResult[Timestamp]] {.async.} = return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime) @@ -302,4 +314,4 @@ method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver, method close*(driver: QueueDriver): Future[ArchiveDriverResult[void]] {.async.} = - return ok() \ No newline at end of file + return ok() diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 8746838a97..545ac3c126 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -109,6 +109,18 @@ method getMessagesCount*(s: SqliteDriver): Future[ArchiveDriverResult[int64]] {.async.} = return s.db.getMessageCount() +method getPagesCount*(s: SqliteDriver): + Future[ArchiveDriverResult[int64]] {.async.} = + return s.db.getPageCount() + +method getPagesSize*(s: SqliteDriver): + Future[ArchiveDriverResult[int64]] {.async.} = + return s.db.getPageSize() + +method performsVacuum*(s: SqliteDriver): + Future[ArchiveDriverResult[void]] {.async.} = + return s.db.performSqliteVacuum() + method getOldestMessageTimestamp*(s: SqliteDriver): Future[ArchiveDriverResult[Timestamp]] {.async.} = return s.db.selectOldestReceiverTimestamp() @@ -135,3 +147,4 @@ method close*(s: SqliteDriver): # Close connection s.db.close() return ok() + diff --git a/waku/waku_archive/retention_policy/builder.nim b/waku/waku_archive/retention_policy/builder.nim index 3cb84d7974..7c2b6175b8 100644 --- a/waku/waku_archive/retention_policy/builder.nim +++ b/waku/waku_archive/retention_policy/builder.nim @@ -11,7 +11,8 @@ import import ../retention_policy, ./retention_policy_time, - ./retention_policy_capacity + ./retention_policy_capacity, + ./retention_policy_size proc new*(T: type RetentionPolicy, retPolicy: string): @@ -51,5 +52,39 @@ proc new*(T: type RetentionPolicy, let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity) return ok(some(retPolicy)) + elif policy == "size": + var retentionSize: string + retentionSize = policyArgs + + # captures the size unit such as Gb or Mb + let sizeUnit = retentionSize.substr(retentionSize.len-2) + # captures the string type number data of the size provided + let sizeQuantityStr = retentionSize.substr(0,retentionSize.len-3) + # to hold the numeric value data of size + var sizeQuantity: float + + if sizeUnit in ["gb", "Gb", "GB", "gB"]: + # parse the actual value into integer type var + try: + sizeQuantity = parseFloat(sizeQuantityStr) + except ValueError: + return err("invalid size retention policy argument") + # Gb data is converted into Mb for uniform processing + sizeQuantity = sizeQuantity * 1024 + elif sizeUnit in ["mb", "Mb", "MB", "mB"]: + try: + sizeQuantity = parseFloat(sizeQuantityStr) + except ValueError: + return err("invalid size retention policy argument") + else: + return err ("""invalid size retention value unit: expected "Mb" or "Gb" but got """ & sizeUnit ) + + if sizeQuantity <= 0: + return err("invalid size retention policy argument: a non-zero value is required") + + let retPolicy: RetentionPolicy = SizeRetentionPolicy.init(sizeQuantity) + return ok(some(retPolicy)) + else: return err("unknown retention policy") + diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim new file mode 100644 index 0000000000..dc20e1f215 --- /dev/null +++ b/waku/waku_archive/retention_policy/retention_policy_size.nim @@ -0,0 +1,87 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/times, + stew/results, + chronicles, + chronos +import + ../driver, + ../retention_policy + +logScope: + topics = "waku archive retention_policy" + +# default size is 30 Gb +const DefaultRetentionSize*: float = 30_720 + +# to remove 20% of the outdated data from database +const DeleteLimit = 0.80 + +type + # SizeRetentionPolicy implements auto delete as follows: + # - sizeLimit is the size in megabytes (Mbs) the database can grow upto + # to reduce the size of the databases, remove the rows/number-of-messages + # DeleteLimit is the total number of messages to delete beyond this limit + # when the database size crosses the sizeLimit, then only a fraction of messages are kept, + # rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(), + # upon deletion process the fragmented space is retrieve back using Vacuum process. + SizeRetentionPolicy* = ref object of RetentionPolicy + sizeLimit: float + +proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T = + SizeRetentionPolicy( + sizeLimit: size + ) + +method execute*(p: SizeRetentionPolicy, + driver: ArchiveDriver): + Future[RetentionPolicyResult[void]] {.async.} = + ## when db size overshoots the database limit, shread 20% of outdated messages + + # to get the size of the database, pageCount and PageSize is required + # get page count in "messages" database + var pageCountRes = await driver.getPagesCount() + if pageCountRes.isErr(): + return err("failed to get Pages count: " & pageCountRes.error) + + var pageCount: int64 = pageCountRes.value + + # get page size of database + let pageSizeRes = await driver.getPagesSize() + var pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024) + + if pageSize == 0: + return err("failed to get Page size: " & pageSizeRes.error) + + # database size in megabytes (Mb) + var totalSizeOfDB: float = float(pageSize * pageCount)/1024.0 + + # check if current databse size crosses the db size limit + if totalSizeOfDB < p.sizeLimit: + return ok() + + # to shread/delete messsges, get the total row/message count + var numMessagesRes = await driver.getMessagesCount() + if numMessagesRes.isErr(): + return err("failed to get messages count: " & numMessagesRes.error) + var numMessages = numMessagesRes.value + + # 80% of the total messages are to be kept, delete others + let pageDeleteWindow = int(float(numMessages) * DeleteLimit) + + let res = await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow) + if res.isErr(): + return err("deleting oldest messages failed: " & res.error) + + # vacuum to get the deleted pages defragments to save storage space + # this will resize the database size + let resVaccum = await driver.performsVacuum() + if resVaccum.isErr(): + return err("vacuumming failed: " & resVaccum.error) + + return ok() + From 9fedc9f207bfd4f0c34e9acb7e6498dd2e968e9d Mon Sep 17 00:00:00 2001 From: ABresting Date: Mon, 2 Oct 2023 23:55:01 +0530 Subject: [PATCH 2/6] chore: post review code - size based retention policy --- tests/waku_archive/test_retention_policy.nim | 20 ++++++++++++------- waku/waku_archive/driver.nim | 2 +- .../driver/sqlite_driver/sqlite_driver.nim | 2 +- .../waku_archive/retention_policy/builder.nim | 2 +- .../retention_policy_size.nim | 14 ++++++------- 5 files changed, 23 insertions(+), 17 deletions(-) diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 01e012b44c..7799c076e4 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -69,21 +69,27 @@ suite "Waku Archive - Retention policy": ## When var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() - var retentionFutures = newSeq[Future[ArchiveDriverResult[void]]]() for i in 1..excess: let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)) - retentionFutures.add(retentionPolicy.execute(driver)) # waitFor is used to synchronously wait for the futures to complete. - discard waitFor allFinished(putFutures & retentionFutures) - + discard waitFor allFinished(putFutures) + ## Then # calculate the current database size - let pageSize = (waitFor driver.getPagesSize()).tryGet() - let pageCount = (waitFor driver.getPagesCount()).tryGet() - let sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0) + var pageSize = (waitFor driver.getPagesSize()).tryGet() + var pageCount = (waitFor driver.getPagesCount()).tryGet() + var sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0) + # execute policy if the current db size oveflows + if sizeDB >= sizeLimit: + require (waitFor retentionPolicy.execute(driver)).isOk() + + # update the current db size + pageSize = (waitFor driver.getPagesSize()).tryGet() + pageCount = (waitFor driver.getPagesCount()).tryGet() + sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0) check: # size of the database is used to check if the storage limit has been preserved diff --git a/waku/waku_archive/driver.nim b/waku/waku_archive/driver.nim index 011e88c52b..64c6b3318a 100644 --- a/waku/waku_archive/driver.nim +++ b/waku/waku_archive/driver.nim @@ -51,7 +51,7 @@ method getPagesCount*(driver: ArchiveDriver): method getPagesSize*(driver: ArchiveDriver): Future[ArchiveDriverResult[int64]] {.base, async.} = discard -method performsVacuum*(driver: ArchiveDriver): +method performVacuum*(driver: ArchiveDriver): Future[ArchiveDriverResult[void]] {.base, async.} = discard method getOldestMessageTimestamp*(driver: ArchiveDriver): diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 545ac3c126..53da379b1a 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -117,7 +117,7 @@ method getPagesSize*(s: SqliteDriver): Future[ArchiveDriverResult[int64]] {.async.} = return s.db.getPageSize() -method performsVacuum*(s: SqliteDriver): +method performVacuum*(s: SqliteDriver): Future[ArchiveDriverResult[void]] {.async.} = return s.db.performSqliteVacuum() diff --git a/waku/waku_archive/retention_policy/builder.nim b/waku/waku_archive/retention_policy/builder.nim index 7c2b6175b8..2c4b3cf12a 100644 --- a/waku/waku_archive/retention_policy/builder.nim +++ b/waku/waku_archive/retention_policy/builder.nim @@ -68,7 +68,7 @@ proc new*(T: type RetentionPolicy, try: sizeQuantity = parseFloat(sizeQuantityStr) except ValueError: - return err("invalid size retention policy argument") + return err("invalid size retention policy argument: " & getCurrentExceptionMsg()) # Gb data is converted into Mb for uniform processing sizeQuantity = sizeQuantity * 1024 elif sizeUnit in ["mb", "Mb", "MB", "mB"]: diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim index dc20e1f215..9d4c099069 100644 --- a/waku/waku_archive/retention_policy/retention_policy_size.nim +++ b/waku/waku_archive/retention_policy/retention_policy_size.nim @@ -44,31 +44,31 @@ method execute*(p: SizeRetentionPolicy, # to get the size of the database, pageCount and PageSize is required # get page count in "messages" database - var pageCountRes = await driver.getPagesCount() + let pageCountRes = await driver.getPagesCount() if pageCountRes.isErr(): return err("failed to get Pages count: " & pageCountRes.error) - var pageCount: int64 = pageCountRes.value + let pageCount: int64 = pageCountRes.value # get page size of database let pageSizeRes = await driver.getPagesSize() - var pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024) + let pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024) if pageSize == 0: return err("failed to get Page size: " & pageSizeRes.error) # database size in megabytes (Mb) - var totalSizeOfDB: float = float(pageSize * pageCount)/1024.0 + let totalSizeOfDB: float = float(pageSize * pageCount)/1024.0 # check if current databse size crosses the db size limit if totalSizeOfDB < p.sizeLimit: return ok() # to shread/delete messsges, get the total row/message count - var numMessagesRes = await driver.getMessagesCount() + let numMessagesRes = await driver.getMessagesCount() if numMessagesRes.isErr(): return err("failed to get messages count: " & numMessagesRes.error) - var numMessages = numMessagesRes.value + let numMessages = numMessagesRes.value # 80% of the total messages are to be kept, delete others let pageDeleteWindow = int(float(numMessages) * DeleteLimit) @@ -79,7 +79,7 @@ method execute*(p: SizeRetentionPolicy, # vacuum to get the deleted pages defragments to save storage space # this will resize the database size - let resVaccum = await driver.performsVacuum() + let resVaccum = await driver.performVacuum() if resVaccum.isErr(): return err("vacuumming failed: " & resVaccum.error) From d7140ddd77c417c4766f471829ab975fd56978c7 Mon Sep 17 00:00:00 2001 From: ABresting Date: Mon, 9 Oct 2023 14:02:37 +0200 Subject: [PATCH 3/6] chore: review integrated, size retention updated, vacuuming in capacity retention --- docs/operators/how-to/configure-store.md | 3 +- tests/waku_archive/test_retention_policy.nim | 47 ++++++++----- .../retention_policy_capacity.nim | 6 ++ .../retention_policy_size.nim | 67 +++++++++++-------- 4 files changed, 79 insertions(+), 44 deletions(-) diff --git a/docs/operators/how-to/configure-store.md b/docs/operators/how-to/configure-store.md index c00140d85c..d340ce9057 100644 --- a/docs/operators/how-to/configure-store.md +++ b/docs/operators/how-to/configure-store.md @@ -31,9 +31,10 @@ If the waku store node is enabled (the `--store` option is set to `true`) the no There is a set of configuration options to customize the waku store protocol's message store. These are the most relevant: -* `--store-message-retention-policy`: This option controls the retention policy i.e., how long certain messages will be persisted. Two different retention policies are supported: +* `--store-message-retention-policy`: This option controls the retention policy i.e., how long certain messages will be persisted. Three different retention policies are supported: + The time retention policy,`time:` (e.g., `time:14400`) + The capacity retention policy,`capacity:` (e.g, `capacity:25000`) + + The size retention policy,`size:` (e.g, `size:25Gb`) + To disable the retention policy, explicitly, set this option to to `""`, an empty string. * `--store-message-db-url`: The message store database url option controls the message storage engine. This option follows the [_SQLAlchemy_ database URL format](https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls). diff --git a/tests/waku_archive/test_retention_policy.nim b/tests/waku_archive/test_retention_policy.nim index 7799c076e4..0927a15e80 100644 --- a/tests/waku_archive/test_retention_policy.nim +++ b/tests/waku_archive/test_retention_policy.nim @@ -1,7 +1,7 @@ {.used.} import - std/sequtils, + std/[sequtils,times], stew/results, testutils/unittests, chronos @@ -31,18 +31,21 @@ suite "Waku Archive - Retention policy": ## Given let capacity = 100 - excess = 65 + excess = 60 let driver = newTestArchiveDriver() let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity) + var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() ## When for i in 1..capacity+excess: let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) + putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)) + + discard waitFor allFinished(putFutures) - require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk() - require (waitFor retentionPolicy.execute(driver)).isOk() + require (waitFor retentionPolicy.execute(driver)).isOk() ## Then let numMessages = (waitFor driver.getMessagesCount()).tryGet() @@ -50,7 +53,7 @@ suite "Waku Archive - Retention policy": # Expected number of messages is 120 because # (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete) # the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store - numMessages == 120 + numMessages == 115 ## Cleanup (waitFor driver.close()).expect("driver to close") @@ -60,16 +63,29 @@ suite "Waku Archive - Retention policy": let # in megabytes sizeLimit:float = 0.05 - excess = 123 + excess = 325 let driver = newTestArchiveDriver() let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit) + var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() - ## When + # variables to check the db size + var pageSize = (waitFor driver.getPagesSize()).tryGet() + var pageCount = (waitFor driver.getPagesCount()).tryGet() + var sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0) - var putFutures = newSeq[Future[ArchiveDriverResult[void]]]() + # make sure that the db is empty to before test begins + let storedMsg = (waitFor driver.getAllMessages()).tryGet() + # if there are messages in db, empty them + if storedMsg.len > 0: + let now = getNanosecondTime(getTime().toUnixFloat()) + require (waitFor driver.deleteMessagesOlderThanTimestamp(ts=now)).isOk() + require (waitFor driver.performVacuum()).isOk() + + ## When + # create a number of messages so that the size of the DB overshoots for i in 1..excess: let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i)) putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)) @@ -79,12 +95,13 @@ suite "Waku Archive - Retention policy": ## Then # calculate the current database size - var pageSize = (waitFor driver.getPagesSize()).tryGet() - var pageCount = (waitFor driver.getPagesCount()).tryGet() - var sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0) - # execute policy if the current db size oveflows - if sizeDB >= sizeLimit: - require (waitFor retentionPolicy.execute(driver)).isOk() + pageSize = (waitFor driver.getPagesSize()).tryGet() + pageCount = (waitFor driver.getPagesCount()).tryGet() + sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0) + + # execute policy provided the current db size oveflows + require (sizeDB >= sizeLimit) + require (waitFor retentionPolicy.execute(driver)).isOk() # update the current db size pageSize = (waitFor driver.getPagesSize()).tryGet() @@ -94,7 +111,7 @@ suite "Waku Archive - Retention policy": check: # size of the database is used to check if the storage limit has been preserved # check the current database size with the limitSize provided by the user - # it should be lower + # it should be lower sizeDB <= sizeLimit ## Cleanup diff --git a/waku/waku_archive/retention_policy/retention_policy_capacity.nim b/waku/waku_archive/retention_policy/retention_policy_capacity.nim index 72b243301e..73430c597f 100644 --- a/waku/waku_archive/retention_policy/retention_policy_capacity.nim +++ b/waku/waku_archive/retention_policy/retention_policy_capacity.nim @@ -75,4 +75,10 @@ method execute*(p: CapacityRetentionPolicy, if res.isErr(): return err("deleting oldest messages failed: " & res.error) + # vacuum to get the deleted pages defragments to save storage space + # this will resize the database size + let resVaccum = await driver.performVacuum() + if resVaccum.isErr(): + return err("vacuumming failed: " & resVaccum.error) + return ok() diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim index 9d4c099069..8e6bb2538e 100644 --- a/waku/waku_archive/retention_policy/retention_policy_size.nim +++ b/waku/waku_archive/retention_policy/retention_policy_size.nim @@ -7,7 +7,8 @@ import std/times, stew/results, chronicles, - chronos + chronos, + os import ../driver, ../retention_policy @@ -44,44 +45,54 @@ method execute*(p: SizeRetentionPolicy, # to get the size of the database, pageCount and PageSize is required # get page count in "messages" database - let pageCountRes = await driver.getPagesCount() - if pageCountRes.isErr(): - return err("failed to get Pages count: " & pageCountRes.error) - - let pageCount: int64 = pageCountRes.value + var pageCount = (await driver.getPagesCount()).valueOr: + return err("failed to get Pages count: " & $error) # get page size of database - let pageSizeRes = await driver.getPagesSize() - let pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024) + var pageSizeRes = await driver.getPagesSize() + var pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024) if pageSize == 0: return err("failed to get Page size: " & pageSizeRes.error) # database size in megabytes (Mb) - let totalSizeOfDB: float = float(pageSize * pageCount)/1024.0 + var totalSizeOfDB: float = float(pageSize * pageCount)/1024.0 # check if current databse size crosses the db size limit if totalSizeOfDB < p.sizeLimit: return ok() - # to shread/delete messsges, get the total row/message count - let numMessagesRes = await driver.getMessagesCount() - if numMessagesRes.isErr(): - return err("failed to get messages count: " & numMessagesRes.error) - let numMessages = numMessagesRes.value - - # 80% of the total messages are to be kept, delete others - let pageDeleteWindow = int(float(numMessages) * DeleteLimit) - - let res = await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow) - if res.isErr(): - return err("deleting oldest messages failed: " & res.error) - - # vacuum to get the deleted pages defragments to save storage space - # this will resize the database size - let resVaccum = await driver.performVacuum() - if resVaccum.isErr(): - return err("vacuumming failed: " & resVaccum.error) + # keep deleting until the current db size falls within size limit + while totalSizeOfDB > p.sizeLimit: + # to shread/delete messsges, get the total row/message count + let numMessagesRes = await driver.getMessagesCount() + if numMessagesRes.isErr(): + return err("failed to get messages count: " & numMessagesRes.error) + let numMessages = numMessagesRes.value + + # 80% of the total messages are to be kept, delete others + let pageDeleteWindow = int(float(numMessages) * DeleteLimit) + + let res = await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow) + if res.isErr(): + return err("deleting oldest messages failed: " & res.error) + + # vacuum to get the deleted pages defragments to save storage space + # this will resize the database size + let resVaccum = await driver.performVacuum() + if resVaccum.isErr(): + return err("vacuumming failed: " & resVaccum.error) + + # get the db size again for the loop condition check + pageCount = (await driver.getPagesCount()).valueOr: + return err("failed to get Pages count: " & $error) + + pageSizeRes = await driver.getPagesSize() + pageSize = int64(pageSizeRes.valueOr(0) div 1024) + + if pageSize == 0: + return err("failed to get Page size: " & pageSizeRes.error) + + totalSizeOfDB = float(pageSize * pageCount)/1024.0 return ok() - From c026cc916b1fac54cac671e9ac3215525693a4aa Mon Sep 17 00:00:00 2001 From: ABresting Date: Mon, 9 Oct 2023 14:07:37 +0200 Subject: [PATCH 4/6] chore: typo fixed --- waku/waku_archive/driver/queue_driver/queue_driver.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/waku_archive/driver/queue_driver/queue_driver.nim b/waku/waku_archive/driver/queue_driver/queue_driver.nim index 4b0b545036..cce7d895bf 100644 --- a/waku/waku_archive/driver/queue_driver/queue_driver.nim +++ b/waku/waku_archive/driver/queue_driver/queue_driver.nim @@ -288,7 +288,7 @@ method getPagesSize*(driver: QueueDriver): Future[ArchiveDriverResult[int64]] {.async} = return ok(int64(driver.len())) -method performsVacuum*(driver: QueueDriver): +method performVacuum*(driver: QueueDriver): Future[ArchiveDriverResult[void]] {.async.} = return err("interface method not implemented") From 5022d3e5df750643b6affe2f69d0b996a8be9055 Mon Sep 17 00:00:00 2001 From: ABresting Date: Mon, 9 Oct 2023 18:08:34 +0200 Subject: [PATCH 5/6] chore: review integrated --- .../retention_policy_size.nim | 39 +++++++------------ 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/waku/waku_archive/retention_policy/retention_policy_size.nim b/waku/waku_archive/retention_policy/retention_policy_size.nim index 8e6bb2538e..30c5858c1f 100644 --- a/waku/waku_archive/retention_policy/retention_policy_size.nim +++ b/waku/waku_archive/retention_policy/retention_policy_size.nim @@ -42,28 +42,27 @@ method execute*(p: SizeRetentionPolicy, driver: ArchiveDriver): Future[RetentionPolicyResult[void]] {.async.} = ## when db size overshoots the database limit, shread 20% of outdated messages - - # to get the size of the database, pageCount and PageSize is required - # get page count in "messages" database - var pageCount = (await driver.getPagesCount()).valueOr: - return err("failed to get Pages count: " & $error) # get page size of database - var pageSizeRes = await driver.getPagesSize() - var pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024) + let pageSizeRes = await driver.getPagesSize() + let pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024) if pageSize == 0: return err("failed to get Page size: " & pageSizeRes.error) - # database size in megabytes (Mb) - var totalSizeOfDB: float = float(pageSize * pageCount)/1024.0 + # keep deleting until the current db size falls within size limit + while true: + # to get the size of the database, pageCount and PageSize is required + # get page count in "messages" database + let pageCount = (await driver.getPagesCount()).valueOr: + return err("failed to get Pages count: " & $error) - # check if current databse size crosses the db size limit - if totalSizeOfDB < p.sizeLimit: - return ok() + # database size in megabytes (Mb) + let totalSizeOfDB: float = float(pageSize * pageCount)/1024.0 + + if totalSizeOfDB < p.sizeLimit: + break - # keep deleting until the current db size falls within size limit - while totalSizeOfDB > p.sizeLimit: # to shread/delete messsges, get the total row/message count let numMessagesRes = await driver.getMessagesCount() if numMessagesRes.isErr(): @@ -82,17 +81,5 @@ method execute*(p: SizeRetentionPolicy, let resVaccum = await driver.performVacuum() if resVaccum.isErr(): return err("vacuumming failed: " & resVaccum.error) - - # get the db size again for the loop condition check - pageCount = (await driver.getPagesCount()).valueOr: - return err("failed to get Pages count: " & $error) - - pageSizeRes = await driver.getPagesSize() - pageSize = int64(pageSizeRes.valueOr(0) div 1024) - - if pageSize == 0: - return err("failed to get Page size: " & pageSizeRes.error) - - totalSizeOfDB = float(pageSize * pageCount)/1024.0 return ok() From 3a719236ec7ae6ceef594edc9c07747041e7da71 Mon Sep 17 00:00:00 2001 From: ABresting Date: Tue, 10 Oct 2023 11:02:06 +0200 Subject: [PATCH 6/6] chore: external config updated to support newly added retention policy --- apps/wakunode2/external_config.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/wakunode2/external_config.nim b/apps/wakunode2/external_config.nim index 3c604dafe1..ac0c5cd309 100644 --- a/apps/wakunode2/external_config.nim +++ b/apps/wakunode2/external_config.nim @@ -223,7 +223,7 @@ type name: "storenode" }: string storeMessageRetentionPolicy* {. - desc: "Message store retention policy. Time retention policy: 'time:'. Capacity retention policy: 'capacity:'. Set to 'none' to disable.", + desc: "Message store retention policy. Time retention policy: 'time:'. Capacity retention policy: 'capacity:'. Size retention policy: 'size:'. Set to 'none' to disable.", defaultValue: "time:" & $2.days.seconds, name: "store-message-retention-policy" }: string