Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(postgres): Adding a postgres async pool to make the db interactions asynchronous #1779

Merged
merged 10 commits into from
Jun 7, 2023
Merged
16 changes: 16 additions & 0 deletions tests/common/test_postgresql_asyncpool.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{.used.}

import
std/[strutils, os],
stew/results,
testutils/unittests,
chronos
import
../../waku/common/postgres/asyncpool,
../../waku/common/postgres/pg_asyncpool_opts

suite "Async pool":

asyncTest "Create connection pool":
## TODO: extend unit tests
var pgOpts = PgAsyncPoolOptions.init()
167 changes: 58 additions & 109 deletions tests/v2/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,76 +27,61 @@ suite "Postgres driver":
const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"

asyncTest "Asynchronous queries":
#TODO: make the test asynchronous
return
let driverRes = PostgresDriver.new(dbUrl = storeMessageDbUrl,
maxConnections = 100)

## When
let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

## Then
require:
driverRes.isOk()
let driver = driverRes.value
discard await driver.reset()

let driver: ArchiveDriver = driverRes.tryGet()
require:
not driver.isNil()
var futures = newSeq[Future[ArchiveDriverResult[void]]](0)

let beforeSleep = now()
for _ in 1 .. 20:
discard (PostgresDriver driver).sleep(1)
for _ in 1 .. 100:
futures.add(driver.sleep(1))

require (now() - beforeSleep) < 20
await allFutures(futures)

## Cleanup
(await driver.close()).expect("driver to close")
let diff = now() - beforeSleep
# Actually, the diff randomly goes between 1 and 2 seconds.
# although in theory it should spend 1s because we establish 100
# connections and we spawn 100 tasks that spend ~1s each.
require diff < 20

asyncTest "init driver and database":
(await driver.close()).expect("driver to close")

## When
asyncTest "Init database":
let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

## Then
require:
driverRes.isOk()

let driver: ArchiveDriver = driverRes.tryGet()
require:
not driver.isNil()
let driver = driverRes.value
discard await driver.reset()

discard driverRes.get().reset()
let initRes = driverRes.get().init()

require:
initRes.isOk()
let initRes = await driver.init()
assert initRes.isOk(), initRes.error

## Cleanup
(await driver.close()).expect("driver to close")

asyncTest "insert a message":
## Given
asyncTest "Insert a message":
const contentTopic = "test-content-topic"

let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

require:
driverRes.isOk()
let driver = driverRes.get()

discard driverRes.get().reset()
discard driverRes.get().init()
discard await driver.reset()

let driver: ArchiveDriver = driverRes.tryGet()
require:
not driver.isNil()
let initRes = await driver.init()
assert initRes.isOk(), initRes.error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should follow this pattern in other test files too so we can see why the result is an error!
non-blocker

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll set a separate PR asap with this

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Morning @rymnc ! After checking the code to apply this change I see that the change would be quite wide but we won't get very great investment return by doing so, as the tests don't tend to fail very often.

Therefore, I think is better to just apply this technique from now on in the upcoming tests.

Of course, if you consider it worth the effort, then I will make it for the current tests :)


let msg = fakeWakuMessage(contentTopic=contentTopic)

let computedDigest = computeDigest(msg)
## When
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp)

## Then
require:
putRes.isOk()
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp)
assert putRes.isOk(), putRes.error

let storedMsg = (await driver.getAllMessages()).tryGet()
require:
Expand All @@ -108,80 +93,61 @@ suite "Postgres driver":
toHex(computedDigest.data) == toHex(digest) and
toHex(actualMsg.payload) == toHex(msg.payload)

## Cleanup
(await driver.close()).expect("driver to close")

asyncTest "insert and query message":
## Given
asyncTest "Insert and query message":
const contentTopic1 = "test-content-topic-1"
const contentTopic2 = "test-content-topic-2"
const pubsubTopic1 = "pubsubtopic-1"
const pubsubTopic2 = "pubsubtopic-2"

let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

require:
driverRes.isOk()
let driver = driverRes.value

discard driverRes.get().reset()
discard driverRes.get().init()
discard await driver.reset()

let driver: ArchiveDriver = driverRes.tryGet()
require:
not driver.isNil()
let initRes = await driver.init()
assert initRes.isOk(), initRes.error

let msg1 = fakeWakuMessage(contentTopic=contentTopic1)

## When
var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), msg1.timestamp)

## Then
require:
putRes.isOk()
assert putRes.isOk(), putRes.error

let msg2 = fakeWakuMessage(contentTopic=contentTopic2)

## When
putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), msg2.timestamp)

## Then
require:
putRes.isOk()
assert putRes.isOk(), putRes.error

let countMessagesRes = await driver.getMessagesCount()

require:
countMessagesRes.isOk() and
countMessagesRes.get() == 2
require countMessagesRes.isOk() and countMessagesRes.get() == 2

var messagesRes = await driver.getMessages(contentTopic = @[contentTopic1])

require:
messagesRes.isOk()

require:
messagesRes.get().len == 1
require messagesRes.isOk()
require messagesRes.get().len == 1

# Get both content topics, check ordering
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2])
require:
messagesRes.isOk()
assert messagesRes.isOk(), messagesRes.error

require:
messagesRes.get().len == 2 and
messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic1
messagesRes.get()[0][1].contentTopic == contentTopic1

# Descending order
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2],
ascendingOrder = false)
require:
messagesRes.isOk()
assert messagesRes.isOk(), messagesRes.error

require:
messagesRes.get().len == 2 and
messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic2
messagesRes.get()[0][1].contentTopic == contentTopic2

# cursor
# Get both content topics
Expand All @@ -191,50 +157,39 @@ suite "Postgres driver":
cursor = some(
computeTestCursor(pubsubTopic1,
messagesRes.get()[0][1])))
require:
messagesRes.isOk()

require:
messagesRes.get().len == 1
require messagesRes.isOk()
require messagesRes.get().len == 1

# Get both content topics but one pubsub topic
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2],
pubsubTopic = some(pubsubTopic1))
require:
messagesRes.isOk()
assert messagesRes.isOk(), messagesRes.error

require:
messagesRes.get().len == 1 and
messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic1
messagesRes.get()[0][1].contentTopic == contentTopic1

# Limit
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1,
contentTopic2],
maxPageSize = 1)
require:
messagesRes.isOk()
assert messagesRes.isOk(), messagesRes.error
require messagesRes.get().len == 1

require:
messagesRes.get().len == 1

## Cleanup
(await driver.close()).expect("driver to close")

asyncTest "insert true duplicated messages":
asyncTest "Insert true duplicated messages":
# Validates that two completely equal messages can not be stored.
## Given
let driverRes = PostgresDriver.new(storeMessageDbUrl)
assert driverRes.isOk(), driverRes.error

require:
driverRes.isOk()
let driver = driverRes.value

discard driverRes.get().reset()
discard driverRes.get().init()
discard await driver.reset()

let driver: ArchiveDriver = driverRes.tryGet()
require:
not driver.isNil()
let initRes = await driver.init()
assert initRes.isOk(), initRes.error

let now = now()

Expand All @@ -243,14 +198,8 @@ suite "Postgres driver":

var putRes = await driver.put(DefaultPubsubTopic,
msg1, computeDigest(msg1), msg1.timestamp)
## Then
require:
putRes.isOk()
assert putRes.isOk(), putRes.error

putRes = await driver.put(DefaultPubsubTopic,
msg2, computeDigest(msg2), msg2.timestamp)
## Then
require:
not putRes.isOk()


require not putRes.isOk()
Loading