Skip to content

Commit

Permalink
postgres refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed May 27, 2024
1 parent 576fd71 commit 1ec2ab6
Show file tree
Hide file tree
Showing 6 changed files with 1,650 additions and 406 deletions.
68 changes: 18 additions & 50 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@ import
../testlib/testasync,
../testlib/postgres

proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor =
ArchiveCursor(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message),
)

suite "Postgres driver":
## Unique driver instance
var driver {.threadvar.}: PostgresDriver
Expand Down Expand Up @@ -58,24 +49,19 @@ suite "Postgres driver":

let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)

let computedDigest = computeDigest(msg)
let computedHash = computeMessageHash(DefaultPubsubTopic, msg)

let putRes = await driver.put(
DefaultPubsubTopic, msg, computedDigest, computedHash, msg.timestamp
computeMessageHash(DefaultPubsubTopic, msg), DefaultPubsubTopic, msg
)
assert putRes.isOk(), putRes.error

let storedMsg = (await driver.getAllMessages()).tryGet()

assert storedMsg.len == 1

let (pubsubTopic, actualMsg, digest, _, hash) = storedMsg[0]
let (_, pubsubTopic, actualMsg) = storedMsg[0]
assert actualMsg.contentTopic == contentTopic
assert pubsubTopic == DefaultPubsubTopic
assert toHex(computedDigest.data) == toHex(digest)
assert toHex(actualMsg.payload) == toHex(msg.payload)
assert toHex(computedHash) == toHex(hash)
assert toHex(actualMsg.meta) == toHex(msg.meta)

asyncTest "Insert and query message":
Expand All @@ -86,74 +72,64 @@ suite "Postgres driver":

let msg1 = fakeWakuMessage(contentTopic = contentTopic1)

var putRes = await driver.put(
pubsubTopic1,
msg1,
computeDigest(msg1),
computeMessageHash(pubsubTopic1, msg1),
msg1.timestamp,
)
var putRes =
await driver.put(computeMessageHash(pubsubTopic1, msg1), pubsubTopic1, msg1)
assert putRes.isOk(), putRes.error

let msg2 = fakeWakuMessage(contentTopic = contentTopic2)

putRes = await driver.put(
pubsubTopic2,
msg2,
computeDigest(msg2),
computeMessageHash(pubsubTopic2, msg2),
msg2.timestamp,
)
putRes =
await driver.put(computeMessageHash(pubsubTopic2, msg2), pubsubTopic2, msg2)
assert putRes.isOk(), putRes.error

let countMessagesRes = await driver.getMessagesCount()

assert countMessagesRes.isOk(), $countMessagesRes.error
assert countMessagesRes.get() == 2

var messagesRes = await driver.getMessages(contentTopic = @[contentTopic1])
var messagesRes = await driver.getMessages(contentTopics = @[contentTopic1])

assert messagesRes.isOk(), $messagesRes.error
assert messagesRes.get().len == 1

# Get both content topics, check ordering
messagesRes =
await driver.getMessages(contentTopic = @[contentTopic1, contentTopic2])
await driver.getMessages(contentTopics = @[contentTopic1, contentTopic2])
assert messagesRes.isOk(), messagesRes.error

assert messagesRes.get().len == 2
assert messagesRes.get()[0][1].contentTopic == contentTopic1
assert messagesRes.get()[0][2].contentTopic == contentTopic1

# Descending order
messagesRes = await driver.getMessages(
contentTopic = @[contentTopic1, contentTopic2], ascendingOrder = false
contentTopics = @[contentTopic1, contentTopic2], ascendingOrder = false
)
assert messagesRes.isOk(), messagesRes.error

assert messagesRes.get().len == 2
assert messagesRes.get()[0][1].contentTopic == contentTopic2
assert messagesRes.get()[0][2].contentTopic == contentTopic2

# cursor
# Get both content topics
messagesRes = await driver.getMessages(
contentTopic = @[contentTopic1, contentTopic2],
cursor = some(computeTestCursor(pubsubTopic1, messagesRes.get()[1][1])),
contentTopics = @[contentTopic1, contentTopic2],
cursor = some(computeMessageHash(pubsubTopic1, messagesRes.get()[1][2])),
)
assert messagesRes.isOk()
assert messagesRes.get().len == 1

# Get both content topics but one pubsub topic
messagesRes = await driver.getMessages(
contentTopic = @[contentTopic1, contentTopic2], pubsubTopic = some(pubsubTopic1)
contentTopics = @[contentTopic1, contentTopic2], pubsubTopic = some(pubsubTopic1)
)
assert messagesRes.isOk(), messagesRes.error

assert messagesRes.get().len == 1
assert messagesRes.get()[0][1].contentTopic == contentTopic1
assert messagesRes.get()[0][2].contentTopic == contentTopic1

# Limit
messagesRes = await driver.getMessages(
contentTopic = @[contentTopic1, contentTopic2], maxPageSize = 1
contentTopics = @[contentTopic1, contentTopic2], maxPageSize = 1
)
assert messagesRes.isOk(), messagesRes.error
assert messagesRes.get().len == 1
Expand All @@ -170,11 +146,7 @@ suite "Postgres driver":
raiseAssert "could not get num mgs correctly: " & $error

var putRes = await driver.put(
DefaultPubsubTopic,
msg1,
computeDigest(msg1),
computeMessageHash(DefaultPubsubTopic, msg1),
msg1.timestamp,
computeMessageHash(DefaultPubsubTopic, msg1), DefaultPubsubTopic, msg1
)
assert putRes.isOk(), putRes.error

Expand All @@ -185,11 +157,7 @@ suite "Postgres driver":
"wrong number of messages: " & $newNumMsgs

putRes = await driver.put(
DefaultPubsubTopic,
msg2,
computeDigest(msg2),
computeMessageHash(DefaultPubsubTopic, msg2),
msg2.timestamp,
computeMessageHash(DefaultPubsubTopic, msg2), DefaultPubsubTopic, msg2
)

assert putRes.isOk()
Expand Down
92 changes: 62 additions & 30 deletions waku/waku_archive/driver/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -96,36 +96,68 @@ proc new*(
return ok(res.get())
of "postgres":
when defined(postgres):
let res = PostgresDriver.new(
dbUrl = url,
maxConnections = maxNumConn,
onFatalErrorAction = onFatalErrorAction,
)
if res.isErr():
return err("failed to init postgres archive driver: " & res.error)

let driver = res.get()

# Database migration
if migrate:
let migrateRes = await archive_postgres_driver_migrations.migrate(driver)
if migrateRes.isErr():
return err("ArchiveDriver build failed in migration: " & $migrateRes.error)

## This should be started once we make sure the 'messages' table exists
## Hence, this should be run after the migration is completed.
asyncSpawn driver.startPartitionFactory(onFatalErrorAction)

info "waiting for a partition to be created"
for i in 0 ..< 100:
if driver.containsAnyPartition():
break
await sleepAsync(chronos.milliseconds(100))

if not driver.containsAnyPartition():
onFatalErrorAction("a partition could not be created")

return ok(driver)
if legacy:
let res = LegacyPostgresDriver.new(
dbUrl = url,
maxConnections = maxNumConn,
onFatalErrorAction = onFatalErrorAction,
)
if res.isErr():
return err("failed to init postgres archive driver: " & res.error)

let driver = res.get()

# Database migration
if migrate:
let migrateRes = await archive_postgres_driver_migrations.migrate(driver)
if migrateRes.isErr():
return err("ArchiveDriver build failed in migration: " & $migrateRes.error)

## This should be started once we make sure the 'messages' table exists
## Hence, this should be run after the migration is completed.
asyncSpawn driver.startPartitionFactory(onFatalErrorAction)

info "waiting for a partition to be created"
for i in 0 ..< 100:
if driver.containsAnyPartition():
break
await sleepAsync(chronos.milliseconds(100))

if not driver.containsAnyPartition():
onFatalErrorAction("a partition could not be created")

return ok(driver)
else:
let res = PostgresDriver.new(
dbUrl = url,
maxConnections = maxNumConn,
onFatalErrorAction = onFatalErrorAction,
)
if res.isErr():
return err("failed to init postgres archive driver: " & res.error)

let driver = res.get()

# Database migration
if migrate:
let migrateRes = await archive_postgres_driver_migrations.migrate(driver)
if migrateRes.isErr():
return err("ArchiveDriver build failed in migration: " & $migrateRes.error)

## This should be started once we make sure the 'messages' table exists
## Hence, this should be run after the migration is completed.
asyncSpawn driver.startPartitionFactory(onFatalErrorAction)

info "waiting for a partition to be created"
for i in 0 ..< 100:
if driver.containsAnyPartition():
break
await sleepAsync(chronos.milliseconds(100))

if not driver.containsAnyPartition():
onFatalErrorAction("a partition could not be created")

return ok(driver)
else:
return err(
"Postgres has been configured but not been compiled. Check compiler definitions."
Expand Down
3 changes: 2 additions & 1 deletion waku/waku_archive/driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ else:

import
./postgres_driver/postgres_driver,
./postgres_driver/postgres_driver_legacy,
./postgres_driver/partitions_manager,
./postgres_driver/postgres_healthcheck

export postgres_driver, partitions_manager, postgres_healthcheck
export postgres_driver, postgres_driver_legacy, partitions_manager, postgres_healthcheck
35 changes: 35 additions & 0 deletions waku/waku_archive/driver/postgres_driver/migrations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,38 @@ proc migrate*(
debug "finished message store's postgres database migration"

return ok()

proc migrate*(
driver: LegacyPostgresDriver, targetVersion = SchemaVersion
): Future[DatabaseResult[void]] {.async.} =
debug "starting message store's postgres database migration"

let currentVersion = (await driver.getCurrentVersion()).valueOr:
return err("migrate error could not retrieve current version: " & $error)

if currentVersion == targetVersion:
debug "database schema is up to date",
currentVersion = currentVersion, targetVersion = targetVersion
return ok()

info "database schema is outdated",
currentVersion = currentVersion, targetVersion = targetVersion

# Load migration scripts
let scripts = pg_migration_manager.getMigrationScripts(currentVersion, targetVersion)

# Run the migration scripts
for script in scripts:
for statement in script.breakIntoStatements():
debug "executing migration statement", statement = statement

(await driver.performWriteQuery(statement)).isOkOr:
error "failed to execute migration statement",
statement = statement, error = error
return err("failed to execute migration statement")

debug "migration statement executed succesfully", statement = statement

debug "finished message store's postgres database migration"

return ok()
Loading

0 comments on commit 1ec2ab6

Please sign in to comment.