Skip to content

Commit

Permalink
refactor: new proc to foster different size retention policy implemen…
Browse files Browse the repository at this point in the history
…tations (#2463)

* new proc to foster different size retention policy implementations
  The new proc, decreaseDatabaseSize, will have different implementations
  per each driver. For example, in future commits we will implement a size
  retention policy thanks to partitions management, in Postgres.
* RetentionPolicy: use of new instead of init for ref object types
* waku_archive: fix signatures in decreaseDatabaseSize methods
* retention_policy_size: minor cleanup of comments and imports
  • Loading branch information
Ivansete-status authored Feb 22, 2024
1 parent f6332ac commit d530528
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 46 deletions.
6 changes: 3 additions & 3 deletions tests/waku_archive/test_retention_policy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ suite "Waku Archive - Retention policy":

let driver = newSqliteArchiveDriver()

let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
let retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.new(capacity=capacity)
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()

## When
Expand Down Expand Up @@ -61,7 +61,7 @@ suite "Waku Archive - Retention policy":

let driver = newSqliteArchiveDriver()

let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.init(size=sizeLimit)
let retentionPolicy: RetentionPolicy = SizeRetentionPolicy.new(size=sizeLimit)
var putFutures = newSeq[Future[ArchiveDriverResult[void]]]()

# make sure that the db is empty to before test begins
Expand Down Expand Up @@ -115,7 +115,7 @@ suite "Waku Archive - Retention policy":

let
driver = newSqliteArchiveDriver()
retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.init(capacity=capacity)
retentionPolicy: RetentionPolicy = CapacityRetentionPolicy.new(capacity=capacity)

let messages = @[
fakeWakuMessage(contentTopic=DefaultContentTopic, ts=ts(0)),
Expand Down
4 changes: 4 additions & 0 deletions waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ method deleteOldestMessagesNotWithinLimit*(driver: ArchiveDriver,
limit: int):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

method decreaseDatabaseSize*(driver: ArchiveDriver,
targetSizeInBytes: int64):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

method close*(driver: ArchiveDriver):
Future[ArchiveDriverResult[void]] {.base, async.} = discard

34 changes: 34 additions & 0 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,40 @@ method deleteOldestMessagesNotWithinLimit*(

return ok()

method decreaseDatabaseSize*(driver: PostgresDriver,
targetSizeInBytes: int64):
Future[ArchiveDriverResult[void]] {.async.} =
## TODO: refactor this implementation and use partition management instead
## To remove 20% of the outdated data from database
const DeleteLimit = 0.80

## when db size overshoots the database limit, shread 20% of outdated messages
## get size of database
let dbSize = (await driver.getDatabaseSize()).valueOr:
return err("failed to get database size: " & $error)

## database size in bytes
let totalSizeOfDB: int64 = int64(dbSize)

if totalSizeOfDB < targetSizeInBytes:
return ok()

## to shread/delete messsges, get the total row/message count
let numMessages = (await driver.getMessagesCount()).valueOr:
return err("failed to get messages count: " & error)

## NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
## if vacumming is done automatically then we aim to check DB size periodially for efficient
## retention policy implementation.

## 80% of the total messages are to be kept, delete others
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)

(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
return err("deleting oldest messages failed: " & error)

return ok()

method close*(s: PostgresDriver):
Future[ArchiveDriverResult[void]] {.async.} =
## Close the database connection
Expand Down
5 changes: 5 additions & 0 deletions waku/waku_archive/driver/queue_driver/queue_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ method deleteOldestMessagesNotWithinLimit*(driver: QueueDriver,
# TODO: Implement this message_store method
return err("interface method not implemented")

method decreaseDatabaseSize*(driver: QueueDriver,
targetSizeInBytes: int64):
Future[ArchiveDriverResult[void]] {.async.} =
return err("interface method not implemented")

method close*(driver: QueueDriver):
Future[ArchiveDriverResult[void]] {.async.} =
return ok()
34 changes: 34 additions & 0 deletions waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,40 @@ method deleteOldestMessagesNotWithinLimit*(s: SqliteDriver,
Future[ArchiveDriverResult[void]] {.async.} =
return s.db.deleteOldestMessagesNotWithinLimit(limit)

method decreaseDatabaseSize*(driver: SqliteDriver,
targetSizeInBytes: int64):
Future[ArchiveDriverResult[void]] {.async.} =

## To remove 20% of the outdated data from database
const DeleteLimit = 0.80

## when db size overshoots the database limit, shread 20% of outdated messages
## get size of database
let dbSize = (await driver.getDatabaseSize()).valueOr:
return err("failed to get database size: " & $error)

## database size in bytes
let totalSizeOfDB: int64 = int64(dbSize)

if totalSizeOfDB < targetSizeInBytes:
return ok()

## to shread/delete messsges, get the total row/message count
let numMessages = (await driver.getMessagesCount()).valueOr:
return err("failed to get messages count: " & error)

## NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
## if vacumming is done automatically then we aim to check DB size periodially for efficient
## retention policy implementation.

## 80% of the total messages are to be kept, delete others
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)

(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
return err("deleting oldest messages failed: " & error)

return ok()

method close*(s: SqliteDriver):
Future[ArchiveDriverResult[void]] {.async.} =
## Close the database connection
Expand Down
6 changes: 3 additions & 3 deletions waku/waku_archive/retention_policy/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ proc new*(T: type RetentionPolicy,
except ValueError:
return err("invalid time retention policy argument")

let retPolicy: RetentionPolicy = TimeRetentionPolicy.init(retentionTimeSeconds)
let retPolicy: RetentionPolicy = TimeRetentionPolicy.new(retentionTimeSeconds)
return ok(some(retPolicy))

elif policy == "capacity":
Expand All @@ -49,7 +49,7 @@ proc new*(T: type RetentionPolicy,
except ValueError:
return err("invalid capacity retention policy argument")

let retPolicy: RetentionPolicy = CapacityRetentionPolicy.init(retentionCapacity)
let retPolicy: RetentionPolicy = CapacityRetentionPolicy.new(retentionCapacity)
return ok(some(retPolicy))

elif policy == "size":
Expand Down Expand Up @@ -85,7 +85,7 @@ proc new*(T: type RetentionPolicy,
if sizeQuantity <= 0:
return err("invalid size retention policy argument: a non-zero value is required")

let retPolicy: RetentionPolicy = SizeRetentionPolicy.init(sizeQuantity)
let retPolicy: RetentionPolicy = SizeRetentionPolicy.new(sizeQuantity)
return ok(some(retPolicy))

else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ proc calculateOverflowWindow(capacity: int, overflow: float): int =
proc calculateDeleteWindow(capacity: int, overflow: float): int =
calculateOverflowWindow(capacity, overflow) div 2

proc init*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T =
proc new*(T: type CapacityRetentionPolicy, capacity=DefaultCapacity): T =
let
totalCapacity = calculateTotalCapacity(capacity, MaxOverflow)
deleteWindow = calculateDeleteWindow(capacity, MaxOverflow)
Expand Down
43 changes: 5 additions & 38 deletions waku/waku_archive/retention_policy/retention_policy_size.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@ else:
{.push raises: [].}

import
std/times,
stew/results,
chronicles,
chronos,
os
chronos
import
../driver,
../retention_policy
Expand All @@ -19,51 +17,20 @@ logScope:
# default size is 30 GiB or 32212254720.0 in bytes
const DefaultRetentionSize*: int64 = 32212254720

# to remove 20% of the outdated data from database
const DeleteLimit = 0.80

type
# SizeRetentionPolicy implements auto delete as follows:
# - sizeLimit is the size in bytes the database can grow upto
# to reduce the size of the databases, remove the rows/number-of-messages
# DeleteLimit is the total number of messages to delete beyond this limit
# when the database size crosses the sizeLimit, then only a fraction of messages are kept,
# rest of the outdated message are deleted using deleteOldestMessagesNotWithinLimit(),
# upon deletion process the fragmented space is retrieve back using Vacuum process.
SizeRetentionPolicy* = ref object of RetentionPolicy
sizeLimit: int64
sizeLimit: int64

proc init*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
proc new*(T: type SizeRetentionPolicy, size=DefaultRetentionSize): T =
SizeRetentionPolicy(
sizeLimit: size
)

method execute*(p: SizeRetentionPolicy,
driver: ArchiveDriver):
Future[RetentionPolicyResult[void]] {.async.} =
## when db size overshoots the database limit, shread 20% of outdated messages
# get size of database
let dbSize = (await driver.getDatabaseSize()).valueOr:
return err("failed to get database size: " & $error)

# database size in bytes
let totalSizeOfDB: int64 = int64(dbSize)

if totalSizeOfDB < p.sizeLimit:
return ok()

# to shread/delete messsges, get the total row/message count
let numMessages = (await driver.getMessagesCount()).valueOr:
return err("failed to get messages count: " & error)

# NOTE: Using SQLite vacuuming is done manually, we delete a percentage of rows
# if vacumming is done automatically then we aim to check DB size periodially for efficient
# retention policy implementation.

# 80% of the total messages are to be kept, delete others
let pageDeleteWindow = int(float(numMessages) * DeleteLimit)

(await driver.deleteOldestMessagesNotWithinLimit(limit=pageDeleteWindow)).isOkOr:
return err("deleting oldest messages failed: " & error)
(await driver.decreaseDatabaseSize(p.sizeLimit)).isOkOr:
return err("decreaseDatabaseSize failed: " & $error)

return ok()
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type TimeRetentionPolicy* = ref object of RetentionPolicy
retentionTime: chronos.Duration


proc init*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T =
proc new*(T: type TimeRetentionPolicy, retentionTime=DefaultRetentionTime): T =
TimeRetentionPolicy(
retentionTime: retentionTime.seconds
)
Expand Down

0 comments on commit d530528

Please sign in to comment.