Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(archive): refactor archive drivers for store v3 #2729

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 2 additions & 17 deletions tests/waku_archive/archive_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import
node/peer_manager,
waku_core,
waku_archive,
waku_archive/common,
waku_archive/driver/sqlite_driver,
common/databases/db_sqlite,
],
Expand All @@ -23,26 +22,12 @@ proc newSqliteArchiveDriver*(): ArchiveDriver =
proc newWakuArchive*(driver: ArchiveDriver): WakuArchive =
WakuArchive.new(driver).get()

proc computeArchiveCursor*(
pubsubTopic: PubsubTopic, message: WakuMessage
): ArchiveCursor =
ArchiveCursor(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message),
)

proc put*(
driver: ArchiveDriver, pubsubTopic: PubSubTopic, msgList: seq[WakuMessage]
): ArchiveDriver =
for msg in msgList:
let
msgDigest = computeDigest(msg)
msgHash = computeMessageHash(pubsubTopic, msg)
_ = waitFor driver.put(pubsubTopic, msg, msgDigest, msgHash, msg.timestamp)
# discard crashes
discard waitFor driver.put(computeMessageHash(pubsubTopic, msg), pubsubTopic, msg)

return driver

proc newArchiveDriverWithMessages*(
Expand Down
68 changes: 18 additions & 50 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,6 @@ import
../testlib/testasync,
../testlib/postgres

proc computeTestCursor(pubsubTopic: PubsubTopic, message: WakuMessage): ArchiveCursor =
ArchiveCursor(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message),
)

suite "Postgres driver":
## Unique driver instance
var driver {.threadvar.}: PostgresDriver
Expand Down Expand Up @@ -58,24 +49,19 @@ suite "Postgres driver":

let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)

let computedDigest = computeDigest(msg)
let computedHash = computeMessageHash(DefaultPubsubTopic, msg)

let putRes = await driver.put(
DefaultPubsubTopic, msg, computedDigest, computedHash, msg.timestamp
computeMessageHash(DefaultPubsubTopic, msg), DefaultPubsubTopic, msg
)
assert putRes.isOk(), putRes.error

let storedMsg = (await driver.getAllMessages()).tryGet()

assert storedMsg.len == 1

let (pubsubTopic, actualMsg, digest, _, hash) = storedMsg[0]
let (_, pubsubTopic, actualMsg) = storedMsg[0]
assert actualMsg.contentTopic == contentTopic
assert pubsubTopic == DefaultPubsubTopic
assert toHex(computedDigest.data) == toHex(digest)
assert toHex(actualMsg.payload) == toHex(msg.payload)
assert toHex(computedHash) == toHex(hash)
assert toHex(actualMsg.meta) == toHex(msg.meta)

asyncTest "Insert and query message":
Expand All @@ -86,74 +72,64 @@ suite "Postgres driver":

let msg1 = fakeWakuMessage(contentTopic = contentTopic1)

var putRes = await driver.put(
pubsubTopic1,
msg1,
computeDigest(msg1),
computeMessageHash(pubsubTopic1, msg1),
msg1.timestamp,
)
var putRes =
await driver.put(computeMessageHash(pubsubTopic1, msg1), pubsubTopic1, msg1)
assert putRes.isOk(), putRes.error

let msg2 = fakeWakuMessage(contentTopic = contentTopic2)

putRes = await driver.put(
pubsubTopic2,
msg2,
computeDigest(msg2),
computeMessageHash(pubsubTopic2, msg2),
msg2.timestamp,
)
putRes =
await driver.put(computeMessageHash(pubsubTopic2, msg2), pubsubTopic2, msg2)
assert putRes.isOk(), putRes.error

let countMessagesRes = await driver.getMessagesCount()

assert countMessagesRes.isOk(), $countMessagesRes.error
assert countMessagesRes.get() == 2

var messagesRes = await driver.getMessages(contentTopic = @[contentTopic1])
var messagesRes = await driver.getMessages(contentTopics = @[contentTopic1])

assert messagesRes.isOk(), $messagesRes.error
assert messagesRes.get().len == 1

# Get both content topics, check ordering
messagesRes =
await driver.getMessages(contentTopic = @[contentTopic1, contentTopic2])
await driver.getMessages(contentTopics = @[contentTopic1, contentTopic2])
assert messagesRes.isOk(), messagesRes.error

assert messagesRes.get().len == 2
assert messagesRes.get()[0][1].contentTopic == contentTopic1
assert messagesRes.get()[0][2].contentTopic == contentTopic1

# Descending order
messagesRes = await driver.getMessages(
contentTopic = @[contentTopic1, contentTopic2], ascendingOrder = false
contentTopics = @[contentTopic1, contentTopic2], ascendingOrder = false
)
assert messagesRes.isOk(), messagesRes.error

assert messagesRes.get().len == 2
assert messagesRes.get()[0][1].contentTopic == contentTopic2
assert messagesRes.get()[0][2].contentTopic == contentTopic2

# cursor
# Get both content topics
messagesRes = await driver.getMessages(
contentTopic = @[contentTopic1, contentTopic2],
cursor = some(computeTestCursor(pubsubTopic1, messagesRes.get()[1][1])),
contentTopics = @[contentTopic1, contentTopic2],
cursor = some(computeMessageHash(pubsubTopic1, messagesRes.get()[1][2])),
)
assert messagesRes.isOk()
assert messagesRes.get().len == 1

# Get both content topics but one pubsub topic
messagesRes = await driver.getMessages(
contentTopic = @[contentTopic1, contentTopic2], pubsubTopic = some(pubsubTopic1)
contentTopics = @[contentTopic1, contentTopic2], pubsubTopic = some(pubsubTopic1)
)
assert messagesRes.isOk(), messagesRes.error

assert messagesRes.get().len == 1
assert messagesRes.get()[0][1].contentTopic == contentTopic1
assert messagesRes.get()[0][2].contentTopic == contentTopic1

# Limit
messagesRes = await driver.getMessages(
contentTopic = @[contentTopic1, contentTopic2], maxPageSize = 1
contentTopics = @[contentTopic1, contentTopic2], maxPageSize = 1
)
assert messagesRes.isOk(), messagesRes.error
assert messagesRes.get().len == 1
Expand All @@ -170,11 +146,7 @@ suite "Postgres driver":
raiseAssert "could not get num mgs correctly: " & $error

var putRes = await driver.put(
DefaultPubsubTopic,
msg1,
computeDigest(msg1),
computeMessageHash(DefaultPubsubTopic, msg1),
msg1.timestamp,
computeMessageHash(DefaultPubsubTopic, msg1), DefaultPubsubTopic, msg1
)
assert putRes.isOk(), putRes.error

Expand All @@ -185,11 +157,7 @@ suite "Postgres driver":
"wrong number of messages: " & $newNumMsgs

putRes = await driver.put(
DefaultPubsubTopic,
msg2,
computeDigest(msg2),
computeMessageHash(DefaultPubsubTopic, msg2),
msg2.timestamp,
computeMessageHash(DefaultPubsubTopic, msg2), DefaultPubsubTopic, msg2
)

assert putRes.isOk()
Expand Down
Loading
Loading