Skip to content

Commit

Permalink
libwaku store enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Dec 13, 2024
1 parent ab0c1d4 commit aad18f8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 27 deletions.
2 changes: 1 addition & 1 deletion library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ proc waku_store_query(
handleRequest(
ctx,
RequestType.STORE,
JsonStoreQueryRequest.createShared(jsonQuery, peerAddr, timeoutMs),
StoreRequest.createShared(StoreReqType.REMOTE_QUERY, jsonQuery, peerAddr, timeoutMs),
callback,
userData,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,27 @@ import
type StoreReqType* = enum
REMOTE_QUERY ## to perform a query to another Store node

type JsonStoreQueryRequest* = object
type StoreRequest* = object
operation: StoreReqType
jsonQuery: cstring
peerAddr: cstring
timeoutMs: cint

type StoreRequest* = object
operation: StoreReqType
storeReq: pointer

func fromJsonNode(
T: type JsonStoreQueryRequest, jsonContent: JsonNode
): Result[StoreQueryRequest, string] =
func fromJsonNode(T: type StoreRequest, jsonContent: JsonNode): StoreQueryRequest =
let contentTopics = collect(newSeq):
for cTopic in jsonContent["content_topics"].getElems():
cTopic.getStr()

let msgHashes = collect(newSeq):
for hashJsonObj in jsonContent["message_hashes"].getElems():
var hash: WakuMessageHash
var count: int = 0
for byteValue in hashJsonObj.getElems():
hash[count] = byteValue.getInt().byte
count.inc()
if jsonContent.contains("message_hashes"):
for hashJsonObj in jsonContent["message_hashes"].getElems():
var hash: WakuMessageHash
var count: int = 0
for byteValue in hashJsonObj.getElems():
hash[count] = byteValue.getInt().byte
count.inc()

hash
hash

let pubsubTopic =
if jsonContent.contains("pubsub_topic"):
Expand Down Expand Up @@ -84,24 +80,26 @@ func fromJsonNode(
)

proc createShared*(
T: type JsonStoreQueryRequest,
T: type StoreRequest,
op: StoreReqType,
jsonQuery: cstring,
peerAddr: cstring,
timeoutMs: cint,
): ptr type T =
var ret = createShared(T)
ret[].operation = op
ret[].timeoutMs = timeoutMs
ret[].jsonQuery = jsonQuery.alloc()
ret[].peerAddr = peerAddr.alloc()
return ret

proc destroyShared(self: ptr JsonStoreQueryRequest) =
proc destroyShared(self: ptr StoreRequest) =
deallocShared(self[].jsonQuery)
deallocShared(self[].peerAddr)
deallocShared(self)

proc process(
self: ptr JsonStoreQueryRequest, waku: ptr Waku
proc process_remote_query(
self: ptr StoreRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)
Expand All @@ -110,17 +108,15 @@ proc process(
parseJson($self[].jsonQuery)

if jsonContentRes.isErr():
return err(
"JsonStoreQueryRequest failed parsing store request: " & jsonContentRes.error.msg
)
return err("StoreRequest failed parsing store request: " & jsonContentRes.error.msg)

let storeQueryRequest = JsonStoreQueryRequest.fromJsonNode(jsonContentRes.get())
let storeQueryRequest = StoreRequest.fromJsonNode(jsonContentRes.get())

let peer = peers.parsePeerInfo(($self[].peerAddr).split(",")).valueOr:
return err("JsonStoreQueryRequest failed to parse peer addr: " & $error)
return err("StoreRequest failed to parse peer addr: " & $error)

let queryResponse = (await waku.node.wakuStoreClient.query(?storeQueryRequest, peer)).valueOr:
return err("JsonStoreQueryRequest failed store query: " & $error)
return err("StoreRequest failed store query: " & $error)

return ok($(%*queryResponse)) ## returning the response in json format

Expand All @@ -132,7 +128,7 @@ proc process*(

case self.operation
of REMOTE_QUERY:
return await cast[ptr JsonStoreQueryRequest](self[].storeReq).process(waku)
return await self.process_remote_query(waku)

error "store request not handled at all"
return err("store request not handled at all")

0 comments on commit aad18f8

Please sign in to comment.