Skip to content

Commit

Permalink
chore: Postgres migrations (#2477)
Browse files Browse the repository at this point in the history
* Add postgres_driver/migrations.nim
* Postgres and archive logic adaptation to the migration implementation
* libwaku: adapt node_lifecycle_request.nim to migration refactoring
* test_app.nim: add more detail for test that only fails in CI
* postgres migrations: store the migration scripts inside the resulting wakunode binary instead of external .sql files.
  • Loading branch information
Ivansete-status authored Mar 1, 2024
1 parent 88ff928 commit 560f949
Show file tree
Hide file tree
Showing 15 changed files with 400 additions and 253 deletions.
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ proc setupProtocols(node: WakuNode,

if conf.store:
# Archive setup
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
let archiveDriverRes = waitFor ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ proc configureStore(node: WakuNode,
discard

# Archive setup
let archiveDriverRes = ArchiveDriver.new(storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onFatalErrorAction)
let archiveDriverRes = await ArchiveDriver.new(storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onFatalErrorAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

Expand Down
20 changes: 20 additions & 0 deletions migrations/message_store_postgres/content_script_version_1.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const ContentScriptVersion_1* = """
CREATE TABLE IF NOT EXISTS messages (
pubsubTopic VARCHAR NOT NULL,
contentTopic VARCHAR NOT NULL,
payload VARCHAR,
version INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
id VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
storedAt BIGINT NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (messageHash)
);
CREATE TABLE iF NOT EXISTS version (
version INTEGER NOT NULL
);
INSERT INTO version (version) VALUES(1);
"""
68 changes: 68 additions & 0 deletions migrations/message_store_postgres/content_script_version_2.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
const ContentScriptVersion_2* = """
ALTER TABLE messages RENAME TO messages_backup;
ALTER TABLE messages_backup DROP CONSTRAINT messageIndex;
CREATE TABLE IF NOT EXISTS messages (
pubsubTopic VARCHAR NOT NULL,
contentTopic VARCHAR NOT NULL,
payload VARCHAR,
version INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
id VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
storedAt BIGINT NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (messageHash, storedAt)
) PARTITION BY RANGE (storedAt);
DO $$
DECLARE
min_storedAt numeric;
max_storedAt numeric;
min_storedAtSeconds integer = 0;
max_storedAtSeconds integer = 0;
partition_name TEXT;
create_partition_stmt TEXT;
BEGIN
SELECT MIN(storedAt) into min_storedAt
FROM messages_backup;
SELECT MAX(storedAt) into max_storedAt
FROM messages_backup;
min_storedAtSeconds := min_storedAt / 1000000000;
max_storedAtSeconds := max_storedAt / 1000000000;
partition_name := 'messages_' || min_storedAtSeconds || '_' || max_storedAtSeconds;
create_partition_stmt := 'CREATE TABLE ' || partition_name ||
' PARTITION OF messages FOR VALUES FROM (' ||
min_storedAt || ') TO (' || (max_storedAt + 1) || ')';
IF min_storedAtSeconds > 0 AND max_storedAtSeconds > 0 THEN
EXECUTE create_partition_stmt USING partition_name, min_storedAt, max_storedAt;
END IF;
END $$;
INSERT INTO messages (
pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
)
SELECT pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
FROM messages_backup;
DROP TABLE messages_backup;
UPDATE version SET version = 2 WHERE version = 1;
"""
37 changes: 37 additions & 0 deletions migrations/message_store_postgres/pg_migration_manager.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

import
content_script_version_1,
content_script_version_2

type
MigrationScript* = object
version*: int
scriptContent*: string

proc init*(T: type MigrationScript,
targetVersion: int,
scriptContent: string): T =

return MigrationScript(
targetVersion: targetVersion,
scriptContent: scriptContent)

const PgMigrationScripts* = @[
MigrationScript(
version: 1,
scriptContent: ContentScriptVersion_1),
MigrationScript(
version: 2,
scriptContent: ContentScriptVersion_2)
]

proc getMigrationScripts*(currentVersion: int64,
targetVersion: int64): seq[string] =
var ret = newSeq[string]()
var v = currentVersion
while v < targetVersion:
ret.add(PgMigrationScripts[v].scriptContent)
v.inc()
return ret


33 changes: 33 additions & 0 deletions tests/testlib/postgres.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

import
chronicles,
chronos
import
../../../waku/waku_archive,
../../../waku/waku_archive/driver as driver_module,
../../../waku/waku_archive/driver/builder,
../../../waku/waku_archive/driver/postgres_driver

const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"

proc newTestPostgresDriver*(): Future[Result[ArchiveDriver, string]] {.async.} =

proc onErr(errMsg: string) {.gcsafe, closure.} =
error "error creating ArchiveDriver", error = errMsg
quit(QuitFailure)

let
vacuum = false
migrate = true
maxNumConn = 50

let driverRes = await ArchiveDriver.new(storeMessageDbUrl,
vacuum,
migrate,
maxNumConn,
onErr)
if driverRes.isErr():
onErr("could not create archive driver: " & driverRes.error)

return ok(driverRes.get())

72 changes: 16 additions & 56 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../testlib/wakucore
../testlib/wakucore,
../testlib/testasync,
../testlib/postgres

proc now():int64 = getTime().toUnix()

Expand All @@ -24,18 +26,24 @@ proc computeTestCursor(pubsubTopic: PubsubTopic,
)

suite "Postgres driver":
## Unique driver instance
var driver {.threadvar.}: PostgresDriver

const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"
asyncSetup:
let driverRes = await newTestPostgresDriver()
if driverRes.isErr():
assert false, driverRes.error

asyncTest "Asynchronous queries":
let driverRes = PostgresDriver.new(dbUrl = storeMessageDbUrl,
maxConnections = 100)
driver = PostgresDriver(driverRes.get())

assert driverRes.isOk(), driverRes.error
asyncTeardown:
let resetRes = await driver.reset()
if resetRes.isErr():
assert false, resetRes.error

let driver = driverRes.value
discard await driver.reset()
(await driver.close()).expect("driver to close")

asyncTest "Asynchronous queries":
var futures = newSeq[Future[ArchiveDriverResult[void]]](0)

let beforeSleep = now()
Expand All @@ -50,33 +58,9 @@ suite "Postgres driver":
# connections and we spawn 100 tasks that spend ~1s each.
require diff < 20

(await driver.close()).expect("driver to close")

asyncTest "Init database":
let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

let driver = driverRes.value
discard await driver.reset()

let initRes = await driver.init()
assert initRes.isOk(), initRes.error

(await driver.close()).expect("driver to close")

asyncTest "Insert a message":
const contentTopic = "test-content-topic"

let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

let driver = driverRes.get()

discard await driver.reset()

let initRes = await driver.init()
assert initRes.isOk(), initRes.error

let msg = fakeWakuMessage(contentTopic=contentTopic)

let computedDigest = computeDigest(msg)
Expand All @@ -94,24 +78,12 @@ suite "Postgres driver":
toHex(computedDigest.data) == toHex(digest) and
toHex(actualMsg.payload) == toHex(msg.payload)

(await driver.close()).expect("driver to close")

asyncTest "Insert and query message":
const contentTopic1 = "test-content-topic-1"
const contentTopic2 = "test-content-topic-2"
const pubsubTopic1 = "pubsubtopic-1"
const pubsubTopic2 = "pubsubtopic-2"

let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

let driver = driverRes.value

discard await driver.reset()

let initRes = await driver.init()
assert initRes.isOk(), initRes.error

let msg1 = fakeWakuMessage(contentTopic=contentTopic1)

var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), computeMessageHash(pubsubTopic1, msg1), msg1.timestamp)
Expand Down Expand Up @@ -178,19 +150,8 @@ suite "Postgres driver":
assert messagesRes.isOk(), messagesRes.error
require messagesRes.get().len == 1

(await driver.close()).expect("driver to close")

asyncTest "Insert true duplicated messages":
# Validates that two completely equal messages can not be stored.
let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

let driver = driverRes.value

discard await driver.reset()

let initRes = await driver.init()
assert initRes.isOk(), initRes.error

let now = now()

Expand All @@ -205,4 +166,3 @@ suite "Postgres driver":
msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp)
require not putRes.isOk()

(await driver.close()).expect("driver to close")
Loading

0 comments on commit 560f949

Please sign in to comment.