diff --git a/tests/node/test_wakunode_store.nim b/tests/node/test_wakunode_store.nim index b7e5651b29..903989946f 100644 --- a/tests/node/test_wakunode_store.nim +++ b/tests/node/test_wakunode_store.nim @@ -60,7 +60,9 @@ suite "Waku Store - End to End - Sorted Archive": ] archiveMessages = messages.mapIt( WakuMessageKeyValue( - messageHash: computeMessageHash(pubsubTopic, it), message: some(it) + messageHash: computeMessageHash(pubsubTopic, it), + message: some(it), + pubsubTopic: some(pubsubTopic), ) ) @@ -337,7 +339,9 @@ suite "Waku Store - End to End - Sorted Archive": archiveMessages & extraMessages.mapIt( WakuMessageKeyValue( - messageHash: computeMessageHash(pubsubTopic, it), message: some(it) + messageHash: computeMessageHash(pubsubTopic, it), + message: some(it), + pubsubTopic: some(pubsubTopic), ) ) @@ -392,7 +396,9 @@ suite "Waku Store - End to End - Sorted Archive": archiveMessages & extraMessages.mapIt( WakuMessageKeyValue( - messageHash: computeMessageHash(pubsubTopic, it), message: some(it) + messageHash: computeMessageHash(pubsubTopic, it), + message: some(it), + pubsubTopic: some(pubsubTopic), ) ) @@ -558,7 +564,9 @@ suite "Waku Store - End to End - Unsorted Archive": ] unsortedArchiveMessages = messages.mapIt( WakuMessageKeyValue( - messageHash: computeMessageHash(pubsubTopic, it), message: some(it) + messageHash: computeMessageHash(pubsubTopic, it), + message: some(it), + pubsubTopic: some(pubsubTopic), ) ) @@ -773,7 +781,9 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp": ] unsortedArchiveMessages = messages.mapIt( WakuMessageKeyValue( - messageHash: computeMessageHash(pubsubTopic, it), message: some(it) + messageHash: computeMessageHash(pubsubTopic, it), + message: some(it), + pubsubTopic: some(pubsubTopic), ) ) @@ -915,7 +925,9 @@ suite "Waku Store - End to End - Archive with Multiple Topics": archiveMessages = messages.mapIt( WakuMessageKeyValue( - messageHash: computeMessageHash(pubsubTopic, it), message: some(it) + messageHash: computeMessageHash(pubsubTopic, it), + message: some(it), + pubsubTopic: some(pubsubTopic), ) ) @@ -923,6 +935,8 @@ suite "Waku Store - End to End - Archive with Multiple Topics": archiveMessages[i].messagehash = computeMessageHash(pubsubTopicB, archiveMessages[i].message.get()) + archiveMessages[i].pubsubTopic = some(pubsubTopicB) + let serverKey = generateSecp256k1Key() clientKey = generateSecp256k1Key() @@ -1274,7 +1288,9 @@ suite "Waku Store - End to End - Archive with Multiple Topics": let voluminousArchiveMessages = messages.mapIt( WakuMessageKeyValue( - messageHash: computeMessageHash(pubsubTopic, it), message: some(it) + messageHash: computeMessageHash(pubsubTopic, it), + message: some(it), + pubsubTopic: some(pubsubTopic), ) ) diff --git a/tests/waku_store/test_client.nim b/tests/waku_store/test_client.nim index f0dacb538b..11554a3120 100644 --- a/tests/waku_store/test_client.nim +++ b/tests/waku_store/test_client.nim @@ -38,9 +38,21 @@ suite "Store Client": hash3 = computeMessageHash(DefaultPubsubTopic, message3) messageSeq = @[ - WakuMessageKeyValue(messageHash: hash1, message: some(message1)), - WakuMessageKeyValue(messageHash: hash2, message: some(message2)), - WakuMessageKeyValue(messageHash: hash3, message: some(message3)), + WakuMessageKeyValue( + messageHash: hash1, + message: some(message1), + pubsubTopic: some(DefaultPubsubTopic), + ), + WakuMessageKeyValue( + messageHash: hash2, + message: some(message2), + pubsubTopic: some(DefaultPubsubTopic), + ), + WakuMessageKeyValue( + messageHash: hash3, + message: some(message3), + pubsubTopic: some(DefaultPubsubTopic), + ), ] handlerFuture = newHistoryFuture() handler = proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.} = diff --git a/tests/waku_store/test_rpc_codec.nim b/tests/waku_store/test_rpc_codec.nim index 3d59badfe2..6f5135d154 100644 --- a/tests/waku_store/test_rpc_codec.nim +++ b/tests/waku_store/test_rpc_codec.nim @@ -58,7 +58,9 @@ procSuite "Waku Store - RPC codec": let message = fakeWakuMessage() hash = computeMessageHash(DefaultPubsubTopic, message) - keyValue = WakuMessageKeyValue(messageHash: hash, message: some(message)) + keyValue = WakuMessageKeyValue( + messageHash: hash, message: some(message), pubsubTopic: some(DefaultPubsubTopic) + ) res = StoreQueryResponse( requestId: "1", statusCode: 200, diff --git a/tests/waku_store/test_waku_store.nim b/tests/waku_store/test_waku_store.nim index b897196d82..d1f66708d2 100644 --- a/tests/waku_store/test_waku_store.nim +++ b/tests/waku_store/test_waku_store.nim @@ -29,7 +29,9 @@ suite "Waku Store - query handler": let msg = fakeWakuMessage(contentTopic = DefaultContentTopic) let hash = computeMessageHash(DefaultPubsubTopic, msg) - let kv = WakuMessageKeyValue(messageHash: hash, message: some(msg)) + let kv = WakuMessageKeyValue( + messageHash: hash, message: some(msg), pubsubTopic: some(DefaultPubsubTopic) + ) var queryHandlerFut = newFuture[(StoreQueryRequest)]() diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index 1071af19af..184cbdf865 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -50,7 +50,9 @@ procSuite "WakuNode - Store": let hashes = msgListA.mapIt(computeMessageHash(DefaultPubsubTopic, it)) let kvs = zip(hashes, msgListA).mapIt( - WakuMessageKeyValue(messageHash: it[0], message: some(it[1])) + WakuMessageKeyValue( + messageHash: it[0], message: some(it[1]), pubsubTopic: some(DefaultPubsubTopic) + ) ) let archiveA = block: @@ -276,7 +278,11 @@ procSuite "WakuNode - Store": check: response.messages.len == 1 response.messages[0] == - WakuMessageKeyValue(messageHash: hash, message: some(message)) + WakuMessageKeyValue( + messageHash: hash, + message: some(message), + pubsubTopic: some(DefaultPubSubTopic), + ) let (handledPubsubTopic, handledMsg) = filterFut.read() check: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 89c7d92c84..273a699eee 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -838,13 +838,13 @@ proc toStoreResult(res: ArchiveResult): StoreQueryResult = for i in 0 ..< response.hashes.len: let hash = response.hashes[i] - let kv = - store_common.WakuMessageKeyValue(messageHash: hash, message: none(WakuMessage)) + let kv = store_common.WakuMessageKeyValue(messageHash: hash) res.messages.add(kv) for i in 0 ..< response.messages.len: res.messages[i].message = some(response.messages[i]) + res.messages[i].pubsubTopic = some(response.topics[i]) if response.cursor.isSome(): res.paginationCursor = some(response.cursor.get().hash) diff --git a/waku/waku_archive/archive.nim b/waku/waku_archive/archive.nim index 6d2cf6ee7d..fa395c62ec 100644 --- a/waku/waku_archive/archive.nim +++ b/waku/waku_archive/archive.nim @@ -169,6 +169,7 @@ proc findMessages*( var hashes = newSeq[WakuMessageHash]() var messages = newSeq[WakuMessage]() + var topics = newSeq[PubsubTopic]() var cursor = none(ArchiveCursor) if rows.len == 0: @@ -180,6 +181,7 @@ proc findMessages*( #TODO once store v2 is removed, unzip instead of 2x map #TODO once store v2 is removed, update driver to not return messages when not needed if query.includeData: + topics = rows[0 ..< pageSize].mapIt(it[0]) messages = rows[0 ..< pageSize].mapIt(it[1]) hashes = rows[0 ..< pageSize].mapIt(it[4]) @@ -206,10 +208,13 @@ proc findMessages*( # All messages MUST be returned in chronological order if not isAscendingOrder: - reverse(messages) reverse(hashes) + reverse(messages) + reverse(topics) - return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor)) + return ok( + ArchiveResponse(hashes: hashes, messages: messages, topics: topics, cursor: cursor) + ) proc findMessagesV2*( self: WakuArchive, query: ArchiveQuery diff --git a/waku/waku_archive/common.nim b/waku/waku_archive/common.nim index adc598941f..5b14fb111c 100644 --- a/waku/waku_archive/common.nim +++ b/waku/waku_archive/common.nim @@ -56,6 +56,7 @@ type ArchiveResponse* = object hashes*: seq[WakuMessageHash] messages*: seq[WakuMessage] + topics*: seq[PubsubTopic] cursor*: Option[ArchiveCursor] ArchiveErrorKind* {.pure.} = enum diff --git a/waku/waku_store/common.nim b/waku/waku_store/common.nim index 6481b73f5a..f1e6a78bc0 100644 --- a/waku/waku_store/common.nim +++ b/waku/waku_store/common.nim @@ -38,6 +38,7 @@ type WakuMessageKeyValue* = object messageHash*: WakuMessageHash message*: Option[WakuMessage] + pubsubTopic*: Option[PubsubTopic] StoreQueryResponse* = object requestId*: string diff --git a/waku/waku_store/rpc_codec.nim b/waku/waku_store/rpc_codec.nim index 6b5c905906..13d7440edb 100644 --- a/waku/waku_store/rpc_codec.nim +++ b/waku/waku_store/rpc_codec.nim @@ -125,8 +125,9 @@ proc encode*(keyValue: WakuMessageKeyValue): ProtoBuffer = pb.write3(1, keyValue.messageHash) - if keyValue.message.isSome(): + if keyValue.message.isSome() and keyValue.pubsubTopic.isSome(): pb.write3(2, keyValue.message.get().encode()) + pb.write3(3, keyValue.pubsubTopic.get()) pb.finish3() @@ -164,10 +165,13 @@ proc decode*( keyValue.messagehash = hash var proto: ProtoBuffer - if not ?pb.getField(2, proto): - keyValue.message = none(WakuMessage) - else: + var topic: string + if ?pb.getField(2, proto) and ?pb.getField(3, topic): keyValue.message = some(?WakuMessage.decode(proto.buffer)) + keyValue.pubsubTopic = some(topic) + else: + keyValue.message = none(WakuMessage) + keyValue.pubsubTopic = none(string) return ok(keyValue)