From 43ff606fa69dabb5de64d27419f7d1d0525ab44b Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Thu, 12 Sep 2024 12:35:16 +0200 Subject: [PATCH 1/9] Enhanced peer-ex protocol - added rate limiting, added response status and desc to the rpc --- tests/waku_peer_exchange/test_protocol.nim | 10 ++- tests/waku_peer_exchange/test_rpc_codec.nim | 38 +++++++--- waku/factory/node_factory.nim | 5 +- waku/node/waku_node.nim | 7 +- waku/waku_peer_exchange/protocol.nim | 84 ++++++++++++++------- waku/waku_peer_exchange/rpc.nim | 35 ++++++++- waku/waku_peer_exchange/rpc_codec.nim | 72 ++++++++++++++++-- 7 files changed, 198 insertions(+), 53 deletions(-) diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index e80386d5e5..9bc57083f8 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -385,7 +385,7 @@ suite "Waku Peer Exchange": let conn = connOpt.get() # Send bytes so that they directly hit the handler - let rpc = PeerExchangeRpc(request: PeerExchangeRequest(numPeers: 1)) + let rpc = PeerExchangeRpc.makeRequest(1) var buffer: seq[byte] await conn.writeLP(rpc.encode().buffer) @@ -397,5 +397,9 @@ suite "Waku Peer Exchange": # Check we got back the enr we mocked check: - decodedBuff.get().response.peerInfos.len == 1 - decodedBuff.get().response.peerInfos[0].enr == enr1.raw + decodedBuff.get().responseStatus.isSome() + decodedBuff.get().responseStatus.get().status == + PeerExchangeResponseStatusCode.SUCCESS + decodedBuff.get().response.isSome() + decodedBuff.get().response.get().peerInfos.len == 1 + decodedBuff.get().response.get().peerInfos[0].enr == enr1.raw diff --git a/tests/waku_peer_exchange/test_rpc_codec.nim b/tests/waku_peer_exchange/test_rpc_codec.nim index 0393fb4894..c5fa79b11b 100644 --- a/tests/waku_peer_exchange/test_rpc_codec.nim +++ b/tests/waku_peer_exchange/test_rpc_codec.nim @@ -23,6 +23,17 @@ import suite "Peer Exchange RPC": asyncTest "Encode - Decode": # Setup + let rpcReq = PeerExchangeRpc.makeRequest(2) + let rpcReqBuffer: seq[byte] = rpcReq.encode().buffer + let resReq = PeerExchangeRpc.decode(rpcReqBuffer) + + check: + resReq.isOk + resReq.get().response.isNone() + resReq.get().responseStatus.isNone() + resReq.get().request.isSome() + resReq.get().request.get().numPeers == 2 + var enr1 = enr.Record(seqNum: 0, raw: @[]) enr2 = enr.Record(seqNum: 0, raw: @[]) @@ -35,28 +46,35 @@ suite "Peer Exchange RPC": "enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq" ) - let - peerInfos = - @[PeerExchangePeerInfo(enr: enr1.raw), PeerExchangePeerInfo(enr: enr2.raw)] - rpc = PeerExchangeRpc(response: PeerExchangeResponse(peerInfos: peerInfos)) + let peerInfos = + @[PeerExchangePeerInfo(enr: enr1.raw), PeerExchangePeerInfo(enr: enr2.raw)] + let rpc = PeerExchangeRpc.makeResponse(peerInfos) # When encoding and decoding - let - rpcBuffer: seq[byte] = rpc.encode().buffer - res = PeerExchangeRpc.decode(rpcBuffer) + let rpcBuffer: seq[byte] = rpc.encode().buffer + let res = PeerExchangeRpc.decode(rpcBuffer) # Then the peerInfos match the originals check: res.isOk - res.get().response.peerInfos == peerInfos + res.get().request.isNone() + res.get().response.isSome() + res.get().responseStatus.isSome() + res.get().responseStatus.get().status == PeerExchangeResponseStatusCode.SUCCESS + res.get().response.get().peerInfos == peerInfos # When using the decoded responses to create new enrs var resEnr1 = enr.Record(seqNum: 0, raw: @[]) resEnr2 = enr.Record(seqNum: 0, raw: @[]) - discard resEnr1.fromBytes(res.get().response.peerInfos[0].enr) - discard resEnr2.fromBytes(res.get().response.peerInfos[1].enr) + check: + res.get().response.isSome() + res.get().responseStatus.isSome() + res.get().responseStatus.get().status == PeerExchangeResponseStatusCode.SUCCESS + + discard resEnr1.fromBytes(res.get().response.get().peerInfos[0].enr) + discard resEnr2.fromBytes(res.get().response.get().peerInfos[1].enr) # Then they match the original enrs check: diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index d6672f8a8a..042c11cee1 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -368,7 +368,10 @@ proc setupProtocols( # waku peer exchange setup if conf.peerExchange: try: - await mountPeerExchange(node, some(conf.clusterId)) + let rateLimitSetting: RateLimitSetting = + (conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod)) + + await mountPeerExchange(node, some(conf.clusterId), rateLimitSetting) except CatchableError: return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 2bcb065940..9d41260127 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1144,11 +1144,14 @@ proc mountRlnRelay*( ## Waku peer-exchange proc mountPeerExchange*( - node: WakuNode, cluster: Option[uint16] = none(uint16) + node: WakuNode, + cluster: Option[uint16] = none(uint16), + rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit, ) {.async: (raises: []).} = info "mounting waku peer exchange" - node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, cluster) + node.wakuPeerExchange = + WakuPeerExchange.new(node.peerManager, cluster, some(rateLimit)) if node.started: try: diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index e64395dd41..a75b4b2007 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -13,7 +13,8 @@ import ../waku_core, ../discovery/waku_discv5, ./rpc, - ./rpc_codec + ./rpc_codec, + ../common/rate_limit/request_limiter declarePublicGauge waku_px_peers_received_total, "number of ENRs received via peer exchange" @@ -52,11 +53,12 @@ type enrCache*: seq[enr.Record] cluster*: Option[uint16] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ + requestRateLimiter*: RequestRateLimiter proc request*( wpx: WakuPeerExchange, numPeers: uint64, conn: Connection ): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} = - let rpc = PeerExchangeRpc(request: PeerExchangeRequest(numPeers: numPeers)) + let rpc = PeerExchangeRpc.makeRequest(numPeers) var buffer: seq[byte] var error: string @@ -76,7 +78,9 @@ proc request*( let decodedBuff = PeerExchangeRpc.decode(buffer) if decodedBuff.isErr(): return err("decode failed: " & $decodedBuff.error) - return ok(decodedBuff.get().response) + if decodedBuff.get().response.isNone(): + return err("bad response - missing value") + return ok(decodedBuff.get().response.get()) proc request*( wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo @@ -101,10 +105,23 @@ proc request*( proc respond( wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection ): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let rpc = PeerExchangeRpc( - response: - PeerExchangeResponse(peerInfos: enrs.mapIt(PeerExchangePeerInfo(enr: it.raw))) - ) + let rpc = PeerExchangeRpc.makeResponse(enrs.mapIt(PeerExchangePeerInfo(enr: it.raw))) + + try: + await conn.writeLP(rpc.encode().buffer) + except CatchableError as exc: + waku_px_errors.inc(labelValues = [exc.msg]) + return err(exc.msg) + + return ok() + +proc respondError( + wpx: WakuPeerExchange, + status: PeerExchangeResponseStatusCode, + desc: Option[string], + conn: Connection, +): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = + let rpc = PeerExchangeRpc.makeErrorResponse(status, desc) try: await conn.writeLP(rpc.encode().buffer) @@ -169,25 +186,33 @@ proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} = proc initProtocolHandler(wpx: WakuPeerExchange) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = var buffer: seq[byte] - try: - buffer = await conn.readLp(DefaultMaxRpcSize.int) - except CatchableError as exc: - waku_px_errors.inc(labelValues = [exc.msg]) - return - - let decBuf = PeerExchangeRpc.decode(buffer) - if decBuf.isErr(): - waku_px_errors.inc(labelValues = [decodeRpcFailure]) - return - - let rpc = decBuf.get() - trace "peer exchange request received" - let enrs = wpx.getEnrsFromCache(rpc.request.numPeers) - let res = await wpx.respond(enrs, conn) - if res.isErr: - waku_px_errors.inc(labelValues = [res.error]) - else: - waku_px_peers_sent.inc(enrs.len().int64()) + wpx.requestRateLimiter.checkUsageLimit(WakuPeerExchangeCodec, conn): + try: + buffer = await conn.readLp(DefaultMaxRpcSize.int) + except CatchableError as exc: + waku_px_errors.inc(labelValues = [exc.msg]) + discard await wpx.respondError( + PeerExchangeResponseStatusCode.BAD_REQUEST, some(exc.msg), conn + ) + return + + let decBuf = PeerExchangeRpc.decode(buffer) + if decBuf.isErr() or decBuf.get().request.isNone(): + waku_px_errors.inc(labelValues = [decodeRpcFailure]) + discard await wpx.respondError( + PeerExchangeResponseStatusCode.BAD_REQUEST, some($decBuf.error), conn + ) + return + + let request = decBuf.get().request.get() + trace "peer exchange request received" + let enrs = wpx.getEnrsFromCache(request.numPeers) + (await wpx.respond(enrs, conn)).isErrOr: + waku_px_peers_sent.inc(enrs.len().int64()) + do: + discard await wpx.respondError( + PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS, none(string), conn + ) # close, no data is expected await conn.closeWithEof() @@ -199,8 +224,13 @@ proc new*( T: type WakuPeerExchange, peerManager: PeerManager, cluster: Option[uint16] = none(uint16), + rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), ): T = - let wpx = WakuPeerExchange(peerManager: peerManager, cluster: cluster) + let wpx = WakuPeerExchange( + peerManager: peerManager, + cluster: cluster, + requestRateLimiter: newRequestRateLimiter(rateLimitSetting), + ) wpx.initProtocolHandler() asyncSpawn wpx.updatePxEnrCache() return wpx diff --git a/waku/waku_peer_exchange/rpc.nim b/waku/waku_peer_exchange/rpc.nim index 0b248d9356..f40ce7db08 100644 --- a/waku/waku_peer_exchange/rpc.nim +++ b/waku/waku_peer_exchange/rpc.nim @@ -1,3 +1,5 @@ +import std/options + type PeerExchangePeerInfo* = object enr*: seq[byte] # RLP encoded ENR: https://eips.ethereum.org/EIPS/eip-778 @@ -8,6 +10,35 @@ type PeerExchangeResponse* = object peerInfos*: seq[PeerExchangePeerInfo] + PeerExchangeResponseStatusCode* {.pure.} = enum + UNKNOWN = uint32(000) + SUCCESS = uint32(200) + BAD_REQUEST = uint32(400) + TOO_MANY_REQUESTS = uint32(429) + SERVICE_UNAVAILABLE = uint32(503) + + PeerExchangeResponseStatus* = object + status*: PeerExchangeResponseStatusCode + desc*: Option[string] + PeerExchangeRpc* = object - request*: PeerExchangeRequest - response*: PeerExchangeResponse + request*: Option[PeerExchangeRequest] + response*: Option[PeerExchangeResponse] + responseStatus*: Option[PeerExchangeResponseStatus] + +proc makeRequest*(T: type PeerExchangeRpc, numPeers: uint64): T = + return T(request: some(PeerExchangeRequest(numPeers: numpeers))) + +proc makeResponse*(T: type PeerExchangeRpc, peerInfos: seq[PeerExchangePeerInfo]): T = + return T( + response: some(PeerExchangeResponse(peerInfos: peerInfos)), + responseStatus: + some(PeerExchangeResponseStatus(status: PeerExchangeResponseStatusCode.SUCCESS)), + ) + +proc makeErrorResponse*( + T: type PeerExchangeRpc, + status: PeerExchangeResponseStatusCode, + desc: Option[string] = none(string), +): T = + return T(responseStatus: some(PeerExchangeResponseStatus(status: status, desc: desc))) diff --git a/waku/waku_peer_exchange/rpc_codec.nim b/waku/waku_peer_exchange/rpc_codec.nim index 92ebd70842..48dd5c009c 100644 --- a/waku/waku_peer_exchange/rpc_codec.nim +++ b/waku/waku_peer_exchange/rpc_codec.nim @@ -1,5 +1,6 @@ {.push raises: [].} +import std/options import ../common/protobuf, ./rpc proc encode*(rpc: PeerExchangeRequest): ProtoBuffer = @@ -59,26 +60,81 @@ proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] = ok(rpc) +proc parse*(T: type PeerExchangeResponseStatusCode, status: uint32): T = + case status + of 200, 400, 429, 503: + PeerExchangeResponseStatusCode(status) + else: + PeerExchangeResponseStatusCode.UNKNOWN + +proc encode*(rpc: PeerExchangeResponseStatus): ProtoBuffer = + var pb = initProtoBuffer() + + pb.write3(1, rpc.status.uint32) + pb.write3(2, rpc.desc) + + pb.finish3() + + pb + +proc decode*(T: type PeerExchangeResponseStatus, buffer: seq[byte]): ProtobufResult[T] = + var pb = initProtoBuffer(buffer) + var rpc = PeerExchangeResponseStatus(status: PeerExchangeResponseStatusCode.UNKNOWN) + + var status: uint32 + if ?pb.getField(1, status): + rpc.status = PeerExchangeResponseStatusCode.parse(status) + else: + return err(ProtobufError.missingRequiredField("status")) + + var desc: string + if ?pb.getField(2, desc): + rpc.desc = some(desc) + else: + rpc.desc = none(string) + + ok(rpc) + proc encode*(rpc: PeerExchangeRpc): ProtoBuffer = var pb = initProtoBuffer() - pb.write3(1, rpc.request.encode()) - pb.write3(2, rpc.response.encode()) + if rpc.request.isSome(): + pb.write3(1, rpc.request.get().encode()) + if rpc.response.isSome(): + pb.write3(2, rpc.response.get().encode()) + if rpc.responseStatus.isSome(): + pb.write3(10, rpc.responseStatus.get().encode()) + pb.finish3() pb -proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtobufResult[T] = let pb = initProtoBuffer(buffer) var rpc = PeerExchangeRpc() var requestBuffer: seq[byte] - if not ?pb.getField(1, requestBuffer): - return err(ProtoError.RequiredFieldMissing) - rpc.request = ?PeerExchangeRequest.decode(requestBuffer) + let isRequest = ?pb.getField(1, requestBuffer) var responseBuffer: seq[byte] - discard ?pb.getField(2, responseBuffer) - rpc.response = ?PeerExchangeResponse.decode(responseBuffer) + let isResponse = ?pb.getField(2, responseBuffer) + + if isRequest and isResponse: + return err(ProtobufError.missingRequiredField("request and response are exclusive")) + + if not isRequest and not isResponse: + return err(ProtobufError.missingRequiredField("request")) + + if isRequest: + rpc.request = some(?PeerExchangeRequest.decode(requestBuffer)) + + if isResponse: + rpc.response = some(?PeerExchangeResponse.decode(responseBuffer)) + + var status: seq[byte] + if ?pb.getField(10, status): + rpc.responseStatus = some(?PeerExchangeResponseStatus.decode(status)) + elif not isRequest: + return err(ProtobufError.missingRequiredField("responseStatus")) ok(rpc) From 91596c945e8e78b7c81e29b5314978bd2745ce0c Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Thu, 12 Sep 2024 14:58:24 +0200 Subject: [PATCH 2/9] Better error result handling for PeerEx request, adjusted tests --- tests/waku_peer_exchange/test_protocol.nim | 8 +-- tests/waku_peer_exchange/test_rpc_codec.nim | 3 +- waku/waku_peer_exchange/protocol.nim | 58 ++++++++++++++++----- waku/waku_peer_exchange/rpc.nim | 15 ++++++ 4 files changed, 65 insertions(+), 19 deletions(-) diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index 9bc57083f8..450eb12924 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -1,11 +1,10 @@ {.used.} import - std/[options, sequtils, tables], + std/[options, sequtils, tables, net], testutils/unittests, chronos, chronicles, - stew/shims/net, libp2p/[switch, peerId, crypto/crypto, multistream, muxers/muxer], eth/[keys, p2p/discoveryv5/enr] @@ -223,6 +222,7 @@ suite "Waku Peer Exchange": # Check that it failed gracefully check: response.isErr + response.error.status == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE asyncTest "Request 0 peers, with 0 peers in PeerExchange": # Given a disconnected PeerExchange @@ -237,7 +237,7 @@ suite "Waku Peer Exchange": # Then the response should be an error check: response.isErr - response.error == "peer_not_found_failure" + response.error.status == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE asyncTest "Pool filtering": let @@ -331,7 +331,7 @@ suite "Waku Peer Exchange": # Then the response should be an error check: response.isErr - response.error == "dial_failure" + response.error.status == PeerExchangeResponseStatusCode.DIAL_FAILURE asyncTest "Connections are closed after response is sent": # Create 3 nodes diff --git a/tests/waku_peer_exchange/test_rpc_codec.nim b/tests/waku_peer_exchange/test_rpc_codec.nim index c5fa79b11b..e7ddc60f66 100644 --- a/tests/waku_peer_exchange/test_rpc_codec.nim +++ b/tests/waku_peer_exchange/test_rpc_codec.nim @@ -1,10 +1,9 @@ {.used.} import - std/[options], + std/[options, net], testutils/unittests, chronos, - stew/shims/net, libp2p/switch, libp2p/peerId, libp2p/crypto/crypto, diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index a75b4b2007..b815fc7895 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -46,7 +46,7 @@ const pxFailure = "px_failure" type - WakuPeerExchangeResult*[T] = Result[T, string] + WakuPeerExchangeResult*[T] = Result[T, PeerExchangeResponseStatus] WakuPeerExchange* = ref object of LPProtocol peerManager*: PeerManager @@ -61,25 +61,33 @@ proc request*( let rpc = PeerExchangeRpc.makeRequest(numPeers) var buffer: seq[byte] - var error: string + var callResult = + PeerExchangeResponseStatus(status: PeerExchangeResponseStatusCode.SUCCESS) try: await conn.writeLP(rpc.encode().buffer) buffer = await conn.readLp(DefaultMaxRpcSize.int) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) - error = $exc.msg + callResult = PeerExchangeResponseStatus( + status: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, desc: some($exc.msg) + ) finally: # close, no more data is expected await conn.closeWithEof() - if error.len > 0: - return err("write/read failed: " & error) + if callResult.status != PeerExchangeResponseStatusCode.SUCCESS: + return err(callResult) let decodedBuff = PeerExchangeRpc.decode(buffer) if decodedBuff.isErr(): - return err("decode failed: " & $decodedBuff.error) - if decodedBuff.get().response.isNone(): - return err("bad response - missing value") + return err( + PeerExchangeResponseStatus( + status: PeerExchangeResponseStatusCode.BAD_RESPONSE, + desc: some($decodedBuff.error), + ) + ) + if decodedBuff.get().response.isNone() and decodedBuff.get().responseStatus.isSome(): + return err(decodedBuff.get().responseStatus.get()) return ok(decodedBuff.get().response.get()) proc request*( @@ -88,10 +96,19 @@ proc request*( try: let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec) if connOpt.isNone(): - return err(dialFailure) + return err( + PeerExchangeResponseStatus( + status: PeerExchangeResponseStatusCode.DIAL_FAILURE, desc: some(dialFailure) + ) + ) return await wpx.request(numPeers, connOpt.get()) except CatchableError: - return err("exception dialing peer: " & getCurrentExceptionMsg()) + return err( + PeerExchangeResponseStatus( + status: PeerExchangeResponseStatusCode.BAD_RESPONSE, + desc: some("exception dialing peer: " & getCurrentExceptionMsg()), + ) + ) proc request*( wpx: WakuPeerExchange, numPeers: uint64 @@ -99,7 +116,12 @@ proc request*( let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) if peerOpt.isNone(): waku_px_errors.inc(labelValues = [peerNotFoundFailure]) - return err(peerNotFoundFailure) + return err( + PeerExchangeResponseStatus( + status: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + desc: some(peerNotFoundFailure), + ) + ) return await wpx.request(numPeers, peerOpt.get()) proc respond( @@ -111,7 +133,12 @@ proc respond( await conn.writeLP(rpc.encode().buffer) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) - return err(exc.msg) + return err( + PeerExchangeResponseStatus( + status: PeerExchangeResponseStatusCode.DIAL_FAILURE, + desc: some("exception dialing peer: " & exc.msg), + ) + ) return ok() @@ -127,7 +154,12 @@ proc respondError( await conn.writeLP(rpc.encode().buffer) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) - return err(exc.msg) + return err( + PeerExchangeResponseStatus( + status: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + desc: some("exception dialing peer: " & exc.msg), + ) + ) return ok() diff --git a/waku/waku_peer_exchange/rpc.nim b/waku/waku_peer_exchange/rpc.nim index f40ce7db08..c4c36212f2 100644 --- a/waku/waku_peer_exchange/rpc.nim +++ b/waku/waku_peer_exchange/rpc.nim @@ -14,8 +14,10 @@ type UNKNOWN = uint32(000) SUCCESS = uint32(200) BAD_REQUEST = uint32(400) + BAD_RESPONSE = uint32(401) TOO_MANY_REQUESTS = uint32(429) SERVICE_UNAVAILABLE = uint32(503) + DIAL_FAILURE = uint32(599) PeerExchangeResponseStatus* = object status*: PeerExchangeResponseStatusCode @@ -42,3 +44,16 @@ proc makeErrorResponse*( desc: Option[string] = none(string), ): T = return T(responseStatus: some(PeerExchangeResponseStatus(status: status, desc: desc))) + +proc `$`*(statusCode: PeerExchangeResponseStatusCode): string = + case statusCode + of PeerExchangeResponseStatusCode.UNKNOWN: "UNKNOWN" + of PeerExchangeResponseStatusCode.SUCCESS: "SUCCESS" + of PeerExchangeResponseStatusCode.BAD_REQUEST: "BAD_REQUEST" + of PeerExchangeResponseStatusCode.BAD_RESPONSE: "BAD_RESPONSE" + of PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS: "TOO_MANY_REQUESTS" + of PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE: "SERVICE_UNAVAILABLE" + of PeerExchangeResponseStatusCode.DIAL_FAILURE: "DIAL_FAILURE" + +proc `$`*(pxResponseStatus: PeerExchangeResponseStatus): string = + return $pxResponseStatus.status & " - " & pxResponseStatus.desc.get("") From 4da68d6a2bacd8eb31a70b641a8f7864e1a7f8bf Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 13 Sep 2024 02:11:03 +0200 Subject: [PATCH 3/9] Refactored RateLimit configuration option for better CLI UX - now possible to set separate limits per protocol. Adjusted mountings. Added and adjusted tests --- tests/common/test_all.nim | 1 + tests/common/test_ratelimit_setting.nim | 150 +++++++++++++++++++++ tests/common/test_requestratelimiter.nim | 14 ++ tests/node/test_wakunode_peer_exchange.nim | 7 +- tests/waku_peer_exchange/test_protocol.nim | 56 ++++++++ waku/common/rate_limit/setting.nim | 118 +++++++++++++++- waku/factory/builder.nim | 16 ++- waku/factory/external_config.nim | 26 ++-- waku/factory/node_factory.nim | 20 ++- waku/node/waku_node.nim | 22 ++- waku/waku_filter_v2/subscriptions.nim | 6 - waku/waku_peer_exchange.nim | 4 +- waku/waku_peer_exchange/rpc_codec.nim | 6 +- 13 files changed, 401 insertions(+), 45 deletions(-) create mode 100644 tests/common/test_ratelimit_setting.nim diff --git a/tests/common/test_all.nim b/tests/common/test_all.nim index 1afef23a90..7756f23adf 100644 --- a/tests/common/test_all.nim +++ b/tests/common/test_all.nim @@ -8,4 +8,5 @@ import ./test_parse_size, ./test_tokenbucket, ./test_requestratelimiter, + ./test_ratelimit_setting, ./test_timed_map diff --git a/tests/common/test_ratelimit_setting.nim b/tests/common/test_ratelimit_setting.nim new file mode 100644 index 0000000000..d14bf9b87d --- /dev/null +++ b/tests/common/test_ratelimit_setting.nim @@ -0,0 +1,150 @@ +# Chronos Test Suite +# (c) Copyright 2022-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +{.used.} + +import testutils/unittests +import chronos, libp2p/stream/connection +import std/[sequtils, options, tables] + +import ../../waku/common/rate_limit/request_limiter +import ../../waku/common/rate_limit/timed_map + +let proto = "ProtocolDescriptor" + +let conn1 = Connection(peerId: PeerId.random().tryGet()) +let conn2 = Connection(peerId: PeerId.random().tryGet()) +let conn3 = Connection(peerId: PeerId.random().tryGet()) + +suite "RateLimitSetting": + test "Parse rate limit setting - ok": + let test1 = "10/2m" + let test2 = " store : 10 /1h" + let test2a = "storev2 : 10 /1h" + let test2b = "storeV3: 12 /1s" + let test3 = "LIGHTPUSH: 10/ 1m" + let test4 = "px:10/2 s " + let test5 = "filter:42/66ms" + + let expU = UnlimitedRateLimit + let exp1: RateLimitSetting = (10, 2.minutes) + let exp2: RateLimitSetting = (10, 1.hours) + let exp2a: RateLimitSetting = (10, 1.hours) + let exp2b: RateLimitSetting = (12, 1.seconds) + let exp3: RateLimitSetting = (10, 1.minutes) + let exp4: RateLimitSetting = (10, 2.seconds) + let exp5: RateLimitSetting = (42, 66.milliseconds) + + let res1 = ProtocolRateLimitSettings.parse(@[test1]) + let res2 = ProtocolRateLimitSettings.parse(@[test2]) + let res2a = ProtocolRateLimitSettings.parse(@[test2a]) + let res2b = ProtocolRateLimitSettings.parse(@[test2b]) + let res3 = ProtocolRateLimitSettings.parse(@[test3]) + let res4 = ProtocolRateLimitSettings.parse(@[test4]) + let res5 = ProtocolRateLimitSettings.parse(@[test5]) + + check: + res1.isOk() + res1.get() == {GLOBAL: exp1}.toTable() + res2.isOk() + res2.get() == {GLOBAL: expU, STOREV2: exp2, STOREV3: exp2}.toTable() + res2a.isOk() + res2a.get() == {GLOBAL: expU, STOREV2: exp2a}.toTable() + res2b.isOk() + res2b.get() == {GLOBAL: expU, STOREV3: exp2b}.toTable() + res3.isOk() + res3.get() == {GLOBAL: expU, LIGHTPUSH: exp3}.toTable() + res4.isOk() + res4.get() == {GLOBAL: expU, PEEREXCHG: exp4}.toTable() + res5.isOk() + res5.get() == {GLOBAL: expU, FILTER: exp5}.toTable() + + test "Parse rate limit setting - err": + let test1 = "10/2d" + let test2 = " stre : 10 /1h" + let test2a = "storev2 10 /1h" + let test2b = "storev3: 12 1s" + let test3 = "somethingelse: 10/ 1m" + let test4 = ":px:10/2 s " + let test5 = "filter:p42/66ms" + + let res1 = ProtocolRateLimitSettings.parse(@[test1]) + let res2 = ProtocolRateLimitSettings.parse(@[test2]) + let res2a = ProtocolRateLimitSettings.parse(@[test2a]) + let res2b = ProtocolRateLimitSettings.parse(@[test2b]) + let res3 = ProtocolRateLimitSettings.parse(@[test3]) + let res4 = ProtocolRateLimitSettings.parse(@[test4]) + let res5 = ProtocolRateLimitSettings.parse(@[test5]) + + check: + res1.isErr() + res2.isErr() + res2a.isErr() + res2b.isErr() + res3.isErr() + res4.isErr() + res5.isErr() + + test "Parse rate limit setting - complex": + let expU = UnlimitedRateLimit + + let test1 = @["lightpush:2/2ms", "10/2m", " store: 3/3s", " storev2:12/12s"] + let exp1 = { + GLOBAL: (10, 2.minutes), + LIGHTPUSH: (2, 2.milliseconds), + STOREV3: (3, 3.seconds), + STOREV2: (12, 12.seconds), + }.toTable() + + let res1 = ProtocolRateLimitSettings.parse(test1) + + check: + res1.isOk() + res1.get() == exp1 + res1.get().getSetting(PEEREXCHG) == (10, 2.minutes) + res1.get().getSetting(STOREV2) == (12, 12.seconds) + res1.get().getSetting(STOREV3) == (3, 3.seconds) + res1.get().getSetting(LIGHTPUSH) == (2, 2.milliseconds) + + let test2 = @["lightpush:2/2ms", " store: 3/3s", "px:10/10h", "filter:4/42ms"] + let exp2 = { + GLOBAL: expU, + LIGHTPUSH: (2, 2.milliseconds), + STOREV3: (3, 3.seconds), + STOREV2: (3, 3.seconds), + FILTER: (4, 42.milliseconds), + PEEREXCHG: (10, 10.hours), + }.toTable() + + let res2 = ProtocolRateLimitSettings.parse(test2) + + check: + res2.isOk() + res2.get() == exp2 + + let test3 = + @["storev2:1/1s", "store:3/3s", "storev3:4/42ms", "storev3:5/5s", "storev3:6/6s"] + let exp3 = + {GLOBAL: expU, STOREV3: (6, 6.seconds), STOREV2: (1, 1.seconds)}.toTable() + + let res3 = ProtocolRateLimitSettings.parse(test3) + + check: + res3.isOk() + res3.get() == exp3 + res3.get().getSetting(LIGHTPUSH) == expU + + let test4 = newSeq[string](0) + let exp4 = {GLOBAL: expU}.toTable() + + let res4 = ProtocolRateLimitSettings.parse(test4) + + check: + res4.isOk() + res4.get() == exp4 + res3.get().getSetting(LIGHTPUSH) == expU diff --git a/tests/common/test_requestratelimiter.nim b/tests/common/test_requestratelimiter.nim index 256e48118c..0b494c1bec 100644 --- a/tests/common/test_requestratelimiter.nim +++ b/tests/common/test_requestratelimiter.nim @@ -82,3 +82,17 @@ suite "RequestRateLimiter": # requests of other peers can also go check limiter.checkUsage(proto, conn2, now + 4100.milliseconds) == true check limiter.checkUsage(proto, conn3, now + 5.minutes) == true + + test "RequestRateLimiter lowest possible volume": + # keep limits low for easier calculation of ratios + let rateLimit: RateLimitSetting = (1, 1.seconds) + var limiter = newRequestRateLimiter(some(rateLimit)) + + let now = Moment.now() + # with first use we register the peer also and start its timer + check limiter.checkUsage(proto, conn1, now + 500.milliseconds) == true + + # run out of main tokens but still used one more token from the peer's bucket + check limiter.checkUsage(proto, conn1, now + 800.milliseconds) == false + check limiter.checkUsage(proto, conn1, now + 1499.milliseconds) == false + check limiter.checkUsage(proto, conn1, now + 1501.milliseconds) == true diff --git a/tests/node/test_wakunode_peer_exchange.nim b/tests/node/test_wakunode_peer_exchange.nim index c2a235045e..1abcdfd23c 100644 --- a/tests/node/test_wakunode_peer_exchange.nim +++ b/tests/node/test_wakunode_peer_exchange.nim @@ -84,7 +84,8 @@ suite "Waku Peer Exchange": # Then no peers are fetched check: node.peerManager.peerStore.peers.len == 0 - res.error == "PeerExchange is not mounted" + res.error.status == SERVICE_UNAVAILABLE + res.error.desc == some("PeerExchange is not mounted") asyncTest "Node fetches with mounted peer exchange, but no peers": # Given a node with peer exchange mounted @@ -92,7 +93,9 @@ suite "Waku Peer Exchange": # When a node fetches peers let res = await node.fetchPeerExchangePeers(1) - check res.error == "Peer exchange failure: peer_not_found_failure" + check: + res.error.status == SERVICE_UNAVAILABLE + res.error.desc == some("peer_not_found_failure") # Then no peers are fetched check node.peerManager.peerStore.peers.len == 0 diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index 450eb12924..b7c4e636ff 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -403,3 +403,59 @@ suite "Waku Peer Exchange": decodedBuff.get().response.isSome() decodedBuff.get().response.get().peerInfos.len == 1 decodedBuff.get().response.get().peerInfos[0].enr == enr1.raw + + asyncTest "RateLimit as expected": + let + node1 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node2 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures( + [ + node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)), + node2.mountPeerExchange(), + ] + ) + + # Create connection + let connOpt = await node2.peerManager.dialPeer( + node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec + ) + require: + connOpt.isSome + + # Create some enr and add to peer exchange (simulating disv5) + var enr1, enr2 = enr.Record() + check enr1.fromUri( + "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" + ) + check enr2.fromUri( + "enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB" + ) + + # Mock that we have discovered these enrs + node1.wakuPeerExchange.enrCache.add(enr1) + node1.wakuPeerExchange.enrCache.add(enr2) + + await sleepAsync(150.milliseconds) + + # Request 2 peer from px. Test all request variants + let response1 = await node2.wakuPeerExchange.request(1) + check: + response1.isOk + response1.get().peerInfos.len == 1 + + let response2 = + await node2.wakuPeerExchange.request(1, node1.peerInfo.toRemotePeerInfo()) + check: + response2.isErr + response2.error().status == PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS + + await sleepAsync(150.milliseconds) + let response3 = await node2.wakuPeerExchange.request(1, connOpt.get()) + check: + response3.isOk + response3.get().peerInfos.len == 1 diff --git a/waku/common/rate_limit/setting.nim b/waku/common/rate_limit/setting.nim index 420be9f717..5f4ea0af5d 100644 --- a/waku/common/rate_limit/setting.nim +++ b/waku/common/rate_limit/setting.nim @@ -1,12 +1,35 @@ {.push raises: [].} -import chronos/timer +import chronos/timer, std/[tables, strutils, options], regex, results # Setting for TokenBucket defined as volume over period of time type RateLimitSetting* = tuple[volume: int, period: Duration] +type RateLimitedProtocol* = enum + GLOBAL + STOREV2 + STOREV3 + LIGHTPUSH + PEEREXCHG + FILTER + +type ProtocolRateLimitSettings* = Table[RateLimitedProtocol, RateLimitSetting] +type ProtocolRateLimit = tuple[protocol: RateLimitedProtocol, setting: RateLimitSetting] + # Set the default to switch off rate limiting for now let DefaultGlobalNonRelayRateLimit*: RateLimitSetting = (0, 0.minutes) +let UnlimitedRateLimit*: RateLimitSetting = (0, 0.seconds) + +# Acceptable call frequence from one peer using filter service +# Assumption is having to set up a subscription with max 30 calls than using ping in every min +# While subscribe/unsubscribe events are distributed in time among clients, pings will happen regularly from +# all subscribed peers +let FilterDefaultPerPeerRateLimit*: RateLimitSetting = (30, 1.minutes) + +# For being used under GC-safe condition must use threadvar +var DefaultProtocolRateLimit* {.threadvar.}: ProtocolRateLimitSettings +DefaultProtocolRateLimit = + {GLOBAL: UnlimitedRateLimit, FILTER: FilterDefaultPerPeerRateLimit}.toTable() proc isUnlimited*(t: RateLimitSetting): bool {.inline.} = return t.volume <= 0 or t.period <= 0.seconds @@ -17,3 +40,96 @@ func `$`*(t: RateLimitSetting): string {.inline.} = "no-limit" else: $t.volume & "/" & $t.period + +proc translate(sProtocol: string): RateLimitedProtocol = + if sProtocol.len == 0: + return GLOBAL + + case sProtocol + of "global": + return GLOBAL + of "storev2": + return STOREV2 + of "storev3": + return STOREV3 + of "lightpush": + return LIGHTPUSH + of "px": + return PEEREXCHG + of "filter": + return FILTER + +proc fillSettingTable( + t: var ProtocolRateLimitSettings, sProtocol: var string, setting: RateLimitSetting +) = + let protocol = translate(sProtocol) + + if sProtocol == "store": + # generic store will only applies to version which is not listed directly + discard t.hasKeyOrPut(STOREV2, setting) + discard t.hasKeyOrPut(STOREV3, setting) + else: + # always overrides, last one wins if same protocol duplicated + t[protocol] = setting + +proc parse*( + T: type ProtocolRateLimitSettings, settings: seq[string] +): Result[ProtocolRateLimitSettings, string] = + var settingsTable: ProtocolRateLimitSettings = + initTable[RateLimitedProtocol, RateLimitSetting]() + + ## Following regex can match the exact syntax of how rate limit can be set for different protocol or global. + ## It uses capture groups + ## group0: Will be check if protocol name is followed by a colon but only if protocol name is set. + ## group1: Protocol name, if empty we take it as "global" setting + ## group2: Volume of tokens - only integer + ## group3: Duration of period - only integer + ## group4: Unit of period - only h:hour, m:minute, s:second, ms:millisecond allowed + ## whitespaces are allowed lazily + const parseRegex = + """^\s*((store|storev2|storev3|lightpush|px|filter)\s*:)?\s*(\d+)\s*\/\s*(\d+)\s*(s|h|m|ms)\s*$""" + const regexParseSize = re2(parseRegex) + for settingStr in settings: + let aSetting = settingStr.toLower() + try: + var m: RegexMatch2 + if aSetting.match(regexParseSize, m) == false: + return err("Invalid rate-limit setting: " & settingStr) + + var sProtocol = aSetting[m.captures[1]] + let volume = aSetting[m.captures[2]].parseInt() + let duration = aSetting[m.captures[3]].parseInt() + let periodUnit = aSetting[m.captures[4]] + + var period = 0.seconds + case periodUnit + of "ms": + period = duration.milliseconds + of "s": + period = duration.seconds + of "m": + period = duration.minutes + of "h": + period = duration.hours + + fillSettingTable(settingsTable, sProtocol, (volume, period)) + except ValueError: + return err("Invalid rate-limit setting: " & settingStr) + + # If there were no global setting predefined, we set unlimited + # due it is taken for protocols not defined in the list - thus those will not apply accidentally wrong settings. + discard settingsTable.hasKeyOrPut(GLOBAL, UnlimitedRateLimit) + discard settingsTable.hasKeyOrPut(FILTER, FilterDefaultPerPeerRateLimit) + + return ok(settingsTable) + +proc parse*( + T: type ProtocolRateLimitSettings, settings: string +): Result[ProtocolRateLimitSettings, string] = + return ok(settingsTable) + +proc getSetting*( + t: ProtocolRateLimitSettings, protocol: RateLimitedProtocol +): RateLimitSetting = + let default = t.getOrDefault(GLOBAL, UnlimitedRateLimit) + return t.getOrDefault(protocol, default) diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index 1451a8a39b..7e203fe72b 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -8,7 +8,12 @@ import libp2p/builders, libp2p/nameresolving/nameresolver, libp2p/transports/wstransport -import ../waku_enr, ../discovery/waku_discv5, ../waku_node, ../node/peer_manager +import + ../waku_enr, + ../discovery/waku_discv5, + ../waku_node, + ../node/peer_manager, + ../common/rate_limit/setting type WakuNodeBuilder* = object # General @@ -34,6 +39,9 @@ type switchSslSecureCert: Option[string] switchSendSignedPeerRecord: Option[bool] + #Rate limit configs for non-relay req-resp protocols + rateLimitSettings: Option[seq[string]] + WakuNodeBuilderResult* = Result[void, string] ## Init @@ -105,6 +113,9 @@ proc withPeerManagerConfig*( proc withColocationLimit*(builder: var WakuNodeBuilder, colocationLimit: int) = builder.colocationLimit = colocationLimit +proc withRateLimit*(builder: var WakuNodeBuilder, limits: seq[string]) = + builder.rateLimitSettings = some(limits) + ## Waku switch proc withSwitchConfiguration*( @@ -184,4 +195,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = except Exception: return err("failed to build WakuNode instance: " & getCurrentExceptionMsg()) + if builder.rateLimitSettings.isSome(): + ?node.setRateLimits(builder.rateLimitSettings.get()) + ok(node) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 532a657fd0..812cffb768 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -670,21 +670,18 @@ with the drawback of consuming some more bandwitdh.""", name: "websocket-secure-cert-path" .}: string - ## Rate limitation config - ## Currently default to switch of rate limit until become official - requestRateLimit* {. + ## Rate limitation config, if not set, rate limit checks will not be performed + rateLimits* {. desc: - "Number of requests to serve by each service in the specified period. Set it to 0 for unlimited", - defaultValue: 0, - name: "request-rate-limit" - .}: int - - ## Currently default to switch of rate limit until become official - requestRatePeriod* {. - desc: "Period of request rate limitation in seconds. Set it to 0 for unlimited", - defaultValue: 0, - name: "request-rate-period" - .}: int64 + "Rate limit settings for different protocols." & + "Format: protocol:volume/period" & + " Where 'protocol' can be one of: if not defined it means a global setting" & + " 'volume' and period must be an integer value. " & + " 'unit' must be one of - hours, minutes, seconds, milliseconds respectively. " & + "Argument may be repeated.", + defaultValue: newSeq[string](0), + name: "rate-limit" + .}: seq[string] ## Parsing @@ -850,6 +847,7 @@ proc load*(T: type WakuNodeConf, version = ""): ConfResult[T] = sources.addConfigFile(Toml, conf.configFile.get()) , ) + ok(conf) except CatchableError: err(getCurrentExceptionMsg()) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 042c11cee1..dcd2179043 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -102,6 +102,7 @@ proc initNode( builder.withPeerManagerConfig( maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement ) + builder.withRateLimit(conf.rateLimits) node = ?builder.build().mapErr( @@ -274,20 +275,17 @@ proc setupProtocols( if mountArcRes.isErr(): return err("failed to mount waku archive protocol: " & mountArcRes.error) - let rateLimitSetting: RateLimitSetting = - (conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod)) - if conf.legacyStore: # Store legacy setup try: - await mountLegacyStore(node, rateLimitSetting) + await mountLegacyStore(node, node.rateLimitSettings.getSetting(STOREV2)) except CatchableError: return err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg()) # Store setup try: - await mountStore(node, rateLimitSetting) + await mountStore(node, node.rateLimitSettings.getSetting(STOREV3)) except CatchableError: return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) @@ -326,9 +324,7 @@ proc setupProtocols( # NOTE Must be mounted after relay if conf.lightpush: try: - let rateLimitSetting: RateLimitSetting = - (conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod)) - await mountLightPush(node, rateLimitSetting) + await mountLightPush(node, node.rateLimitSettings.getSetting(LIGHTPUSH)) except CatchableError: return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) @@ -348,6 +344,7 @@ proc setupProtocols( subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout), maxFilterPeers = conf.filterMaxPeersToServe, maxFilterCriteriaPerPeer = conf.filterMaxCriteria, + rateLimitSetting = node.rateLimitSettings.getSetting(FILTER), ) except CatchableError: return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) @@ -368,10 +365,9 @@ proc setupProtocols( # waku peer exchange setup if conf.peerExchange: try: - let rateLimitSetting: RateLimitSetting = - (conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod)) - - await mountPeerExchange(node, some(conf.clusterId), rateLimitSetting) + await mountPeerExchange( + node, some(conf.clusterId), node.rateLimitSettings.getSetting(PEEREXCHG) + ) except CatchableError: return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 9d41260127..c54f97dc41 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -114,6 +114,7 @@ type started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] contentTopicHandlers: Table[ContentTopic, TopicHandler] + rateLimitSettings*: ProtocolRateLimitSettings proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService = ## AutonatService request other peers to dial us back @@ -164,6 +165,7 @@ proc new*( enr: enr, announcedAddresses: netConfig.announcedAddresses, topicSubscriptionQueue: queue, + rateLimitSettings: DefaultProtocolRateLimit, ) return node @@ -481,7 +483,7 @@ proc mountFilter*( maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer, messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL, - rateLimitSetting: RateLimitSetting = FilterPerPeerRateLimit, + rateLimitSetting: RateLimitSetting = FilterDefaultPerPeerRateLimit, ) {.async: (raises: []).} = ## Mounting filter v2 protocol @@ -1166,10 +1168,15 @@ proc mountPeerExchange*( proc fetchPeerExchangePeers*( node: Wakunode, amount: uint64 -): Future[Result[int, string]] {.async: (raises: []).} = +): Future[Result[int, PeerExchangeResponseStatus]] {.async: (raises: []).} = if node.wakuPeerExchange.isNil(): error "could not get peers from px, waku peer-exchange is nil" - return err("PeerExchange is not mounted") + return err( + PeerExchangeResponseStatus( + status: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + desc: some("PeerExchange is not mounted"), + ) + ) info "Retrieving peer info via peer exchange protocol" let pxPeersRes = await node.wakuPeerExchange.request(amount) @@ -1187,7 +1194,7 @@ proc fetchPeerExchangePeers*( else: warn "failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error - return err("Peer exchange failure: " & $pxPeersRes.error) + return err(pxPeersRes.error) # TODO: Move to application module (e.g., wakunode2.nim) proc setPeerExchangePeer*( @@ -1376,3 +1383,10 @@ proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} = return true return await node.wakuRlnRelay.isReady() ## TODO: add other protocol `isReady` checks + +proc setRateLimits*(node: WakuNode, limits: seq[string]): Result[void, string] = + let rateLimitConfig = ProtocolRateLimitSettings.parse(limits) + if rateLimitConfig.isErr(): + return err("invalid rate limit settings:" & rateLimitConfig.error) + node.rateLimitSettings = rateLimitConfig.get() + ok() diff --git a/waku/waku_filter_v2/subscriptions.nim b/waku/waku_filter_v2/subscriptions.nim index f8f2987081..7df21ea0f4 100644 --- a/waku/waku_filter_v2/subscriptions.nim +++ b/waku/waku_filter_v2/subscriptions.nim @@ -12,12 +12,6 @@ const DefaultSubscriptionTimeToLiveSec* = 5.minutes MessageCacheTTL* = 2.minutes - # Acceptable call frequence from one peer using filter service - # Assumption is having to set up a subscription with max 30 calls than using ping in every min - # While subscribe/unsubscribe events are distributed in time among clients, pings will happen regularly from - # all subscribed peers - FilterPerPeerRateLimit*: RateLimitSetting = (30, 1.minutes) - type # a single filter criterion is fully defined by a pubsub topic and content topic FilterCriterion* = tuple[pubsubTopic: PubsubTopic, contentTopic: ContentTopic] diff --git a/waku/waku_peer_exchange.nim b/waku/waku_peer_exchange.nim index ca707f1621..994989df44 100644 --- a/waku/waku_peer_exchange.nim +++ b/waku/waku_peer_exchange.nim @@ -1,5 +1,5 @@ {.push raises: [].} -import ./waku_peer_exchange/protocol +import ./waku_peer_exchange/[protocol, rpc] -export protocol +export protocol, rpc diff --git a/waku/waku_peer_exchange/rpc_codec.nim b/waku/waku_peer_exchange/rpc_codec.nim index 48dd5c009c..e7a680e8c5 100644 --- a/waku/waku_peer_exchange/rpc_codec.nim +++ b/waku/waku_peer_exchange/rpc_codec.nim @@ -122,9 +122,6 @@ proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtobufResult[T] = if isRequest and isResponse: return err(ProtobufError.missingRequiredField("request and response are exclusive")) - if not isRequest and not isResponse: - return err(ProtobufError.missingRequiredField("request")) - if isRequest: rpc.request = some(?PeerExchangeRequest.decode(requestBuffer)) @@ -134,6 +131,9 @@ proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtobufResult[T] = var status: seq[byte] if ?pb.getField(10, status): rpc.responseStatus = some(?PeerExchangeResponseStatus.decode(status)) + if rpc.responseStatus.get().status == PeerExchangeResponseStatusCode.SUCCESS and + not isResponse: + return err(ProtobufError.missingRequiredField("response")) elif not isRequest: return err(ProtobufError.missingRequiredField("responseStatus")) From 4b99834544b0fbdbc0be619772af8597dfeda82d Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 13 Sep 2024 02:54:58 +0200 Subject: [PATCH 4/9] Fix libwaku due to changes of error return type of fetchPeerExchangePeers --- .../inter_thread_communication/requests/discovery_request.nim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim index 3cbf1de7c5..e23b9be26a 100644 --- a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim @@ -104,7 +104,8 @@ proc updateDiscv5BootstrapNodes(nodes: string, waku: ptr Waku): Result[void, str proc performPeerExchangeRequestTo( numPeers: uint64, waku: ptr Waku ): Future[Result[int, string]] {.async.} = - return await waku.node.fetchPeerExchangePeers(numPeers) + return (await waku.node.fetchPeerExchangePeers(numPeers)).isOkOr: + return err($error) proc process*( self: ptr DiscoveryRequest, waku: ptr Waku From a25c0d4fff944072999d2d88dc640197de1a56ac Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 13 Sep 2024 08:45:38 +0200 Subject: [PATCH 5/9] Fix rate limit setting tests due to changed defaults --- tests/common/test_ratelimit_setting.nim | 33 ++++++++++++++++++------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/tests/common/test_ratelimit_setting.nim b/tests/common/test_ratelimit_setting.nim index d14bf9b87d..6f6ac8d38e 100644 --- a/tests/common/test_ratelimit_setting.nim +++ b/tests/common/test_ratelimit_setting.nim @@ -50,17 +50,27 @@ suite "RateLimitSetting": check: res1.isOk() - res1.get() == {GLOBAL: exp1}.toTable() + res1.get() == {GLOBAL: exp1, FILTER: FilterDefaultPerPeerRateLimit}.toTable() res2.isOk() - res2.get() == {GLOBAL: expU, STOREV2: exp2, STOREV3: exp2}.toTable() + res2.get() == + { + GLOBAL: expU, + FILTER: FilterDefaultPerPeerRateLimit, + STOREV2: exp2, + STOREV3: exp2, + }.toTable() res2a.isOk() - res2a.get() == {GLOBAL: expU, STOREV2: exp2a}.toTable() + res2a.get() == + {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, STOREV2: exp2a}.toTable() res2b.isOk() - res2b.get() == {GLOBAL: expU, STOREV3: exp2b}.toTable() + res2b.get() == + {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, STOREV3: exp2b}.toTable() res3.isOk() - res3.get() == {GLOBAL: expU, LIGHTPUSH: exp3}.toTable() + res3.get() == + {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, LIGHTPUSH: exp3}.toTable() res4.isOk() - res4.get() == {GLOBAL: expU, PEEREXCHG: exp4}.toTable() + res4.get() == + {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, PEEREXCHG: exp4}.toTable() res5.isOk() res5.get() == {GLOBAL: expU, FILTER: exp5}.toTable() @@ -96,6 +106,7 @@ suite "RateLimitSetting": let test1 = @["lightpush:2/2ms", "10/2m", " store: 3/3s", " storev2:12/12s"] let exp1 = { GLOBAL: (10, 2.minutes), + FILTER: FilterDefaultPerPeerRateLimit, LIGHTPUSH: (2, 2.milliseconds), STOREV3: (3, 3.seconds), STOREV2: (12, 12.seconds), @@ -129,8 +140,12 @@ suite "RateLimitSetting": let test3 = @["storev2:1/1s", "store:3/3s", "storev3:4/42ms", "storev3:5/5s", "storev3:6/6s"] - let exp3 = - {GLOBAL: expU, STOREV3: (6, 6.seconds), STOREV2: (1, 1.seconds)}.toTable() + let exp3 = { + GLOBAL: expU, + FILTER: FilterDefaultPerPeerRateLimit, + STOREV3: (6, 6.seconds), + STOREV2: (1, 1.seconds), + }.toTable() let res3 = ProtocolRateLimitSettings.parse(test3) @@ -140,7 +155,7 @@ suite "RateLimitSetting": res3.get().getSetting(LIGHTPUSH) == expU let test4 = newSeq[string](0) - let exp4 = {GLOBAL: expU}.toTable() + let exp4 = {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit}.toTable() let res4 = ProtocolRateLimitSettings.parse(test4) From fa461481238c13a5187e6dd5c29a13d8333ae2d4 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 13 Sep 2024 09:45:16 +0200 Subject: [PATCH 6/9] Introduce new gauge to help dasboard effectively show current rate limit applied for protocol --- waku/common/rate_limit/service_metrics.nim | 12 +++++++++++- waku/common/rate_limit/setting.nim | 5 +++++ waku/waku_filter_v2/protocol.nim | 1 + waku/waku_lightpush/protocol.nim | 1 + waku/waku_peer_exchange/protocol.nim | 1 + waku/waku_store/protocol.nim | 2 +- waku/waku_store_legacy/protocol.nim | 1 + 7 files changed, 21 insertions(+), 2 deletions(-) diff --git a/waku/common/rate_limit/service_metrics.nim b/waku/common/rate_limit/service_metrics.nim index fe0bbf05c8..339bf7f38b 100644 --- a/waku/common/rate_limit/service_metrics.nim +++ b/waku/common/rate_limit/service_metrics.nim @@ -1,9 +1,19 @@ {.push raises: [].} -import metrics +import std/options +import metrics, setting + +declarePublicGauge waku_service_requests_limit, + "Applied rate limit of non-relay service", ["service"] declarePublicCounter waku_service_requests, "number of non-relay service requests received", ["service", "state"] declarePublicCounter waku_service_network_bytes, "total incoming traffic of specific waku services", labels = ["service", "direction"] + +proc setServiceLimitMetric*(service: string, limit: Option[RateLimitSetting]) = + if limit.isSome() and not limit.get().isUnlimited(): + waku_service_requests_limit.set( + limit.get().calculateLimitPerSecond(), labelValues = [service] + ) diff --git a/waku/common/rate_limit/setting.nim b/waku/common/rate_limit/setting.nim index 5f4ea0af5d..98dc4472ab 100644 --- a/waku/common/rate_limit/setting.nim +++ b/waku/common/rate_limit/setting.nim @@ -133,3 +133,8 @@ proc getSetting*( ): RateLimitSetting = let default = t.getOrDefault(GLOBAL, UnlimitedRateLimit) return t.getOrDefault(protocol, default) + +proc calculateLimitPerSecond*(setting: RateLimitSetting): float64 = + if setting.isUnlimited(): + return 0.float64 + return (setting.volume.float64 / setting.period.milliseconds.float64) * 1000.float64 diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 4c39a70a76..695093fe57 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -328,6 +328,7 @@ proc new*( ) wf.initProtocolHandler() + setServiceLimitMetric(WakuFilterSubscribeCodec, rateLimitSetting) return wf const MaintainSubscriptionsInterval* = 1.minutes diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index b646a1571f..b47f6e7ad9 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -110,4 +110,5 @@ proc new*( requestRateLimiter: newRequestRateLimiter(rateLimitSetting), ) wl.initProtocolHandler() + setServiceLimitMetric(WakuLightpushCodec, rateLimitSetting) return wl diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index b815fc7895..4c295fc7bc 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -264,5 +264,6 @@ proc new*( requestRateLimiter: newRequestRateLimiter(rateLimitSetting), ) wpx.initProtocolHandler() + setServiceLimitMetric(WakuPeerExchangeCodec, rateLimitSetting) asyncSpawn wpx.updatePxEnrCache() return wpx diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 4e94d4c489..c4e1cd36c6 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -154,5 +154,5 @@ proc new*( ) store.initProtocolHandler() - + setServiceLimitMetric(WakuStoreCodec, rateLimitSetting) return store diff --git a/waku/waku_store_legacy/protocol.nim b/waku/waku_store_legacy/protocol.nim index 11e3e9be9d..a7886165d9 100644 --- a/waku/waku_store_legacy/protocol.nim +++ b/waku/waku_store_legacy/protocol.nim @@ -153,4 +153,5 @@ proc new*( requestRateLimiter: newRequestRateLimiter(rateLimitSetting), ) ws.initProtocolHandler() + setServiceLimitMetric(WakuLegacyStoreCodec, rateLimitSetting) ws From 3a883392c1bc5efc76c5508e81cd5667cf787b3d Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 13 Sep 2024 17:22:52 +0200 Subject: [PATCH 7/9] Adjust timeing in filter rate limit test to let macos CI test run ok. --- tests/waku_filter_v2/test_waku_filter_dos_protection.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/waku_filter_v2/test_waku_filter_dos_protection.nim b/tests/waku_filter_v2/test_waku_filter_dos_protection.nim index c584d22472..c751114c14 100644 --- a/tests/waku_filter_v2/test_waku_filter_dos_protection.nim +++ b/tests/waku_filter_v2/test_waku_filter_dos_protection.nim @@ -146,7 +146,7 @@ suite "Waku Filter - DOS protection": some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS) # ensure period of time has passed and clients can again use the service - await sleepAsync(700.milliseconds) + await sleepAsync(1000.milliseconds) check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == none(FilterSubscribeErrorKind) check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == From e709a31dbf67347cefffb8550195c3c1368c80cc Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Tue, 17 Sep 2024 15:09:48 +0200 Subject: [PATCH 8/9] Address review findings, namings, error logs, removed left-overs --- waku/common/rate_limit/setting.nim | 12 ++++------- waku/node/waku_node.nim | 2 +- waku/waku_peer_exchange/protocol.nim | 31 +++++++++++++++++++--------- waku/waku_peer_exchange/rpc.nim | 2 +- 4 files changed, 27 insertions(+), 20 deletions(-) diff --git a/waku/common/rate_limit/setting.nim b/waku/common/rate_limit/setting.nim index 98dc4472ab..7bf533ad59 100644 --- a/waku/common/rate_limit/setting.nim +++ b/waku/common/rate_limit/setting.nim @@ -14,7 +14,6 @@ type RateLimitedProtocol* = enum FILTER type ProtocolRateLimitSettings* = Table[RateLimitedProtocol, RateLimitSetting] -type ProtocolRateLimit = tuple[protocol: RateLimitedProtocol, setting: RateLimitSetting] # Set the default to switch off rate limiting for now let DefaultGlobalNonRelayRateLimit*: RateLimitSetting = (0, 0.minutes) @@ -41,7 +40,7 @@ func `$`*(t: RateLimitSetting): string {.inline.} = else: $t.volume & "/" & $t.period -proc translate(sProtocol: string): RateLimitedProtocol = +proc translate(sProtocol: string): RateLimitedProtocol {.raises: [ValueError].} = if sProtocol.len == 0: return GLOBAL @@ -58,10 +57,12 @@ proc translate(sProtocol: string): RateLimitedProtocol = return PEEREXCHG of "filter": return FILTER + else: + raise newException(ValueError, "Unknown protocol definition: " & sProtocol) proc fillSettingTable( t: var ProtocolRateLimitSettings, sProtocol: var string, setting: RateLimitSetting -) = +) {.raises: [ValueError].} = let protocol = translate(sProtocol) if sProtocol == "store": @@ -123,11 +124,6 @@ proc parse*( return ok(settingsTable) -proc parse*( - T: type ProtocolRateLimitSettings, settings: string -): Result[ProtocolRateLimitSettings, string] = - return ok(settingsTable) - proc getSetting*( t: ProtocolRateLimitSettings, protocol: RateLimitedProtocol ): RateLimitSetting = diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index c54f97dc41..156090c7d6 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1389,4 +1389,4 @@ proc setRateLimits*(node: WakuNode, limits: seq[string]): Result[void, string] = if rateLimitConfig.isErr(): return err("invalid rate limit settings:" & rateLimitConfig.error) node.rateLimitSettings = rateLimitConfig.get() - ok() + return ok() diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index 4c295fc7bc..80a95323d8 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -223,17 +223,26 @@ proc initProtocolHandler(wpx: WakuPeerExchange) = buffer = await conn.readLp(DefaultMaxRpcSize.int) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) - discard await wpx.respondError( - PeerExchangeResponseStatusCode.BAD_REQUEST, some(exc.msg), conn - ) + + ( + await wpx.respondError( + PeerExchangeResponseStatusCode.BAD_REQUEST, some(exc.msg), conn + ) + ).isOkOr: + error "Failed to respond with BAD_REQUEST:", error = $error return let decBuf = PeerExchangeRpc.decode(buffer) if decBuf.isErr() or decBuf.get().request.isNone(): waku_px_errors.inc(labelValues = [decodeRpcFailure]) - discard await wpx.respondError( - PeerExchangeResponseStatusCode.BAD_REQUEST, some($decBuf.error), conn - ) + error "Failed to decode PeerExchange request", error = $decBuf.error + + ( + await wpx.respondError( + PeerExchangeResponseStatusCode.BAD_REQUEST, some($decBuf.error), conn + ) + ).isOkOr: + error "Failed to respond with BAD_REQUEST:", error = $error return let request = decBuf.get().request.get() @@ -242,10 +251,12 @@ proc initProtocolHandler(wpx: WakuPeerExchange) = (await wpx.respond(enrs, conn)).isErrOr: waku_px_peers_sent.inc(enrs.len().int64()) do: - discard await wpx.respondError( - PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS, none(string), conn - ) - + ( + await wpx.respondError( + PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS, none(string), conn + ) + ).isOkOr: + error "Failed to respond with TOO_MANY_REQUESTS:", error = $error # close, no data is expected await conn.closeWithEof() diff --git a/waku/waku_peer_exchange/rpc.nim b/waku/waku_peer_exchange/rpc.nim index c4c36212f2..dc3d22d172 100644 --- a/waku/waku_peer_exchange/rpc.nim +++ b/waku/waku_peer_exchange/rpc.nim @@ -29,7 +29,7 @@ type responseStatus*: Option[PeerExchangeResponseStatus] proc makeRequest*(T: type PeerExchangeRpc, numPeers: uint64): T = - return T(request: some(PeerExchangeRequest(numPeers: numpeers))) + return T(request: some(PeerExchangeRequest(numPeers: numPeers))) proc makeResponse*(T: type PeerExchangeRpc, peerInfos: seq[PeerExchangePeerInfo]): T = return T( From 90c4a08f74ecd2a7e2e83783dbbd88864da1ef65 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Wed, 18 Sep 2024 02:46:17 +0200 Subject: [PATCH 9/9] Changes to reflect latest spec agreement and changes. PeerExchange RPC is changed the now respond structure will contain status_code and status_desc. --- tests/node/test_wakunode_peer_exchange.nim | 8 +- tests/waku_peer_exchange/test_protocol.nim | 17 ++--- tests/waku_peer_exchange/test_rpc_codec.nim | 20 ++--- waku/common/rate_limit/setting.nim | 3 +- waku/node/waku_node.nim | 6 +- waku/waku_peer_exchange/protocol.nim | 67 ++++++++-------- waku/waku_peer_exchange/rpc.nim | 50 ++++++------ waku/waku_peer_exchange/rpc_codec.nim | 84 +++++++-------------- 8 files changed, 111 insertions(+), 144 deletions(-) diff --git a/tests/node/test_wakunode_peer_exchange.nim b/tests/node/test_wakunode_peer_exchange.nim index 1abcdfd23c..49f61d2959 100644 --- a/tests/node/test_wakunode_peer_exchange.nim +++ b/tests/node/test_wakunode_peer_exchange.nim @@ -84,8 +84,8 @@ suite "Waku Peer Exchange": # Then no peers are fetched check: node.peerManager.peerStore.peers.len == 0 - res.error.status == SERVICE_UNAVAILABLE - res.error.desc == some("PeerExchange is not mounted") + res.error.status_code == SERVICE_UNAVAILABLE + res.error.status_desc == some("PeerExchange is not mounted") asyncTest "Node fetches with mounted peer exchange, but no peers": # Given a node with peer exchange mounted @@ -94,8 +94,8 @@ suite "Waku Peer Exchange": # When a node fetches peers let res = await node.fetchPeerExchangePeers(1) check: - res.error.status == SERVICE_UNAVAILABLE - res.error.desc == some("peer_not_found_failure") + res.error.status_code == SERVICE_UNAVAILABLE + res.error.status_desc == some("peer_not_found_failure") # Then no peers are fetched check node.peerManager.peerStore.peers.len == 0 diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index b7c4e636ff..6c044586a4 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -222,7 +222,7 @@ suite "Waku Peer Exchange": # Check that it failed gracefully check: response.isErr - response.error.status == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE + response.error.status_code == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE asyncTest "Request 0 peers, with 0 peers in PeerExchange": # Given a disconnected PeerExchange @@ -237,7 +237,7 @@ suite "Waku Peer Exchange": # Then the response should be an error check: response.isErr - response.error.status == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE + response.error.status_code == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE asyncTest "Pool filtering": let @@ -331,7 +331,7 @@ suite "Waku Peer Exchange": # Then the response should be an error check: response.isErr - response.error.status == PeerExchangeResponseStatusCode.DIAL_FAILURE + response.error.status_code == PeerExchangeResponseStatusCode.DIAL_FAILURE asyncTest "Connections are closed after response is sent": # Create 3 nodes @@ -397,12 +397,9 @@ suite "Waku Peer Exchange": # Check we got back the enr we mocked check: - decodedBuff.get().responseStatus.isSome() - decodedBuff.get().responseStatus.get().status == - PeerExchangeResponseStatusCode.SUCCESS - decodedBuff.get().response.isSome() - decodedBuff.get().response.get().peerInfos.len == 1 - decodedBuff.get().response.get().peerInfos[0].enr == enr1.raw + decodedBuff.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS + decodedBuff.get().response.peerInfos.len == 1 + decodedBuff.get().response.peerInfos[0].enr == enr1.raw asyncTest "RateLimit as expected": let @@ -452,7 +449,7 @@ suite "Waku Peer Exchange": await node2.wakuPeerExchange.request(1, node1.peerInfo.toRemotePeerInfo()) check: response2.isErr - response2.error().status == PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS + response2.error().status_code == PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS await sleepAsync(150.milliseconds) let response3 = await node2.wakuPeerExchange.request(1, connOpt.get()) diff --git a/tests/waku_peer_exchange/test_rpc_codec.nim b/tests/waku_peer_exchange/test_rpc_codec.nim index e7ddc60f66..84aec7ec42 100644 --- a/tests/waku_peer_exchange/test_rpc_codec.nim +++ b/tests/waku_peer_exchange/test_rpc_codec.nim @@ -28,10 +28,7 @@ suite "Peer Exchange RPC": check: resReq.isOk - resReq.get().response.isNone() - resReq.get().responseStatus.isNone() - resReq.get().request.isSome() - resReq.get().request.get().numPeers == 2 + resReq.get().request.numPeers == 2 var enr1 = enr.Record(seqNum: 0, raw: @[]) @@ -56,11 +53,8 @@ suite "Peer Exchange RPC": # Then the peerInfos match the originals check: res.isOk - res.get().request.isNone() - res.get().response.isSome() - res.get().responseStatus.isSome() - res.get().responseStatus.get().status == PeerExchangeResponseStatusCode.SUCCESS - res.get().response.get().peerInfos == peerInfos + res.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS + res.get().response.peerInfos == peerInfos # When using the decoded responses to create new enrs var @@ -68,12 +62,10 @@ suite "Peer Exchange RPC": resEnr2 = enr.Record(seqNum: 0, raw: @[]) check: - res.get().response.isSome() - res.get().responseStatus.isSome() - res.get().responseStatus.get().status == PeerExchangeResponseStatusCode.SUCCESS + res.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS - discard resEnr1.fromBytes(res.get().response.get().peerInfos[0].enr) - discard resEnr2.fromBytes(res.get().response.get().peerInfos[1].enr) + discard resEnr1.fromBytes(res.get().response.peerInfos[0].enr) + discard resEnr2.fromBytes(res.get().response.peerInfos[1].enr) # Then they match the original enrs check: diff --git a/waku/common/rate_limit/setting.nim b/waku/common/rate_limit/setting.nim index 7bf533ad59..70f0ee7212 100644 --- a/waku/common/rate_limit/setting.nim +++ b/waku/common/rate_limit/setting.nim @@ -63,13 +63,12 @@ proc translate(sProtocol: string): RateLimitedProtocol {.raises: [ValueError].} proc fillSettingTable( t: var ProtocolRateLimitSettings, sProtocol: var string, setting: RateLimitSetting ) {.raises: [ValueError].} = - let protocol = translate(sProtocol) - if sProtocol == "store": # generic store will only applies to version which is not listed directly discard t.hasKeyOrPut(STOREV2, setting) discard t.hasKeyOrPut(STOREV3, setting) else: + let protocol = translate(sProtocol) # always overrides, last one wins if same protocol duplicated t[protocol] = setting diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 156090c7d6..e137f3ed07 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1172,9 +1172,9 @@ proc fetchPeerExchangePeers*( if node.wakuPeerExchange.isNil(): error "could not get peers from px, waku peer-exchange is nil" return err( - PeerExchangeResponseStatus( - status: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, - desc: some("PeerExchange is not mounted"), + ( + status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + status_desc: some("PeerExchange is not mounted"), ) ) diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index 80a95323d8..0374e12772 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -62,33 +62,40 @@ proc request*( var buffer: seq[byte] var callResult = - PeerExchangeResponseStatus(status: PeerExchangeResponseStatusCode.SUCCESS) + (status_code: PeerExchangeResponseStatusCode.SUCCESS, status_desc: none(string)) try: await conn.writeLP(rpc.encode().buffer) buffer = await conn.readLp(DefaultMaxRpcSize.int) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) - callResult = PeerExchangeResponseStatus( - status: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, desc: some($exc.msg) + callResult = ( + status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + status_desc: some($exc.msg), ) finally: # close, no more data is expected await conn.closeWithEof() - if callResult.status != PeerExchangeResponseStatusCode.SUCCESS: + if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS: return err(callResult) let decodedBuff = PeerExchangeRpc.decode(buffer) if decodedBuff.isErr(): return err( - PeerExchangeResponseStatus( - status: PeerExchangeResponseStatusCode.BAD_RESPONSE, - desc: some($decodedBuff.error), + ( + status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE, + status_desc: some($decodedBuff.error), + ) + ) + if decodedBuff.get().response.status_code != PeerExchangeResponseStatusCode.SUCCESS: + return err( + ( + status_code: decodedBuff.get().response.status_code, + status_desc: decodedBuff.get().response.status_desc, ) ) - if decodedBuff.get().response.isNone() and decodedBuff.get().responseStatus.isSome(): - return err(decodedBuff.get().responseStatus.get()) - return ok(decodedBuff.get().response.get()) + + return ok(decodedBuff.get().response) proc request*( wpx: WakuPeerExchange, numPeers: uint64, peer: RemotePeerInfo @@ -97,16 +104,17 @@ proc request*( let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec) if connOpt.isNone(): return err( - PeerExchangeResponseStatus( - status: PeerExchangeResponseStatusCode.DIAL_FAILURE, desc: some(dialFailure) + ( + status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE, + status_desc: some(dialFailure), ) ) return await wpx.request(numPeers, connOpt.get()) except CatchableError: return err( - PeerExchangeResponseStatus( - status: PeerExchangeResponseStatusCode.BAD_RESPONSE, - desc: some("exception dialing peer: " & getCurrentExceptionMsg()), + ( + status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE, + status_desc: some("exception dialing peer: " & getCurrentExceptionMsg()), ) ) @@ -117,9 +125,9 @@ proc request*( if peerOpt.isNone(): waku_px_errors.inc(labelValues = [peerNotFoundFailure]) return err( - PeerExchangeResponseStatus( - status: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, - desc: some(peerNotFoundFailure), + ( + status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + status_desc: some(peerNotFoundFailure), ) ) return await wpx.request(numPeers, peerOpt.get()) @@ -134,9 +142,9 @@ proc respond( except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) return err( - PeerExchangeResponseStatus( - status: PeerExchangeResponseStatusCode.DIAL_FAILURE, - desc: some("exception dialing peer: " & exc.msg), + ( + status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE, + status_desc: some("exception dialing peer: " & exc.msg), ) ) @@ -144,20 +152,20 @@ proc respond( proc respondError( wpx: WakuPeerExchange, - status: PeerExchangeResponseStatusCode, - desc: Option[string], + status_code: PeerExchangeResponseStatusCode, + status_desc: Option[string], conn: Connection, ): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let rpc = PeerExchangeRpc.makeErrorResponse(status, desc) + let rpc = PeerExchangeRpc.makeErrorResponse(status_code, status_desc) try: await conn.writeLP(rpc.encode().buffer) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) return err( - PeerExchangeResponseStatus( - status: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, - desc: some("exception dialing peer: " & exc.msg), + ( + status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + status_desc: some("exception dialing peer: " & exc.msg), ) ) @@ -233,7 +241,7 @@ proc initProtocolHandler(wpx: WakuPeerExchange) = return let decBuf = PeerExchangeRpc.decode(buffer) - if decBuf.isErr() or decBuf.get().request.isNone(): + if decBuf.isErr(): waku_px_errors.inc(labelValues = [decodeRpcFailure]) error "Failed to decode PeerExchange request", error = $decBuf.error @@ -245,9 +253,8 @@ proc initProtocolHandler(wpx: WakuPeerExchange) = error "Failed to respond with BAD_REQUEST:", error = $error return - let request = decBuf.get().request.get() trace "peer exchange request received" - let enrs = wpx.getEnrsFromCache(request.numPeers) + let enrs = wpx.getEnrsFromCache(decBuf.get().request.numPeers) (await wpx.respond(enrs, conn)).isErrOr: waku_px_peers_sent.inc(enrs.len().int64()) do: diff --git a/waku/waku_peer_exchange/rpc.nim b/waku/waku_peer_exchange/rpc.nim index dc3d22d172..4a91e8db1e 100644 --- a/waku/waku_peer_exchange/rpc.nim +++ b/waku/waku_peer_exchange/rpc.nim @@ -1,15 +1,6 @@ import std/options type - PeerExchangePeerInfo* = object - enr*: seq[byte] # RLP encoded ENR: https://eips.ethereum.org/EIPS/eip-778 - - PeerExchangeRequest* = object - numPeers*: uint64 - - PeerExchangeResponse* = object - peerInfos*: seq[PeerExchangePeerInfo] - PeerExchangeResponseStatusCode* {.pure.} = enum UNKNOWN = uint32(000) SUCCESS = uint32(200) @@ -19,31 +10,42 @@ type SERVICE_UNAVAILABLE = uint32(503) DIAL_FAILURE = uint32(599) - PeerExchangeResponseStatus* = object - status*: PeerExchangeResponseStatusCode - desc*: Option[string] + PeerExchangePeerInfo* = object + enr*: seq[byte] # RLP encoded ENR: https://eips.ethereum.org/EIPS/eip-778 + + PeerExchangeRequest* = object + numPeers*: uint64 + + PeerExchangeResponse* = object + peerInfos*: seq[PeerExchangePeerInfo] + status_code*: PeerExchangeResponseStatusCode + status_desc*: Option[string] + + PeerExchangeResponseStatus* = + tuple[status_code: PeerExchangeResponseStatusCode, status_desc: Option[string]] PeerExchangeRpc* = object - request*: Option[PeerExchangeRequest] - response*: Option[PeerExchangeResponse] - responseStatus*: Option[PeerExchangeResponseStatus] + request*: PeerExchangeRequest + response*: PeerExchangeResponse proc makeRequest*(T: type PeerExchangeRpc, numPeers: uint64): T = - return T(request: some(PeerExchangeRequest(numPeers: numPeers))) + return T(request: PeerExchangeRequest(numPeers: numPeers)) proc makeResponse*(T: type PeerExchangeRpc, peerInfos: seq[PeerExchangePeerInfo]): T = return T( - response: some(PeerExchangeResponse(peerInfos: peerInfos)), - responseStatus: - some(PeerExchangeResponseStatus(status: PeerExchangeResponseStatusCode.SUCCESS)), + response: PeerExchangeResponse( + peerInfos: peerInfos, status_code: PeerExchangeResponseStatusCode.SUCCESS + ) ) proc makeErrorResponse*( T: type PeerExchangeRpc, - status: PeerExchangeResponseStatusCode, - desc: Option[string] = none(string), + status_code: PeerExchangeResponseStatusCode, + status_desc: Option[string] = none(string), ): T = - return T(responseStatus: some(PeerExchangeResponseStatus(status: status, desc: desc))) + return T( + response: PeerExchangeResponse(status_code: status_code, status_desc: status_desc) + ) proc `$`*(statusCode: PeerExchangeResponseStatusCode): string = case statusCode @@ -55,5 +57,5 @@ proc `$`*(statusCode: PeerExchangeResponseStatusCode): string = of PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE: "SERVICE_UNAVAILABLE" of PeerExchangeResponseStatusCode.DIAL_FAILURE: "DIAL_FAILURE" -proc `$`*(pxResponseStatus: PeerExchangeResponseStatus): string = - return $pxResponseStatus.status & " - " & pxResponseStatus.desc.get("") +# proc `$`*(pxResponseStatus: PeerExchangeResponseStatus): string = +# return $pxResponseStatus.status & " - " & pxResponseStatus.desc.get("") diff --git a/waku/waku_peer_exchange/rpc_codec.nim b/waku/waku_peer_exchange/rpc_codec.nim index e7a680e8c5..e5e982938f 100644 --- a/waku/waku_peer_exchange/rpc_codec.nim +++ b/waku/waku_peer_exchange/rpc_codec.nim @@ -39,17 +39,26 @@ proc decode*(T: type PeerExchangePeerInfo, buffer: seq[byte]): ProtoResult[T] = ok(rpc) +proc parse*(T: type PeerExchangeResponseStatusCode, status: uint32): T = + case status + of 200, 400, 429, 503: + PeerExchangeResponseStatusCode(status) + else: + PeerExchangeResponseStatusCode.UNKNOWN + proc encode*(rpc: PeerExchangeResponse): ProtoBuffer = var pb = initProtoBuffer() for pi in rpc.peerInfos: pb.write3(1, pi.encode()) + pb.write3(10, rpc.status_code.uint32) + pb.write3(11, rpc.status_desc) pb.finish3() pb -proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtobufResult[T] = let pb = initProtoBuffer(buffer) var rpc = PeerExchangeResponse(peerInfos: @[]) @@ -58,52 +67,25 @@ proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] = for pib in peerInfoBuffers: rpc.peerInfos.add(?PeerExchangePeerInfo.decode(pib)) - ok(rpc) - -proc parse*(T: type PeerExchangeResponseStatusCode, status: uint32): T = - case status - of 200, 400, 429, 503: - PeerExchangeResponseStatusCode(status) - else: - PeerExchangeResponseStatusCode.UNKNOWN - -proc encode*(rpc: PeerExchangeResponseStatus): ProtoBuffer = - var pb = initProtoBuffer() - - pb.write3(1, rpc.status.uint32) - pb.write3(2, rpc.desc) - - pb.finish3() - - pb - -proc decode*(T: type PeerExchangeResponseStatus, buffer: seq[byte]): ProtobufResult[T] = - var pb = initProtoBuffer(buffer) - var rpc = PeerExchangeResponseStatus(status: PeerExchangeResponseStatusCode.UNKNOWN) - - var status: uint32 - if ?pb.getField(1, status): - rpc.status = PeerExchangeResponseStatusCode.parse(status) + var status_code: uint32 + if ?pb.getField(10, status_code): + rpc.status_code = PeerExchangeResponseStatusCode.parse(status_code) else: - return err(ProtobufError.missingRequiredField("status")) + return err(ProtobufError.missingRequiredField("status_code")) - var desc: string - if ?pb.getField(2, desc): - rpc.desc = some(desc) + var status_desc: string + if ?pb.getField(11, status_desc): + rpc.status_desc = some(status_desc) else: - rpc.desc = none(string) + rpc.status_desc = none(string) ok(rpc) proc encode*(rpc: PeerExchangeRpc): ProtoBuffer = var pb = initProtoBuffer() - if rpc.request.isSome(): - pb.write3(1, rpc.request.get().encode()) - if rpc.response.isSome(): - pb.write3(2, rpc.response.get().encode()) - if rpc.responseStatus.isSome(): - pb.write3(10, rpc.responseStatus.get().encode()) + pb.write3(1, rpc.request.encode()) + pb.write3(2, rpc.response.encode()) pb.finish3() @@ -114,27 +96,15 @@ proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtobufResult[T] = var rpc = PeerExchangeRpc() var requestBuffer: seq[byte] - let isRequest = ?pb.getField(1, requestBuffer) + if not ?pb.getField(1, requestBuffer): + return err(ProtobufError.missingRequiredField("request")) - var responseBuffer: seq[byte] - let isResponse = ?pb.getField(2, responseBuffer) + rpc.request = ?PeerExchangeRequest.decode(requestBuffer) - if isRequest and isResponse: - return err(ProtobufError.missingRequiredField("request and response are exclusive")) - - if isRequest: - rpc.request = some(?PeerExchangeRequest.decode(requestBuffer)) - - if isResponse: - rpc.response = some(?PeerExchangeResponse.decode(responseBuffer)) + var responseBuffer: seq[byte] + if not ?pb.getField(2, responseBuffer): + return err(ProtobufError.missingRequiredField("response")) - var status: seq[byte] - if ?pb.getField(10, status): - rpc.responseStatus = some(?PeerExchangeResponseStatus.decode(status)) - if rpc.responseStatus.get().status == PeerExchangeResponseStatusCode.SUCCESS and - not isResponse: - return err(ProtobufError.missingRequiredField("response")) - elif not isRequest: - return err(ProtobufError.missingRequiredField("responseStatus")) + rpc.response = ?PeerExchangeResponse.decode(responseBuffer) ok(rpc)