-
Notifications
You must be signed in to change notification settings - Fork 54
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(postgresql): align Andrea's PR#1590 changes into master (#1764)
* Add postgres basic support * test_driver_postgres.nim: adding simple test to validate asynchronous-ness * Aligning the changes made by Andrea to the current master state * test_driver_postgres.nim: new unit test (checking duplicate insertion) --------- Co-authored-by: Andrea Maria Piana <andrea.maria.piana@gmail.com>
- Loading branch information
1 parent
047d1cf
commit 7df6f4c
Showing
5 changed files
with
569 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
version: "3.8" | ||
|
||
|
||
services: | ||
db: | ||
image: postgres:9.6-alpine | ||
restart: always | ||
environment: | ||
POSTGRES_PASSWORD: test123 | ||
ports: | ||
- "5432:5432" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,256 @@ | ||
{.used.} | ||
|
||
import | ||
std/[sequtils,times,options], | ||
testutils/unittests, | ||
chronos | ||
import | ||
../../../waku/v2/waku_archive, | ||
../../../waku/v2/waku_archive/driver/postgres_driver, | ||
../../../waku/v2/waku_core, | ||
../testlib/wakucore | ||
|
||
proc now():int64 = getTime().toUnix() | ||
|
||
proc computeTestCursor(pubsubTopic: PubsubTopic, | ||
message: WakuMessage): | ||
ArchiveCursor = | ||
ArchiveCursor( | ||
pubsubTopic: pubsubTopic, | ||
senderTime: message.timestamp, | ||
storeTime: message.timestamp, | ||
digest: computeDigest(message) | ||
) | ||
|
||
suite "Postgres driver": | ||
|
||
const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres" | ||
|
||
asyncTest "Asynchronous queries": | ||
#TODO: make the test asynchronous | ||
return | ||
|
||
## When | ||
let driverRes = PostgresDriver.new(storeMessageDbUrl) | ||
|
||
## Then | ||
require: | ||
driverRes.isOk() | ||
|
||
let driver: ArchiveDriver = driverRes.tryGet() | ||
require: | ||
not driver.isNil() | ||
|
||
let beforeSleep = now() | ||
for _ in 1 .. 20: | ||
discard (PostgresDriver driver).sleep(1) | ||
|
||
require (now() - beforeSleep) < 20 | ||
|
||
## Cleanup | ||
(await driver.close()).expect("driver to close") | ||
|
||
asyncTest "init driver and database": | ||
|
||
## When | ||
let driverRes = PostgresDriver.new(storeMessageDbUrl) | ||
|
||
## Then | ||
require: | ||
driverRes.isOk() | ||
|
||
let driver: ArchiveDriver = driverRes.tryGet() | ||
require: | ||
not driver.isNil() | ||
|
||
discard driverRes.get().reset() | ||
let initRes = driverRes.get().init() | ||
|
||
require: | ||
initRes.isOk() | ||
|
||
## Cleanup | ||
(await driver.close()).expect("driver to close") | ||
|
||
asyncTest "insert a message": | ||
## Given | ||
const contentTopic = "test-content-topic" | ||
|
||
let driverRes = PostgresDriver.new(storeMessageDbUrl) | ||
|
||
require: | ||
driverRes.isOk() | ||
|
||
discard driverRes.get().reset() | ||
discard driverRes.get().init() | ||
|
||
let driver: ArchiveDriver = driverRes.tryGet() | ||
require: | ||
not driver.isNil() | ||
|
||
let msg = fakeWakuMessage(contentTopic=contentTopic) | ||
|
||
let computedDigest = computeDigest(msg) | ||
## When | ||
let putRes = await driver.put(DefaultPubsubTopic, msg, computedDigest, msg.timestamp) | ||
|
||
## Then | ||
require: | ||
putRes.isOk() | ||
|
||
let storedMsg = (await driver.getAllMessages()).tryGet() | ||
require: | ||
storedMsg.len == 1 | ||
storedMsg.all do (item: auto) -> bool: | ||
let (pubsubTopic, actualMsg, digest, storeTimestamp) = item | ||
actualMsg.contentTopic == contentTopic and | ||
pubsubTopic == DefaultPubsubTopic and | ||
toHex(computedDigest.data) == toHex(digest) and | ||
toHex(actualMsg.payload) == toHex(msg.payload) | ||
|
||
## Cleanup | ||
(await driver.close()).expect("driver to close") | ||
|
||
asyncTest "insert and query message": | ||
## Given | ||
const contentTopic1 = "test-content-topic-1" | ||
const contentTopic2 = "test-content-topic-2" | ||
const pubsubTopic1 = "pubsubtopic-1" | ||
const pubsubTopic2 = "pubsubtopic-2" | ||
|
||
let driverRes = PostgresDriver.new(storeMessageDbUrl) | ||
|
||
require: | ||
driverRes.isOk() | ||
|
||
discard driverRes.get().reset() | ||
discard driverRes.get().init() | ||
|
||
let driver: ArchiveDriver = driverRes.tryGet() | ||
require: | ||
not driver.isNil() | ||
|
||
let msg1 = fakeWakuMessage(contentTopic=contentTopic1) | ||
|
||
## When | ||
var putRes = await driver.put(pubsubTopic1, msg1, computeDigest(msg1), msg1.timestamp) | ||
|
||
## Then | ||
require: | ||
putRes.isOk() | ||
|
||
let msg2 = fakeWakuMessage(contentTopic=contentTopic2) | ||
|
||
## When | ||
putRes = await driver.put(pubsubTopic2, msg2, computeDigest(msg2), msg2.timestamp) | ||
|
||
## Then | ||
require: | ||
putRes.isOk() | ||
|
||
let countMessagesRes = await driver.getMessagesCount() | ||
|
||
require: | ||
countMessagesRes.isOk() and | ||
countMessagesRes.get() == 2 | ||
|
||
var messagesRes = await driver.getMessages(contentTopic = @[contentTopic1]) | ||
|
||
require: | ||
messagesRes.isOk() | ||
|
||
require: | ||
messagesRes.get().len == 1 | ||
|
||
# Get both content topics, check ordering | ||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, | ||
contentTopic2]) | ||
require: | ||
messagesRes.isOk() | ||
|
||
require: | ||
messagesRes.get().len == 2 and | ||
messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic1 | ||
|
||
# Descending order | ||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, | ||
contentTopic2], | ||
ascendingOrder = false) | ||
require: | ||
messagesRes.isOk() | ||
|
||
require: | ||
messagesRes.get().len == 2 and | ||
messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic2 | ||
|
||
# cursor | ||
# Get both content topics | ||
messagesRes = | ||
await driver.getMessages(contentTopic = @[contentTopic1, | ||
contentTopic2], | ||
cursor = some( | ||
computeTestCursor(pubsubTopic1, | ||
messagesRes.get()[0][1]))) | ||
require: | ||
messagesRes.isOk() | ||
|
||
require: | ||
messagesRes.get().len == 1 | ||
|
||
# Get both content topics but one pubsub topic | ||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, | ||
contentTopic2], | ||
pubsubTopic = some(pubsubTopic1)) | ||
require: | ||
messagesRes.isOk() | ||
|
||
require: | ||
messagesRes.get().len == 1 and | ||
messagesRes.get()[0][1].WakuMessage.contentTopic == contentTopic1 | ||
|
||
# Limit | ||
messagesRes = await driver.getMessages(contentTopic = @[contentTopic1, | ||
contentTopic2], | ||
maxPageSize = 1) | ||
require: | ||
messagesRes.isOk() | ||
|
||
require: | ||
messagesRes.get().len == 1 | ||
|
||
## Cleanup | ||
(await driver.close()).expect("driver to close") | ||
|
||
asyncTest "insert true duplicated messages": | ||
# Validates that two completely equal messages can not be stored. | ||
## Given | ||
let driverRes = PostgresDriver.new(storeMessageDbUrl) | ||
|
||
require: | ||
driverRes.isOk() | ||
|
||
discard driverRes.get().reset() | ||
discard driverRes.get().init() | ||
|
||
let driver: ArchiveDriver = driverRes.tryGet() | ||
require: | ||
not driver.isNil() | ||
|
||
let now = now() | ||
|
||
let msg1 = fakeWakuMessage(ts = now) | ||
let msg2 = fakeWakuMessage(ts = now) | ||
|
||
var putRes = await driver.put(DefaultPubsubTopic, | ||
msg1, computeDigest(msg1), msg1.timestamp) | ||
## Then | ||
require: | ||
putRes.isOk() | ||
|
||
putRes = await driver.put(DefaultPubsubTopic, | ||
msg2, computeDigest(msg2), msg2.timestamp) | ||
## Then | ||
require: | ||
not putRes.isOk() | ||
|
||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
when (NimMajor, NimMinor) < (1, 4): | ||
{.push raises: [Defect].} | ||
else: | ||
{.push raises: [].} | ||
|
||
import ./postgres_driver/postgres_driver | ||
|
||
export postgres_driver |
Oops, something went wrong.