Skip to content

Commit

Permalink
tests and merge fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Apr 23, 2024
1 parent 21b9225 commit 4edee18
Show file tree
Hide file tree
Showing 17 changed files with 153 additions and 112 deletions.
2 changes: 1 addition & 1 deletion apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
echo "Connecting to storenode: " & $(storenode.get())

node.mountLegacyStoreClient()
node.peerManager.addServicePeer(storenode.get(), WakuLegacyStoreCodec)
node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec)

proc storeHandler(response: HistoryResponse) {.gcsafe.} =
for msg in response.messages:
Expand Down
9 changes: 4 additions & 5 deletions tests/waku_store/test_wakunode_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ procSuite "WakuNode - Store":
client.mountStoreClient()

## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()

let requestProc = proc() {.async.} =
Expand All @@ -348,7 +348,7 @@ procSuite "WakuNode - Store":

let response = queryRes.get()
check:
response.messages == msgListA
response.messages.mapIt(it.message) == msgListA

for count in 0 ..< 4:
waitFor requestProc()
Expand Down Expand Up @@ -381,17 +381,16 @@ procSuite "WakuNode - Store":
client.mountStoreClient()

## Given
let req = HistoryQuery(contentTopics: @[DefaultContentTopic])
let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic])
let serverPeer = server.peerInfo.toRemotePeerInfo()

let successProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)

check queryRes.isOk()

let response = queryRes.get()
check:
response.messages == msgListA
response.messages.mapIt(it.message) == msgListA

let failsProc = proc() {.async.} =
let queryRes = waitFor client.query(req, peer = serverPeer)
Expand Down
87 changes: 42 additions & 45 deletions tests/wakunode_rest/test_rest_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import
../../../waku/waku_archive,
../../../waku/waku_archive/driver/queue_driver,
../../../waku/waku_store as waku_store,
../../../waku/common/base64,
../testlib/wakucore,
../testlib/wakunode

Expand Down Expand Up @@ -627,24 +628,17 @@ procSuite "Waku Rest API - Store v3":
$response.contentType == $MIMETYPE_JSON
response.data.messages.len == 1

let storeMessage = response.data.messages[0]
let storeMessage = response.data.messages[0].message

check:
storeMessage.contentTopic.isSome()
storeMessage.version.isSome()
storeMessage.timestamp.isSome()
storeMessage.ephemeral.isSome()
storeMessage.meta.isSome()

check:
storeMessage.payload == base64.encode(msg.payload)
storeMessage.contentTopic.get() == msg.contentTopic
storeMessage.version.get() == msg.version
storeMessage.timestamp.get() == msg.timestamp
storeMessage.ephemeral.get() == msg.ephemeral
storeMessage.meta.get() == base64.encode(msg.meta)

asyncTest "Rate limit store node history query":
storeMessage.payload == msg.payload
storeMessage.contentTopic == msg.contentTopic
storeMessage.version == msg.version
storeMessage.timestamp == msg.timestamp
storeMessage.ephemeral == msg.ephemeral
storeMessage.meta == msg.meta

asyncTest "Rate limit store node store query":
# Test adapted from the analogous present at waku_store/test_wakunode_store.nim
let node = testWakuNode()
await node.start()
Expand Down Expand Up @@ -695,39 +689,36 @@ procSuite "Waku Rest API - Store v3":

var pages = newSeq[seq[WakuMessage]](2)

# Fields that compose a HistoryCursor object
var reqPubsubTopic = DefaultPubsubTopic
var reqSenderTime = Timestamp(0)
var reqStoreTime = Timestamp(0)
var reqDigest = waku_store.MessageDigest()
var reqHash = none(WakuMessageHash)

for i in 0 ..< 2:
let response = await client.getStoreMessagesV1(
let response = await client.getStoreMessagesV3(
encodeUrl(fullAddr),
"true", # include data
encodeUrl(reqPubsubTopic),
"", # content topics. Empty ignores the field.
"", # start time. Empty ignores the field.
"", # end time. Empty ignores the field.
encodeUrl($reqSenderTime), # sender time
encodeUrl($reqStoreTime), # store time
reqDigest.toRestStringMessageDigest(),
# base64-encoded digest. Empty ignores the field.
"3", # page size. Empty implies default page size.
"", # hashes
if reqHash.isSome():
reqHash.get().toRestStringWakuMessageHash()
else:
""
, # base64-encoded digest. Empty ignores the field.
"true", # ascending
"3", # page size. Empty implies default page size.
)

var wakuMessages = newSeq[WakuMessage](0)
for j in 0 ..< response.data.messages.len:
wakuMessages.add(response.data.messages[j].toWakuMessage())
wakuMessages.add(response.data.messages[j].message)

pages[i] = wakuMessages

# populate the cursor for next page
if response.data.cursor.isSome():
reqPubsubTopic = response.data.cursor.get().pubsubTopic
reqDigest = response.data.cursor.get().digest
reqSenderTime = response.data.cursor.get().senderTime
reqStoreTime = response.data.cursor.get().storeTime
if response.data.paginationCursor.isSome():
reqHash = response.data.paginationCursor

check:
response.status == 200
Expand All @@ -738,38 +729,44 @@ procSuite "Waku Rest API - Store v3":
pages[1] == msgList[3 .. 5]

# request last third will lead to rate limit rejection
var response = await client.getStoreMessagesV1(
var response = await client.getStoreMessagesV3(
encodeUrl(fullAddr),
"true", # include data
encodeUrl(reqPubsubTopic),
"", # content topics. Empty ignores the field.
"", # start time. Empty ignores the field.
"", # end time. Empty ignores the field.
encodeUrl($reqSenderTime), # sender time
encodeUrl($reqStoreTime), # store time
reqDigest.toRestStringMessageDigest(),
# base64-encoded digest. Empty ignores the field.
"", # hashes
if reqHash.isSome():
reqHash.get().toRestStringWakuMessageHash()
else:
""
, # base64-encoded digest. Empty ignores the field.
)

check:
response.status == 429
$response.contentType == $MIMETYPE_TEXT
response.data.error_message.get == "Request rate limmit reached"
response.data.statusDesc == "Request rate limit reached"

await sleepAsync(500.millis)

# retry after respective amount of time shall succeed
response = await client.getStoreMessagesV1(
response = await client.getStoreMessagesV3(
encodeUrl(fullAddr),
"true", # include data
encodeUrl(reqPubsubTopic),
"", # content topics. Empty ignores the field.
"", # start time. Empty ignores the field.
"", # end time. Empty ignores the field.
encodeUrl($reqSenderTime), # sender time
encodeUrl($reqStoreTime), # store time
reqDigest.toRestStringMessageDigest(),
# base64-encoded digest. Empty ignores the field.
"5", # page size. Empty implies default page size.
"", # hashes
if reqHash.isSome():
reqHash.get().toRestStringWakuMessageHash()
else:
""
, # base64-encoded digest. Empty ignores the field.
"true", # ascending
"5", # page size. Empty implies default page size.
)

check:
Expand All @@ -778,7 +775,7 @@ procSuite "Waku Rest API - Store v3":

var wakuMessages = newSeq[WakuMessage](0)
for j in 0 ..< response.data.messages.len:
wakuMessages.add(response.data.messages[j].toWakuMessage())
wakuMessages.add(response.data.messages[j].message)

check wakuMessages == msgList[6 .. 9]

Expand Down
6 changes: 4 additions & 2 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import
../discovery/waku_dnsdisc,
../waku_archive,
../waku_store,
../waku_store/common as store_common,
../waku_store_legacy,
../waku_store_legacy/common as legacy_common,
../waku_filter_v2,
../waku_peer_exchange,
../node/peer_manager,
Expand Down Expand Up @@ -259,15 +261,15 @@ proc setupProtocols(
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(storeNode.value, WakuStoreCodec)
node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec)
else:
return err("failed to set node waku store peer: " & storeNode.error)

mountLegacyStoreClient(node)
if conf.storenode != "":
let storeNode = parsePeerInfo(conf.storenode)
if storeNode.isOk():
node.peerManager.addServicePeer(storeNode.value, WakuLegacyStoreCodec)
node.peerManager.addServicePeer(storeNode.value, legacy_common.WakuStoreCodec)
else:
return err("failed to set node waku legacy store peer: " & storeNode.error)

Expand Down
19 changes: 14 additions & 5 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,9 @@ proc mountLegacyStore*(
# Node has started already. Let's start store too.
await node.wakuLegacyStore.start()

node.switch.mount(node.wakuLegacyStore, protocolMatcher(WakuLegacyStoreCodec))
node.switch.mount(
node.wakuLegacyStore, protocolMatcher(legacy_store_common.WakuStoreCodec)
)

proc mountLegacyStoreClient*(node: WakuNode) =
info "mounting legacy store client"
Expand Down Expand Up @@ -769,7 +771,7 @@ proc query*(
if node.wakuLegacyStoreClient.isNil():
return err("waku legacy store client is nil")

let peerOpt = node.peerManager.selectPeer(WakuLegacyStoreCodec)
let peerOpt = node.peerManager.selectPeer(legacy_store_common.WakuStoreCodec)
if peerOpt.isNone():
error "no suitable remote peers"
return err("peer_not_found_failure")
Expand Down Expand Up @@ -839,7 +841,9 @@ proc toStoreResult(res: ArchiveResult): StoreQueryResult =

return ok(res)

proc mountStore*(node: WakuNode) {.async.} =
proc mountStore*(
node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit
) {.async.} =
if node.wakuArchive.isNil():
error "failed to mount waku store protocol", error = "waku archive not set"
return
Expand All @@ -854,7 +858,8 @@ proc mountStore*(node: WakuNode) {.async.} =

return response.toStoreResult()

node.wakuStore = store.WakuStore.new(node.peerManager, node.rng, requestHandler)
node.wakuStore =
store.WakuStore.new(node.peerManager, node.rng, requestHandler, some(rateLimit))

if node.started:
await node.wakuStore.start()
Expand All @@ -876,7 +881,11 @@ proc query*(
return err("waku store v3 client is nil")

let response = (await node.wakuStoreClient.query(request, peer)).valueOr:
return err("store client query error: " & $error)
var res = StoreQueryResponse()
res.statusCode = uint32(error.kind)
res.statusDesc = $error

return ok(res)

return ok(response)

Expand Down
4 changes: 2 additions & 2 deletions waku/waku_api/rest/admin/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) =

if not node.wakuLegacyStore.isNil():
# Map WakuStore peers to WakuPeers and add to return list
let storePeers = node.peerManager.peerStore.peers(WakuLegacyStoreCodec).mapIt(
let storePeers = node.peerManager.peerStore.peers(WakuStoreCodec).mapIt(
(
multiaddr: constructMultiaddrStr(it),
protocol: WakuLegacyStoreCodec,
protocol: WakuStoreCodec,
connected: it.connectedness == Connectedness.Connected,
)
)
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_api/rest/legacy_store/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ proc installStoreApiHandlers*(
return RestApiResponse.badRequest(error)

let peerAddr = parsedPeerAddr.valueOr:
node.peerManager.selectPeer(WakuLegacyStoreCodec).valueOr:
node.peerManager.selectPeer(WakuStoreCodec).valueOr:
let handler = discHandler.valueOr:
return NoPeerNoDiscError

Expand Down
25 changes: 13 additions & 12 deletions waku/waku_api/rest/store/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,20 @@ proc performStoreQuery(
error msg
return RestApiResponse.internalServerError(msg)

let res = queryFut.read()
if res.isErr():
const TooManyRequestErrorStr =
$HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS)
if res.error == TooManyRequestErrorStr:
debug "Request rate limmit reached on peer ", storePeer
return RestApiResponse.tooManyRequests("Request rate limmit reached")
else:
const msg = "Error occurred in queryFut.read()"
error msg, error = res.error
return RestApiResponse.internalServerError(fmt("{msg} [{res.error}]"))
let futRes = queryFut.read()

let resp = RestApiResponse.jsonResponse(storeResp, status = Http200).valueOr:
if futRes.isErr():
const msg = "Error occurred in queryFut.read()"
error msg, error = futRes.error
return RestApiResponse.internalServerError(fmt("{msg} [{futRes.error}]"))

let res = futRes.get()

if res.statusCode == uint32(ErrorCode.TOO_MANY_REQUESTS):
debug "Request rate limit reached on peer ", storePeer
return RestApiResponse.tooManyRequests("Request rate limit reached")

let resp = RestApiResponse.jsonResponse(res, status = Http200).valueOr:
const msg = "Error building the json respose"
error msg, error = error
return RestApiResponse.internalServerError(fmt("{msg} [{error}]"))
Expand Down
31 changes: 18 additions & 13 deletions waku/waku_api/rest/store/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,20 @@ proc writeValue*(
writer: var JsonWriter, msg: WakuMessage
) {.gcsafe, raises: [IOError].} =
writer.beginRecord()
writer.writeField("payload", $value.payload)
if value.contentTopic.isSome():
writer.writeField("contentTopic", value.contentTopic.get())
if value.version.isSome():
writer.writeField("version", value.version.get())
if value.timestamp.isSome():
writer.writeField("timestamp", value.timestamp.get())
if value.ephemeral.isSome():
writer.writeField("ephemeral", value.ephemeral.get())
if value.meta.isSome():
writer.writeField("meta", value.meta.get())

writer.writeField("payload", base64.encode(msg.payload))
writer.writeField("contentTopic", msg.contentTopic)

if msg.meta.len > 0:
writer.writeField("meta", base64.encode(msg.meta))

writer.writeField("version", msg.version)
writer.writeField("timestamp", msg.timestamp)
writer.writeField("ephemeral", msg.ephemeral)

if msg.proof.len > 0:
writer.writeField("proof", base64.encode(msg.proof))

writer.endRecord()

proc readValue*(
Expand Down Expand Up @@ -108,9 +111,11 @@ proc readValue*(

case fieldName
of "payload":
payload = some(reader.readValue(Base64String))
let base64String = reader.readValue(Base64String)
payload = base64.decode(base64String).valueOr:
reader.raiseUnexpectedField("Failed decoding data", "payload")
of "contentTopic":
contentTopic = some(reader.readValue(ContentTopic))
contentTopic = reader.readValue(ContentTopic)
of "version":
version = reader.readValue(uint32)
of "timestamp":
Expand Down
Loading

0 comments on commit 4edee18

Please sign in to comment.