From 0449c5eca6e4a9d0e15cc8700e457425629838c0 Mon Sep 17 00:00:00 2001 From: kaliubuntu0206 <139627505+kaliubuntu0206@users.noreply.github.com> Date: Thu, 2 Nov 2023 23:29:03 +0900 Subject: [PATCH] feat(rpc): limit block_range by 100_000 per eth_getLogs request (#5243) Co-authored-by: Matthias Seitz Co-authored-by: Alexey Shekhirin --- bin/reth/src/args/mod.rs | 4 +- bin/reth/src/args/rpc_server_args.rs | 47 +++++++++++++-- bin/reth/src/args/types.rs | 49 +++++++++++++++ crates/rpc/rpc-builder/src/auth.rs | 17 +++--- crates/rpc/rpc-builder/src/constants.rs | 3 + crates/rpc/rpc-builder/src/eth.rs | 25 +++++++- crates/rpc/rpc-builder/src/lib.rs | 3 +- crates/rpc/rpc/src/eth/filter.rs | 79 ++++++++++++++++++++++--- crates/rpc/rpc/src/eth/mod.rs | 2 +- 9 files changed, 200 insertions(+), 29 deletions(-) create mode 100644 bin/reth/src/args/types.rs diff --git a/bin/reth/src/args/mod.rs b/bin/reth/src/args/mod.rs index 0710176a0d59..b6a68eef1043 100644 --- a/bin/reth/src/args/mod.rs +++ b/bin/reth/src/args/mod.rs @@ -31,7 +31,7 @@ pub use stage_args::StageEnum; mod gas_price_oracle_args; pub use gas_price_oracle_args::GasPriceOracleArgs; -/// TxPoolArgs for congiguring the transaction pool +/// TxPoolArgs for configuring the transaction pool mod txpool_args; pub use txpool_args::TxPoolArgs; @@ -44,3 +44,5 @@ mod pruning_args; pub use pruning_args::PruningArgs; pub mod utils; + +pub mod types; diff --git a/bin/reth/src/args/rpc_server_args.rs b/bin/reth/src/args/rpc_server_args.rs index 75f7c0a54066..567e00d1f2a5 100644 --- a/bin/reth/src/args/rpc_server_args.rs +++ b/bin/reth/src/args/rpc_server_args.rs @@ -1,7 +1,7 @@ //! clap [Args](clap::Args) for RPC related arguments. use crate::{ - args::GasPriceOracleArgs, + args::{types::ZeroAsNone, GasPriceOracleArgs}, cli::{ components::{RethNodeComponents, RethRpcComponents, RethRpcServerHandles}, config::RethRpcConfig, @@ -140,9 +140,13 @@ pub struct RpcServerArgs { #[arg(long, value_name = "COUNT", default_value_t = constants::DEFAULT_MAX_TRACING_REQUESTS)] pub rpc_max_tracing_requests: u32, - /// Maximum number of logs that can be returned in a single response. - #[arg(long, value_name = "COUNT", default_value_t = constants::DEFAULT_MAX_LOGS_PER_RESPONSE)] - pub rpc_max_logs_per_response: usize, + /// Maximum number of blocks that could be scanned per filter request. (0 = entire chain) + #[arg(long, value_name = "COUNT", default_value_t = ZeroAsNone::new(constants::DEFAULT_MAX_BLOCKS_PER_FILTER))] + pub rpc_max_blocks_per_filter: ZeroAsNone, + + /// Maximum number of logs that can be returned in a single response. (0 = no limit) + #[arg(long, value_name = "COUNT", default_value_t = ZeroAsNone::new(constants::DEFAULT_MAX_LOGS_PER_RESPONSE as u64))] + pub rpc_max_logs_per_response: ZeroAsNone, /// Maximum gas limit for `eth_call` and call tracing RPC methods. #[arg( @@ -326,7 +330,8 @@ impl RethRpcConfig for RpcServerArgs { fn eth_config(&self) -> EthConfig { EthConfig::default() .max_tracing_requests(self.rpc_max_tracing_requests) - .max_logs_per_response(self.rpc_max_logs_per_response) + .max_blocks_per_filter(self.rpc_max_blocks_per_filter.unwrap_or_max()) + .max_logs_per_response(self.rpc_max_logs_per_response.unwrap_or_max() as usize) .rpc_gas_cap(self.rpc_gas_cap) .gpo_config(self.gas_price_oracle_config()) } @@ -598,4 +603,36 @@ mod tests { ); assert_eq!(config.ipc_endpoint().unwrap().path(), constants::DEFAULT_IPC_ENDPOINT); } + + #[test] + fn test_zero_filter_limits() { + let args = CommandParser::::parse_from([ + "reth", + "--rpc-max-blocks-per-filter", + "0", + "--rpc-max-logs-per-response", + "0", + ]) + .args; + + let config = args.eth_config().filter_config(); + assert_eq!(config.max_blocks_per_filter, Some(u64::MAX)); + assert_eq!(config.max_logs_per_response, Some(usize::MAX)); + } + + #[test] + fn test_custom_filter_limits() { + let args = CommandParser::::parse_from([ + "reth", + "--rpc-max-blocks-per-filter", + "100", + "--rpc-max-logs-per-response", + "200", + ]) + .args; + + let config = args.eth_config().filter_config(); + assert_eq!(config.max_blocks_per_filter, Some(100)); + assert_eq!(config.max_logs_per_response, Some(200)); + } } diff --git a/bin/reth/src/args/types.rs b/bin/reth/src/args/types.rs new file mode 100644 index 000000000000..d193e2dffeae --- /dev/null +++ b/bin/reth/src/args/types.rs @@ -0,0 +1,49 @@ +//! Additional helper types for CLI parsing. + +use std::{fmt, str::FromStr}; + +/// A helper type that maps `0` to `None` when parsing CLI arguments. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct ZeroAsNone(pub Option); + +impl ZeroAsNone { + /// Returns the inner value. + pub const fn new(value: u64) -> Self { + Self(Some(value)) + } + + /// Returns the inner value or `u64::MAX` if `None`. + pub fn unwrap_or_max(self) -> u64 { + self.0.unwrap_or(u64::MAX) + } +} + +impl fmt::Display for ZeroAsNone { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0 { + Some(value) => write!(f, "{}", value), + None => write!(f, "0"), + } + } +} + +impl FromStr for ZeroAsNone { + type Err = std::num::ParseIntError; + + fn from_str(s: &str) -> Result { + let value = s.parse::()?; + Ok(Self(if value == 0 { None } else { Some(value) })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_zero_parse() { + let val = "0".parse::().unwrap(); + assert_eq!(val, ZeroAsNone(None)); + assert_eq!(val.unwrap_or_max(), u64::MAX); + } +} diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 396b2a9fad87..c95f3bc00880 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -1,6 +1,6 @@ use crate::{ constants, - constants::DEFAULT_MAX_LOGS_PER_RESPONSE, + constants::{DEFAULT_MAX_BLOCKS_PER_FILTER, DEFAULT_MAX_LOGS_PER_RESPONSE}, error::{RpcError, ServerKind}, EthConfig, }; @@ -16,7 +16,7 @@ use reth_provider::{ StateProviderFactory, }; use reth_rpc::{ - eth::{cache::EthStateCache, gas_oracle::GasPriceOracle}, + eth::{cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig}, AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, }; @@ -68,14 +68,11 @@ where Box::new(executor.clone()), BlockingTaskPool::build().expect("failed to build tracing pool"), ); - let eth_filter = EthFilter::new( - provider, - pool, - eth_cache.clone(), - DEFAULT_MAX_LOGS_PER_RESPONSE, - Box::new(executor.clone()), - EthConfig::default().stale_filter_ttl, - ); + let config = EthFilterConfig::default() + .max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE) + .max_blocks_per_filter(DEFAULT_MAX_BLOCKS_PER_FILTER); + let eth_filter = + EthFilter::new(provider, pool, eth_cache.clone(), config, Box::new(executor.clone())); launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await } diff --git a/crates/rpc/rpc-builder/src/constants.rs b/crates/rpc/rpc-builder/src/constants.rs index cbc051730c8b..6659174123fc 100644 --- a/crates/rpc/rpc-builder/src/constants.rs +++ b/crates/rpc/rpc-builder/src/constants.rs @@ -7,6 +7,9 @@ pub const DEFAULT_WS_RPC_PORT: u16 = 8546; /// The default port for the auth server. pub const DEFAULT_AUTH_PORT: u16 = 8551; +/// The default maximum block range allowed to filter +pub const DEFAULT_MAX_BLOCKS_PER_FILTER: u64 = 100_000; + /// The default maximum of logs in a single response. pub const DEFAULT_MAX_LOGS_PER_RESPONSE: usize = 20_000; diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index e3b9d4dcc735..c40226d6adab 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -1,9 +1,11 @@ -use crate::constants::{DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_TRACING_REQUESTS}; +use crate::constants::{ + DEFAULT_MAX_BLOCKS_PER_FILTER, DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_MAX_TRACING_REQUESTS, +}; use reth_rpc::{ eth::{ cache::{EthStateCache, EthStateCacheConfig}, gas_oracle::GasPriceOracleConfig, - RPC_DEFAULT_GAS_CAP, + EthFilterConfig, RPC_DEFAULT_GAS_CAP, }, BlockingTaskPool, EthApi, EthFilter, EthPubSub, }; @@ -33,6 +35,8 @@ pub struct EthConfig { pub gas_oracle: GasPriceOracleConfig, /// The maximum number of tracing calls that can be executed in concurrently. pub max_tracing_requests: u32, + /// Maximum number of blocks that could be scanned per filter request in `eth_getLogs` calls. + pub max_blocks_per_filter: u64, /// Maximum number of logs that can be returned in a single response in `eth_getLogs` calls. pub max_logs_per_response: usize, /// Gas limit for `eth_call` and call tracing RPC methods. @@ -44,6 +48,16 @@ pub struct EthConfig { pub stale_filter_ttl: std::time::Duration, } +impl EthConfig { + /// Returns the filter config for the `eth_filter` handler. + pub fn filter_config(&self) -> EthFilterConfig { + EthFilterConfig::default() + .max_blocks_per_filter(self.max_blocks_per_filter) + .max_logs_per_response(self.max_logs_per_response) + .stale_filter_ttl(self.stale_filter_ttl) + } +} + /// Default value for stale filter ttl const DEFAULT_STALE_FILTER_TTL: std::time::Duration = std::time::Duration::from_secs(5 * 60); @@ -53,6 +67,7 @@ impl Default for EthConfig { cache: EthStateCacheConfig::default(), gas_oracle: GasPriceOracleConfig::default(), max_tracing_requests: DEFAULT_MAX_TRACING_REQUESTS, + max_blocks_per_filter: DEFAULT_MAX_BLOCKS_PER_FILTER, max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE, rpc_gas_cap: RPC_DEFAULT_GAS_CAP.into(), stale_filter_ttl: DEFAULT_STALE_FILTER_TTL, @@ -79,6 +94,12 @@ impl EthConfig { self } + /// Configures the maximum block length to scan per `eth_getLogs` request + pub fn max_blocks_per_filter(mut self, max_blocks: u64) -> Self { + self.max_blocks_per_filter = max_blocks; + self + } + /// Configures the maximum number of logs per response pub fn max_logs_per_response(mut self, max_logs: usize) -> Self { self.max_logs_per_response = max_logs; diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index ada2b22910ef..cf6ffe875819 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1043,9 +1043,8 @@ where self.provider.clone(), self.pool.clone(), cache.clone(), - self.config.eth.max_logs_per_response, + self.config.eth.filter_config(), executor.clone(), - self.config.eth.stale_filter_ttl, ); let pubsub = EthPubSub::with_spawner( diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index e8e2f4a92d12..3427a06fe301 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -51,29 +51,32 @@ where /// Creates a new, shareable instance. /// /// This uses the given pool to get notified about new transactions, the provider to interact - /// with the blockchain, the cache to fetch cacheable data, like the logs and the - /// max_logs_per_response to limit the amount of logs returned in a single response - /// `eth_getLogs` + /// with the blockchain, the cache to fetch cacheable data, like the logs. + /// + /// See also [EthFilterConfig]. /// /// This also spawns a task that periodically clears stale filters. pub fn new( provider: Provider, pool: Pool, eth_cache: EthStateCache, - max_logs_per_response: usize, + config: EthFilterConfig, task_spawner: Box, - stale_filter_ttl: Duration, ) -> Self { + let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } = + config; let inner = EthFilterInner { provider, active_filters: Default::default(), pool, id_provider: Arc::new(EthSubscriptionIdProvider::default()), - max_logs_per_response, eth_cache, max_headers_range: MAX_HEADERS_RANGE, task_spawner, stale_filter_ttl, + // if not set, use the max value, which is effectively no limit + max_blocks_per_filter: max_blocks_per_filter.unwrap_or(u64::MAX), + max_logs_per_response: max_logs_per_response.unwrap_or(usize::MAX), }; let eth_filter = Self { inner: Arc::new(inner) }; @@ -324,6 +327,8 @@ struct EthFilterInner { active_filters: ActiveFilters, /// Provides ids to identify filters id_provider: Arc, + /// Maximum number of blocks that could be scanned per filter + max_blocks_per_filter: u64, /// Maximum number of logs that can be returned in a response max_logs_per_response: usize, /// The async cache frontend for eth related data @@ -424,6 +429,10 @@ where ) -> Result, FilterError> { trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range"); + if to_block - from_block > self.max_blocks_per_filter { + return Err(FilterError::QueryExceedsMaxBlocks(self.max_blocks_per_filter)) + } + let mut all_logs = Vec::new(); let filter_params = FilteredParams::new(Some(filter.clone())); @@ -431,8 +440,6 @@ where let address_filter = FilteredParams::address_filter(&filter.address); let topics_filter = FilteredParams::topics_filter(&filter.topics); - let is_multi_block_range = from_block != to_block; - // loop over the range of new blocks and check logs if the filter matches the log's bloom // filter for (from, to) in @@ -467,6 +474,7 @@ where // size check but only if range is multiple blocks, so we always return all // logs of a single block + let is_multi_block_range = from_block != to_block; if is_multi_block_range && all_logs.len() > self.max_logs_per_response { return Err(FilterError::QueryExceedsMaxResults( self.max_logs_per_response, @@ -481,6 +489,56 @@ where } } +/// Config for the filter +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct EthFilterConfig { + /// Maximum number of blocks that a filter can scan for logs. + /// + /// If `None` then no limit is enforced. + pub max_blocks_per_filter: Option, + /// Maximum number of logs that can be returned in a single response in `eth_getLogs` calls. + /// + /// If `None` then no limit is enforced. + pub max_logs_per_response: Option, + /// How long a filter remains valid after the last poll. + /// + /// A filter is considered stale if it has not been polled for longer than this duration and + /// will be removed. + pub stale_filter_ttl: Duration, +} + +impl EthFilterConfig { + /// Sets the maximum number of blocks that a filter can scan for logs. + pub fn max_blocks_per_filter(mut self, num: u64) -> Self { + self.max_blocks_per_filter = Some(num); + self + } + + /// Sets the maximum number of logs that can be returned in a single response in `eth_getLogs` + /// calls. + pub fn max_logs_per_response(mut self, num: usize) -> Self { + self.max_logs_per_response = Some(num); + self + } + + /// Sets how long a filter remains valid after the last poll before it will be removed. + pub fn stale_filter_ttl(mut self, duration: Duration) -> Self { + self.stale_filter_ttl = duration; + self + } +} + +impl Default for EthFilterConfig { + fn default() -> Self { + Self { + max_blocks_per_filter: None, + max_logs_per_response: None, + // 5min + stale_filter_ttl: Duration::from_secs(5 * 60), + } + } +} + /// All active filters #[derive(Debug, Clone, Default)] pub struct ActiveFilters { @@ -599,6 +657,8 @@ enum FilterKind { pub enum FilterError { #[error("filter not found")] FilterNotFound(FilterId), + #[error("query exceeds max block range {0}")] + QueryExceedsMaxBlocks(u64), #[error("query exceeds max results {0}")] QueryExceedsMaxResults(usize), #[error(transparent)] @@ -620,6 +680,9 @@ impl From for jsonrpsee::types::error::ErrorObject<'static> { rpc_error_with_code(jsonrpsee::types::error::INTERNAL_ERROR_CODE, err.to_string()) } FilterError::EthAPIError(err) => err.into(), + err @ FilterError::QueryExceedsMaxBlocks(_) => { + rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string()) + } err @ FilterError::QueryExceedsMaxResults(_) => { rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string()) } diff --git a/crates/rpc/rpc/src/eth/mod.rs b/crates/rpc/rpc/src/eth/mod.rs index 77baa36e32dd..90ee0e1042e8 100644 --- a/crates/rpc/rpc/src/eth/mod.rs +++ b/crates/rpc/rpc/src/eth/mod.rs @@ -15,6 +15,6 @@ pub(crate) mod utils; pub use api::{EthApi, EthApiSpec, EthTransactions, TransactionSource, RPC_DEFAULT_GAS_CAP}; pub use bundle::EthBundle; -pub use filter::EthFilter; +pub use filter::{EthFilter, EthFilterConfig}; pub use id_provider::EthSubscriptionIdProvider; pub use pubsub::EthPubSub;