Skip to content

Commit

Permalink
chore: added size based retention policy (#2098)
Browse files Browse the repository at this point in the history
* chore: added size based retention policy

* chore: post review code - size based retention policy

* chore: review integrated, size retention updated, vacuuming in capacity retention

* chore: typo fixed

* chore: review integrated

* chore: external config updated to support newly added retention policy
  • Loading branch information
ABresting authored Oct 10, 2023
1 parent 2c5eb42 commit 25d6e52
Show file tree
Hide file tree
Showing 11 changed files with 238 additions and 10 deletions.
2 changes: 1 addition & 1 deletion apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ type
name: "storenode" }: string

storeMessageRetentionPolicy* {.
desc: "Message store retention policy. Time retention policy: 'time:<seconds>'. Capacity retention policy: 'capacity:<count>'. Set to 'none' to disable.",
desc: "Message store retention policy. Time retention policy: 'time:<seconds>'. Capacity retention policy: 'capacity:<count>'. Size retention policy: 'size:<xMB/xGB>'. Set to 'none' to disable.",
defaultValue: "time:" & $2.days.seconds,
name: "store-message-retention-policy" }: string

Expand Down
3 changes: 2 additions & 1 deletion docs/operators/how-to/configure-store.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ If the waku store node is enabled (the `--store` option is set to `true`) the no

There is a set of configuration options to customize the waku store protocol's message store. These are the most relevant:

* `--store-message-retention-policy`: This option controls the retention policy i.e., how long certain messages will be persisted. Two different retention policies are supported:
* `--store-message-retention-policy`: This option controls the retention policy i.e., how long certain messages will be persisted. Three different retention policies are supported:
+ The time retention policy,`time:<duration-in-seconds>` (e.g., `time:14400`)
+ The capacity retention policy,`capacity:<messages-count>` (e.g, `capacity:25000`)
+ The size retention policy,`size:<size-in-gb-mb>` (e.g, `size:25Gb`)
+ To disable the retention policy, explicitly, set this option to to `""`, an empty string.
* `--store-message-db-url`: The message store database url option controls the message storage engine. This option follows the [_SQLAlchemy_ database URL format](https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls).

Expand Down
74 changes: 69 additions & 5 deletions tests/waku_archive/test_retention_policy.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/sequtils,
std/[sequtils,times],
stew/results,
testutils/unittests,
chronos
Expand All @@ -12,6 +12,7 @@ import
../../../waku/waku_archive/driver/sqlite_driver,
../../../waku/waku_archive/retention_policy,
../../../waku/waku_archive/retention_policy/retention_policy_capacity,
../../../waku/waku_archive/retention_policy/retention_policy_size,
../testlib/common,
../testlib/wakucore

Expand All @@ -30,26 +31,88 @@ suite "Waku Archive - Retention policy":
## Given
let
capacity = 100
excess = 65
excess = 60

let driver = newTestArchiveDriver()

let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()

## When
for i in 1..capacity+excess:
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp))

discard waitFor allFinished(putFutures)

require (waitFor driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp)).isOk()
require (waitFor retentionPolicy.execute(driver)).isOk()
require (waitFor retentionPolicy.execute(driver)).isOk()

## Then
let numMessages = (waitFor driver.getMessagesCount()).tryGet()
check:
# Expected number of messages is 120 because
# (capacity = 100) + (half of the overflow window = 15) + (5 messages added after after the last delete)
# the window size changes when changing `const maxStoreOverflow = 1.3 in sqlite_store
numMessages == 120
numMessages == 115

## Cleanup
(waitFor driver.close()).expect("driver to close")

test "size retention policy - windowed message deletion":
## Given
let
# in megabytes
sizeLimit:float = 0.05
excess = 325

let driver = newTestArchiveDriver()

let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit)
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()

# variables to check the db size
var pageSize = (waitFor driver.getPagesSize()).tryGet()
var pageCount = (waitFor driver.getPagesCount()).tryGet()
var sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0)

# make sure that the db is empty to before test begins
let storedMsg = (waitFor driver.getAllMessages()).tryGet()
# if there are messages in db, empty them
if storedMsg.len > 0:
let now = getNanosecondTime(getTime().toUnixFloat())
require (waitFor driver.deleteMessagesOlderThanTimestamp(ts=now)).isOk()
require (waitFor driver.performVacuum()).isOk()

## When

# create a number of messages so that the size of the DB overshoots
for i in 1..excess:
let msg = fakeWakuMessage(payload= @[byte i], contentTopic=DefaultContentTopic, ts=Timestamp(i))
putFutures.add(driver.put(DefaultPubsubTopic, msg, computeDigest(msg), msg.timestamp))

# waitFor is used to synchronously wait for the futures to complete.
discard waitFor allFinished(putFutures)

## Then
# calculate the current database size
pageSize = (waitFor driver.getPagesSize()).tryGet()
pageCount = (waitFor driver.getPagesCount()).tryGet()
sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0)

# execute policy provided the current db size oveflows
require (sizeDB >= sizeLimit)
require (waitFor retentionPolicy.execute(driver)).isOk()

# update the current db size
pageSize = (waitFor driver.getPagesSize()).tryGet()
pageCount = (waitFor driver.getPagesCount()).tryGet()
sizeDB = float(pageCount * pageSize) / (1024.0 * 1024.0)

check:
# size of the database is used to check if the storage limit has been preserved
# check the current database size with the limitSize provided by the user
# it should be lower
sizeDB <= sizeLimit

## Cleanup
(waitFor driver.close()).expect("driver to close")
Expand Down Expand Up @@ -90,3 +153,4 @@ suite "Waku Archive - Retention policy":

## Cleanup
(waitFor driver.close()).expect("driver to close")

3 changes: 2 additions & 1 deletion waku/common/databases/db_sqlite.nim
Original file line number Diff line number Diff line change
Expand Up @@ -484,4 +484,5 @@ proc performSqliteVacuum*(db: SqliteDatabase): DatabaseResult[void] =
if resVacuum.isErr():
return err("failed to execute vacuum: " & resVacuum.error)

debug "finished sqlite database vacuuming"
debug "finished sqlite database vacuuming"
ok()
10 changes: 10 additions & 0 deletions waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ method getMessages*(driver: ArchiveDriver,
method getMessagesCount*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard

method getPagesCount*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard

method getPagesSize*(driver: ArchiveDriver):
Future[ArchiveDriverResult[int64]] {.base, async.} = discard

method performVacuum*(driver: ArchiveDriver):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

method getOldestMessageTimestamp*(driver: ArchiveDriver):
Future[ArchiveDriverResult[Timestamp]] {.base, async.} = discard

Expand All @@ -61,3 +70,4 @@ method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver,

method close*(driver: ArchiveDriver):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

1 change: 1 addition & 0 deletions waku/waku_archive/driver/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,4 @@ proc new*(T: type ArchiveDriver,
debug "setting up in-memory waku archive driver"
let driver = QueueDriver.new() # Defaults to a capacity of 25.000 messages
return ok(driver)

14 changes: 13 additions & 1 deletion waku/waku_archive/driver/queue_driver/queue_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,18 @@ method getMessagesCount*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))

method getPagesCount*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))

method getPagesSize*(driver: QueueDriver):
Future[ArchiveDriverResult[int64]] {.async} =
return ok(int64(driver.len()))

method performVacuum*(driver: QueueDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return err("interface method not implemented")

method getOldestMessageTimestamp*(driver: QueueDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return driver.first().map(proc(msg: IndexedWakuMessage): Timestamp = msg.index.receiverTime)
Expand All @@ -302,4 +314,4 @@ method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver,

method close*(driver: QueueDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return ok()
return ok()
13 changes: 13 additions & 0 deletions waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ method getMessagesCount*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getMessageCount()

method getPagesCount*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getPageCount()

method getPagesSize*(s: SqliteDriver):
Future[ArchiveDriverResult[int64]] {.async.} =
return s.db.getPageSize()

method performVacuum*(s: SqliteDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return s.db.performSqliteVacuum()

method getOldestMessageTimestamp*(s: SqliteDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =
return s.db.selectOldestReceiverTimestamp()
Expand All @@ -135,3 +147,4 @@ method close*(s: SqliteDriver):
# Close connection
s.db.close()
return ok()

37 changes: 36 additions & 1 deletion waku/waku_archive/retention_policy/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import
import
../retention_policy,
./retention_policy_time,
./retention_policy_capacity
./retention_policy_capacity,
./retention_policy_size

proc new*(T: type RetentionPolicy,
retPolicy: string):
Expand Down Expand Up @@ -51,5 +52,39 @@ proc new*(T: type RetentionPolicy,
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
return ok(some(retPolicy))

elif policy == "size":
var retentionSize: string
retentionSize = policyArgs

# captures the size unit such as Gb or Mb
let sizeUnit = retentionSize.substr(retentionSize.len-2)
# captures the string type number data of the size provided
let sizeQuantityStr = retentionSize.substr(0,retentionSize.len-3)
# to hold the numeric value data of size
var sizeQuantity: float

if sizeUnit in ["gb", "Gb", "GB", "gB"]:
# parse the actual value into integer type var
try:
sizeQuantity = parseFloat(sizeQuantityStr)
except ValueError:
return err("invalid size retention policy argument: " & getCurrentExceptionMsg())
# Gb data is converted into Mb for uniform processing
sizeQuantity = sizeQuantity * 1024
elif sizeUnit in ["mb", "Mb", "MB", "mB"]:
try:
sizeQuantity = parseFloat(sizeQuantityStr)
except ValueError:
return err("invalid size retention policy argument")
else:
return err ("""invalid size retention value unit: expected "Mb" or "Gb" but got """ & sizeUnit )

if sizeQuantity <= 0:
return err("invalid size retention policy argument: a non-zero value is required")

let retPolicy: RetentionPolicy = SizeRetentionPolicy.init(sizeQuantity)
return ok(some(retPolicy))

else:
return err("unknown retention policy")

Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,10 @@ method execute*(p: CapacityRetentionPolicy,
if res.isErr():
return err("deleting oldest messages failed: " & res.error)

# vacuum to get the deleted pages defragments to save storage space
# this will resize the database size
let resVaccum = await driver.performVacuum()
if resVaccum.isErr():
return err("vacuumming failed: " & resVaccum.error)

return ok()
85 changes: 85 additions & 0 deletions waku/waku_archive/retention_policy/retention_policy_size.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
std/times,
stew/results,
chronicles,
chronos,
os
import
../driver,
../retention_policy

logScope:
topics = "waku archive retention_policy"

# default size is 30 Gb
const DefaultRetentionSize*: float = 30_720

# to remove 20% of the outdated data from database
const DeleteLimit = 0.80

type
# SizeRetentionPolicy implements auto delete as follows:
# - sizeLimit is the size in megabytes (Mbs) the database can grow upto
# to reduce the size of the databases, remove the rows/number-of-messages
# DeleteLimit is the total number of messages to delete beyond this limit
# when the database size crosses the sizeLimit, then only a fraction of messages are kept,
# rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(),
# upon deletion process the fragmented space is retrieve back using Vacuum process.
SizeRetentionPolicy* = ref object of RetentionPolicy
sizeLimit: float

proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
SizeRetentionPolicy(
sizeLimit: size
)

method execute*(p: SizeRetentionPolicy,
driver: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.async.} =
## when db size overshoots the database limit, shread 20% of outdated messages

# get page size of database
let pageSizeRes = await driver.getPagesSize()
let pageSize: int64 = int64(pageSizeRes.valueOr(0) div 1024)

if pageSize == 0:
return err("failed to get Page size: " & pageSizeRes.error)

# keep deleting until the current db size falls within size limit
while true:
# to get the size of the database, pageCount and PageSize is required
# get page count in "messages" database
let pageCount = (await driver.getPagesCount()).valueOr:
return err("failed to get Pages count: " & $error)

# database size in megabytes (Mb)
let totalSizeOfDB: float = float(pageSize * pageCount)/1024.0

if totalSizeOfDB < p.sizeLimit:
break

# to shread/delete messsges, get the total row/message count
let numMessagesRes = await driver.getMessagesCount()
if numMessagesRes.isErr():
return err("failed to get messages count: " & numMessagesRes.error)
let numMessages = numMessagesRes.value

# 80% of the total messages are to be kept, delete others
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)

let res = await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)
if res.isErr():
return err("deleting oldest messages failed: " & res.error)

# vacuum to get the deleted pages defragments to save storage space
# this will resize the database size
let resVaccum = await driver.performVacuum()
if resVaccum.isErr():
return err("vacuumming failed: " & resVaccum.error)

return ok()

0 comments on commit 25d6e52

Please sign in to comment.