Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pre check for rate limit #194

Merged
merged 7 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Context;
use anyhow::{bail, Context};
use regex::{Captures, Regex};
use std::env;
use std::fs;
Expand Down Expand Up @@ -208,6 +208,33 @@ pub async fn validate(config: &Config) -> Result<(), anyhow::Error> {

// validate use garde::Validate
config.validate(&())?;

if let Some(rate_limit) = config.extensions.rate_limit.as_ref() {
if let Some(ref rule) = rate_limit.ip {
for method in &config.rpcs.methods {
if method.rate_limit_weight > rule.burst {
bail!(
"`{}` rate_limit_weight is too big for ip: {}",
method.method,
method.rate_limit_weight,
);
}
}
}

if let Some(ref rule) = rate_limit.connection {
for method in &config.rpcs.methods {
if method.rate_limit_weight > rule.burst {
bail!(
"`{}` rate_limit_weight is too big for connection: {}",
method.method,
method.rate_limit_weight,
);
}
}
}
}

// since endpoints connection test is async
// we can't intergrate it into garde::Validate
// and it's not a static validation like format, length, .etc
Expand Down Expand Up @@ -301,4 +328,12 @@ mod tests {
.to_string()
.contains("Unable to connect to all endpoints"));
}

#[tokio::test]
async fn validate_config_fails_for_too_big_rate_limit_weight() {
let config = read_config("tests/configs/big_rate_limit_weight.yml").expect("Unable to read config file");
let result = validate(&config).await;
assert!(result.is_err());
assert!(result.err().unwrap().to_string().contains("rate_limit_weight"));
}
}
9 changes: 5 additions & 4 deletions src/extensions/rate_limit/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{extensions::rate_limit::MethodWeights, utils::errors};
use crate::extensions::rate_limit::MethodWeights;
use futures::{future::BoxFuture, FutureExt};
use governor::{DefaultDirectRateLimiter, Jitter, RateLimiter};
use jsonrpsee::{
Expand Down Expand Up @@ -75,9 +75,10 @@ where

async move {
if let Some(n) = NonZeroU32::new(weight) {
if limiter.until_n_ready_with_jitter(n, jitter).await.is_err() {
return MethodResponse::error(req.id, errors::failed("rate limit exceeded"));
}
limiter
.until_n_ready_with_jitter(n, jitter)
.await
.expect("check_n have been done during init");
}
service.call(req).await
}
Expand Down
9 changes: 3 additions & 6 deletions src/extensions/rate_limit/ip.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{extensions::rate_limit::MethodWeights, utils::errors};
use crate::extensions::rate_limit::MethodWeights;
use futures::{future::BoxFuture, FutureExt};
use governor::{DefaultKeyedRateLimiter, Jitter};
use jsonrpsee::{
Expand Down Expand Up @@ -86,13 +86,10 @@ where
let weight = self.method_weights.get(req.method_name());
async move {
if let Some(n) = NonZeroU32::new(weight) {
if limiter
limiter
.until_key_n_ready_with_jitter(&ip_addr, n, jitter)
.await
.is_err()
{
return MethodResponse::error(req.id, errors::failed("rate limit exceeded"));
}
.expect("check_n have been done during init");
}
service.call(req).await
}
Expand Down
1 change: 1 addition & 0 deletions src/extensions/rate_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ impl RateLimitBuilder {
}
}
}

pub fn connection_limit(&self, method_weights: MethodWeights) -> Option<ConnectionRateLimitLayer> {
if let Some(ref rule) = self.config.connection {
let burst = NonZeroU32::new(rule.burst).unwrap();
Expand Down
56 changes: 56 additions & 0 deletions tests/configs/big_rate_limit_weight.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
extensions:
client:
endpoints:
- wss://acala-rpc.dwellir.com
- wss://acala-rpc-0.aca-api.network
health_check:
interval_sec: 10 # check interval, default is 10s
healthy_response_time_ms: 500 # max response time to be considered healthy, default is 500ms
health_method: system_health
response: # response contains { isSyncing: false }
!contains
- - isSyncing
- !eq false
event_bus:
substrate_api:
stale_timeout_seconds: 180 # rotate endpoint if no new blocks for 3 minutes
telemetry:
provider: none
cache:
default_ttl_seconds: 60
default_size: 500
merge_subscription:
keep_alive_seconds: 60
server:
port: 9944
listen_address: '0.0.0.0'
max_connections: 2000
http_methods:
- path: /health
method: system_health
- path: /liveness
method: chain_getBlockHash
cors: all
rate_limit: # these are for demo purpose only, please adjust to your needs
connection: # 20 RPC requests per second per connection
burst: 20
period_secs: 1
ip: # 500 RPC requests per 10 seconds per ip
burst: 500
period_secs: 10
# use X-Forwarded-For header to get real ip, if available (e.g. behind a load balancer).
# WARNING: Use with caution, as this xff header can be forged.
use_xff: true # default is false

middlewares:
methods:
- delay
- response
- inject_params
- cache
- upstream
subscriptions:
- merge_subscription
- upstream

rpcs: tests/rpc_configs/big_rate_limit_weight.yml
5 changes: 5 additions & 0 deletions tests/rpc_configs/big_rate_limit_weight.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
methods:
- method: eth_chainId
cache:
size: 1
rate_limit_weight: 21
Loading