diff --git a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim index 3cbf1de7c5..e23b9be26a 100644 --- a/library/waku_thread/inter_thread_communication/requests/discovery_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/discovery_request.nim @@ -104,7 +104,8 @@ proc updateDiscv5BootstrapNodes(nodes: string, waku: ptr Waku): Result[void, str proc performPeerExchangeRequestTo( numPeers: uint64, waku: ptr Waku ): Future[Result[int, string]] {.async.} = - return await waku.node.fetchPeerExchangePeers(numPeers) + return (await waku.node.fetchPeerExchangePeers(numPeers)).isOkOr: + return err($error) proc process*( self: ptr DiscoveryRequest, waku: ptr Waku diff --git a/tests/common/test_all.nim b/tests/common/test_all.nim index 1afef23a90..7756f23adf 100644 --- a/tests/common/test_all.nim +++ b/tests/common/test_all.nim @@ -8,4 +8,5 @@ import ./test_parse_size, ./test_tokenbucket, ./test_requestratelimiter, + ./test_ratelimit_setting, ./test_timed_map diff --git a/tests/common/test_ratelimit_setting.nim b/tests/common/test_ratelimit_setting.nim new file mode 100644 index 0000000000..6f6ac8d38e --- /dev/null +++ b/tests/common/test_ratelimit_setting.nim @@ -0,0 +1,165 @@ +# Chronos Test Suite +# (c) Copyright 2022-Present +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +{.used.} + +import testutils/unittests +import chronos, libp2p/stream/connection +import std/[sequtils, options, tables] + +import ../../waku/common/rate_limit/request_limiter +import ../../waku/common/rate_limit/timed_map + +let proto = "ProtocolDescriptor" + +let conn1 = Connection(peerId: PeerId.random().tryGet()) +let conn2 = Connection(peerId: PeerId.random().tryGet()) +let conn3 = Connection(peerId: PeerId.random().tryGet()) + +suite "RateLimitSetting": + test "Parse rate limit setting - ok": + let test1 = "10/2m" + let test2 = " store : 10 /1h" + let test2a = "storev2 : 10 /1h" + let test2b = "storeV3: 12 /1s" + let test3 = "LIGHTPUSH: 10/ 1m" + let test4 = "px:10/2 s " + let test5 = "filter:42/66ms" + + let expU = UnlimitedRateLimit + let exp1: RateLimitSetting = (10, 2.minutes) + let exp2: RateLimitSetting = (10, 1.hours) + let exp2a: RateLimitSetting = (10, 1.hours) + let exp2b: RateLimitSetting = (12, 1.seconds) + let exp3: RateLimitSetting = (10, 1.minutes) + let exp4: RateLimitSetting = (10, 2.seconds) + let exp5: RateLimitSetting = (42, 66.milliseconds) + + let res1 = ProtocolRateLimitSettings.parse(@[test1]) + let res2 = ProtocolRateLimitSettings.parse(@[test2]) + let res2a = ProtocolRateLimitSettings.parse(@[test2a]) + let res2b = ProtocolRateLimitSettings.parse(@[test2b]) + let res3 = ProtocolRateLimitSettings.parse(@[test3]) + let res4 = ProtocolRateLimitSettings.parse(@[test4]) + let res5 = ProtocolRateLimitSettings.parse(@[test5]) + + check: + res1.isOk() + res1.get() == {GLOBAL: exp1, FILTER: FilterDefaultPerPeerRateLimit}.toTable() + res2.isOk() + res2.get() == + { + GLOBAL: expU, + FILTER: FilterDefaultPerPeerRateLimit, + STOREV2: exp2, + STOREV3: exp2, + }.toTable() + res2a.isOk() + res2a.get() == + {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, STOREV2: exp2a}.toTable() + res2b.isOk() + res2b.get() == + {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, STOREV3: exp2b}.toTable() + res3.isOk() + res3.get() == + {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, LIGHTPUSH: exp3}.toTable() + res4.isOk() + res4.get() == + {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit, PEEREXCHG: exp4}.toTable() + res5.isOk() + res5.get() == {GLOBAL: expU, FILTER: exp5}.toTable() + + test "Parse rate limit setting - err": + let test1 = "10/2d" + let test2 = " stre : 10 /1h" + let test2a = "storev2 10 /1h" + let test2b = "storev3: 12 1s" + let test3 = "somethingelse: 10/ 1m" + let test4 = ":px:10/2 s " + let test5 = "filter:p42/66ms" + + let res1 = ProtocolRateLimitSettings.parse(@[test1]) + let res2 = ProtocolRateLimitSettings.parse(@[test2]) + let res2a = ProtocolRateLimitSettings.parse(@[test2a]) + let res2b = ProtocolRateLimitSettings.parse(@[test2b]) + let res3 = ProtocolRateLimitSettings.parse(@[test3]) + let res4 = ProtocolRateLimitSettings.parse(@[test4]) + let res5 = ProtocolRateLimitSettings.parse(@[test5]) + + check: + res1.isErr() + res2.isErr() + res2a.isErr() + res2b.isErr() + res3.isErr() + res4.isErr() + res5.isErr() + + test "Parse rate limit setting - complex": + let expU = UnlimitedRateLimit + + let test1 = @["lightpush:2/2ms", "10/2m", " store: 3/3s", " storev2:12/12s"] + let exp1 = { + GLOBAL: (10, 2.minutes), + FILTER: FilterDefaultPerPeerRateLimit, + LIGHTPUSH: (2, 2.milliseconds), + STOREV3: (3, 3.seconds), + STOREV2: (12, 12.seconds), + }.toTable() + + let res1 = ProtocolRateLimitSettings.parse(test1) + + check: + res1.isOk() + res1.get() == exp1 + res1.get().getSetting(PEEREXCHG) == (10, 2.minutes) + res1.get().getSetting(STOREV2) == (12, 12.seconds) + res1.get().getSetting(STOREV3) == (3, 3.seconds) + res1.get().getSetting(LIGHTPUSH) == (2, 2.milliseconds) + + let test2 = @["lightpush:2/2ms", " store: 3/3s", "px:10/10h", "filter:4/42ms"] + let exp2 = { + GLOBAL: expU, + LIGHTPUSH: (2, 2.milliseconds), + STOREV3: (3, 3.seconds), + STOREV2: (3, 3.seconds), + FILTER: (4, 42.milliseconds), + PEEREXCHG: (10, 10.hours), + }.toTable() + + let res2 = ProtocolRateLimitSettings.parse(test2) + + check: + res2.isOk() + res2.get() == exp2 + + let test3 = + @["storev2:1/1s", "store:3/3s", "storev3:4/42ms", "storev3:5/5s", "storev3:6/6s"] + let exp3 = { + GLOBAL: expU, + FILTER: FilterDefaultPerPeerRateLimit, + STOREV3: (6, 6.seconds), + STOREV2: (1, 1.seconds), + }.toTable() + + let res3 = ProtocolRateLimitSettings.parse(test3) + + check: + res3.isOk() + res3.get() == exp3 + res3.get().getSetting(LIGHTPUSH) == expU + + let test4 = newSeq[string](0) + let exp4 = {GLOBAL: expU, FILTER: FilterDefaultPerPeerRateLimit}.toTable() + + let res4 = ProtocolRateLimitSettings.parse(test4) + + check: + res4.isOk() + res4.get() == exp4 + res3.get().getSetting(LIGHTPUSH) == expU diff --git a/tests/common/test_requestratelimiter.nim b/tests/common/test_requestratelimiter.nim index 256e48118c..0b494c1bec 100644 --- a/tests/common/test_requestratelimiter.nim +++ b/tests/common/test_requestratelimiter.nim @@ -82,3 +82,17 @@ suite "RequestRateLimiter": # requests of other peers can also go check limiter.checkUsage(proto, conn2, now + 4100.milliseconds) == true check limiter.checkUsage(proto, conn3, now + 5.minutes) == true + + test "RequestRateLimiter lowest possible volume": + # keep limits low for easier calculation of ratios + let rateLimit: RateLimitSetting = (1, 1.seconds) + var limiter = newRequestRateLimiter(some(rateLimit)) + + let now = Moment.now() + # with first use we register the peer also and start its timer + check limiter.checkUsage(proto, conn1, now + 500.milliseconds) == true + + # run out of main tokens but still used one more token from the peer's bucket + check limiter.checkUsage(proto, conn1, now + 800.milliseconds) == false + check limiter.checkUsage(proto, conn1, now + 1499.milliseconds) == false + check limiter.checkUsage(proto, conn1, now + 1501.milliseconds) == true diff --git a/tests/node/test_wakunode_peer_exchange.nim b/tests/node/test_wakunode_peer_exchange.nim index c2a235045e..49f61d2959 100644 --- a/tests/node/test_wakunode_peer_exchange.nim +++ b/tests/node/test_wakunode_peer_exchange.nim @@ -84,7 +84,8 @@ suite "Waku Peer Exchange": # Then no peers are fetched check: node.peerManager.peerStore.peers.len == 0 - res.error == "PeerExchange is not mounted" + res.error.status_code == SERVICE_UNAVAILABLE + res.error.status_desc == some("PeerExchange is not mounted") asyncTest "Node fetches with mounted peer exchange, but no peers": # Given a node with peer exchange mounted @@ -92,7 +93,9 @@ suite "Waku Peer Exchange": # When a node fetches peers let res = await node.fetchPeerExchangePeers(1) - check res.error == "Peer exchange failure: peer_not_found_failure" + check: + res.error.status_code == SERVICE_UNAVAILABLE + res.error.status_desc == some("peer_not_found_failure") # Then no peers are fetched check node.peerManager.peerStore.peers.len == 0 diff --git a/tests/waku_filter_v2/test_waku_filter_dos_protection.nim b/tests/waku_filter_v2/test_waku_filter_dos_protection.nim index c584d22472..c751114c14 100644 --- a/tests/waku_filter_v2/test_waku_filter_dos_protection.nim +++ b/tests/waku_filter_v2/test_waku_filter_dos_protection.nim @@ -146,7 +146,7 @@ suite "Waku Filter - DOS protection": some(FilterSubscribeErrorKind.TOO_MANY_REQUESTS) # ensure period of time has passed and clients can again use the service - await sleepAsync(700.milliseconds) + await sleepAsync(1000.milliseconds) check client1.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == none(FilterSubscribeErrorKind) check client2.subscribe(serverRemotePeerInfo, pubsubTopic, contentTopicSeq) == diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index e80386d5e5..6c044586a4 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -1,11 +1,10 @@ {.used.} import - std/[options, sequtils, tables], + std/[options, sequtils, tables, net], testutils/unittests, chronos, chronicles, - stew/shims/net, libp2p/[switch, peerId, crypto/crypto, multistream, muxers/muxer], eth/[keys, p2p/discoveryv5/enr] @@ -223,6 +222,7 @@ suite "Waku Peer Exchange": # Check that it failed gracefully check: response.isErr + response.error.status_code == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE asyncTest "Request 0 peers, with 0 peers in PeerExchange": # Given a disconnected PeerExchange @@ -237,7 +237,7 @@ suite "Waku Peer Exchange": # Then the response should be an error check: response.isErr - response.error == "peer_not_found_failure" + response.error.status_code == PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE asyncTest "Pool filtering": let @@ -331,7 +331,7 @@ suite "Waku Peer Exchange": # Then the response should be an error check: response.isErr - response.error == "dial_failure" + response.error.status_code == PeerExchangeResponseStatusCode.DIAL_FAILURE asyncTest "Connections are closed after response is sent": # Create 3 nodes @@ -385,7 +385,7 @@ suite "Waku Peer Exchange": let conn = connOpt.get() # Send bytes so that they directly hit the handler - let rpc = PeerExchangeRpc(request: PeerExchangeRequest(numPeers: 1)) + let rpc = PeerExchangeRpc.makeRequest(1) var buffer: seq[byte] await conn.writeLP(rpc.encode().buffer) @@ -397,5 +397,62 @@ suite "Waku Peer Exchange": # Check we got back the enr we mocked check: + decodedBuff.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS decodedBuff.get().response.peerInfos.len == 1 decodedBuff.get().response.peerInfos[0].enr == enr1.raw + + asyncTest "RateLimit as expected": + let + node1 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + node2 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + + # Start and mount peer exchange + await allFutures([node1.start(), node2.start()]) + await allFutures( + [ + node1.mountPeerExchange(rateLimit = (1, 150.milliseconds)), + node2.mountPeerExchange(), + ] + ) + + # Create connection + let connOpt = await node2.peerManager.dialPeer( + node1.switch.peerInfo.toRemotePeerInfo(), WakuPeerExchangeCodec + ) + require: + connOpt.isSome + + # Create some enr and add to peer exchange (simulating disv5) + var enr1, enr2 = enr.Record() + check enr1.fromUri( + "enr:-Iu4QGNuTvNRulF3A4Kb9YHiIXLr0z_CpvWkWjWKU-o95zUPR_In02AWek4nsSk7G_-YDcaT4bDRPzt5JIWvFqkXSNcBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQKp9VzU2FAh7fwOwSpg1M_Ekz4zzl0Fpbg6po2ZwgVwQYN0Y3CC6mCFd2FrdTIB" + ) + check enr2.fromUri( + "enr:-Iu4QGJllOWlviPIh_SGR-VVm55nhnBIU5L-s3ran7ARz_4oDdtJPtUs3Bc5aqZHCiPQX6qzNYF2ARHER0JPX97TFbEBgmlkgnY0gmlwhE0WsGeJc2VjcDI1NmsxoQP3ULycvday4EkvtVu0VqbBdmOkbfVLJx8fPe0lE_dRkIN0Y3CC6mCFd2FrdTIB" + ) + + # Mock that we have discovered these enrs + node1.wakuPeerExchange.enrCache.add(enr1) + node1.wakuPeerExchange.enrCache.add(enr2) + + await sleepAsync(150.milliseconds) + + # Request 2 peer from px. Test all request variants + let response1 = await node2.wakuPeerExchange.request(1) + check: + response1.isOk + response1.get().peerInfos.len == 1 + + let response2 = + await node2.wakuPeerExchange.request(1, node1.peerInfo.toRemotePeerInfo()) + check: + response2.isErr + response2.error().status_code == PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS + + await sleepAsync(150.milliseconds) + let response3 = await node2.wakuPeerExchange.request(1, connOpt.get()) + check: + response3.isOk + response3.get().peerInfos.len == 1 diff --git a/tests/waku_peer_exchange/test_rpc_codec.nim b/tests/waku_peer_exchange/test_rpc_codec.nim index 0393fb4894..84aec7ec42 100644 --- a/tests/waku_peer_exchange/test_rpc_codec.nim +++ b/tests/waku_peer_exchange/test_rpc_codec.nim @@ -1,10 +1,9 @@ {.used.} import - std/[options], + std/[options, net], testutils/unittests, chronos, - stew/shims/net, libp2p/switch, libp2p/peerId, libp2p/crypto/crypto, @@ -23,6 +22,14 @@ import suite "Peer Exchange RPC": asyncTest "Encode - Decode": # Setup + let rpcReq = PeerExchangeRpc.makeRequest(2) + let rpcReqBuffer: seq[byte] = rpcReq.encode().buffer + let resReq = PeerExchangeRpc.decode(rpcReqBuffer) + + check: + resReq.isOk + resReq.get().request.numPeers == 2 + var enr1 = enr.Record(seqNum: 0, raw: @[]) enr2 = enr.Record(seqNum: 0, raw: @[]) @@ -35,19 +42,18 @@ suite "Peer Exchange RPC": "enr:-Iu4QK_T7kzAmewG92u1pr7o6St3sBqXaiIaWIsFNW53_maJEaOtGLSN2FUbm6LmVxSfb1WfC7Eyk-nFYI7Gs3SlchwBgmlkgnY0gmlwhI5d6VKJc2VjcDI1NmsxoQLPYQDvrrFdCrhqw3JuFaGD71I8PtPfk6e7TJ3pg_vFQYN0Y3CC6mKDdWRwgiMq" ) - let - peerInfos = - @[PeerExchangePeerInfo(enr: enr1.raw), PeerExchangePeerInfo(enr: enr2.raw)] - rpc = PeerExchangeRpc(response: PeerExchangeResponse(peerInfos: peerInfos)) + let peerInfos = + @[PeerExchangePeerInfo(enr: enr1.raw), PeerExchangePeerInfo(enr: enr2.raw)] + let rpc = PeerExchangeRpc.makeResponse(peerInfos) # When encoding and decoding - let - rpcBuffer: seq[byte] = rpc.encode().buffer - res = PeerExchangeRpc.decode(rpcBuffer) + let rpcBuffer: seq[byte] = rpc.encode().buffer + let res = PeerExchangeRpc.decode(rpcBuffer) # Then the peerInfos match the originals check: res.isOk + res.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS res.get().response.peerInfos == peerInfos # When using the decoded responses to create new enrs @@ -55,6 +61,9 @@ suite "Peer Exchange RPC": resEnr1 = enr.Record(seqNum: 0, raw: @[]) resEnr2 = enr.Record(seqNum: 0, raw: @[]) + check: + res.get().response.status_code == PeerExchangeResponseStatusCode.SUCCESS + discard resEnr1.fromBytes(res.get().response.peerInfos[0].enr) discard resEnr2.fromBytes(res.get().response.peerInfos[1].enr) diff --git a/waku/common/rate_limit/service_metrics.nim b/waku/common/rate_limit/service_metrics.nim index fe0bbf05c8..339bf7f38b 100644 --- a/waku/common/rate_limit/service_metrics.nim +++ b/waku/common/rate_limit/service_metrics.nim @@ -1,9 +1,19 @@ {.push raises: [].} -import metrics +import std/options +import metrics, setting + +declarePublicGauge waku_service_requests_limit, + "Applied rate limit of non-relay service", ["service"] declarePublicCounter waku_service_requests, "number of non-relay service requests received", ["service", "state"] declarePublicCounter waku_service_network_bytes, "total incoming traffic of specific waku services", labels = ["service", "direction"] + +proc setServiceLimitMetric*(service: string, limit: Option[RateLimitSetting]) = + if limit.isSome() and not limit.get().isUnlimited(): + waku_service_requests_limit.set( + limit.get().calculateLimitPerSecond(), labelValues = [service] + ) diff --git a/waku/common/rate_limit/setting.nim b/waku/common/rate_limit/setting.nim index 420be9f717..70f0ee7212 100644 --- a/waku/common/rate_limit/setting.nim +++ b/waku/common/rate_limit/setting.nim @@ -1,12 +1,34 @@ {.push raises: [].} -import chronos/timer +import chronos/timer, std/[tables, strutils, options], regex, results # Setting for TokenBucket defined as volume over period of time type RateLimitSetting* = tuple[volume: int, period: Duration] +type RateLimitedProtocol* = enum + GLOBAL + STOREV2 + STOREV3 + LIGHTPUSH + PEEREXCHG + FILTER + +type ProtocolRateLimitSettings* = Table[RateLimitedProtocol, RateLimitSetting] + # Set the default to switch off rate limiting for now let DefaultGlobalNonRelayRateLimit*: RateLimitSetting = (0, 0.minutes) +let UnlimitedRateLimit*: RateLimitSetting = (0, 0.seconds) + +# Acceptable call frequence from one peer using filter service +# Assumption is having to set up a subscription with max 30 calls than using ping in every min +# While subscribe/unsubscribe events are distributed in time among clients, pings will happen regularly from +# all subscribed peers +let FilterDefaultPerPeerRateLimit*: RateLimitSetting = (30, 1.minutes) + +# For being used under GC-safe condition must use threadvar +var DefaultProtocolRateLimit* {.threadvar.}: ProtocolRateLimitSettings +DefaultProtocolRateLimit = + {GLOBAL: UnlimitedRateLimit, FILTER: FilterDefaultPerPeerRateLimit}.toTable() proc isUnlimited*(t: RateLimitSetting): bool {.inline.} = return t.volume <= 0 or t.period <= 0.seconds @@ -17,3 +39,97 @@ func `$`*(t: RateLimitSetting): string {.inline.} = "no-limit" else: $t.volume & "/" & $t.period + +proc translate(sProtocol: string): RateLimitedProtocol {.raises: [ValueError].} = + if sProtocol.len == 0: + return GLOBAL + + case sProtocol + of "global": + return GLOBAL + of "storev2": + return STOREV2 + of "storev3": + return STOREV3 + of "lightpush": + return LIGHTPUSH + of "px": + return PEEREXCHG + of "filter": + return FILTER + else: + raise newException(ValueError, "Unknown protocol definition: " & sProtocol) + +proc fillSettingTable( + t: var ProtocolRateLimitSettings, sProtocol: var string, setting: RateLimitSetting +) {.raises: [ValueError].} = + if sProtocol == "store": + # generic store will only applies to version which is not listed directly + discard t.hasKeyOrPut(STOREV2, setting) + discard t.hasKeyOrPut(STOREV3, setting) + else: + let protocol = translate(sProtocol) + # always overrides, last one wins if same protocol duplicated + t[protocol] = setting + +proc parse*( + T: type ProtocolRateLimitSettings, settings: seq[string] +): Result[ProtocolRateLimitSettings, string] = + var settingsTable: ProtocolRateLimitSettings = + initTable[RateLimitedProtocol, RateLimitSetting]() + + ## Following regex can match the exact syntax of how rate limit can be set for different protocol or global. + ## It uses capture groups + ## group0: Will be check if protocol name is followed by a colon but only if protocol name is set. + ## group1: Protocol name, if empty we take it as "global" setting + ## group2: Volume of tokens - only integer + ## group3: Duration of period - only integer + ## group4: Unit of period - only h:hour, m:minute, s:second, ms:millisecond allowed + ## whitespaces are allowed lazily + const parseRegex = + """^\s*((store|storev2|storev3|lightpush|px|filter)\s*:)?\s*(\d+)\s*\/\s*(\d+)\s*(s|h|m|ms)\s*$""" + const regexParseSize = re2(parseRegex) + for settingStr in settings: + let aSetting = settingStr.toLower() + try: + var m: RegexMatch2 + if aSetting.match(regexParseSize, m) == false: + return err("Invalid rate-limit setting: " & settingStr) + + var sProtocol = aSetting[m.captures[1]] + let volume = aSetting[m.captures[2]].parseInt() + let duration = aSetting[m.captures[3]].parseInt() + let periodUnit = aSetting[m.captures[4]] + + var period = 0.seconds + case periodUnit + of "ms": + period = duration.milliseconds + of "s": + period = duration.seconds + of "m": + period = duration.minutes + of "h": + period = duration.hours + + fillSettingTable(settingsTable, sProtocol, (volume, period)) + except ValueError: + return err("Invalid rate-limit setting: " & settingStr) + + # If there were no global setting predefined, we set unlimited + # due it is taken for protocols not defined in the list - thus those will not apply accidentally wrong settings. + discard settingsTable.hasKeyOrPut(GLOBAL, UnlimitedRateLimit) + discard settingsTable.hasKeyOrPut(FILTER, FilterDefaultPerPeerRateLimit) + + return ok(settingsTable) + +proc getSetting*( + t: ProtocolRateLimitSettings, protocol: RateLimitedProtocol +): RateLimitSetting = + let default = t.getOrDefault(GLOBAL, UnlimitedRateLimit) + return t.getOrDefault(protocol, default) + +proc calculateLimitPerSecond*(setting: RateLimitSetting): float64 = + if setting.isUnlimited(): + return 0.float64 + return (setting.volume.float64 / setting.period.milliseconds.float64) * 1000.float64 diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index 1451a8a39b..7e203fe72b 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -8,7 +8,12 @@ import libp2p/builders, libp2p/nameresolving/nameresolver, libp2p/transports/wstransport -import ../waku_enr, ../discovery/waku_discv5, ../waku_node, ../node/peer_manager +import + ../waku_enr, + ../discovery/waku_discv5, + ../waku_node, + ../node/peer_manager, + ../common/rate_limit/setting type WakuNodeBuilder* = object # General @@ -34,6 +39,9 @@ type switchSslSecureCert: Option[string] switchSendSignedPeerRecord: Option[bool] + #Rate limit configs for non-relay req-resp protocols + rateLimitSettings: Option[seq[string]] + WakuNodeBuilderResult* = Result[void, string] ## Init @@ -105,6 +113,9 @@ proc withPeerManagerConfig*( proc withColocationLimit*(builder: var WakuNodeBuilder, colocationLimit: int) = builder.colocationLimit = colocationLimit +proc withRateLimit*(builder: var WakuNodeBuilder, limits: seq[string]) = + builder.rateLimitSettings = some(limits) + ## Waku switch proc withSwitchConfiguration*( @@ -184,4 +195,7 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = except Exception: return err("failed to build WakuNode instance: " & getCurrentExceptionMsg()) + if builder.rateLimitSettings.isSome(): + ?node.setRateLimits(builder.rateLimitSettings.get()) + ok(node) diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 532a657fd0..812cffb768 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -670,21 +670,18 @@ with the drawback of consuming some more bandwitdh.""", name: "websocket-secure-cert-path" .}: string - ## Rate limitation config - ## Currently default to switch of rate limit until become official - requestRateLimit* {. + ## Rate limitation config, if not set, rate limit checks will not be performed + rateLimits* {. desc: - "Number of requests to serve by each service in the specified period. Set it to 0 for unlimited", - defaultValue: 0, - name: "request-rate-limit" - .}: int - - ## Currently default to switch of rate limit until become official - requestRatePeriod* {. - desc: "Period of request rate limitation in seconds. Set it to 0 for unlimited", - defaultValue: 0, - name: "request-rate-period" - .}: int64 + "Rate limit settings for different protocols." & + "Format: protocol:volume/period" & + " Where 'protocol' can be one of: if not defined it means a global setting" & + " 'volume' and period must be an integer value. " & + " 'unit' must be one of - hours, minutes, seconds, milliseconds respectively. " & + "Argument may be repeated.", + defaultValue: newSeq[string](0), + name: "rate-limit" + .}: seq[string] ## Parsing @@ -850,6 +847,7 @@ proc load*(T: type WakuNodeConf, version = ""): ConfResult[T] = sources.addConfigFile(Toml, conf.configFile.get()) , ) + ok(conf) except CatchableError: err(getCurrentExceptionMsg()) diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index d6672f8a8a..dcd2179043 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -102,6 +102,7 @@ proc initNode( builder.withPeerManagerConfig( maxRelayPeers = conf.maxRelayPeers, shardAware = conf.relayShardedPeerManagement ) + builder.withRateLimit(conf.rateLimits) node = ?builder.build().mapErr( @@ -274,20 +275,17 @@ proc setupProtocols( if mountArcRes.isErr(): return err("failed to mount waku archive protocol: " & mountArcRes.error) - let rateLimitSetting: RateLimitSetting = - (conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod)) - if conf.legacyStore: # Store legacy setup try: - await mountLegacyStore(node, rateLimitSetting) + await mountLegacyStore(node, node.rateLimitSettings.getSetting(STOREV2)) except CatchableError: return err("failed to mount waku legacy store protocol: " & getCurrentExceptionMsg()) # Store setup try: - await mountStore(node, rateLimitSetting) + await mountStore(node, node.rateLimitSettings.getSetting(STOREV3)) except CatchableError: return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) @@ -326,9 +324,7 @@ proc setupProtocols( # NOTE Must be mounted after relay if conf.lightpush: try: - let rateLimitSetting: RateLimitSetting = - (conf.requestRateLimit, chronos.seconds(conf.requestRatePeriod)) - await mountLightPush(node, rateLimitSetting) + await mountLightPush(node, node.rateLimitSettings.getSetting(LIGHTPUSH)) except CatchableError: return err("failed to mount waku lightpush protocol: " & getCurrentExceptionMsg()) @@ -348,6 +344,7 @@ proc setupProtocols( subscriptionTimeout = chronos.seconds(conf.filterSubscriptionTimeout), maxFilterPeers = conf.filterMaxPeersToServe, maxFilterCriteriaPerPeer = conf.filterMaxCriteria, + rateLimitSetting = node.rateLimitSettings.getSetting(FILTER), ) except CatchableError: return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) @@ -368,7 +365,9 @@ proc setupProtocols( # waku peer exchange setup if conf.peerExchange: try: - await mountPeerExchange(node, some(conf.clusterId)) + await mountPeerExchange( + node, some(conf.clusterId), node.rateLimitSettings.getSetting(PEEREXCHG) + ) except CatchableError: return err("failed to mount waku peer-exchange protocol: " & getCurrentExceptionMsg()) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 2bcb065940..e137f3ed07 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -114,6 +114,7 @@ type started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] contentTopicHandlers: Table[ContentTopic, TopicHandler] + rateLimitSettings*: ProtocolRateLimitSettings proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService = ## AutonatService request other peers to dial us back @@ -164,6 +165,7 @@ proc new*( enr: enr, announcedAddresses: netConfig.announcedAddresses, topicSubscriptionQueue: queue, + rateLimitSettings: DefaultProtocolRateLimit, ) return node @@ -481,7 +483,7 @@ proc mountFilter*( maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer, messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL, - rateLimitSetting: RateLimitSetting = FilterPerPeerRateLimit, + rateLimitSetting: RateLimitSetting = FilterDefaultPerPeerRateLimit, ) {.async: (raises: []).} = ## Mounting filter v2 protocol @@ -1144,11 +1146,14 @@ proc mountRlnRelay*( ## Waku peer-exchange proc mountPeerExchange*( - node: WakuNode, cluster: Option[uint16] = none(uint16) + node: WakuNode, + cluster: Option[uint16] = none(uint16), + rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit, ) {.async: (raises: []).} = info "mounting waku peer exchange" - node.wakuPeerExchange = WakuPeerExchange.new(node.peerManager, cluster) + node.wakuPeerExchange = + WakuPeerExchange.new(node.peerManager, cluster, some(rateLimit)) if node.started: try: @@ -1163,10 +1168,15 @@ proc mountPeerExchange*( proc fetchPeerExchangePeers*( node: Wakunode, amount: uint64 -): Future[Result[int, string]] {.async: (raises: []).} = +): Future[Result[int, PeerExchangeResponseStatus]] {.async: (raises: []).} = if node.wakuPeerExchange.isNil(): error "could not get peers from px, waku peer-exchange is nil" - return err("PeerExchange is not mounted") + return err( + ( + status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + status_desc: some("PeerExchange is not mounted"), + ) + ) info "Retrieving peer info via peer exchange protocol" let pxPeersRes = await node.wakuPeerExchange.request(amount) @@ -1184,7 +1194,7 @@ proc fetchPeerExchangePeers*( else: warn "failed to retrieve peer info via peer exchange protocol", error = pxPeersRes.error - return err("Peer exchange failure: " & $pxPeersRes.error) + return err(pxPeersRes.error) # TODO: Move to application module (e.g., wakunode2.nim) proc setPeerExchangePeer*( @@ -1373,3 +1383,10 @@ proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} = return true return await node.wakuRlnRelay.isReady() ## TODO: add other protocol `isReady` checks + +proc setRateLimits*(node: WakuNode, limits: seq[string]): Result[void, string] = + let rateLimitConfig = ProtocolRateLimitSettings.parse(limits) + if rateLimitConfig.isErr(): + return err("invalid rate limit settings:" & rateLimitConfig.error) + node.rateLimitSettings = rateLimitConfig.get() + return ok() diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index 4c39a70a76..695093fe57 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -328,6 +328,7 @@ proc new*( ) wf.initProtocolHandler() + setServiceLimitMetric(WakuFilterSubscribeCodec, rateLimitSetting) return wf const MaintainSubscriptionsInterval* = 1.minutes diff --git a/waku/waku_filter_v2/subscriptions.nim b/waku/waku_filter_v2/subscriptions.nim index f8f2987081..7df21ea0f4 100644 --- a/waku/waku_filter_v2/subscriptions.nim +++ b/waku/waku_filter_v2/subscriptions.nim @@ -12,12 +12,6 @@ const DefaultSubscriptionTimeToLiveSec* = 5.minutes MessageCacheTTL* = 2.minutes - # Acceptable call frequence from one peer using filter service - # Assumption is having to set up a subscription with max 30 calls than using ping in every min - # While subscribe/unsubscribe events are distributed in time among clients, pings will happen regularly from - # all subscribed peers - FilterPerPeerRateLimit*: RateLimitSetting = (30, 1.minutes) - type # a single filter criterion is fully defined by a pubsub topic and content topic FilterCriterion* = tuple[pubsubTopic: PubsubTopic, contentTopic: ContentTopic] diff --git a/waku/waku_lightpush/protocol.nim b/waku/waku_lightpush/protocol.nim index b646a1571f..b47f6e7ad9 100644 --- a/waku/waku_lightpush/protocol.nim +++ b/waku/waku_lightpush/protocol.nim @@ -110,4 +110,5 @@ proc new*( requestRateLimiter: newRequestRateLimiter(rateLimitSetting), ) wl.initProtocolHandler() + setServiceLimitMetric(WakuLightpushCodec, rateLimitSetting) return wl diff --git a/waku/waku_peer_exchange.nim b/waku/waku_peer_exchange.nim index ca707f1621..994989df44 100644 --- a/waku/waku_peer_exchange.nim +++ b/waku/waku_peer_exchange.nim @@ -1,5 +1,5 @@ {.push raises: [].} -import ./waku_peer_exchange/protocol +import ./waku_peer_exchange/[protocol, rpc] -export protocol +export protocol, rpc diff --git a/waku/waku_peer_exchange/protocol.nim b/waku/waku_peer_exchange/protocol.nim index e64395dd41..0374e12772 100644 --- a/waku/waku_peer_exchange/protocol.nim +++ b/waku/waku_peer_exchange/protocol.nim @@ -13,7 +13,8 @@ import ../waku_core, ../discovery/waku_discv5, ./rpc, - ./rpc_codec + ./rpc_codec, + ../common/rate_limit/request_limiter declarePublicGauge waku_px_peers_received_total, "number of ENRs received via peer exchange" @@ -45,37 +46,55 @@ const pxFailure = "px_failure" type - WakuPeerExchangeResult*[T] = Result[T, string] + WakuPeerExchangeResult*[T] = Result[T, PeerExchangeResponseStatus] WakuPeerExchange* = ref object of LPProtocol peerManager*: PeerManager enrCache*: seq[enr.Record] cluster*: Option[uint16] # todo: next step: ring buffer; future: implement cache satisfying https://rfc.vac.dev/spec/34/ + requestRateLimiter*: RequestRateLimiter proc request*( wpx: WakuPeerExchange, numPeers: uint64, conn: Connection ): Future[WakuPeerExchangeResult[PeerExchangeResponse]] {.async: (raises: []).} = - let rpc = PeerExchangeRpc(request: PeerExchangeRequest(numPeers: numPeers)) + let rpc = PeerExchangeRpc.makeRequest(numPeers) var buffer: seq[byte] - var error: string + var callResult = + (status_code: PeerExchangeResponseStatusCode.SUCCESS, status_desc: none(string)) try: await conn.writeLP(rpc.encode().buffer) buffer = await conn.readLp(DefaultMaxRpcSize.int) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) - error = $exc.msg + callResult = ( + status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + status_desc: some($exc.msg), + ) finally: # close, no more data is expected await conn.closeWithEof() - if error.len > 0: - return err("write/read failed: " & error) + if callResult.status_code != PeerExchangeResponseStatusCode.SUCCESS: + return err(callResult) let decodedBuff = PeerExchangeRpc.decode(buffer) if decodedBuff.isErr(): - return err("decode failed: " & $decodedBuff.error) + return err( + ( + status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE, + status_desc: some($decodedBuff.error), + ) + ) + if decodedBuff.get().response.status_code != PeerExchangeResponseStatusCode.SUCCESS: + return err( + ( + status_code: decodedBuff.get().response.status_code, + status_desc: decodedBuff.get().response.status_desc, + ) + ) + return ok(decodedBuff.get().response) proc request*( @@ -84,10 +103,20 @@ proc request*( try: let connOpt = await wpx.peerManager.dialPeer(peer, WakuPeerExchangeCodec) if connOpt.isNone(): - return err(dialFailure) + return err( + ( + status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE, + status_desc: some(dialFailure), + ) + ) return await wpx.request(numPeers, connOpt.get()) except CatchableError: - return err("exception dialing peer: " & getCurrentExceptionMsg()) + return err( + ( + status_code: PeerExchangeResponseStatusCode.BAD_RESPONSE, + status_desc: some("exception dialing peer: " & getCurrentExceptionMsg()), + ) + ) proc request*( wpx: WakuPeerExchange, numPeers: uint64 @@ -95,22 +124,50 @@ proc request*( let peerOpt = wpx.peerManager.selectPeer(WakuPeerExchangeCodec) if peerOpt.isNone(): waku_px_errors.inc(labelValues = [peerNotFoundFailure]) - return err(peerNotFoundFailure) + return err( + ( + status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + status_desc: some(peerNotFoundFailure), + ) + ) return await wpx.request(numPeers, peerOpt.get()) proc respond( wpx: WakuPeerExchange, enrs: seq[enr.Record], conn: Connection ): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = - let rpc = PeerExchangeRpc( - response: - PeerExchangeResponse(peerInfos: enrs.mapIt(PeerExchangePeerInfo(enr: it.raw))) - ) + let rpc = PeerExchangeRpc.makeResponse(enrs.mapIt(PeerExchangePeerInfo(enr: it.raw))) try: await conn.writeLP(rpc.encode().buffer) except CatchableError as exc: waku_px_errors.inc(labelValues = [exc.msg]) - return err(exc.msg) + return err( + ( + status_code: PeerExchangeResponseStatusCode.DIAL_FAILURE, + status_desc: some("exception dialing peer: " & exc.msg), + ) + ) + + return ok() + +proc respondError( + wpx: WakuPeerExchange, + status_code: PeerExchangeResponseStatusCode, + status_desc: Option[string], + conn: Connection, +): Future[WakuPeerExchangeResult[void]] {.async, gcsafe.} = + let rpc = PeerExchangeRpc.makeErrorResponse(status_code, status_desc) + + try: + await conn.writeLP(rpc.encode().buffer) + except CatchableError as exc: + waku_px_errors.inc(labelValues = [exc.msg]) + return err( + ( + status_code: PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE, + status_desc: some("exception dialing peer: " & exc.msg), + ) + ) return ok() @@ -169,26 +226,44 @@ proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} = proc initProtocolHandler(wpx: WakuPeerExchange) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = var buffer: seq[byte] - try: - buffer = await conn.readLp(DefaultMaxRpcSize.int) - except CatchableError as exc: - waku_px_errors.inc(labelValues = [exc.msg]) - return - - let decBuf = PeerExchangeRpc.decode(buffer) - if decBuf.isErr(): - waku_px_errors.inc(labelValues = [decodeRpcFailure]) - return - - let rpc = decBuf.get() - trace "peer exchange request received" - let enrs = wpx.getEnrsFromCache(rpc.request.numPeers) - let res = await wpx.respond(enrs, conn) - if res.isErr: - waku_px_errors.inc(labelValues = [res.error]) - else: - waku_px_peers_sent.inc(enrs.len().int64()) - + wpx.requestRateLimiter.checkUsageLimit(WakuPeerExchangeCodec, conn): + try: + buffer = await conn.readLp(DefaultMaxRpcSize.int) + except CatchableError as exc: + waku_px_errors.inc(labelValues = [exc.msg]) + + ( + await wpx.respondError( + PeerExchangeResponseStatusCode.BAD_REQUEST, some(exc.msg), conn + ) + ).isOkOr: + error "Failed to respond with BAD_REQUEST:", error = $error + return + + let decBuf = PeerExchangeRpc.decode(buffer) + if decBuf.isErr(): + waku_px_errors.inc(labelValues = [decodeRpcFailure]) + error "Failed to decode PeerExchange request", error = $decBuf.error + + ( + await wpx.respondError( + PeerExchangeResponseStatusCode.BAD_REQUEST, some($decBuf.error), conn + ) + ).isOkOr: + error "Failed to respond with BAD_REQUEST:", error = $error + return + + trace "peer exchange request received" + let enrs = wpx.getEnrsFromCache(decBuf.get().request.numPeers) + (await wpx.respond(enrs, conn)).isErrOr: + waku_px_peers_sent.inc(enrs.len().int64()) + do: + ( + await wpx.respondError( + PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS, none(string), conn + ) + ).isOkOr: + error "Failed to respond with TOO_MANY_REQUESTS:", error = $error # close, no data is expected await conn.closeWithEof() @@ -199,8 +274,14 @@ proc new*( T: type WakuPeerExchange, peerManager: PeerManager, cluster: Option[uint16] = none(uint16), + rateLimitSetting: Option[RateLimitSetting] = none[RateLimitSetting](), ): T = - let wpx = WakuPeerExchange(peerManager: peerManager, cluster: cluster) + let wpx = WakuPeerExchange( + peerManager: peerManager, + cluster: cluster, + requestRateLimiter: newRequestRateLimiter(rateLimitSetting), + ) wpx.initProtocolHandler() + setServiceLimitMetric(WakuPeerExchangeCodec, rateLimitSetting) asyncSpawn wpx.updatePxEnrCache() return wpx diff --git a/waku/waku_peer_exchange/rpc.nim b/waku/waku_peer_exchange/rpc.nim index 0b248d9356..4a91e8db1e 100644 --- a/waku/waku_peer_exchange/rpc.nim +++ b/waku/waku_peer_exchange/rpc.nim @@ -1,4 +1,15 @@ +import std/options + type + PeerExchangeResponseStatusCode* {.pure.} = enum + UNKNOWN = uint32(000) + SUCCESS = uint32(200) + BAD_REQUEST = uint32(400) + BAD_RESPONSE = uint32(401) + TOO_MANY_REQUESTS = uint32(429) + SERVICE_UNAVAILABLE = uint32(503) + DIAL_FAILURE = uint32(599) + PeerExchangePeerInfo* = object enr*: seq[byte] # RLP encoded ENR: https://eips.ethereum.org/EIPS/eip-778 @@ -7,7 +18,44 @@ type PeerExchangeResponse* = object peerInfos*: seq[PeerExchangePeerInfo] + status_code*: PeerExchangeResponseStatusCode + status_desc*: Option[string] + + PeerExchangeResponseStatus* = + tuple[status_code: PeerExchangeResponseStatusCode, status_desc: Option[string]] PeerExchangeRpc* = object request*: PeerExchangeRequest response*: PeerExchangeResponse + +proc makeRequest*(T: type PeerExchangeRpc, numPeers: uint64): T = + return T(request: PeerExchangeRequest(numPeers: numPeers)) + +proc makeResponse*(T: type PeerExchangeRpc, peerInfos: seq[PeerExchangePeerInfo]): T = + return T( + response: PeerExchangeResponse( + peerInfos: peerInfos, status_code: PeerExchangeResponseStatusCode.SUCCESS + ) + ) + +proc makeErrorResponse*( + T: type PeerExchangeRpc, + status_code: PeerExchangeResponseStatusCode, + status_desc: Option[string] = none(string), +): T = + return T( + response: PeerExchangeResponse(status_code: status_code, status_desc: status_desc) + ) + +proc `$`*(statusCode: PeerExchangeResponseStatusCode): string = + case statusCode + of PeerExchangeResponseStatusCode.UNKNOWN: "UNKNOWN" + of PeerExchangeResponseStatusCode.SUCCESS: "SUCCESS" + of PeerExchangeResponseStatusCode.BAD_REQUEST: "BAD_REQUEST" + of PeerExchangeResponseStatusCode.BAD_RESPONSE: "BAD_RESPONSE" + of PeerExchangeResponseStatusCode.TOO_MANY_REQUESTS: "TOO_MANY_REQUESTS" + of PeerExchangeResponseStatusCode.SERVICE_UNAVAILABLE: "SERVICE_UNAVAILABLE" + of PeerExchangeResponseStatusCode.DIAL_FAILURE: "DIAL_FAILURE" + +# proc `$`*(pxResponseStatus: PeerExchangeResponseStatus): string = +# return $pxResponseStatus.status & " - " & pxResponseStatus.desc.get("") diff --git a/waku/waku_peer_exchange/rpc_codec.nim b/waku/waku_peer_exchange/rpc_codec.nim index 92ebd70842..e5e982938f 100644 --- a/waku/waku_peer_exchange/rpc_codec.nim +++ b/waku/waku_peer_exchange/rpc_codec.nim @@ -1,5 +1,6 @@ {.push raises: [].} +import std/options import ../common/protobuf, ./rpc proc encode*(rpc: PeerExchangeRequest): ProtoBuffer = @@ -38,17 +39,26 @@ proc decode*(T: type PeerExchangePeerInfo, buffer: seq[byte]): ProtoResult[T] = ok(rpc) +proc parse*(T: type PeerExchangeResponseStatusCode, status: uint32): T = + case status + of 200, 400, 429, 503: + PeerExchangeResponseStatusCode(status) + else: + PeerExchangeResponseStatusCode.UNKNOWN + proc encode*(rpc: PeerExchangeResponse): ProtoBuffer = var pb = initProtoBuffer() for pi in rpc.peerInfos: pb.write3(1, pi.encode()) + pb.write3(10, rpc.status_code.uint32) + pb.write3(11, rpc.status_desc) pb.finish3() pb -proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtobufResult[T] = let pb = initProtoBuffer(buffer) var rpc = PeerExchangeResponse(peerInfos: @[]) @@ -57,6 +67,18 @@ proc decode*(T: type PeerExchangeResponse, buffer: seq[byte]): ProtoResult[T] = for pib in peerInfoBuffers: rpc.peerInfos.add(?PeerExchangePeerInfo.decode(pib)) + var status_code: uint32 + if ?pb.getField(10, status_code): + rpc.status_code = PeerExchangeResponseStatusCode.parse(status_code) + else: + return err(ProtobufError.missingRequiredField("status_code")) + + var status_desc: string + if ?pb.getField(11, status_desc): + rpc.status_desc = some(status_desc) + else: + rpc.status_desc = none(string) + ok(rpc) proc encode*(rpc: PeerExchangeRpc): ProtoBuffer = @@ -64,21 +86,25 @@ proc encode*(rpc: PeerExchangeRpc): ProtoBuffer = pb.write3(1, rpc.request.encode()) pb.write3(2, rpc.response.encode()) + pb.finish3() pb -proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtoResult[T] = +proc decode*(T: type PeerExchangeRpc, buffer: seq[byte]): ProtobufResult[T] = let pb = initProtoBuffer(buffer) var rpc = PeerExchangeRpc() var requestBuffer: seq[byte] if not ?pb.getField(1, requestBuffer): - return err(ProtoError.RequiredFieldMissing) + return err(ProtobufError.missingRequiredField("request")) + rpc.request = ?PeerExchangeRequest.decode(requestBuffer) var responseBuffer: seq[byte] - discard ?pb.getField(2, responseBuffer) + if not ?pb.getField(2, responseBuffer): + return err(ProtobufError.missingRequiredField("response")) + rpc.response = ?PeerExchangeResponse.decode(responseBuffer) ok(rpc) diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 4e94d4c489..c4e1cd36c6 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -154,5 +154,5 @@ proc new*( ) store.initProtocolHandler() - + setServiceLimitMetric(WakuStoreCodec, rateLimitSetting) return store diff --git a/waku/waku_store_legacy/protocol.nim b/waku/waku_store_legacy/protocol.nim index 11e3e9be9d..a7886165d9 100644 --- a/waku/waku_store_legacy/protocol.nim +++ b/waku/waku_store_legacy/protocol.nim @@ -153,4 +153,5 @@ proc new*( requestRateLimiter: newRequestRateLimiter(rateLimitSetting), ) ws.initProtocolHandler() + setServiceLimitMetric(WakuLegacyStoreCodec, rateLimitSetting) ws