From 7df6f4c851ee4994c2e6cc3fc9163ea2d9844024 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 31 May 2023 10:28:48 +0200 Subject: [PATCH] feat(postgresql): align Andrea's PR#1590 changes into master (#1764) * Add postgres basic support * test_driver_postgres.nim: adding simple test to validate asynchronous-ness * Aligning the changes made by Andrea to the current master state * test_driver_postgres.nim: new unit test (checking duplicate insertion) --------- Co-authored-by: Andrea Maria Piana --- tests/all_tests_v2.nim | 4 + tests/docker-compose.yml | 11 + .../v2/waku_archive/test_driver_postgres.nim | 256 ++++++++++++++++ .../waku_archive/driver/postgres_driver.nim | 8 + .../postgres_driver/postgres_driver.nim | 290 ++++++++++++++++++ 5 files changed, 569 insertions(+) create mode 100644 tests/docker-compose.yml create mode 100644 tests/v2/waku_archive/test_driver_postgres.nim create mode 100644 waku/v2/waku_archive/driver/postgres_driver.nim create mode 100644 waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index f60095a976..d2303c8f5d 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -19,6 +19,10 @@ import ./v2/waku_archive/test_retention_policy, ./v2/waku_archive/test_waku_archive +# TODO: add the postgres test when we can mock the database. +# The postgres tests now need a running postgresql database running locally. +# ./v2/waku_archive/test_driver_postgres, + # Waku store test suite import ./v2/waku_store/test_rpc_codec, diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml new file mode 100644 index 0000000000..14fcdb1686 --- /dev/null +++ b/tests/docker-compose.yml @@ -0,0 +1,11 @@ +version: "3.8" + + +services: + db: + image: postgres:9.6-alpine + restart: always + environment: + POSTGRES_PASSWORD: test123 + ports: + - "5432:5432" diff --git a/tests/v2/waku_archive/test_driver_postgres.nim b/tests/v2/waku_archive/test_driver_postgres.nim new file mode 100644 index 0000000000..c75549e448 --- /dev/null +++ b/tests/v2/waku_archive/test_driver_postgres.nim @@ -0,0 +1,256 @@ +{.used.} + +import + std/[sequtils,times,options], + testutils/unittests, + chronos +import + ../../../waku/v2/waku_archive, + ../../../waku/v2/waku_archive/driver/postgres_driver, + ../../../waku/v2/waku_core, + ../testlib/wakucore + +proc now():int64 = getTime().toUnix() + +proc computeTestCursor(pubsubTopic: PubsubTopic, + message: WakuMessage): + ArchiveCursor = + ArchiveCursor( + pubsubTopic: pubsubTopic, + senderTime: message.timestamp, + storeTime: message.timestamp, + digest: computeDigest(message) + ) + +suite "Postgres driver": + + const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres" + + asyncTest "Asynchronous queries": + #TODO: make the test asynchronous + return + + ## When + let driverRes = PostgresDriver.new(storeMessageDbUrl) + + ## Then + require: + driverRes.isOk() + + let driver: ArchiveDriver = driverRes.tryGet() + require: + not driver.isNil() + + let beforeSleep = now() + for _ in 1 .. 20: + discard (PostgresDriver driver).sleep(1) + + require (now() - beforeSleep) < 20 + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "init driver and database": + + ## When + let driverRes = PostgresDriver.new(storeMessageDbUrl) + + ## Then + require: + driverRes.isOk() + + let driver: ArchiveDriver = driverRes.tryGet() + require: + not driver.isNil() + + discard driverRes.get().reset() + let initRes = driverRes.get().init() + + require: + initRes.isOk() + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "insert a message": + ## Given + const contentTopic = "test-content-topic" + + let driverRes = PostgresDriver.new(storeMessageDbUrl) + + require: + driverRes.isOk() + + discard driverRes.get().reset() + discard driverRes.get().init() + + let driver: ArchiveDriver = driverRes.tryGet() + require: + not driver.isNil() + + let msg = fakeWakuMessage(contentTopic=contentTopic) + + let computedDigest = computeDigest(msg) + ## When + let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp) + + ## Then + require: + putRes.isOk() + + let storedMsg = (await driver.getAllMessages()).tryGet() + require: + storedMsg.len == 1 + storedMsg.all do (item: auto) -> bool: + let (pubsubTopic, actualMsg, digest, storeTimestamp) = item + actualMsg.contentTopic == contentTopic and + pubsubTopic == DefaultPubsubTopic and + toHex(computedDigest.data) == toHex(digest) and + toHex(actualMsg.payload) == toHex(msg.payload) + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "insert and query message": + ## Given + const contentTopic1 = "test-content-topic-1" + const contentTopic2 = "test-content-topic-2" + const pubsubTopic1 = "pubsubtopic-1" + const pubsubTopic2 = "pubsubtopic-2" + + let driverRes = PostgresDriver.new(storeMessageDbUrl) + + require: + driverRes.isOk() + + discard driverRes.get().reset() + discard driverRes.get().init() + + let driver: ArchiveDriver = driverRes.tryGet() + require: + not driver.isNil() + + let msg1 = fakeWakuMessage(contentTopic=contentTopic1) + + ## When + var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), msg1.timestamp) + + ## Then + require: + putRes.isOk() + + let msg2 = fakeWakuMessage(contentTopic=contentTopic2) + + ## When + putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), msg2.timestamp) + + ## Then + require: + putRes.isOk() + + let countMessagesRes = await driver.getMessagesCount() + + require: + countMessagesRes.isOk() and + countMessagesRes.get() == 2 + + var messagesRes = await driver.getMessages(contentTopic = @[contentTopic1]) + + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 1 + + # Get both content topics, check ordering + messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, + contentTopic2]) + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 2 and + messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic1 + + # Descending order + messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, + contentTopic2], + ascendingOrder = false) + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 2 and + messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic2 + + # cursor + # Get both content topics + messagesRes = + await driver.getMessages(contentTopic = @[contentTopic1, + contentTopic2], + cursor = some( + computeTestCursor(pubsubTopic1, + messagesRes.get()[0][1]))) + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 1 + + # Get both content topics but one pubsub topic + messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, + contentTopic2], + pubsubTopic = some(pubsubTopic1)) + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 1 and + messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic1 + + # Limit + messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, + contentTopic2], + maxPageSize = 1) + require: + messagesRes.isOk() + + require: + messagesRes.get().len == 1 + + ## Cleanup + (await driver.close()).expect("driver to close") + + asyncTest "insert true duplicated messages": + # Validates that two completely equal messages can not be stored. + ## Given + let driverRes = PostgresDriver.new(storeMessageDbUrl) + + require: + driverRes.isOk() + + discard driverRes.get().reset() + discard driverRes.get().init() + + let driver: ArchiveDriver = driverRes.tryGet() + require: + not driver.isNil() + + let now = now() + + let msg1 = fakeWakuMessage(ts = now) + let msg2 = fakeWakuMessage(ts = now) + + var putRes = await driver.put(DefaultPubsubTopic, + msg1, computeDigest(msg1), msg1.timestamp) + ## Then + require: + putRes.isOk() + + putRes = await driver.put(DefaultPubsubTopic, + msg2, computeDigest(msg2), msg2.timestamp) + ## Then + require: + not putRes.isOk() + + diff --git a/waku/v2/waku_archive/driver/postgres_driver.nim b/waku/v2/waku_archive/driver/postgres_driver.nim new file mode 100644 index 0000000000..496005cbec --- /dev/null +++ b/waku/v2/waku_archive/driver/postgres_driver.nim @@ -0,0 +1,8 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import ./postgres_driver/postgres_driver + +export postgres_driver diff --git a/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim new file mode 100644 index 0000000000..71e157527c --- /dev/null +++ b/waku/v2/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -0,0 +1,290 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/db_postgres, + std/strformat, + std/nre, + std/options, + std/strutils, + stew/[results,byteutils], + chronos + +import + ../../../waku_core, + ../../common, + ../../driver + +export postgres_driver + +type PostgresDriver* = ref object of ArchiveDriver + connection: DbConn + preparedInsert: SqlPrepared + +proc dropTableQuery(): string = + "DROP TABLE messages" + +proc createTableQuery(): string = + "CREATE TABLE IF NOT EXISTS messages (" & + " pubsubTopic VARCHAR NOT NULL," & + " contentTopic VARCHAR NOT NULL," & + " payload VARCHAR," & + " version INTEGER NOT NULL," & + " timestamp BIGINT NOT NULL," & + " id VARCHAR NOT NULL," & + " storedAt BIGINT NOT NULL," & + " CONSTRAINT messageIndex PRIMARY KEY (storedAt, id, pubsubTopic)" & + ");" + +proc insertRow(): string = + """INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic, + version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);""" + +proc new*(T: type PostgresDriver, storeMessageDbUrl: string): ArchiveDriverResult[T] = + var host: string + var user: string + var password: string + var dbName: string + var port: string + var connectionString: string + var dbConn: DbConn + try: + let regex = re("""^postgres:\/\/([^:]+):([^@]+)@([^:]+):(\d+)\/(.+)$""") + let matches = find(storeMessageDbUrl,regex).get.captures + user = matches[0] + password = matches[1] + host = matches[2] + port = matches[3] + dbName = matches[4] + connectionString = "user={user} host={host} port={port} dbname={dbName} password={password}".fmt + except KeyError,InvalidUnicodeError, RegexInternalError, ValueError, StudyError, SyntaxError: + return err("could not parse postgres string") + + try: + dbConn = open("","", "", connectionString) + except DbError: + return err("could not connect to postgres") + + return ok(PostgresDriver(connection: dbConn)) + +method reset*(s: PostgresDriver): ArchiveDriverResult[void] {.base.} = + try: + let res = s.connection.tryExec(sql(dropTableQuery())) + if not res: + return err("failed to reset database") + except DbError: + return err("failed to reset database") + + return ok() + +method init*(s: PostgresDriver): ArchiveDriverResult[void] {.base.} = + try: + let res = s.connection.tryExec(sql(createTableQuery())) + if not res: + return err("failed to initialize") + s.preparedInsert = prepare(s.connection, "insertRow", sql(insertRow()), 7) + except DbError: + let + e = getCurrentException() + msg = getCurrentExceptionMsg() + exceptionMessage = "failed to init driver, got exception " & + repr(e) & " with message " & msg + return err(exceptionMessage) + + return ok() + +method put*(s: PostgresDriver, + pubsubTopic: PubsubTopic, + message: WakuMessage, + digest: MessageDigest, + receivedTime: Timestamp): + Future[ArchiveDriverResult[void]] {.async.} = + try: + let res = s.connection.tryExec(s.preparedInsert, + toHex(digest.data), + receivedTime, + message.contentTopic, + toHex(message.payload), + pubsubTopic, + int64(message.version), + message.timestamp) + if not res: + return err("failed to insert into database") + except DbError: + return err("failed to insert into database") + + return ok() + +proc extractRow(r: Row): ArchiveDriverResult[ArchiveRow] = + var wakuMessage: WakuMessage + var timestamp: Timestamp + var version: uint + var pubSubTopic: string + var contentTopic: string + var storedAt: int64 + var digest: string + var payload: string + + try: + storedAt = parseInt(r[0]) + contentTopic = r[1] + payload = parseHexStr(r[2]) + pubSubTopic = r[3] + version = parseUInt(r[4]) + timestamp = parseInt(r[5]) + digest = parseHexStr(r[6]) + except ValueError: + return err("could not parse timestamp") + + wakuMessage.timestamp = timestamp + wakuMessage.version = uint32(version) + wakuMessage.contentTopic = contentTopic + wakuMessage.payload = @(payload.toOpenArrayByte(0, payload.high)) + + return ok((pubSubTopic, + wakuMessage, + @(digest.toOpenArrayByte(0, digest.high)), + storedAt)) + +method getAllMessages*(s: PostgresDriver): + Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + ## Retrieve all messages from the store. + var rows: seq[Row] + var results: seq[ArchiveRow] + try: + rows = s.connection.getAllRows(sql("""SELECT storedAt, contentTopic, + payload, pubsubTopic, version, timestamp, + id FROM messages ORDER BY storedAt ASC""")) + except DbError: + return err("failed to query rows") + + for r in rows: + let rowRes = extractRow(r) + if rowRes.isErr(): + return err("failed to extract row") + + results.add(rowRes.get()) + + return ok(results) + +method getMessages*(s: PostgresDriver, + contentTopic: seq[ContentTopic] = @[], + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = DefaultPageSize, + ascendingOrder = true): + Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + var query = """SELECT storedAt, contentTopic, payload, + pubsubTopic, version, timestamp, id FROM messages""" + var statements: seq[string] + var args: seq[string] + + if contentTopic.len > 0: + let cstmt = "contentTopic IN (" & "?".repeat(contentTopic.len).join(",") & ")" + statements.add(cstmt) + for t in contentTopic: + args.add(t) + + if pubsubTopic.isSome(): + statements.add("pubsubTopic = ?") + args.add(pubsubTopic.get()) + + if cursor.isSome(): + let comp = if ascendingOrder: ">" else: "<" + statements.add("(storedAt, id) " & comp & " (?,?)") + args.add($cursor.get().storeTime) + args.add($cursor.get().digest.data) + + if startTime.isSome(): + statements.add("storedAt >= ?") + args.add($startTime.get()) + + if endTime.isSome(): + statements.add("storedAt <= ?") + args.add($endTime.get()) + + if statements.len > 0: + query &= " WHERE " & statements.join(" AND ") + + var direction: string + if ascendingOrder: + direction = "ASC" + else: + direction = "DESC" + + query &= " ORDER BY storedAt " & direction & ", id " & direction + + query &= " LIMIT ?" + args.add($maxPageSize) + + var rows: seq[Row] + var results: seq[ArchiveRow] + try: + rows = s.connection.getAllRows(sql(query), args) + except DbError: + return err("failed to query rows") + + for r in rows: + let rowRes = extractRow(r) + if rowRes.isErr(): + return err("failed to extract row") + + results.add(rowRes.get()) + + return ok(results) + +method getMessagesCount*(s: PostgresDriver): + Future[ArchiveDriverResult[int64]] {.async.} = + var count: int64 + try: + let row = s.connection.getRow(sql("""SELECT COUNT(1) FROM messages""")) + count = parseInt(row[0]) + + except DbError: + return err("failed to query count") + except ValueError: + return err("failed to parse query count result") + + return ok(count) + +method getOldestMessageTimestamp*(s: PostgresDriver): + Future[ArchiveDriverResult[Timestamp]] {.async.} = + return err("not implemented") + +method getNewestMessageTimestamp*(s: PostgresDriver): + Future[ArchiveDriverResult[Timestamp]] {.async.} = + return err("not implemented") + +method deleteMessagesOlderThanTimestamp*(s: PostgresDriver, + ts: Timestamp): + Future[ArchiveDriverResult[void]] {.async.} = + return err("not implemented") + +method deleteOldestMessagesNotWithinLimit*(s: PostgresDriver, + limit: int): + Future[ArchiveDriverResult[void]] {.async.} = + return err("not implemented") + +method close*(s: PostgresDriver): + Future[ArchiveDriverResult[void]] {.async.} = + ## Close the database connection + s.connection.close() + return ok() + +proc sleep*(s: PostgresDriver, seconds: int): + Future[ArchiveDriverResult[void]] {.async.} = + # This is for testing purposes only. It is aimed to test the proper + # implementation of asynchronous requests. It merely triggers a sleep in the + # database for the amount of seconds given as a parameter. + try: + let params = @[$seconds] + s.connection.exec(sql"SELECT pg_sleep(?)", params) + except DbError: + # This always raises an exception although the sleep works + return err("exception sleeping: " & getCurrentExceptionMsg()) + + return ok() \ No newline at end of file