Skip to content

Commit

Permalink
feat: store v3 return pubsub topics (#2676)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored May 8, 2024
1 parent 6a1af92 commit d700006
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 22 deletions.
30 changes: 23 additions & 7 deletions tests/node/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ suite "Waku Store - End to End - Sorted Archive":
]
archiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down Expand Up @@ -337,7 +339,9 @@ suite "Waku Store - End to End - Sorted Archive":
archiveMessages &
extraMessages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down Expand Up @@ -392,7 +396,9 @@ suite "Waku Store - End to End - Sorted Archive":
archiveMessages &
extraMessages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down Expand Up @@ -558,7 +564,9 @@ suite "Waku Store - End to End - Unsorted Archive":
]
unsortedArchiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down Expand Up @@ -773,7 +781,9 @@ suite "Waku Store - End to End - Unsorted Archive without provided Timestamp":
]
unsortedArchiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down Expand Up @@ -915,14 +925,18 @@ suite "Waku Store - End to End - Archive with Multiple Topics":

archiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

for i in 6 ..< 10:
archiveMessages[i].messagehash =
computeMessageHash(pubsubTopicB, archiveMessages[i].message.get())

archiveMessages[i].pubsubTopic = some(pubsubTopicB)

let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
Expand Down Expand Up @@ -1274,7 +1288,9 @@ suite "Waku Store - End to End - Archive with Multiple Topics":

let voluminousArchiveMessages = messages.mapIt(
WakuMessageKeyValue(
messageHash: computeMessageHash(pubsubTopic, it), message: some(it)
messageHash: computeMessageHash(pubsubTopic, it),
message: some(it),
pubsubTopic: some(pubsubTopic),
)
)

Expand Down
18 changes: 15 additions & 3 deletions tests/waku_store/test_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,21 @@ suite "Store Client":
hash3 = computeMessageHash(DefaultPubsubTopic, message3)
messageSeq =
@[
WakuMessageKeyValue(messageHash: hash1, message: some(message1)),
WakuMessageKeyValue(messageHash: hash2, message: some(message2)),
WakuMessageKeyValue(messageHash: hash3, message: some(message3)),
WakuMessageKeyValue(
messageHash: hash1,
message: some(message1),
pubsubTopic: some(DefaultPubsubTopic),
),
WakuMessageKeyValue(
messageHash: hash2,
message: some(message2),
pubsubTopic: some(DefaultPubsubTopic),
),
WakuMessageKeyValue(
messageHash: hash3,
message: some(message3),
pubsubTopic: some(DefaultPubsubTopic),
),
]
handlerFuture = newHistoryFuture()
handler = proc(req: StoreQueryRequest): Future[StoreQueryResult] {.async, gcsafe.} =
Expand Down
4 changes: 3 additions & 1 deletion tests/waku_store/test_rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ procSuite "Waku Store - RPC codec":
let
message = fakeWakuMessage()
hash = computeMessageHash(DefaultPubsubTopic, message)
keyValue = WakuMessageKeyValue(messageHash: hash, message: some(message))
keyValue = WakuMessageKeyValue(
messageHash: hash, message: some(message), pubsubTopic: some(DefaultPubsubTopic)
)
res = StoreQueryResponse(
requestId: "1",
statusCode: 200,
Expand Down
4 changes: 3 additions & 1 deletion tests/waku_store/test_waku_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ suite "Waku Store - query handler":

let msg = fakeWakuMessage(contentTopic = DefaultContentTopic)
let hash = computeMessageHash(DefaultPubsubTopic, msg)
let kv = WakuMessageKeyValue(messageHash: hash, message: some(msg))
let kv = WakuMessageKeyValue(
messageHash: hash, message: some(msg), pubsubTopic: some(DefaultPubsubTopic)
)

var queryHandlerFut = newFuture[(StoreQueryRequest)]()

Expand Down
10 changes: 8 additions & 2 deletions tests/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ procSuite "WakuNode - Store":
let hashes = msgListA.mapIt(computeMessageHash(DefaultPubsubTopic, it))

let kvs = zip(hashes, msgListA).mapIt(
WakuMessageKeyValue(messageHash: it[0], message: some(it[1]))
WakuMessageKeyValue(
messageHash: it[0], message: some(it[1]), pubsubTopic: some(DefaultPubsubTopic)
)
)

let archiveA = block:
Expand Down Expand Up @@ -276,7 +278,11 @@ procSuite "WakuNode - Store":
check:
response.messages.len == 1
response.messages[0] ==
WakuMessageKeyValue(messageHash: hash, message: some(message))
WakuMessageKeyValue(
messageHash: hash,
message: some(message),
pubsubTopic: some(DefaultPubSubTopic),
)

let (handledPubsubTopic, handledMsg) = filterFut.read()
check:
Expand Down
4 changes: 2 additions & 2 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -838,13 +838,13 @@ proc toStoreResult(res: ArchiveResult): StoreQueryResult =
for i in 0 ..< response.hashes.len:
let hash = response.hashes[i]

let kv =
store_common.WakuMessageKeyValue(messageHash: hash, message: none(WakuMessage))
let kv = store_common.WakuMessageKeyValue(messageHash: hash)

res.messages.add(kv)

for i in 0 ..< response.messages.len:
res.messages[i].message = some(response.messages[i])
res.messages[i].pubsubTopic = some(response.topics[i])

if response.cursor.isSome():
res.paginationCursor = some(response.cursor.get().hash)
Expand Down
9 changes: 7 additions & 2 deletions waku/waku_archive/archive.nim
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ proc findMessages*(

var hashes = newSeq[WakuMessageHash]()
var messages = newSeq[WakuMessage]()
var topics = newSeq[PubsubTopic]()
var cursor = none(ArchiveCursor)

if rows.len == 0:
Expand All @@ -180,6 +181,7 @@ proc findMessages*(
#TODO once store v2 is removed, unzip instead of 2x map
#TODO once store v2 is removed, update driver to not return messages when not needed
if query.includeData:
topics = rows[0 ..< pageSize].mapIt(it[0])
messages = rows[0 ..< pageSize].mapIt(it[1])

hashes = rows[0 ..< pageSize].mapIt(it[4])
Expand All @@ -206,10 +208,13 @@ proc findMessages*(

# All messages MUST be returned in chronological order
if not isAscendingOrder:
reverse(messages)
reverse(hashes)
reverse(messages)
reverse(topics)

return ok(ArchiveResponse(hashes: hashes, messages: messages, cursor: cursor))
return ok(
ArchiveResponse(hashes: hashes, messages: messages, topics: topics, cursor: cursor)
)

proc findMessagesV2*(
self: WakuArchive, query: ArchiveQuery
Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type
ArchiveResponse* = object
hashes*: seq[WakuMessageHash]
messages*: seq[WakuMessage]
topics*: seq[PubsubTopic]
cursor*: Option[ArchiveCursor]

ArchiveErrorKind* {.pure.} = enum
Expand Down
1 change: 1 addition & 0 deletions waku/waku_store/common.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type
WakuMessageKeyValue* = object
messageHash*: WakuMessageHash
message*: Option[WakuMessage]
pubsubTopic*: Option[PubsubTopic]

StoreQueryResponse* = object
requestId*: string
Expand Down
12 changes: 8 additions & 4 deletions waku/waku_store/rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ proc encode*(keyValue: WakuMessageKeyValue): ProtoBuffer =

pb.write3(1, keyValue.messageHash)

if keyValue.message.isSome():
if keyValue.message.isSome() and keyValue.pubsubTopic.isSome():
pb.write3(2, keyValue.message.get().encode())
pb.write3(3, keyValue.pubsubTopic.get())

pb.finish3()

Expand Down Expand Up @@ -164,10 +165,13 @@ proc decode*(
keyValue.messagehash = hash

var proto: ProtoBuffer
if not ?pb.getField(2, proto):
keyValue.message = none(WakuMessage)
else:
var topic: string
if ?pb.getField(2, proto) and ?pb.getField(3, topic):
keyValue.message = some(?WakuMessage.decode(proto.buffer))
keyValue.pubsubTopic = some(topic)
else:
keyValue.message = none(WakuMessage)
keyValue.pubsubTopic = none(string)

return ok(keyValue)

Expand Down

0 comments on commit d700006

Please sign in to comment.