Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(archive): archive and drivers refactor #2761

Merged
merged 22 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import
factory/builder,
common/utils/nat,
waku_relay,
waku_store/common,
],
./config_chat2

Expand Down Expand Up @@ -468,22 +469,30 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
# We have a viable storenode. Let's query it for historical messages.
echo "Connecting to storenode: " & $(storenode.get())

node.mountLegacyStoreClient()
node.peerManager.addServicePeer(storenode.get(), WakuLegacyStoreCodec)
node.mountStoreClient()
node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec)

proc storeHandler(response: HistoryResponse) {.gcsafe.} =
proc storeHandler(response: StoreQueryResponse) {.gcsafe.} =
for msg in response.messages:
let payload =
if msg.message.isSome():
msg.message.get().payload
else:
newSeq[byte](0)

let
pb = Chat2Message.init(msg.payload)
pb = Chat2Message.init(payload)
chatLine =
if pb.isOk:
pb[].toString()
else:
string.fromBytes(msg.payload)
string.fromBytes(payload)
echo &"{chatLine}"
info "Hit store handler"

let queryRes = await node.query(HistoryQuery(contentTopics: @[chat.contentTopic]))
let queryRes = await node.query(
StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get()
)
if queryRes.isOk():
storeHandler(queryRes.value)

Expand Down
12 changes: 12 additions & 0 deletions migrations/message_store_postgres/content_script_version_6.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const ContentScriptVersion_6* =
"""
-- we can drop the timestamp column because this data is also kept in the storedAt column
ALTER TABLE messages DROP COLUMN timestamp;

-- from now on we are only interested in the message timestamp
ALTER TABLE messages RENAME COLUMN storedAt TO timestamp;

-- Update to new version
UPDATE version SET version = 6 WHERE version = 5;

"""
3 changes: 2 additions & 1 deletion migrations/message_store_postgres/pg_migration_manager.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import
content_script_version_1, content_script_version_2, content_script_version_3,
content_script_version_4, content_script_version_5
content_script_version_4, content_script_version_5, content_script_version_6

type MigrationScript* = object
version*: int
Expand All @@ -16,6 +16,7 @@ const PgMigrationScripts* =
MigrationScript(version: 3, scriptContent: ContentScriptVersion_3),
MigrationScript(version: 4, scriptContent: ContentScriptVersion_4),
MigrationScript(version: 5, scriptContent: ContentScriptVersion_5),
MigrationScript(version: 6, scriptContent: ContentScriptVersion_6),
]

proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =
Expand Down
12 changes: 11 additions & 1 deletion tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ import
./waku_archive/test_driver_sqlite,
./waku_archive/test_retention_policy,
./waku_archive/test_waku_archive,
./waku_archive/test_partition_manager
./waku_archive/test_partition_manager,
./waku_archive_legacy/test_driver_queue_index,
./waku_archive_legacy/test_driver_queue_pagination,
./waku_archive_legacy/test_driver_queue_query,
./waku_archive_legacy/test_driver_queue,
./waku_archive_legacy/test_driver_sqlite_query,
./waku_archive_legacy/test_driver_sqlite,
./waku_archive_legacy/test_retention_policy,
./waku_archive_legacy/test_waku_archive
SionoiS marked this conversation as resolved.
Show resolved Hide resolved

const os* {.strdefine.} = ""
when os == "Linux" and
Expand All @@ -28,6 +36,8 @@ when os == "Linux" and
import
./waku_archive/test_driver_postgres_query,
./waku_archive/test_driver_postgres,
#./waku_archive_legacy/test_driver_postgres_query,
#./waku_archive_legacy/test_driver_postgres,
./factory/test_node_factory,
./wakunode_rest/test_rest_store

Expand Down
22 changes: 11 additions & 11 deletions tests/node/test_wakunode_legacy_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import
waku_core,
waku_store_legacy,
waku_store_legacy/client,
waku_archive,
waku_archive/driver/sqlite_driver,
waku_archive_legacy,
waku_archive_legacy/driver/sqlite_driver,
common/databases/db_sqlite,
],
../waku_store_legacy/store_utils,
../waku_archive/archive_utils,
../waku_archive_legacy/archive_utils,
../testlib/[common, wakucore, wakunode, testasync, futures, testutils]

suite "Waku Store - End to End - Sorted Archive":
Expand Down Expand Up @@ -73,7 +73,7 @@ suite "Waku Store - End to End - Sorted Archive":
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))

archiveDriver = newArchiveDriverWithMessages(pubsubTopic, archiveMessages)
let mountArchiveResult = server.mountArchive(archiveDriver)
let mountArchiveResult = server.mountLegacyArchive(archiveDriver)
assert mountArchiveResult.isOk()

await server.mountLegacyStore()
Expand Down Expand Up @@ -445,7 +445,7 @@ suite "Waku Store - End to End - Sorted Archive":
otherServer =
newTestWakuNode(otherServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
mountOtherArchiveResult =
otherServer.mountArchive(otherArchiveDriverWithMessages)
otherServer.mountLegacyArchive(otherArchiveDriverWithMessages)
assert mountOtherArchiveResult.isOk()

await otherServer.mountLegacyStore()
Expand Down Expand Up @@ -532,7 +532,7 @@ suite "Waku Store - End to End - Unsorted Archive":
unsortedArchiveDriverWithMessages =
newArchiveDriverWithMessages(pubsubTopic, unsortedArchiveMessages)
mountUnsortedArchiveResult =
server.mountArchive(unsortedArchiveDriverWithMessages)
server.mountLegacyArchive(unsortedArchiveDriverWithMessages)

assert mountUnsortedArchiveResult.isOk()

Expand Down Expand Up @@ -687,7 +687,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
let archiveDriver = newSqliteArchiveDriver()
.put(pubsubTopic, archiveMessages[0 ..< 6])
.put(pubsubTopicB, archiveMessages[6 ..< 10])
let mountSortedArchiveResult = server.mountArchive(archiveDriver)
let mountSortedArchiveResult = server.mountLegacyArchive(archiveDriver)

assert mountSortedArchiveResult.isOk()

Expand Down Expand Up @@ -932,7 +932,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
ephemeralServer =
newTestWakuNode(ephemeralServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
mountEphemeralArchiveResult =
ephemeralServer.mountArchive(ephemeralArchiveDriver)
ephemeralServer.mountLegacyArchive(ephemeralArchiveDriver)
assert mountEphemeralArchiveResult.isOk()

await ephemeralServer.mountLegacyStore()
Expand Down Expand Up @@ -974,7 +974,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
mixedServerKey = generateSecp256k1Key()
mixedServer =
newTestWakuNode(mixedServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
mountMixedArchiveResult = mixedServer.mountArchive(mixedArchiveDriver)
mountMixedArchiveResult = mixedServer.mountLegacyArchive(mixedArchiveDriver)
assert mountMixedArchiveResult.isOk()

await mixedServer.mountLegacyStore()
Expand All @@ -1001,7 +1001,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
emptyServerKey = generateSecp256k1Key()
emptyServer =
newTestWakuNode(emptyServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
mountEmptyArchiveResult = emptyServer.mountArchive(emptyArchiveDriver)
mountEmptyArchiveResult = emptyServer.mountLegacyArchive(emptyArchiveDriver)
assert mountEmptyArchiveResult.isOk()

await emptyServer.mountLegacyStore()
Expand Down Expand Up @@ -1033,7 +1033,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
voluminousServer =
newTestWakuNode(voluminousServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
mountVoluminousArchiveResult =
voluminousServer.mountArchive(voluminousArchiveDriverWithMessages)
voluminousServer.mountLegacyArchive(voluminousArchiveDriverWithMessages)
assert mountVoluminousArchiveResult.isOk()

await voluminousServer.mountLegacyStore()
Expand Down
27 changes: 27 additions & 0 deletions tests/testlib/postgres_legacy.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import chronicles, chronos
import
waku/waku_archive_legacy,
waku/waku_archive_legacy/driver as driver_module,
waku/waku_archive_legacy/driver/builder,
waku/waku_archive_legacy/driver/postgres_driver

const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"

proc newTestPostgresDriver*(): Future[Result[ArchiveDriver, string]] {.
async, deprecated
.} =
proc onErr(errMsg: string) {.gcsafe, closure.} =
error "error creating ArchiveDriver", error = errMsg
quit(QuitFailure)

let
vacuum = false
migrate = true
maxNumConn = 50

let driverRes =
await ArchiveDriver.new(storeMessageDbUrl, vacuum, migrate, maxNumConn, onErr)
if driverRes.isErr():
onErr("could not create archive driver: " & driverRes.error)

return ok(driverRes.get())
17 changes: 1 addition & 16 deletions tests/waku_archive/archive_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,11 @@ proc newSqliteArchiveDriver*(): ArchiveDriver =
proc newWakuArchive*(driver: ArchiveDriver): WakuArchive =
WakuArchive.new(driver).get()

proc computeArchiveCursor*(
pubsubTopic: PubsubTopic, message: WakuMessage
): ArchiveCursor =
ArchiveCursor(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message),
)

proc put*(
driver: ArchiveDriver, pubsubTopic: PubSubTopic, msgList: seq[WakuMessage]
): ArchiveDriver =
for msg in msgList:
let
msgDigest = computeDigest(msg)
msgHash = computeMessageHash(pubsubTopic, msg)
_ = waitFor driver.put(pubsubTopic, msg, msgDigest, msgHash, msg.timestamp)
# discard crashes
let _ = waitFor driver.put(computeMessageHash(pubsubTopic, msg), pubsubTopic, msg)
return driver

proc newArchiveDriverWithMessages*(
Expand Down
1 change: 1 addition & 0 deletions tests/waku_archive/test_all.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ import
./test_driver_queue,
./test_driver_sqlite_query,
./test_driver_sqlite,
./test_partition_manager,
./test_retention_policy,
./test_waku_archive
Loading
Loading