Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: protect datanode with concurrency limit. #4699

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
| `rpc_addr` | String | Unset | Deprecated, use `grpc.addr` instead. |
| `rpc_hostname` | String | Unset | Deprecated, use `grpc.hostname` instead. |
| `rpc_runtime_size` | Integer | Unset | Deprecated, use `grpc.runtime_size` instead. |
Expand Down
3 changes: 3 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ enable_telemetry = true
## Parallelism of initializing regions.
init_regions_parallelism = 16

## The maximum current queries allowed to be executed. Zero means unlimited.
max_concurrent_queries = 0

## Deprecated, use `grpc.addr` instead.
## @toml2docs:none-default
rpc_addr = "127.0.0.1:3001"
Expand Down
2 changes: 2 additions & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ pub struct DatanodeOptions {
pub meta_client: Option<MetaClientOptions>,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub max_concurrent_queries: usize,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub logging: LoggingOptions,
Expand Down Expand Up @@ -339,6 +340,7 @@ impl Default for DatanodeOptions {
meta_client: None,
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
max_concurrent_queries: 0,
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),
RegionEngineConfig::File(FileEngineConfig::default()),
Expand Down
5 changes: 4 additions & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl DatanodeBuilder {
&self,
event_listener: RegionServerEventListenerRef,
) -> Result<RegionServer> {
let opts = &self.opts;
let opts: &DatanodeOptions = &self.opts;

let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
Expand All @@ -334,6 +334,9 @@ impl DatanodeBuilder {
common_runtime::global_runtime(),
event_listener,
table_provider_factory,
opts.max_concurrent_queries,
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
Duration::from_millis(100),
);

let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
Expand Down
20 changes: 20 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::error::Error as TableError;
use tokio::time::error::Elapsed;

/// Business error of datanode.
#[derive(Snafu)]
Expand Down Expand Up @@ -347,6 +348,22 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to acquire permit, source closed"))]
ConcurrentQueryLimiterClosed {
#[snafu(source)]
error: tokio::sync::AcquireError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to acquire permit under timeouts"))]
ConcurrentQueryLimiterTimeout {
#[snafu(source)]
error: Elapsed,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -411,6 +428,9 @@ impl ErrorExt for Error {

FindLogicalRegions { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } => source.status_code(),
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
StatusCode::RegionBusy
}
}
}

Expand Down
93 changes: 88 additions & 5 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use api::region::RegionResponse;
use api::v1::region::{region_request, RegionResponse as RegionResponseV1};
Expand Down Expand Up @@ -58,10 +59,13 @@ use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio::time::timeout;
use tonic::{Request, Response, Result as TonicResult};

use crate::error::{
self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu,
HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
Expand Down Expand Up @@ -90,6 +94,8 @@ impl RegionServer {
runtime,
event_listener,
Arc::new(DummyTableProviderFactory),
0,
Duration::from_millis(0),
)
}

Expand All @@ -98,13 +104,19 @@ impl RegionServer {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
max_concurrent_queries: usize,
concurrent_query_limiter_timeout: Duration,
) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(
query_engine,
runtime,
event_listener,
table_provider_factory,
RegionServerParallelism::from_opts(
max_concurrent_queries,
concurrent_query_limiter_timeout,
),
)),
}
}
Expand Down Expand Up @@ -167,6 +179,11 @@ impl RegionServer {
&self,
request: api::v1::region::QueryRequest,
) -> Result<SendableRecordBatchStream> {
let permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
} else {
None
};
let region_id = RegionId::from_u64(request.region_id);
let provider = self.table_provider(region_id).await?;
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
Expand All @@ -189,17 +206,27 @@ impl RegionServer {
.await
.context(DecodeLogicalPlanSnafu)?;

self.inner
let res = self
.inner
.handle_read(QueryRequest {
header: request.header,
region_id,
plan,
})
.await
.await;
if let Some(_permit) = permit {
drop(_permit)
};
res
}

#[tracing::instrument(skip_all)]
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
let permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
} else {
None
};
let provider = self.table_provider(request.region_id).await?;

struct RegionDataSourceInjector {
Expand Down Expand Up @@ -228,9 +255,14 @@ impl RegionServer {
.context(DataFusionSnafu)?
.data;

self.inner
let res = self
.inner
.handle_read(QueryRequest { plan, ..request })
.await
.await;
if let Some(_permit) = permit {
drop(_permit)
};
res
}

/// Returns all opened and reportable regions.
Expand Down Expand Up @@ -450,6 +482,36 @@ struct RegionServerInner {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
// The number of queries allowed to be executed at the same time.
// Act as last line of defense on datanode to prevent query overloading.
parallelism: Option<RegionServerParallelism>,
}

struct RegionServerParallelism {
semaphore: Semaphore,
timeout: Duration,
}

impl RegionServerParallelism {
pub fn from_opts(
max_concurrent_queries: usize,
concurrent_query_limiter_timeout: Duration,
) -> Option<Self> {
if max_concurrent_queries == 0 {
return None;
}
Some(RegionServerParallelism {
semaphore: Semaphore::new(max_concurrent_queries),
timeout: concurrent_query_limiter_timeout,
})
}

pub async fn acquire(&self) -> Result<SemaphorePermit> {
timeout(self.timeout, self.semaphore.acquire())
.await
.context(ConcurrentQueryLimiterTimeoutSnafu)?
.context(ConcurrentQueryLimiterClosedSnafu)
}
}

enum CurrentEngine {
Expand Down Expand Up @@ -478,6 +540,7 @@ impl RegionServerInner {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
parallelism: Option<RegionServerParallelism>,
) -> Self {
Self {
engines: RwLock::new(HashMap::new()),
Expand All @@ -486,6 +549,7 @@ impl RegionServerInner {
runtime,
event_listener,
table_provider_factory,
parallelism,
}
}

Expand Down Expand Up @@ -1284,4 +1348,23 @@ mod tests {
assert(result);
}
}

#[tokio::test]
async fn test_region_server_parallism() {
let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
let first_query = p.acquire().await;
assert!(first_query.is_ok());
let second_query = p.acquire().await;
assert!(second_query.is_ok());
let third_query = p.acquire().await;
assert!(third_query.is_err());
let err = third_query.unwrap_err();
assert_eq!(
err.output_msg(),
"Failed to acquire permit under timeouts: deadline has elapsed".to_string()
);
drop(first_query);
let forth_query = p.acquire().await;
assert!(forth_query.is_ok());
}
}