From 75c340ad2a96768cbf8289eaf1e0b16aeeb322ff Mon Sep 17 00:00:00 2001 From: lyang24 Date: Sun, 8 Sep 2024 22:08:00 -0700 Subject: [PATCH] Adding parallelism in region server to protect datanode from query overload. --- config/config.md | 1 + config/datanode.example.toml | 3 + src/datanode/src/config.rs | 2 + src/datanode/src/datanode.rs | 5 +- src/datanode/src/error.rs | 20 +++++++ src/datanode/src/region_server.rs | 93 +++++++++++++++++++++++++++++-- 6 files changed, 118 insertions(+), 6 deletions(-) diff --git a/config/config.md b/config/config.md index 235b1f86fd61..385d3eb282f1 100644 --- a/config/config.md +++ b/config/config.md @@ -335,6 +335,7 @@ | `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. | | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. | | `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. | +| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. | | `rpc_addr` | String | Unset | Deprecated, use `grpc.addr` instead. | | `rpc_hostname` | String | Unset | Deprecated, use `grpc.hostname` instead. | | `rpc_runtime_size` | Integer | Unset | Deprecated, use `grpc.runtime_size` instead. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 4388c4420f8b..e4a3dca6d328 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -19,6 +19,9 @@ enable_telemetry = true ## Parallelism of initializing regions. init_regions_parallelism = 16 +## The maximum current queries allowed to be executed. Zero means unlimited. +max_concurrent_queries = 0 + ## Deprecated, use `grpc.addr` instead. ## @toml2docs:none-default rpc_addr = "127.0.0.1:3001" diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 7cb18131ed3e..70be2513b2e4 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -305,6 +305,7 @@ pub struct DatanodeOptions { pub meta_client: Option, pub wal: DatanodeWalConfig, pub storage: StorageConfig, + pub max_concurrent_queries: usize, /// Options for different store engines. pub region_engine: Vec, pub logging: LoggingOptions, @@ -339,6 +340,7 @@ impl Default for DatanodeOptions { meta_client: None, wal: DatanodeWalConfig::default(), storage: StorageConfig::default(), + max_concurrent_queries: 0, region_engine: vec![ RegionEngineConfig::Mito(MitoConfig::default()), RegionEngineConfig::File(FileEngineConfig::default()), diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 149aa44ebe34..f5d7bd9fc627 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -314,7 +314,7 @@ impl DatanodeBuilder { &self, event_listener: RegionServerEventListenerRef, ) -> Result { - let opts = &self.opts; + let opts: &DatanodeOptions = &self.opts; let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in datanode only executes plan with resolved table source. @@ -334,6 +334,9 @@ impl DatanodeBuilder { common_runtime::global_runtime(), event_listener, table_provider_factory, + opts.max_concurrent_queries, + //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter. + Duration::from_millis(100), ); let object_store_manager = Self::build_object_store_manager(&opts.storage).await?; diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 5717d0a40389..0b36245924b4 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -22,6 +22,7 @@ use common_macro::stack_trace_debug; use snafu::{Location, Snafu}; use store_api::storage::RegionId; use table::error::Error as TableError; +use tokio::time::error::Elapsed; /// Business error of datanode. #[derive(Snafu)] @@ -347,6 +348,22 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to acquire permit, source closed"))] + ConcurrentQueryLimiterClosed { + #[snafu(source)] + error: tokio::sync::AcquireError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to acquire permit under timeouts"))] + ConcurrentQueryLimiterTimeout { + #[snafu(source)] + error: Elapsed, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -411,6 +428,9 @@ impl ErrorExt for Error { FindLogicalRegions { source, .. } => source.status_code(), BuildMitoEngine { source, .. } => source.status_code(), + ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => { + StatusCode::RegionBusy + } } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 56068a38c3aa..f2858e5e5ee4 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use std::fmt::Debug; use std::ops::Deref; use std::sync::{Arc, RwLock}; +use std::time::Duration; use api::region::RegionResponse; use api::v1::region::{region_request, RegionResponse as RegionResponseV1}; @@ -58,10 +59,13 @@ use store_api::region_request::{ AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest, }; use store_api::storage::RegionId; +use tokio::sync::{Semaphore, SemaphorePermit}; +use tokio::time::timeout; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ - self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, + self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu, + ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, @@ -90,6 +94,8 @@ impl RegionServer { runtime, event_listener, Arc::new(DummyTableProviderFactory), + 0, + Duration::from_millis(0), ) } @@ -98,6 +104,8 @@ impl RegionServer { runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, + max_concurrent_queries: usize, + concurrent_query_limiter_timeout: Duration, ) -> Self { Self { inner: Arc::new(RegionServerInner::new( @@ -105,6 +113,10 @@ impl RegionServer { runtime, event_listener, table_provider_factory, + RegionServerParallelism::from_opts( + max_concurrent_queries, + concurrent_query_limiter_timeout, + ), )), } } @@ -167,6 +179,11 @@ impl RegionServer { &self, request: api::v1::region::QueryRequest, ) -> Result { + let permit = if let Some(p) = &self.inner.parallelism { + Some(p.acquire().await?) + } else { + None + }; let region_id = RegionId::from_u64(request.region_id); let provider = self.table_provider(region_id).await?; let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider)); @@ -189,17 +206,27 @@ impl RegionServer { .await .context(DecodeLogicalPlanSnafu)?; - self.inner + let res = self + .inner .handle_read(QueryRequest { header: request.header, region_id, plan, }) - .await + .await; + if let Some(_permit) = permit { + drop(_permit) + }; + res } #[tracing::instrument(skip_all)] pub async fn handle_read(&self, request: QueryRequest) -> Result { + let permit = if let Some(p) = &self.inner.parallelism { + Some(p.acquire().await?) + } else { + None + }; let provider = self.table_provider(request.region_id).await?; struct RegionDataSourceInjector { @@ -228,9 +255,14 @@ impl RegionServer { .context(DataFusionSnafu)? .data; - self.inner + let res = self + .inner .handle_read(QueryRequest { plan, ..request }) - .await + .await; + if let Some(_permit) = permit { + drop(_permit) + }; + res } /// Returns all opened and reportable regions. @@ -450,6 +482,36 @@ struct RegionServerInner { runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, + // The number of queries allowed to be executed at the same time. + // Act as last line of defense on datanode to prevent query overloading. + parallelism: Option, +} + +struct RegionServerParallelism { + semaphore: Semaphore, + timeout: Duration, +} + +impl RegionServerParallelism { + pub fn from_opts( + max_concurrent_queries: usize, + concurrent_query_limiter_timeout: Duration, + ) -> Option { + if max_concurrent_queries == 0 { + return None; + } + Some(RegionServerParallelism { + semaphore: Semaphore::new(max_concurrent_queries), + timeout: concurrent_query_limiter_timeout, + }) + } + + pub async fn acquire(&self) -> Result { + timeout(self.timeout, self.semaphore.acquire()) + .await + .context(ConcurrentQueryLimiterTimeoutSnafu)? + .context(ConcurrentQueryLimiterClosedSnafu) + } } enum CurrentEngine { @@ -478,6 +540,7 @@ impl RegionServerInner { runtime: Runtime, event_listener: RegionServerEventListenerRef, table_provider_factory: TableProviderFactoryRef, + parallelism: Option, ) -> Self { Self { engines: RwLock::new(HashMap::new()), @@ -486,6 +549,7 @@ impl RegionServerInner { runtime, event_listener, table_provider_factory, + parallelism, } } @@ -1284,4 +1348,23 @@ mod tests { assert(result); } } + + #[tokio::test] + async fn test_region_server_parallism() { + let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap(); + let first_query = p.acquire().await; + assert!(first_query.is_ok()); + let second_query = p.acquire().await; + assert!(second_query.is_ok()); + let third_query = p.acquire().await; + assert!(third_query.is_err()); + let err = third_query.unwrap_err(); + assert_eq!( + err.output_msg(), + "Failed to acquire permit under timeouts: deadline has elapsed".to_string() + ); + drop(first_query); + let forth_query = p.acquire().await; + assert!(forth_query.is_ok()); + } }