Skip to content

Commit

Permalink
Adding parallelism in region server to protect datanode from query ov…
Browse files Browse the repository at this point in the history
…erload.
  • Loading branch information
lyang24 committed Sep 19, 2024
1 parent 67fb3d0 commit 75c340a
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 6 deletions.
1 change: 1 addition & 0 deletions config/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
| `rpc_addr` | String | Unset | Deprecated, use `grpc.addr` instead. |
| `rpc_hostname` | String | Unset | Deprecated, use `grpc.hostname` instead. |
| `rpc_runtime_size` | Integer | Unset | Deprecated, use `grpc.runtime_size` instead. |
Expand Down
3 changes: 3 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ enable_telemetry = true
## Parallelism of initializing regions.
init_regions_parallelism = 16

## The maximum current queries allowed to be executed. Zero means unlimited.
max_concurrent_queries = 0

## Deprecated, use `grpc.addr` instead.
## @toml2docs:none-default
rpc_addr = "127.0.0.1:3001"
Expand Down
2 changes: 2 additions & 0 deletions src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ pub struct DatanodeOptions {
pub meta_client: Option<MetaClientOptions>,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub max_concurrent_queries: usize,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub logging: LoggingOptions,
Expand Down Expand Up @@ -339,6 +340,7 @@ impl Default for DatanodeOptions {
meta_client: None,
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
max_concurrent_queries: 0,
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),
RegionEngineConfig::File(FileEngineConfig::default()),
Expand Down
5 changes: 4 additions & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl DatanodeBuilder {
&self,
event_listener: RegionServerEventListenerRef,
) -> Result<RegionServer> {
let opts = &self.opts;
let opts: &DatanodeOptions = &self.opts;

let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
Expand All @@ -334,6 +334,9 @@ impl DatanodeBuilder {
common_runtime::global_runtime(),
event_listener,
table_provider_factory,
opts.max_concurrent_queries,
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
Duration::from_millis(100),
);

let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
Expand Down
20 changes: 20 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::error::Error as TableError;
use tokio::time::error::Elapsed;

/// Business error of datanode.
#[derive(Snafu)]
Expand Down Expand Up @@ -347,6 +348,22 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to acquire permit, source closed"))]
ConcurrentQueryLimiterClosed {
#[snafu(source)]
error: tokio::sync::AcquireError,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to acquire permit under timeouts"))]
ConcurrentQueryLimiterTimeout {
#[snafu(source)]
error: Elapsed,
#[snafu(implicit)]
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -411,6 +428,9 @@ impl ErrorExt for Error {

FindLogicalRegions { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } => source.status_code(),
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
StatusCode::RegionBusy
}
}
}

Expand Down
93 changes: 88 additions & 5 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::time::Duration;

use api::region::RegionResponse;
use api::v1::region::{region_request, RegionResponse as RegionResponseV1};
Expand Down Expand Up @@ -58,10 +59,13 @@ use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit};
use tokio::time::timeout;
use tonic::{Request, Response, Result as TonicResult};

use crate::error::{
self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu,
HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
Expand Down Expand Up @@ -90,6 +94,8 @@ impl RegionServer {
runtime,
event_listener,
Arc::new(DummyTableProviderFactory),
0,
Duration::from_millis(0),
)
}

Expand All @@ -98,13 +104,19 @@ impl RegionServer {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
max_concurrent_queries: usize,
concurrent_query_limiter_timeout: Duration,
) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(
query_engine,
runtime,
event_listener,
table_provider_factory,
RegionServerParallelism::from_opts(
max_concurrent_queries,
concurrent_query_limiter_timeout,
),
)),
}
}
Expand Down Expand Up @@ -167,6 +179,11 @@ impl RegionServer {
&self,
request: api::v1::region::QueryRequest,
) -> Result<SendableRecordBatchStream> {
let permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
} else {
None
};
let region_id = RegionId::from_u64(request.region_id);
let provider = self.table_provider(region_id).await?;
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
Expand All @@ -189,17 +206,27 @@ impl RegionServer {
.await
.context(DecodeLogicalPlanSnafu)?;

self.inner
let res = self
.inner
.handle_read(QueryRequest {
header: request.header,
region_id,
plan,
})
.await
.await;
if let Some(_permit) = permit {
drop(_permit)
};
res
}

#[tracing::instrument(skip_all)]
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
let permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
} else {
None
};
let provider = self.table_provider(request.region_id).await?;

struct RegionDataSourceInjector {
Expand Down Expand Up @@ -228,9 +255,14 @@ impl RegionServer {
.context(DataFusionSnafu)?
.data;

self.inner
let res = self
.inner
.handle_read(QueryRequest { plan, ..request })
.await
.await;
if let Some(_permit) = permit {
drop(_permit)
};
res
}

/// Returns all opened and reportable regions.
Expand Down Expand Up @@ -450,6 +482,36 @@ struct RegionServerInner {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
// The number of queries allowed to be executed at the same time.
// Act as last line of defense on datanode to prevent query overloading.
parallelism: Option<RegionServerParallelism>,
}

struct RegionServerParallelism {
semaphore: Semaphore,
timeout: Duration,
}

impl RegionServerParallelism {
pub fn from_opts(
max_concurrent_queries: usize,
concurrent_query_limiter_timeout: Duration,
) -> Option<Self> {
if max_concurrent_queries == 0 {
return None;
}
Some(RegionServerParallelism {
semaphore: Semaphore::new(max_concurrent_queries),
timeout: concurrent_query_limiter_timeout,
})
}

pub async fn acquire(&self) -> Result<SemaphorePermit> {
timeout(self.timeout, self.semaphore.acquire())
.await
.context(ConcurrentQueryLimiterTimeoutSnafu)?
.context(ConcurrentQueryLimiterClosedSnafu)
}
}

enum CurrentEngine {
Expand Down Expand Up @@ -478,6 +540,7 @@ impl RegionServerInner {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
parallelism: Option<RegionServerParallelism>,
) -> Self {
Self {
engines: RwLock::new(HashMap::new()),
Expand All @@ -486,6 +549,7 @@ impl RegionServerInner {
runtime,
event_listener,
table_provider_factory,
parallelism,
}
}

Expand Down Expand Up @@ -1284,4 +1348,23 @@ mod tests {
assert(result);
}
}

#[tokio::test]
async fn test_region_server_parallism() {
let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
let first_query = p.acquire().await;
assert!(first_query.is_ok());
let second_query = p.acquire().await;
assert!(second_query.is_ok());
let third_query = p.acquire().await;
assert!(third_query.is_err());
let err = third_query.unwrap_err();
assert_eq!(
err.output_msg(),
"Failed to acquire permit under timeouts: deadline has elapsed".to_string()
);
drop(first_query);
let forth_query = p.acquire().await;
assert!(forth_query.is_ok());
}
}

0 comments on commit 75c340a

Please sign in to comment.