Skip to content

Commit

Permalink
Merge cc7b0a6 into e269dca
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Jul 12, 2024
2 parents e269dca + cc7b0a6 commit 8dc0454
Show file tree
Hide file tree
Showing 71 changed files with 12,540 additions and 2,103 deletions.
21 changes: 15 additions & 6 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import
factory/builder,
common/utils/nat,
waku_relay,
waku_store/common,
],
./config_chat2

Expand Down Expand Up @@ -468,22 +469,30 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
# We have a viable storenode. Let's query it for historical messages.
echo "Connecting to storenode: " & $(storenode.get())

node.mountLegacyStoreClient()
node.peerManager.addServicePeer(storenode.get(), WakuLegacyStoreCodec)
node.mountStoreClient()
node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec)

proc storeHandler(response: HistoryResponse) {.gcsafe.} =
proc storeHandler(response: StoreQueryResponse) {.gcsafe.} =
for msg in response.messages:
let payload =
if msg.message.isSome():
msg.message.get().payload
else:
newSeq[byte](0)

let
pb = Chat2Message.init(msg.payload)
pb = Chat2Message.init(payload)
chatLine =
if pb.isOk:
pb[].toString()
else:
string.fromBytes(msg.payload)
string.fromBytes(payload)
echo &"{chatLine}"
info "Hit store handler"

let queryRes = await node.query(HistoryQuery(contentTopics: @[chat.contentTopic]))
let queryRes = await node.query(
StoreQueryRequest(contentTopics: @[chat.contentTopic]), storenode.get()
)
if queryRes.isOk():
storeHandler(queryRes.value)

Expand Down
12 changes: 12 additions & 0 deletions migrations/message_store_postgres/content_script_version_6.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const ContentScriptVersion_6* =
"""
-- we can drop the timestamp column because this data is also kept in the storedAt column
ALTER TABLE messages DROP COLUMN timestamp;
-- from now on we are only interested in the message timestamp
ALTER TABLE messages RENAME COLUMN storedAt TO timestamp;
-- Update to new version
UPDATE version SET version = 6 WHERE version = 5;
"""
3 changes: 2 additions & 1 deletion migrations/message_store_postgres/pg_migration_manager.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import
content_script_version_1, content_script_version_2, content_script_version_3,
content_script_version_4, content_script_version_5
content_script_version_4, content_script_version_5, content_script_version_6

type MigrationScript* = object
version*: int
Expand All @@ -16,6 +16,7 @@ const PgMigrationScripts* =
MigrationScript(version: 3, scriptContent: ContentScriptVersion_3),
MigrationScript(version: 4, scriptContent: ContentScriptVersion_4),
MigrationScript(version: 5, scriptContent: ContentScriptVersion_5),
MigrationScript(version: 6, scriptContent: ContentScriptVersion_6),
]

proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =
Expand Down
12 changes: 11 additions & 1 deletion tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,15 @@ import
./waku_archive/test_driver_sqlite,
./waku_archive/test_retention_policy,
./waku_archive/test_waku_archive,
./waku_archive/test_partition_manager
./waku_archive/test_partition_manager,
./waku_archive_legacy/test_driver_queue_index,
./waku_archive_legacy/test_driver_queue_pagination,
./waku_archive_legacy/test_driver_queue_query,
./waku_archive_legacy/test_driver_queue,
./waku_archive_legacy/test_driver_sqlite_query,
./waku_archive_legacy/test_driver_sqlite,
./waku_archive_legacy/test_retention_policy,
./waku_archive_legacy/test_waku_archive

const os* {.strdefine.} = ""
when os == "Linux" and
Expand All @@ -28,6 +36,8 @@ when os == "Linux" and
import
./waku_archive/test_driver_postgres_query,
./waku_archive/test_driver_postgres,
#./waku_archive_legacy/test_driver_postgres_query,
#./waku_archive_legacy/test_driver_postgres,
./factory/test_node_factory,
./wakunode_rest/test_rest_store

Expand Down
22 changes: 11 additions & 11 deletions tests/node/test_wakunode_legacy_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import
waku_core,
waku_store_legacy,
waku_store_legacy/client,
waku_archive,
waku_archive/driver/sqlite_driver,
waku_archive_legacy,
waku_archive_legacy/driver/sqlite_driver,
common/databases/db_sqlite,
],
../waku_store_legacy/store_utils,
../waku_archive/archive_utils,
../waku_archive_legacy/archive_utils,
../testlib/[common, wakucore, wakunode, testasync, futures, testutils]

suite "Waku Store - End to End - Sorted Archive":
Expand Down Expand Up @@ -73,7 +73,7 @@ suite "Waku Store - End to End - Sorted Archive":
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))

archiveDriver = newArchiveDriverWithMessages(pubsubTopic, archiveMessages)
let mountArchiveResult = server.mountArchive(archiveDriver)
let mountArchiveResult = server.mountLegacyArchive(archiveDriver)
assert mountArchiveResult.isOk()

await server.mountLegacyStore()
Expand Down Expand Up @@ -445,7 +445,7 @@ suite "Waku Store - End to End - Sorted Archive":
otherServer =
newTestWakuNode(otherServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
mountOtherArchiveResult =
otherServer.mountArchive(otherArchiveDriverWithMessages)
otherServer.mountLegacyArchive(otherArchiveDriverWithMessages)
assert mountOtherArchiveResult.isOk()

await otherServer.mountLegacyStore()
Expand Down Expand Up @@ -532,7 +532,7 @@ suite "Waku Store - End to End - Unsorted Archive":
unsortedArchiveDriverWithMessages =
newArchiveDriverWithMessages(pubsubTopic, unsortedArchiveMessages)
mountUnsortedArchiveResult =
server.mountArchive(unsortedArchiveDriverWithMessages)
server.mountLegacyArchive(unsortedArchiveDriverWithMessages)

assert mountUnsortedArchiveResult.isOk()

Expand Down Expand Up @@ -687,7 +687,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
let archiveDriver = newSqliteArchiveDriver()
.put(pubsubTopic, archiveMessages[0 ..< 6])
.put(pubsubTopicB, archiveMessages[6 ..< 10])
let mountSortedArchiveResult = server.mountArchive(archiveDriver)
let mountSortedArchiveResult = server.mountLegacyArchive(archiveDriver)

assert mountSortedArchiveResult.isOk()

Expand Down Expand Up @@ -932,7 +932,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
ephemeralServer =
newTestWakuNode(ephemeralServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
mountEphemeralArchiveResult =
ephemeralServer.mountArchive(ephemeralArchiveDriver)
ephemeralServer.mountLegacyArchive(ephemeralArchiveDriver)
assert mountEphemeralArchiveResult.isOk()

await ephemeralServer.mountLegacyStore()
Expand Down Expand Up @@ -974,7 +974,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
mixedServerKey = generateSecp256k1Key()
mixedServer =
newTestWakuNode(mixedServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
mountMixedArchiveResult = mixedServer.mountArchive(mixedArchiveDriver)
mountMixedArchiveResult = mixedServer.mountLegacyArchive(mixedArchiveDriver)
assert mountMixedArchiveResult.isOk()

await mixedServer.mountLegacyStore()
Expand All @@ -1001,7 +1001,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
emptyServerKey = generateSecp256k1Key()
emptyServer =
newTestWakuNode(emptyServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
mountEmptyArchiveResult = emptyServer.mountArchive(emptyArchiveDriver)
mountEmptyArchiveResult = emptyServer.mountLegacyArchive(emptyArchiveDriver)
assert mountEmptyArchiveResult.isOk()

await emptyServer.mountLegacyStore()
Expand Down Expand Up @@ -1033,7 +1033,7 @@ suite "Waku Store - End to End - Archive with Multiple Topics":
voluminousServer =
newTestWakuNode(voluminousServerKey, ValidIpAddress.init("0.0.0.0"), Port(0))
mountVoluminousArchiveResult =
voluminousServer.mountArchive(voluminousArchiveDriverWithMessages)
voluminousServer.mountLegacyArchive(voluminousArchiveDriverWithMessages)
assert mountVoluminousArchiveResult.isOk()

await voluminousServer.mountLegacyStore()
Expand Down
27 changes: 27 additions & 0 deletions tests/testlib/postgres_legacy.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import chronicles, chronos
import
waku/waku_archive_legacy,
waku/waku_archive_legacy/driver as driver_module,
waku/waku_archive_legacy/driver/builder,
waku/waku_archive_legacy/driver/postgres_driver

const storeMessageDbUrl = "postgres://postgres:test123@localhost:5432/postgres"

proc newTestPostgresDriver*(): Future[Result[ArchiveDriver, string]] {.
async, deprecated
.} =
proc onErr(errMsg: string) {.gcsafe, closure.} =
error "error creating ArchiveDriver", error = errMsg
quit(QuitFailure)

let
vacuum = false
migrate = true
maxNumConn = 50

let driverRes =
await ArchiveDriver.new(storeMessageDbUrl, vacuum, migrate, maxNumConn, onErr)
if driverRes.isErr():
onErr("could not create archive driver: " & driverRes.error)

return ok(driverRes.get())
17 changes: 1 addition & 16 deletions tests/waku_archive/archive_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,11 @@ proc newSqliteArchiveDriver*(): ArchiveDriver =
proc newWakuArchive*(driver: ArchiveDriver): WakuArchive =
WakuArchive.new(driver).get()

proc computeArchiveCursor*(
pubsubTopic: PubsubTopic, message: WakuMessage
): ArchiveCursor =
ArchiveCursor(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: computeDigest(message),
hash: computeMessageHash(pubsubTopic, message),
)

proc put*(
driver: ArchiveDriver, pubsubTopic: PubSubTopic, msgList: seq[WakuMessage]
): ArchiveDriver =
for msg in msgList:
let
msgDigest = computeDigest(msg)
msgHash = computeMessageHash(pubsubTopic, msg)
_ = waitFor driver.put(pubsubTopic, msg, msgDigest, msgHash, msg.timestamp)
# discard crashes
let _ = waitFor driver.put(computeMessageHash(pubsubTopic, msg), pubsubTopic, msg)
return driver

proc newArchiveDriverWithMessages*(
Expand Down
1 change: 1 addition & 0 deletions tests/waku_archive/test_all.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ import
./test_driver_queue,
./test_driver_sqlite_query,
./test_driver_sqlite,
./test_partition_manager,
./test_retention_policy,
./test_waku_archive
Loading

0 comments on commit 8dc0454

Please sign in to comment.