diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 64ab61c392..7aee403ae7 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -470,7 +470,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = echo "Connecting to storenode: " & $(storenode.get()) node.mountLegacyStoreClient() - node.peerManager.addServicePeer(storenode.get(), WakuLegacyStoreCodec) + node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec) proc storeHandler(response: HistoryResponse) {.gcsafe.} = for msg in response.messages: diff --git a/tests/waku_store/test_wakunode_store.nim b/tests/waku_store/test_wakunode_store.nim index dd6ac60594..b2fae87c03 100644 --- a/tests/waku_store/test_wakunode_store.nim +++ b/tests/waku_store/test_wakunode_store.nim @@ -338,7 +338,7 @@ procSuite "WakuNode - Store": client.mountStoreClient() ## Given - let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) + let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic]) let serverPeer = server.peerInfo.toRemotePeerInfo() let requestProc = proc() {.async.} = @@ -348,7 +348,7 @@ procSuite "WakuNode - Store": let response = queryRes.get() check: - response.messages == msgListA + response.messages.mapIt(it.message) == msgListA for count in 0 ..< 4: waitFor requestProc() @@ -381,17 +381,16 @@ procSuite "WakuNode - Store": client.mountStoreClient() ## Given - let req = HistoryQuery(contentTopics: @[DefaultContentTopic]) + let req = StoreQueryRequest(contentTopics: @[DefaultContentTopic]) let serverPeer = server.peerInfo.toRemotePeerInfo() let successProc = proc() {.async.} = let queryRes = waitFor client.query(req, peer = serverPeer) check queryRes.isOk() - let response = queryRes.get() check: - response.messages == msgListA + response.messages.mapIt(it.message) == msgListA let failsProc = proc() {.async.} = let queryRes = waitFor client.query(req, peer = serverPeer) diff --git a/tests/wakunode_rest/test_rest_store.nim b/tests/wakunode_rest/test_rest_store.nim index 139fd0f33f..d5adabb7eb 100644 --- a/tests/wakunode_rest/test_rest_store.nim +++ b/tests/wakunode_rest/test_rest_store.nim @@ -25,6 +25,7 @@ import ../../../waku/waku_archive, ../../../waku/waku_archive/driver/queue_driver, ../../../waku/waku_store as waku_store, + ../../../waku/common/base64, ../testlib/wakucore, ../testlib/wakunode @@ -627,24 +628,17 @@ procSuite "Waku Rest API - Store v3": $response.contentType == $MIMETYPE_JSON response.data.messages.len == 1 - let storeMessage = response.data.messages[0] + let storeMessage = response.data.messages[0].message check: - storeMessage.contentTopic.isSome() - storeMessage.version.isSome() - storeMessage.timestamp.isSome() - storeMessage.ephemeral.isSome() - storeMessage.meta.isSome() - - check: - storeMessage.payload == base64.encode(msg.payload) - storeMessage.contentTopic.get() == msg.contentTopic - storeMessage.version.get() == msg.version - storeMessage.timestamp.get() == msg.timestamp - storeMessage.ephemeral.get() == msg.ephemeral - storeMessage.meta.get() == base64.encode(msg.meta) - - asyncTest "Rate limit store node history query": + storeMessage.payload == msg.payload + storeMessage.contentTopic == msg.contentTopic + storeMessage.version == msg.version + storeMessage.timestamp == msg.timestamp + storeMessage.ephemeral == msg.ephemeral + storeMessage.meta == msg.meta + + asyncTest "Rate limit store node store query": # Test adapted from the analogous present at waku_store/test_wakunode_store.nim let node = testWakuNode() await node.start() @@ -695,39 +689,36 @@ procSuite "Waku Rest API - Store v3": var pages = newSeq[seq[WakuMessage]](2) - # Fields that compose a HistoryCursor object var reqPubsubTopic = DefaultPubsubTopic - var reqSenderTime = Timestamp(0) - var reqStoreTime = Timestamp(0) - var reqDigest = waku_store.MessageDigest() + var reqHash = none(WakuMessageHash) for i in 0 ..< 2: - let response = await client.getStoreMessagesV1( + let response = await client.getStoreMessagesV3( encodeUrl(fullAddr), + "true", # include data encodeUrl(reqPubsubTopic), "", # content topics. Empty ignores the field. "", # start time. Empty ignores the field. "", # end time. Empty ignores the field. - encodeUrl($reqSenderTime), # sender time - encodeUrl($reqStoreTime), # store time - reqDigest.toRestStringMessageDigest(), - # base64-encoded digest. Empty ignores the field. - "3", # page size. Empty implies default page size. + "", # hashes + if reqHash.isSome(): + reqHash.get().toRestStringWakuMessageHash() + else: + "" + , # base64-encoded digest. Empty ignores the field. "true", # ascending + "3", # page size. Empty implies default page size. ) var wakuMessages = newSeq[WakuMessage](0) for j in 0 ..< response.data.messages.len: - wakuMessages.add(response.data.messages[j].toWakuMessage()) + wakuMessages.add(response.data.messages[j].message) pages[i] = wakuMessages # populate the cursor for next page - if response.data.cursor.isSome(): - reqPubsubTopic = response.data.cursor.get().pubsubTopic - reqDigest = response.data.cursor.get().digest - reqSenderTime = response.data.cursor.get().senderTime - reqStoreTime = response.data.cursor.get().storeTime + if response.data.paginationCursor.isSome(): + reqHash = response.data.paginationCursor check: response.status == 200 @@ -738,38 +729,44 @@ procSuite "Waku Rest API - Store v3": pages[1] == msgList[3 .. 5] # request last third will lead to rate limit rejection - var response = await client.getStoreMessagesV1( + var response = await client.getStoreMessagesV3( encodeUrl(fullAddr), + "true", # include data encodeUrl(reqPubsubTopic), "", # content topics. Empty ignores the field. "", # start time. Empty ignores the field. "", # end time. Empty ignores the field. - encodeUrl($reqSenderTime), # sender time - encodeUrl($reqStoreTime), # store time - reqDigest.toRestStringMessageDigest(), - # base64-encoded digest. Empty ignores the field. + "", # hashes + if reqHash.isSome(): + reqHash.get().toRestStringWakuMessageHash() + else: + "" + , # base64-encoded digest. Empty ignores the field. ) check: response.status == 429 $response.contentType == $MIMETYPE_TEXT - response.data.error_message.get == "Request rate limmit reached" + response.data.statusDesc == "Request rate limit reached" await sleepAsync(500.millis) # retry after respective amount of time shall succeed - response = await client.getStoreMessagesV1( + response = await client.getStoreMessagesV3( encodeUrl(fullAddr), + "true", # include data encodeUrl(reqPubsubTopic), "", # content topics. Empty ignores the field. "", # start time. Empty ignores the field. "", # end time. Empty ignores the field. - encodeUrl($reqSenderTime), # sender time - encodeUrl($reqStoreTime), # store time - reqDigest.toRestStringMessageDigest(), - # base64-encoded digest. Empty ignores the field. - "5", # page size. Empty implies default page size. + "", # hashes + if reqHash.isSome(): + reqHash.get().toRestStringWakuMessageHash() + else: + "" + , # base64-encoded digest. Empty ignores the field. "true", # ascending + "5", # page size. Empty implies default page size. ) check: @@ -778,7 +775,7 @@ procSuite "Waku Rest API - Store v3": var wakuMessages = newSeq[WakuMessage](0) for j in 0 ..< response.data.messages.len: - wakuMessages.add(response.data.messages[j].toWakuMessage()) + wakuMessages.add(response.data.messages[j].message) check wakuMessages == msgList[6 .. 9] diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index c01b048378..638e3df291 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -19,7 +19,9 @@ import ../discovery/waku_dnsdisc, ../waku_archive, ../waku_store, + ../waku_store/common as store_common, ../waku_store_legacy, + ../waku_store_legacy/common as legacy_common, ../waku_filter_v2, ../waku_peer_exchange, ../node/peer_manager, @@ -259,7 +261,7 @@ proc setupProtocols( if conf.storenode != "": let storeNode = parsePeerInfo(conf.storenode) if storeNode.isOk(): - node.peerManager.addServicePeer(storeNode.value, WakuStoreCodec) + node.peerManager.addServicePeer(storeNode.value, store_common.WakuStoreCodec) else: return err("failed to set node waku store peer: " & storeNode.error) @@ -267,7 +269,7 @@ proc setupProtocols( if conf.storenode != "": let storeNode = parsePeerInfo(conf.storenode) if storeNode.isOk(): - node.peerManager.addServicePeer(storeNode.value, WakuLegacyStoreCodec) + node.peerManager.addServicePeer(storeNode.value, legacy_common.WakuStoreCodec) else: return err("failed to set node waku legacy store peer: " & storeNode.error) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index fbf0af86e4..a2be401d76 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -734,7 +734,9 @@ proc mountLegacyStore*( # Node has started already. Let's start store too. await node.wakuLegacyStore.start() - node.switch.mount(node.wakuLegacyStore, protocolMatcher(WakuLegacyStoreCodec)) + node.switch.mount( + node.wakuLegacyStore, protocolMatcher(legacy_store_common.WakuStoreCodec) + ) proc mountLegacyStoreClient*(node: WakuNode) = info "mounting legacy store client" @@ -769,7 +771,7 @@ proc query*( if node.wakuLegacyStoreClient.isNil(): return err("waku legacy store client is nil") - let peerOpt = node.peerManager.selectPeer(WakuLegacyStoreCodec) + let peerOpt = node.peerManager.selectPeer(legacy_store_common.WakuStoreCodec) if peerOpt.isNone(): error "no suitable remote peers" return err("peer_not_found_failure") @@ -839,7 +841,9 @@ proc toStoreResult(res: ArchiveResult): StoreQueryResult = return ok(res) -proc mountStore*(node: WakuNode) {.async.} = +proc mountStore*( + node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit +) {.async.} = if node.wakuArchive.isNil(): error "failed to mount waku store protocol", error = "waku archive not set" return @@ -854,7 +858,8 @@ proc mountStore*(node: WakuNode) {.async.} = return response.toStoreResult() - node.wakuStore = store.WakuStore.new(node.peerManager, node.rng, requestHandler) + node.wakuStore = + store.WakuStore.new(node.peerManager, node.rng, requestHandler, some(rateLimit)) if node.started: await node.wakuStore.start() @@ -876,7 +881,11 @@ proc query*( return err("waku store v3 client is nil") let response = (await node.wakuStoreClient.query(request, peer)).valueOr: - return err("store client query error: " & $error) + var res = StoreQueryResponse() + res.statusCode = uint32(error.kind) + res.statusDesc = $error + + return ok(res) return ok(response) diff --git a/waku/waku_api/rest/admin/handlers.nim b/waku/waku_api/rest/admin/handlers.nim index a8fbbb6a90..7574683ab6 100644 --- a/waku/waku_api/rest/admin/handlers.nim +++ b/waku/waku_api/rest/admin/handlers.nim @@ -68,10 +68,10 @@ proc installAdminV1GetPeersHandler(router: var RestRouter, node: WakuNode) = if not node.wakuLegacyStore.isNil(): # Map WakuStore peers to WakuPeers and add to return list - let storePeers = node.peerManager.peerStore.peers(WakuLegacyStoreCodec).mapIt( + let storePeers = node.peerManager.peerStore.peers(WakuStoreCodec).mapIt( ( multiaddr: constructMultiaddrStr(it), - protocol: WakuLegacyStoreCodec, + protocol: WakuStoreCodec, connected: it.connectedness == Connectedness.Connected, ) ) diff --git a/waku/waku_api/rest/legacy_store/handlers.nim b/waku/waku_api/rest/legacy_store/handlers.nim index b05ece356e..fdf23958e1 100644 --- a/waku/waku_api/rest/legacy_store/handlers.nim +++ b/waku/waku_api/rest/legacy_store/handlers.nim @@ -243,7 +243,7 @@ proc installStoreApiHandlers*( return RestApiResponse.badRequest(error) let peerAddr = parsedPeerAddr.valueOr: - node.peerManager.selectPeer(WakuLegacyStoreCodec).valueOr: + node.peerManager.selectPeer(WakuStoreCodec).valueOr: let handler = discHandler.valueOr: return NoPeerNoDiscError diff --git a/waku/waku_api/rest/store/handlers.nim b/waku/waku_api/rest/store/handlers.nim index c6bf0b45cd..23817118b9 100644 --- a/waku/waku_api/rest/store/handlers.nim +++ b/waku/waku_api/rest/store/handlers.nim @@ -38,19 +38,20 @@ proc performStoreQuery( error msg return RestApiResponse.internalServerError(msg) - let res = queryFut.read() - if res.isErr(): - const TooManyRequestErrorStr = - $HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS) - if res.error == TooManyRequestErrorStr: - debug "Request rate limmit reached on peer ", storePeer - return RestApiResponse.tooManyRequests("Request rate limmit reached") - else: - const msg = "Error occurred in queryFut.read()" - error msg, error = res.error - return RestApiResponse.internalServerError(fmt("{msg} [{res.error}]")) + let futRes = queryFut.read() - let resp = RestApiResponse.jsonResponse(storeResp, status = Http200).valueOr: + if futRes.isErr(): + const msg = "Error occurred in queryFut.read()" + error msg, error = futRes.error + return RestApiResponse.internalServerError(fmt("{msg} [{futRes.error}]")) + + let res = futRes.get() + + if res.statusCode == uint32(ErrorCode.TOO_MANY_REQUESTS): + debug "Request rate limit reached on peer ", storePeer + return RestApiResponse.tooManyRequests("Request rate limit reached") + + let resp = RestApiResponse.jsonResponse(res, status = Http200).valueOr: const msg = "Error building the json respose" error msg, error = error return RestApiResponse.internalServerError(fmt("{msg} [{error}]")) diff --git a/waku/waku_api/rest/store/types.nim b/waku/waku_api/rest/store/types.nim index cb2aa05b96..55367b754f 100644 --- a/waku/waku_api/rest/store/types.nim +++ b/waku/waku_api/rest/store/types.nim @@ -70,17 +70,20 @@ proc writeValue*( writer: var JsonWriter, msg: WakuMessage ) {.gcsafe, raises: [IOError].} = writer.beginRecord() - writer.writeField("payload", $value.payload) - if value.contentTopic.isSome(): - writer.writeField("contentTopic", value.contentTopic.get()) - if value.version.isSome(): - writer.writeField("version", value.version.get()) - if value.timestamp.isSome(): - writer.writeField("timestamp", value.timestamp.get()) - if value.ephemeral.isSome(): - writer.writeField("ephemeral", value.ephemeral.get()) - if value.meta.isSome(): - writer.writeField("meta", value.meta.get()) + + writer.writeField("payload", base64.encode(msg.payload)) + writer.writeField("contentTopic", msg.contentTopic) + + if msg.meta.len > 0: + writer.writeField("meta", base64.encode(msg.meta)) + + writer.writeField("version", msg.version) + writer.writeField("timestamp", msg.timestamp) + writer.writeField("ephemeral", msg.ephemeral) + + if msg.proof.len > 0: + writer.writeField("proof", base64.encode(msg.proof)) + writer.endRecord() proc readValue*( @@ -108,9 +111,11 @@ proc readValue*( case fieldName of "payload": - payload = some(reader.readValue(Base64String)) + let base64String = reader.readValue(Base64String) + payload = base64.decode(base64String).valueOr: + reader.raiseUnexpectedField("Failed decoding data", "payload") of "contentTopic": - contentTopic = some(reader.readValue(ContentTopic)) + contentTopic = reader.readValue(ContentTopic) of "version": version = reader.readValue(uint32) of "timestamp": diff --git a/waku/waku_store/client.nim b/waku/waku_store/client.nim index c710c83994..5eefd8158a 100644 --- a/waku/waku_store/client.nim +++ b/waku/waku_store/client.nim @@ -43,7 +43,7 @@ proc sendStoreRequest( return err(StoreError(kind: ErrorCode.BAD_REQUEST, cause: writeRes.error.msg)) let readRes = catch: - await connection.readLp(MaxRpcSize.int) + await connection.readLp(DefaultMaxRpcSize.int) let buf = readRes.valueOr: return err(StoreError(kind: ErrorCode.BAD_RESPONSE, cause: error.msg)) @@ -54,7 +54,7 @@ proc sendStoreRequest( if res.statusCode != uint32(StatusCode.SUCCESS): waku_store_errors.inc(labelValues = [res.statusDesc]) - return err(common.StoreError.new(res.statusCode, res.statusDesc)) + return err(StoreError.new(res.statusCode, res.statusDesc)) return ok(res) diff --git a/waku/waku_store/common.nim b/waku/waku_store/common.nim index 595b86e838..b078e5574e 100644 --- a/waku/waku_store/common.nim +++ b/waku/waku_store/common.nim @@ -54,6 +54,7 @@ type SUCCESS = uint32(200) BAD_RESPONSE = uint32(300) BAD_REQUEST = uint32(400) + TOO_MANY_REQUESTS = uint32(429) SERVICE_UNAVAILABLE = uint32(503) PEER_DIAL_FAILURE = uint32(504) @@ -83,20 +84,22 @@ proc new*(T: type StoreError, code: uint32, desc: string): T = let kind = ErrorCode.parse(code) case kind - of ErrorCode.UNKNOWN: - return StoreError(kind: kind) of ErrorCode.BAD_RESPONSE: return StoreError(kind: kind, cause: desc) of ErrorCode.BAD_REQUEST: return StoreError(kind: kind, cause: desc) + of ErrorCode.TOO_MANY_REQUESTS: + return StoreError(kind: kind) of ErrorCode.SERVICE_UNAVAILABLE: return StoreError(kind: kind) of ErrorCode.PEER_DIAL_FAILURE: return StoreError(kind: kind, address: desc) + of ErrorCode.UNKNOWN: + return StoreError(kind: kind) proc parse*(T: type ErrorCode, kind: uint32): T = case kind - of 000, 300, 400, 503, 504: + of 000, 300, 400, 429, 503, 504: ErrorCode(kind) else: ErrorCode.UNKNOWN diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 5a9d0fc761..22210a64a3 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -62,20 +62,15 @@ proc handleQueryRequest*( if self.requestRateLimiter.isSome() and not self.requestRateLimiter.get().tryConsume( 1 ): - trace "store query request rejected due rate limit exceeded", - peerId = $conn.peerId, requestId = reqRpc.requestId + debug "store query request rejected due rate limit exceeded", + peerId = $requestor, requestId = requestId - let error = StoreError(kind: ErrorCode.TOO_MANY_REQUESTS) - - let response = StoreQueryResponse( - requestId: requestId, - statusCode: uint32(ErrorCode.TOO_MANY_REQUESTS), - statusDesc: $error, - ) + res.statusCode = uint32(ErrorCode.TOO_MANY_REQUESTS) + res.statusDesc = $ErrorCode.TOO_MANY_REQUESTS waku_service_requests_rejected.inc(labelValues = ["Store"]) - return response.encode().buffer + return res.encode().buffer waku_service_requests.inc(labelValues = ["Store"]) diff --git a/waku/waku_store_legacy/client.nim b/waku/waku_store_legacy/client.nim index d7481c4044..1ad0069e9b 100644 --- a/waku/waku_store_legacy/client.nim +++ b/waku/waku_store_legacy/client.nim @@ -39,7 +39,7 @@ proc new*( proc sendHistoryQueryRPC( w: WakuStoreClient, req: HistoryQuery, peer: RemotePeerInfo ): Future[HistoryResult] {.async, gcsafe.} = - let connOpt = await w.peerManager.dialPeer(peer, WakuLegacyStoreCodec) + let connOpt = await w.peerManager.dialPeer(peer, WakuStoreCodec) if connOpt.isNone(): waku_legacy_store_errors.inc(labelValues = [dialFailure]) return err(HistoryError(kind: HistoryErrorKind.PEER_DIAL_FAILURE, address: $peer)) @@ -49,7 +49,9 @@ proc sendHistoryQueryRPC( let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: some(req.toRPC())) await connection.writeLP(reqRpc.encode().buffer) - let buf = await connection.readLp(MaxRpcSize.int) + #TODO: I see a challenge here, if storeNode uses a different MaxRPCSize this read will fail. + # Need to find a workaround for this. + let buf = await connection.readLp(DefaultMaxRpcSize.int) let respDecodeRes = HistoryRPC.decode(buf) if respDecodeRes.isErr(): waku_legacy_store_errors.inc(labelValues = [decodeRpcFailure]) diff --git a/waku/waku_store_legacy/common.nim b/waku/waku_store_legacy/common.nim index 1770febcd1..67af41a68a 100644 --- a/waku/waku_store_legacy/common.nim +++ b/waku/waku_store_legacy/common.nim @@ -7,7 +7,7 @@ import std/[options, sequtils], stew/results, stew/byteutils, nimcrypto/sha2 import ../waku_core, ../common/paging const - WakuLegacyStoreCodec* = "/vac/waku/store/2.0.0-beta4" + WakuStoreCodec* = "/vac/waku/store/2.0.0-beta4" DefaultPageSize*: uint64 = 20 @@ -57,6 +57,7 @@ type UNKNOWN = uint32(000) BAD_RESPONSE = uint32(300) BAD_REQUEST = uint32(400) + TOO_MANY_REQUESTS = uint32(429) SERVICE_UNAVAILABLE = uint32(503) PEER_DIAL_FAILURE = uint32(504) @@ -73,7 +74,7 @@ type proc parse*(T: type HistoryErrorKind, kind: uint32): T = case kind - of 000, 200, 300, 400, 503: + of 000, 200, 300, 400, 429, 503: HistoryErrorKind(kind) else: HistoryErrorKind.UNKNOWN @@ -86,6 +87,8 @@ proc `$`*(err: HistoryError): string = "BAD_RESPONSE: " & err.cause of HistoryErrorKind.BAD_REQUEST: "BAD_REQUEST: " & err.cause + of HistoryErrorKind.TOO_MANY_REQUESTS: + "TOO_MANY_REQUESTS" of HistoryErrorKind.SERVICE_UNAVAILABLE: "SERVICE_UNAVAILABLE" of HistoryErrorKind.UNKNOWN: diff --git a/waku/waku_store_legacy/protocol.nim b/waku/waku_store_legacy/protocol.nim index 65d35cd838..c50d8f9388 100644 --- a/waku/waku_store_legacy/protocol.nim +++ b/waku/waku_store_legacy/protocol.nim @@ -18,7 +18,14 @@ import libp2p/stream/connection, metrics import - ../waku_core, ../node/peer_manager, ./common, ./rpc, ./rpc_codec, ./protocol_metrics + ../waku_core, + ../node/peer_manager, + ./common, + ./rpc, + ./rpc_codec, + ./protocol_metrics, + ../common/ratelimit, + ../common/waku_service_metrics logScope: topics = "waku legacy store" @@ -33,12 +40,13 @@ type WakuStore* = ref object of LPProtocol peerManager: PeerManager rng: ref rand.HmacDrbgContext queryHandler*: HistoryQueryHandler + requestRateLimiter*: Option[TokenBucket] ## Protocol proc initProtocolHandler(ws: WakuStore) = proc handler(conn: Connection, proto: string) {.async.} = - let buf = await conn.readLp(MaxRpcSize.int) + let buf = await conn.readLp(DefaultMaxRpcSize.int) let decodeRes = HistoryRPC.decode(buf) if decodeRes.isErr(): @@ -55,6 +63,18 @@ proc initProtocolHandler(ws: WakuStore) = # TODO: Return (BAD_REQUEST, cause: "empty query") return + if ws.requestRateLimiter.isSome() and not ws.requestRateLimiter.get().tryConsume(1): + trace "store query request rejected due rate limit exceeded", + peerId = $conn.peerId, requestId = reqRpc.requestId + let error = HistoryError(kind: HistoryErrorKind.TOO_MANY_REQUESTS).toRPC() + let response = HistoryResponseRPC(error: error) + let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(response)) + await conn.writeLp(rpc.encode().buffer) + waku_service_requests_rejected.inc(labelValues = ["Store"]) + return + + waku_service_requests.inc(labelValues = ["Store"]) + let requestId = reqRpc.requestId request = reqRpc.query.get().toAPI() @@ -94,18 +114,24 @@ proc initProtocolHandler(ws: WakuStore) = await conn.writeLp(rpc.encode().buffer) ws.handler = handler - ws.codec = WakuLegacyStoreCodec + ws.codec = WakuStoreCodec proc new*( T: type WakuStore, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, queryHandler: HistoryQueryHandler, + rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), ): T = # Raise a defect if history query handler is nil if queryHandler.isNil(): raise newException(NilAccessDefect, "history query handler is nil") - let ws = WakuStore(rng: rng, peerManager: peerManager, queryHandler: queryHandler) + let ws = WakuStore( + rng: rng, + peerManager: peerManager, + queryHandler: queryHandler, + requestRateLimiter: newTokenBucket(rateLimitSetting), + ) ws.initProtocolHandler() ws diff --git a/waku/waku_store_legacy/rpc.nim b/waku/waku_store_legacy/rpc.nim index fac968f0cc..9a8887aeef 100644 --- a/waku/waku_store_legacy/rpc.nim +++ b/waku/waku_store_legacy/rpc.nim @@ -15,7 +15,7 @@ type PagingIndexRPC* = object pubsubTopic*: PubsubTopic senderTime*: Timestamp # the time at which the message is generated receiverTime*: Timestamp - digest*: common.MessageDigest # calculated over payload and content topic + digest*: MessageDigest # calculated over payload and content topic proc `==`*(x, y: PagingIndexRPC): bool = ## receiverTime plays no role in index equality diff --git a/waku/waku_store_legacy/rpc_codec.nim b/waku/waku_store_legacy/rpc_codec.nim index 9d0e281a40..2d5867e00b 100644 --- a/waku/waku_store_legacy/rpc_codec.nim +++ b/waku/waku_store_legacy/rpc_codec.nim @@ -6,8 +6,7 @@ else: import std/options, nimcrypto/hash import ../common/[protobuf, paging], ../waku_core, ./common, ./rpc -const MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64 * 1024 - # We add a 64kB safety buffer for protocol overhead +const DefaultMaxRpcSize* = -1 ## Pagination