diff --git a/src/config/mod.rs b/src/config/mod.rs index 5424157..b51bd3b 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,4 +1,4 @@ -use anyhow::Context; +use anyhow::{bail, Context}; use regex::{Captures, Regex}; use std::env; use std::fs; @@ -208,6 +208,33 @@ pub async fn validate(config: &Config) -> Result<(), anyhow::Error> { // validate use garde::Validate config.validate(&())?; + + if let Some(rate_limit) = config.extensions.rate_limit.as_ref() { + if let Some(ref rule) = rate_limit.ip { + for method in &config.rpcs.methods { + if method.rate_limit_weight > rule.burst { + bail!( + "`{}` rate_limit_weight is too big for ip: {}", + method.method, + method.rate_limit_weight, + ); + } + } + } + + if let Some(ref rule) = rate_limit.connection { + for method in &config.rpcs.methods { + if method.rate_limit_weight > rule.burst { + bail!( + "`{}` rate_limit_weight is too big for connection: {}", + method.method, + method.rate_limit_weight, + ); + } + } + } + } + // since endpoints connection test is async // we can't intergrate it into garde::Validate // and it's not a static validation like format, length, .etc @@ -301,4 +328,12 @@ mod tests { .to_string() .contains("Unable to connect to all endpoints")); } + + #[tokio::test] + async fn validate_config_fails_for_too_big_rate_limit_weight() { + let config = read_config("tests/configs/big_rate_limit_weight.yml").expect("Unable to read config file"); + let result = validate(&config).await; + assert!(result.is_err()); + assert!(result.err().unwrap().to_string().contains("rate_limit_weight")); + } } diff --git a/src/extensions/rate_limit/connection.rs b/src/extensions/rate_limit/connection.rs index ccc2f3e..bc3cd55 100644 --- a/src/extensions/rate_limit/connection.rs +++ b/src/extensions/rate_limit/connection.rs @@ -1,4 +1,4 @@ -use crate::{extensions::rate_limit::MethodWeights, utils::errors}; +use crate::extensions::rate_limit::MethodWeights; use futures::{future::BoxFuture, FutureExt}; use governor::{DefaultDirectRateLimiter, Jitter, RateLimiter}; use jsonrpsee::{ @@ -75,9 +75,10 @@ where async move { if let Some(n) = NonZeroU32::new(weight) { - if limiter.until_n_ready_with_jitter(n, jitter).await.is_err() { - return MethodResponse::error(req.id, errors::failed("rate limit exceeded")); - } + limiter + .until_n_ready_with_jitter(n, jitter) + .await + .expect("check_n have been done during init"); } service.call(req).await } diff --git a/src/extensions/rate_limit/ip.rs b/src/extensions/rate_limit/ip.rs index 4274e02..caeb92c 100644 --- a/src/extensions/rate_limit/ip.rs +++ b/src/extensions/rate_limit/ip.rs @@ -1,4 +1,4 @@ -use crate::{extensions::rate_limit::MethodWeights, utils::errors}; +use crate::extensions::rate_limit::MethodWeights; use futures::{future::BoxFuture, FutureExt}; use governor::{DefaultKeyedRateLimiter, Jitter}; use jsonrpsee::{ @@ -86,13 +86,10 @@ where let weight = self.method_weights.get(req.method_name()); async move { if let Some(n) = NonZeroU32::new(weight) { - if limiter + limiter .until_key_n_ready_with_jitter(&ip_addr, n, jitter) .await - .is_err() - { - return MethodResponse::error(req.id, errors::failed("rate limit exceeded")); - } + .expect("check_n have been done during init"); } service.call(req).await } diff --git a/src/extensions/rate_limit/mod.rs b/src/extensions/rate_limit/mod.rs index ba987b9..61dd2a1 100644 --- a/src/extensions/rate_limit/mod.rs +++ b/src/extensions/rate_limit/mod.rs @@ -90,6 +90,7 @@ impl RateLimitBuilder { } } } + pub fn connection_limit(&self, method_weights: MethodWeights) -> Option { if let Some(ref rule) = self.config.connection { let burst = NonZeroU32::new(rule.burst).unwrap(); diff --git a/tests/configs/big_rate_limit_weight.yml b/tests/configs/big_rate_limit_weight.yml new file mode 100644 index 0000000..0e784d0 --- /dev/null +++ b/tests/configs/big_rate_limit_weight.yml @@ -0,0 +1,56 @@ +extensions: + client: + endpoints: + - wss://acala-rpc.dwellir.com + - wss://acala-rpc-0.aca-api.network + health_check: + interval_sec: 10 # check interval, default is 10s + healthy_response_time_ms: 500 # max response time to be considered healthy, default is 500ms + health_method: system_health + response: # response contains { isSyncing: false } + !contains + - - isSyncing + - !eq false + event_bus: + substrate_api: + stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes + telemetry: + provider: none + cache: + default_ttl_seconds: 60 + default_size: 500 + merge_subscription: + keep_alive_seconds: 60 + server: + port: 9944 + listen_address: '0.0.0.0' + max_connections: 2000 + http_methods: + - path: /health + method: system_health + - path: /liveness + method: chain_getBlockHash + cors: all + rate_limit: # these are for demo purpose only, please adjust to your needs + connection: # 20 RPC requests per second per connection + burst: 20 + period_secs: 1 + ip: # 500 RPC requests per 10 seconds per ip + burst: 500 + period_secs: 10 + # use X-Forwarded-For header to get real ip, if available (e.g. behind a load balancer). + # WARNING: Use with caution, as this xff header can be forged. + use_xff: true # default is false + +middlewares: + methods: + - delay + - response + - inject_params + - cache + - upstream + subscriptions: + - merge_subscription + - upstream + +rpcs: tests/rpc_configs/big_rate_limit_weight.yml diff --git a/tests/rpc_configs/big_rate_limit_weight.yml b/tests/rpc_configs/big_rate_limit_weight.yml new file mode 100644 index 0000000..e0c011a --- /dev/null +++ b/tests/rpc_configs/big_rate_limit_weight.yml @@ -0,0 +1,5 @@ +methods: + - method: eth_chainId + cache: + size: 1 + rate_limit_weight: 21 \ No newline at end of file