Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Postgres migrations #2477

Merged
merged 20 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9d933f1
step fwd in migration implementation and refactor sqlite migration as…
Ivansete-status Feb 23, 2024
26d7cf6
Add postgres_driver/migrations.nim
Ivansete-status Feb 23, 2024
afe5223
Postgres and archive logic adaptation to the migration implementation
Ivansete-status Feb 23, 2024
0ff9711
Add postgres migration scripts and migration utils
Ivansete-status Feb 24, 2024
fa492fe
libwaku: adapt node_lifecycle_request.nim to migration refactoring
Ivansete-status Feb 24, 2024
cc6e377
test_app.nim: add more detail for test that only fails in CI
Ivansete-status Feb 26, 2024
a2b07a8
postgres migrations: store the migration scripts inside the resulting…
Ivansete-status Feb 26, 2024
850cabc
db_sqlite: revert getUserVersion rename
Ivansete-status Feb 26, 2024
545cae4
db_sqlite.nim: revert too many changes
Ivansete-status Feb 26, 2024
b9c3a6b
Rm uneeded sql files for postgres migration logic
Ivansete-status Feb 27, 2024
c45bc69
partially revert migration-logic changes in sql
Ivansete-status Feb 27, 2024
f5848dc
postgres_driver/migrations.nim: remove unused variables
Ivansete-status Feb 27, 2024
72b62fd
databases/db_sqlite: reverting completely the sqlite.nim changes
Ivansete-status Feb 27, 2024
fd7828d
Remove migration_utils module
Ivansete-status Feb 27, 2024
e7df8bb
postgres_driver/migrations.nim: add breakInfoStatements proc
Ivansete-status Feb 27, 2024
d1ac066
postgres_driver/migrations.nim: remove unused import
Ivansete-status Feb 27, 2024
1723133
test_driver_postgres_query.nim: more generic suite title
Ivansete-status Feb 29, 2024
6de70a5
testlib/postgres.nim: add error log in case of error initializing Arc…
Ivansete-status Feb 29, 2024
8bb6813
testlib/postgres.nim: better error logs
Ivansete-status Feb 29, 2024
4b8d76e
test postgres: avoid use of cast[PostgresDriver](..)
Ivansete-status Feb 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ proc setupProtocols(node: WakuNode,

if conf.store:
# Archive setup
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
let archiveDriverRes = waitFor ArchiveDriver.new(conf.storeMessageDbUrl,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you can waitFor in a async proc. That would prevent the runtime from progressing no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can, it just means you are intended to wait till future become avail...
although I'm not pretty sure if it would not be better to wait with timeout?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments!
@SionoiS - yes this is feasible. In this case we are blocking because this is a needed condition to be satisfied.
@NagyZoltanPeter - I will submit a separate PR where we apply that great proposal of using withTimeout

conf.storeMessageDbVacuum,
conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ proc configureStore(node: WakuNode,
discard

# Archive setup
let archiveDriverRes = ArchiveDriver.new(storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onFatalErrorAction)
let archiveDriverRes = await ArchiveDriver.new(storeDbUrl,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onFatalErrorAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

Expand Down
20 changes: 20 additions & 0 deletions migrations/message_store_postgres/content_script_version_1.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
const ContentScriptVersion_1* = """
CREATE TABLE IF NOT EXISTS messages (
pubsubTopic VARCHAR NOT NULL,
contentTopic VARCHAR NOT NULL,
payload VARCHAR,
version INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
id VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
storedAt BIGINT NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (messageHash)
);

CREATE TABLE iF NOT EXISTS version (
version INTEGER NOT NULL
);

INSERT INTO version (version) VALUES(1);

"""
68 changes: 68 additions & 0 deletions migrations/message_store_postgres/content_script_version_2.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
const ContentScriptVersion_2* = """
ALTER TABLE messages RENAME TO messages_backup;
ALTER TABLE messages_backup DROP CONSTRAINT messageIndex;

CREATE TABLE IF NOT EXISTS messages (
pubsubTopic VARCHAR NOT NULL,
contentTopic VARCHAR NOT NULL,
payload VARCHAR,
version INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
id VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
storedAt BIGINT NOT NULL,
CONSTRAINT messageIndex PRIMARY KEY (messageHash, storedAt)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this migration I think it is better to add this index later after data inserted its time saving as more efficient to do it once for the db engine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok good point , I will cover that in a separate PR.

) PARTITION BY RANGE (storedAt);

DO $$
DECLARE
min_storedAt numeric;
max_storedAt numeric;
min_storedAtSeconds integer = 0;
max_storedAtSeconds integer = 0;
partition_name TEXT;
create_partition_stmt TEXT;
BEGIN
SELECT MIN(storedAt) into min_storedAt
FROM messages_backup;

SELECT MAX(storedAt) into max_storedAt
FROM messages_backup;

min_storedAtSeconds := min_storedAt / 1000000000;
max_storedAtSeconds := max_storedAt / 1000000000;

partition_name := 'messages_' || min_storedAtSeconds || '_' || max_storedAtSeconds;
create_partition_stmt := 'CREATE TABLE ' || partition_name ||
' PARTITION OF messages FOR VALUES FROM (' ||
min_storedAt || ') TO (' || (max_storedAt + 1) || ')';
IF min_storedAtSeconds > 0 AND max_storedAtSeconds > 0 THEN
EXECUTE create_partition_stmt USING partition_name, min_storedAt, max_storedAt;
END IF;
END $$;

INSERT INTO messages (
pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
)
SELECT pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
FROM messages_backup;

DROP TABLE messages_backup;

UPDATE version SET version = 2 WHERE version = 1;

"""
37 changes: 37 additions & 0 deletions migrations/message_store_postgres/pg_migration_manager.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

import
content_script_version_1,
content_script_version_2

type
MigrationScript* = object
version*: int
scriptContent*: string

proc init*(T: type MigrationScript,
targetVersion: int,
scriptContent: string): T =

return MigrationScript(
targetVersion: targetVersion,
scriptContent: scriptContent)

const PgMigrationScripts* = @[
MigrationScript(
version: 1,
scriptContent: ContentScriptVersion_1),
MigrationScript(
version: 2,
scriptContent: ContentScriptVersion_2)
]

proc getMigrationScripts*(currentVersion: int64,
targetVersion: int64): seq[string] =
var ret = newSeq[string]()
var v = currentVersion
while v < targetVersion:
ret.add(PgMigrationScripts[v].scriptContent)
v.inc()
return ret


33 changes: 33 additions & 0 deletions tests/testlib/postgres.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

import
chronicles,
chronos
import
../../../waku/waku_archive,
../../../waku/waku_archive/driver as driver_module,
../../../waku/waku_archive/driver/builder,
../../../waku/waku_archive/driver/postgres_driver

const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"

proc newTestPostgresDriver*(): Future[Result[ArchiveDriver, string]] {.async.} =

proc onErr(errMsg: string) {.gcsafe, closure.} =
error "error creating ArchiveDriver", error = errMsg
quit(QuitFailure)

let
vacuum = false
migrate = true
maxNumConn = 50

let driverRes = await ArchiveDriver.new(storeMessageDbUrl,
vacuum,
migrate,
maxNumConn,
onErr)
if driverRes.isErr():
onErr("could not create archive driver: " & driverRes.error)

return ok(driverRes.get())

72 changes: 16 additions & 56 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import
../../../waku/waku_archive/driver/postgres_driver,
../../../waku/waku_core,
../../../waku/waku_core/message/digest,
../testlib/wakucore
../testlib/wakucore,
../testlib/testasync,
../testlib/postgres

proc now():int64 = getTime().toUnix()

Expand All @@ -24,18 +26,24 @@ proc computeTestCursor(pubsubTopic: PubsubTopic,
)

suite "Postgres driver":
## Unique driver instance
var driver {.threadvar.}: PostgresDriver

const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"
asyncSetup:
let driverRes = await newTestPostgresDriver()
if driverRes.isErr():
assert false, driverRes.error

asyncTest "Asynchronous queries":
let driverRes = PostgresDriver.new(dbUrl = storeMessageDbUrl,
maxConnections = 100)
driver = PostgresDriver(driverRes.get())

assert driverRes.isOk(), driverRes.error
asyncTeardown:
let resetRes = await driver.reset()
if resetRes.isErr():
assert false, resetRes.error

let driver = driverRes.value
discard await driver.reset()
(await driver.close()).expect("driver to close")

asyncTest "Asynchronous queries":
var futures = newSeq[Future[ArchiveDriverResult[void]]](0)

let beforeSleep = now()
Expand All @@ -50,33 +58,9 @@ suite "Postgres driver":
# connections and we spawn 100 tasks that spend ~1s each.
require diff < 20

(await driver.close()).expect("driver to close")

asyncTest "Init database":
let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

let driver = driverRes.value
discard await driver.reset()

let initRes = await driver.init()
assert initRes.isOk(), initRes.error

(await driver.close()).expect("driver to close")

asyncTest "Insert a message":
const contentTopic = "test-content-topic"

let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

let driver = driverRes.get()

discard await driver.reset()

let initRes = await driver.init()
assert initRes.isOk(), initRes.error

let msg = fakeWakuMessage(contentTopic=contentTopic)

let computedDigest = computeDigest(msg)
Expand All @@ -94,24 +78,12 @@ suite "Postgres driver":
toHex(computedDigest.data) == toHex(digest) and
toHex(actualMsg.payload) == toHex(msg.payload)

(await driver.close()).expect("driver to close")

asyncTest "Insert and query message":
const contentTopic1 = "test-content-topic-1"
const contentTopic2 = "test-content-topic-2"
const pubsubTopic1 = "pubsubtopic-1"
const pubsubTopic2 = "pubsubtopic-2"

let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

let driver = driverRes.value

discard await driver.reset()

let initRes = await driver.init()
assert initRes.isOk(), initRes.error

let msg1 = fakeWakuMessage(contentTopic=contentTopic1)

var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), computeMessageHash(pubsubTopic1, msg1), msg1.timestamp)
Expand Down Expand Up @@ -178,19 +150,8 @@ suite "Postgres driver":
assert messagesRes.isOk(), messagesRes.error
require messagesRes.get().len == 1

(await driver.close()).expect("driver to close")

asyncTest "Insert true duplicated messages":
# Validates that two completely equal messages can not be stored.
let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

let driver = driverRes.value

discard await driver.reset()

let initRes = await driver.init()
assert initRes.isOk(), initRes.error

let now = now()

Expand All @@ -205,4 +166,3 @@ suite "Postgres driver":
msg2, computeDigest(msg2), computeMessageHash(DefaultPubsubTopic, msg2), msg2.timestamp)
require not putRes.isOk()

(await driver.close()).expect("driver to close")
Loading
Loading