From 122fb02ee54fd804d3e8820ca49a07557400d88a Mon Sep 17 00:00:00 2001 From: yjhmelody Date: Mon, 24 Jun 2024 16:28:26 +0800 Subject: [PATCH 1/6] feat: pre-check for ip/connection rate limit --- src/extensions/rate_limit/connection.rs | 9 +++--- src/extensions/rate_limit/ip.rs | 9 ++---- src/extensions/rate_limit/mod.rs | 40 +++++++++++++++++++++++++ src/extensions/rate_limit/weight.rs | 2 +- src/extensions/server/mod.rs | 2 ++ src/server.rs | 6 ++++ 6 files changed, 57 insertions(+), 11 deletions(-) diff --git a/src/extensions/rate_limit/connection.rs b/src/extensions/rate_limit/connection.rs index ccc2f3e..bc3cd55 100644 --- a/src/extensions/rate_limit/connection.rs +++ b/src/extensions/rate_limit/connection.rs @@ -1,4 +1,4 @@ -use crate::{extensions::rate_limit::MethodWeights, utils::errors}; +use crate::extensions::rate_limit::MethodWeights; use futures::{future::BoxFuture, FutureExt}; use governor::{DefaultDirectRateLimiter, Jitter, RateLimiter}; use jsonrpsee::{ @@ -75,9 +75,10 @@ where async move { if let Some(n) = NonZeroU32::new(weight) { - if limiter.until_n_ready_with_jitter(n, jitter).await.is_err() { - return MethodResponse::error(req.id, errors::failed("rate limit exceeded")); - } + limiter + .until_n_ready_with_jitter(n, jitter) + .await + .expect("check_n have been done during init"); } service.call(req).await } diff --git a/src/extensions/rate_limit/ip.rs b/src/extensions/rate_limit/ip.rs index 4274e02..caeb92c 100644 --- a/src/extensions/rate_limit/ip.rs +++ b/src/extensions/rate_limit/ip.rs @@ -1,4 +1,4 @@ -use crate::{extensions::rate_limit::MethodWeights, utils::errors}; +use crate::extensions::rate_limit::MethodWeights; use futures::{future::BoxFuture, FutureExt}; use governor::{DefaultKeyedRateLimiter, Jitter}; use jsonrpsee::{ @@ -86,13 +86,10 @@ where let weight = self.method_weights.get(req.method_name()); async move { if let Some(n) = NonZeroU32::new(weight) { - if limiter + limiter .until_key_n_ready_with_jitter(&ip_addr, n, jitter) .await - .is_err() - { - return MethodResponse::error(req.id, errors::failed("rate limit exceeded")); - } + .expect("check_n have been done during init"); } service.call(req).await } diff --git a/src/extensions/rate_limit/mod.rs b/src/extensions/rate_limit/mod.rs index ba987b9..7d24404 100644 --- a/src/extensions/rate_limit/mod.rs +++ b/src/extensions/rate_limit/mod.rs @@ -1,3 +1,4 @@ +use anyhow::bail; use governor::{DefaultKeyedRateLimiter, Jitter, Quota, RateLimiter}; use serde::Deserialize; use std::num::NonZeroU32; @@ -90,6 +91,45 @@ impl RateLimitBuilder { } } } + + pub fn pre_check_connection(&self, method_weights: &MethodWeights) -> anyhow::Result<()> { + if let Some(ref rule) = self.config.connection { + let burst = NonZeroU32::new(rule.burst).unwrap(); + let period = Duration::from_secs(rule.period_secs); + let quota = build_quota(burst, period); + let limiter = RateLimiter::direct(quota); + + for (method, weight) in &method_weights.0 { + if let Some(n) = NonZeroU32::new(*weight) { + if limiter.check_n(n).is_err() { + bail!("`{method}` weight config too big for connection rate limit: {}", n); + } + } + } + } + + Ok(()) + } + + pub fn pre_check_ip(&self, method_weights: &MethodWeights) -> anyhow::Result<()> { + if let Some(ref rule) = self.config.ip { + let burst = NonZeroU32::new(rule.burst).unwrap(); + let period = Duration::from_secs(rule.period_secs); + let quota = build_quota(burst, period); + let limiter = RateLimiter::direct(quota); + + for (method, weight) in &method_weights.0 { + if let Some(n) = NonZeroU32::new(*weight) { + if limiter.check_n(n).is_err() { + bail!("`{method}` weight config too big for ip rate limit: {}", n); + } + } + } + } + + Ok(()) + } + pub fn connection_limit(&self, method_weights: MethodWeights) -> Option { if let Some(ref rule) = self.config.connection { let burst = NonZeroU32::new(rule.burst).unwrap(); diff --git a/src/extensions/rate_limit/weight.rs b/src/extensions/rate_limit/weight.rs index bf7d33c..ddf5d04 100644 --- a/src/extensions/rate_limit/weight.rs +++ b/src/extensions/rate_limit/weight.rs @@ -2,7 +2,7 @@ use crate::config::RpcMethod; use std::collections::BTreeMap; #[derive(Clone, Debug, Default)] -pub struct MethodWeights(BTreeMap); +pub struct MethodWeights(pub(crate) BTreeMap); impl MethodWeights { pub fn add(&mut self, method: &str, weight: u32) { diff --git a/src/extensions/server/mod.rs b/src/extensions/server/mod.rs index 4e38e64..4167272 100644 --- a/src/extensions/server/mod.rs +++ b/src/extensions/server/mod.rs @@ -152,8 +152,10 @@ impl SubwayServerBuilder { methods: Methods, stop_handle: StopHandle, rpc_metrics: RpcMetrics, + // TODO: this is not cheap. svc_builder: TowerServiceBuilder, rate_limit_builder: Option>, + // TODO: this is not cheap. rpc_method_weights: MethodWeights, } diff --git a/src/server.rs b/src/server.rs index 6bf2f0c..e434975 100644 --- a/src/server.rs +++ b/src/server.rs @@ -51,6 +51,12 @@ pub async fn build(config: Config) -> anyhow::Result { let rpc_method_weights = MethodWeights::from_config(&config.rpcs.methods); + // pre-check stage + if let Some(r) = &rate_limit_builder { + r.pre_check_ip(&rpc_method_weights)?; + r.pre_check_connection(&rpc_method_weights)?; + } + let request_timeout_seconds = server_builder.config.request_timeout_seconds; let metrics = get_rpc_metrics(&extensions_registry).await; From 86c3dd21fde7d17a210fe1c1a9d705dba7983bb4 Mon Sep 17 00:00:00 2001 From: yjhmelody Date: Mon, 24 Jun 2024 17:15:33 +0800 Subject: [PATCH 2/6] clean up docs --- src/extensions/server/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/extensions/server/mod.rs b/src/extensions/server/mod.rs index 4167272..4e38e64 100644 --- a/src/extensions/server/mod.rs +++ b/src/extensions/server/mod.rs @@ -152,10 +152,8 @@ impl SubwayServerBuilder { methods: Methods, stop_handle: StopHandle, rpc_metrics: RpcMetrics, - // TODO: this is not cheap. svc_builder: TowerServiceBuilder, rate_limit_builder: Option>, - // TODO: this is not cheap. rpc_method_weights: MethodWeights, } From 4cb0bd4ace588d962f1c8c201e2b2e24f158a3a0 Mon Sep 17 00:00:00 2001 From: yjhmelody Date: Mon, 24 Jun 2024 17:46:14 +0800 Subject: [PATCH 3/6] fix --- src/extensions/rate_limit/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/extensions/rate_limit/mod.rs b/src/extensions/rate_limit/mod.rs index 7d24404..dc76806 100644 --- a/src/extensions/rate_limit/mod.rs +++ b/src/extensions/rate_limit/mod.rs @@ -99,7 +99,7 @@ impl RateLimitBuilder { let quota = build_quota(burst, period); let limiter = RateLimiter::direct(quota); - for (method, weight) in &method_weights.0 { + for (method, weight) in method_weights.0.as_ref() { if let Some(n) = NonZeroU32::new(*weight) { if limiter.check_n(n).is_err() { bail!("`{method}` weight config too big for connection rate limit: {}", n); @@ -118,7 +118,7 @@ impl RateLimitBuilder { let quota = build_quota(burst, period); let limiter = RateLimiter::direct(quota); - for (method, weight) in &method_weights.0 { + for (method, weight) in method_weights.0.as_ref() { if let Some(n) = NonZeroU32::new(*weight) { if limiter.check_n(n).is_err() { bail!("`{method}` weight config too big for ip rate limit: {}", n); From 7cc9fb24bd3f4decaf8708956d338d748389d569 Mon Sep 17 00:00:00 2001 From: yjhmelody Date: Mon, 24 Jun 2024 18:13:32 +0800 Subject: [PATCH 4/6] do pre-check in `validate` fn --- src/config/mod.rs | 43 ++++++++++++++++++++++++++++- src/extensions/rate_limit/mod.rs | 39 -------------------------- src/extensions/rate_limit/weight.rs | 2 +- src/server.rs | 6 ---- 4 files changed, 43 insertions(+), 47 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 5424157..2d62cc2 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,12 +1,16 @@ -use anyhow::Context; +use anyhow::{bail, Context}; use regex::{Captures, Regex}; use std::env; use std::fs; +use std::num::NonZeroU32; use std::path; +use std::time::Duration; use garde::Validate; +use governor::RateLimiter; use serde::Deserialize; +use crate::extensions::rate_limit::build_quota; use crate::extensions::ExtensionsConfig; pub use rpc::*; @@ -208,6 +212,43 @@ pub async fn validate(config: &Config) -> Result<(), anyhow::Error> { // validate use garde::Validate config.validate(&())?; + + if let Some(rate_limit) = config.extensions.rate_limit.as_ref() { + if let Some(ref rule) = rate_limit.ip { + let burst = NonZeroU32::new(rule.burst).unwrap(); + let period = Duration::from_secs(rule.period_secs); + let quota = build_quota(burst, period); + let limiter = RateLimiter::direct(quota); + + for method in &config.rpcs.methods { + if let Some(n) = NonZeroU32::new(method.rate_limit_weight) { + if limiter.check_n(n).is_err() { + bail!("`{}` weight config too big for ip rate limit: {}", method.method, n); + } + } + } + } + + if let Some(ref rule) = rate_limit.connection { + let burst = NonZeroU32::new(rule.burst).unwrap(); + let period = Duration::from_secs(rule.period_secs); + let quota = build_quota(burst, period); + let limiter = RateLimiter::direct(quota); + + for method in &config.rpcs.methods { + if let Some(n) = NonZeroU32::new(method.rate_limit_weight) { + if limiter.check_n(n).is_err() { + bail!( + "`{}` weight config too big for connection rate limit: {}", + method.method, + n + ); + } + } + } + } + } + // since endpoints connection test is async // we can't intergrate it into garde::Validate // and it's not a static validation like format, length, .etc diff --git a/src/extensions/rate_limit/mod.rs b/src/extensions/rate_limit/mod.rs index dc76806..61dd2a1 100644 --- a/src/extensions/rate_limit/mod.rs +++ b/src/extensions/rate_limit/mod.rs @@ -1,4 +1,3 @@ -use anyhow::bail; use governor::{DefaultKeyedRateLimiter, Jitter, Quota, RateLimiter}; use serde::Deserialize; use std::num::NonZeroU32; @@ -92,44 +91,6 @@ impl RateLimitBuilder { } } - pub fn pre_check_connection(&self, method_weights: &MethodWeights) -> anyhow::Result<()> { - if let Some(ref rule) = self.config.connection { - let burst = NonZeroU32::new(rule.burst).unwrap(); - let period = Duration::from_secs(rule.period_secs); - let quota = build_quota(burst, period); - let limiter = RateLimiter::direct(quota); - - for (method, weight) in method_weights.0.as_ref() { - if let Some(n) = NonZeroU32::new(*weight) { - if limiter.check_n(n).is_err() { - bail!("`{method}` weight config too big for connection rate limit: {}", n); - } - } - } - } - - Ok(()) - } - - pub fn pre_check_ip(&self, method_weights: &MethodWeights) -> anyhow::Result<()> { - if let Some(ref rule) = self.config.ip { - let burst = NonZeroU32::new(rule.burst).unwrap(); - let period = Duration::from_secs(rule.period_secs); - let quota = build_quota(burst, period); - let limiter = RateLimiter::direct(quota); - - for (method, weight) in method_weights.0.as_ref() { - if let Some(n) = NonZeroU32::new(*weight) { - if limiter.check_n(n).is_err() { - bail!("`{method}` weight config too big for ip rate limit: {}", n); - } - } - } - } - - Ok(()) - } - pub fn connection_limit(&self, method_weights: MethodWeights) -> Option { if let Some(ref rule) = self.config.connection { let burst = NonZeroU32::new(rule.burst).unwrap(); diff --git a/src/extensions/rate_limit/weight.rs b/src/extensions/rate_limit/weight.rs index e5829cc..06fedd0 100644 --- a/src/extensions/rate_limit/weight.rs +++ b/src/extensions/rate_limit/weight.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use std::sync::Arc; #[derive(Clone, Debug, Default)] -pub struct MethodWeights(pub(crate) Arc>); +pub struct MethodWeights(Arc>); impl MethodWeights { pub fn get(&self, method: &str) -> u32 { diff --git a/src/server.rs b/src/server.rs index e434975..6bf2f0c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -51,12 +51,6 @@ pub async fn build(config: Config) -> anyhow::Result { let rpc_method_weights = MethodWeights::from_config(&config.rpcs.methods); - // pre-check stage - if let Some(r) = &rate_limit_builder { - r.pre_check_ip(&rpc_method_weights)?; - r.pre_check_connection(&rpc_method_weights)?; - } - let request_timeout_seconds = server_builder.config.request_timeout_seconds; let metrics = get_rpc_metrics(&extensions_registry).await; From 3bfc662b4209fe21448aaa7bb1c74e44778512cc Mon Sep 17 00:00:00 2001 From: yjhmelody Date: Tue, 25 Jun 2024 10:38:47 +0800 Subject: [PATCH 5/6] ok_or --- src/config/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 2d62cc2..84564d9 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,4 +1,4 @@ -use anyhow::{bail, Context}; +use anyhow::{anyhow, bail, Context}; use regex::{Captures, Regex}; use std::env; use std::fs; @@ -215,7 +215,7 @@ pub async fn validate(config: &Config) -> Result<(), anyhow::Error> { if let Some(rate_limit) = config.extensions.rate_limit.as_ref() { if let Some(ref rule) = rate_limit.ip { - let burst = NonZeroU32::new(rule.burst).unwrap(); + let burst = NonZeroU32::new(rule.burst).ok_or(anyhow!("burst could not be zero"))?; let period = Duration::from_secs(rule.period_secs); let quota = build_quota(burst, period); let limiter = RateLimiter::direct(quota); @@ -230,7 +230,7 @@ pub async fn validate(config: &Config) -> Result<(), anyhow::Error> { } if let Some(ref rule) = rate_limit.connection { - let burst = NonZeroU32::new(rule.burst).unwrap(); + let burst = NonZeroU32::new(rule.burst).ok_or(anyhow!("burst could not be zero"))?; let period = Duration::from_secs(rule.period_secs); let quota = build_quota(burst, period); let limiter = RateLimiter::direct(quota); From 4f66b2b519e461505e6cd49d2e507566243a9a23 Mon Sep 17 00:00:00 2001 From: yjhmelody Date: Thu, 27 Jun 2024 11:10:48 +0800 Subject: [PATCH 6/6] add tests and simplify validate --- src/config/mod.rs | 48 ++++++++---------- tests/configs/big_rate_limit_weight.yml | 56 +++++++++++++++++++++ tests/rpc_configs/big_rate_limit_weight.yml | 5 ++ 3 files changed, 82 insertions(+), 27 deletions(-) create mode 100644 tests/configs/big_rate_limit_weight.yml create mode 100644 tests/rpc_configs/big_rate_limit_weight.yml diff --git a/src/config/mod.rs b/src/config/mod.rs index 84564d9..b51bd3b 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,16 +1,12 @@ -use anyhow::{anyhow, bail, Context}; +use anyhow::{bail, Context}; use regex::{Captures, Regex}; use std::env; use std::fs; -use std::num::NonZeroU32; use std::path; -use std::time::Duration; use garde::Validate; -use governor::RateLimiter; use serde::Deserialize; -use crate::extensions::rate_limit::build_quota; use crate::extensions::ExtensionsConfig; pub use rpc::*; @@ -215,35 +211,25 @@ pub async fn validate(config: &Config) -> Result<(), anyhow::Error> { if let Some(rate_limit) = config.extensions.rate_limit.as_ref() { if let Some(ref rule) = rate_limit.ip { - let burst = NonZeroU32::new(rule.burst).ok_or(anyhow!("burst could not be zero"))?; - let period = Duration::from_secs(rule.period_secs); - let quota = build_quota(burst, period); - let limiter = RateLimiter::direct(quota); - for method in &config.rpcs.methods { - if let Some(n) = NonZeroU32::new(method.rate_limit_weight) { - if limiter.check_n(n).is_err() { - bail!("`{}` weight config too big for ip rate limit: {}", method.method, n); - } + if method.rate_limit_weight > rule.burst { + bail!( + "`{}` rate_limit_weight is too big for ip: {}", + method.method, + method.rate_limit_weight, + ); } } } if let Some(ref rule) = rate_limit.connection { - let burst = NonZeroU32::new(rule.burst).ok_or(anyhow!("burst could not be zero"))?; - let period = Duration::from_secs(rule.period_secs); - let quota = build_quota(burst, period); - let limiter = RateLimiter::direct(quota); - for method in &config.rpcs.methods { - if let Some(n) = NonZeroU32::new(method.rate_limit_weight) { - if limiter.check_n(n).is_err() { - bail!( - "`{}` weight config too big for connection rate limit: {}", - method.method, - n - ); - } + if method.rate_limit_weight > rule.burst { + bail!( + "`{}` rate_limit_weight is too big for connection: {}", + method.method, + method.rate_limit_weight, + ); } } } @@ -342,4 +328,12 @@ mod tests { .to_string() .contains("Unable to connect to all endpoints")); } + + #[tokio::test] + async fn validate_config_fails_for_too_big_rate_limit_weight() { + let config = read_config("tests/configs/big_rate_limit_weight.yml").expect("Unable to read config file"); + let result = validate(&config).await; + assert!(result.is_err()); + assert!(result.err().unwrap().to_string().contains("rate_limit_weight")); + } } diff --git a/tests/configs/big_rate_limit_weight.yml b/tests/configs/big_rate_limit_weight.yml new file mode 100644 index 0000000..0e784d0 --- /dev/null +++ b/tests/configs/big_rate_limit_weight.yml @@ -0,0 +1,56 @@ +extensions: + client: + endpoints: + - wss://acala-rpc.dwellir.com + - wss://acala-rpc-0.aca-api.network + health_check: + interval_sec: 10 # check interval, default is 10s + healthy_response_time_ms: 500 # max response time to be considered healthy, default is 500ms + health_method: system_health + response: # response contains { isSyncing: false } + !contains + - - isSyncing + - !eq false + event_bus: + substrate_api: + stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes + telemetry: + provider: none + cache: + default_ttl_seconds: 60 + default_size: 500 + merge_subscription: + keep_alive_seconds: 60 + server: + port: 9944 + listen_address: '0.0.0.0' + max_connections: 2000 + http_methods: + - path: /health + method: system_health + - path: /liveness + method: chain_getBlockHash + cors: all + rate_limit: # these are for demo purpose only, please adjust to your needs + connection: # 20 RPC requests per second per connection + burst: 20 + period_secs: 1 + ip: # 500 RPC requests per 10 seconds per ip + burst: 500 + period_secs: 10 + # use X-Forwarded-For header to get real ip, if available (e.g. behind a load balancer). + # WARNING: Use with caution, as this xff header can be forged. + use_xff: true # default is false + +middlewares: + methods: + - delay + - response + - inject_params + - cache + - upstream + subscriptions: + - merge_subscription + - upstream + +rpcs: tests/rpc_configs/big_rate_limit_weight.yml diff --git a/tests/rpc_configs/big_rate_limit_weight.yml b/tests/rpc_configs/big_rate_limit_weight.yml new file mode 100644 index 0000000..e0c011a --- /dev/null +++ b/tests/rpc_configs/big_rate_limit_weight.yml @@ -0,0 +1,5 @@ +methods: + - method: eth_chainId + cache: + size: 1 + rate_limit_weight: 21 \ No newline at end of file