diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index c0ec16a86da5..7cdaa3643e02 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -1458,6 +1458,9 @@ pub struct QueryConfig { #[clap(long, value_name = "VALUE", default_value = "8")] pub max_running_queries: u64, + #[clap(long, value_name = "VALUE", default_value = "true")] + pub global_statement_queue: bool, + /// The max total memory in bytes that can be used by this process. #[clap(long, value_name = "VALUE", default_value = "0")] pub max_server_memory_usage: u64, @@ -1788,6 +1791,7 @@ impl TryInto for QueryConfig { mysql_tls_server_key: self.mysql_tls_server_key, max_active_sessions: self.max_active_sessions, max_running_queries: self.max_running_queries, + global_statement_queue: self.global_statement_queue, max_server_memory_usage: self.max_server_memory_usage, max_memory_limit_enabled: self.max_memory_limit_enabled, clickhouse_http_handler_host: self.clickhouse_http_handler_host, @@ -1880,6 +1884,7 @@ impl From for QueryConfig { mysql_tls_server_key: inner.mysql_tls_server_key, max_active_sessions: inner.max_active_sessions, max_running_queries: inner.max_running_queries, + global_statement_queue: inner.global_statement_queue, max_server_memory_usage: inner.max_server_memory_usage, max_memory_limit_enabled: inner.max_memory_limit_enabled, diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index 03dfbb265b99..5af4534f1853 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -169,6 +169,7 @@ pub struct QueryConfig { pub mysql_tls_server_key: String, pub max_active_sessions: u64, pub max_running_queries: u64, + pub global_statement_queue: bool, pub max_server_memory_usage: u64, pub max_memory_limit_enabled: bool, pub clickhouse_http_handler_host: String, @@ -271,6 +272,7 @@ impl Default for QueryConfig { mysql_tls_server_key: "".to_string(), max_active_sessions: 256, max_running_queries: 8, + global_statement_queue: true, max_server_memory_usage: 0, max_memory_limit_enabled: false, clickhouse_http_handler_host: "127.0.0.1".to_string(), diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 6d3e283f4f67..7669236d1ff0 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -257,7 +257,7 @@ async fn plan_sql( if !acquire_queue { // If queue guard is not required, plan the statement directly. let plan = planner.plan_stmt(&extras.statement).await?; - return Ok((plan, extras, AcquireQueueGuard::create(None))); + return Ok((plan, extras, AcquireQueueGuard::create_global(None))); } let need_acquire_lock = need_acquire_lock(ctx.clone(), &extras.statement); diff --git a/src/query/service/src/sessions/queue_mgr.rs b/src/query/service/src/sessions/queue_mgr.rs index 2d0c23bc1f65..4d0396b59586 100644 --- a/src/query/service/src/sessions/queue_mgr.rs +++ b/src/query/service/src/sessions/queue_mgr.rs @@ -53,6 +53,9 @@ use databend_common_sql::PlanExtras; use log::info; use parking_lot::Mutex; use pin_project_lite::pin_project; +use tokio::sync::AcquireError as TokioAcquireError; +use tokio::sync::OwnedSemaphorePermit; +use tokio::sync::Semaphore; use tokio::time::error::Elapsed; use crate::sessions::QueryContext; @@ -87,6 +90,8 @@ pub(crate) struct Inner { pub struct QueueManager { permits: usize, meta_store: MetaStore, + semaphore: Arc, + global_statement_queue: bool, queue: Mutex>>, } @@ -101,7 +106,11 @@ impl QueueManager { }; info!("queue manager permits: {:?}", permits); - GlobalInstance::set(Self::create(permits, metastore)); + GlobalInstance::set(Self::create( + permits, + metastore, + conf.query.global_statement_queue, + )); Ok(()) } @@ -109,7 +118,11 @@ impl QueueManager { GlobalInstance::get::>() } - pub fn create(mut permits: usize, meta_store: MetaStore) -> Arc> { + pub fn create( + mut permits: usize, + meta_store: MetaStore, + global_statement_queue: bool, + ) -> Arc> { if permits == 0 { permits = usize::MAX >> 4; } @@ -117,7 +130,9 @@ impl QueueManager { Arc::new(QueueManager { permits, meta_store, + global_statement_queue, queue: Mutex::new(HashMap::new()), + semaphore: Arc::new(Semaphore::new(permits)), }) } @@ -156,21 +171,35 @@ impl QueueManager { ); let timeout = data.timeout(); - let semaphore_acquire = self.meta_store.new_acquired( - data.get_lock_key(), - self.permits as u64, - data.get_key(), // ID of this acquirer - data.lock_ttl(), - ); - let future = AcquireQueueFuture::create( - Arc::new(data), - tokio::time::timeout(timeout, semaphore_acquire), - self.clone(), - ); let start_time = SystemTime::now(); + let acquire_res = match self.global_statement_queue { + true => { + let semaphore_acquire = self.meta_store.new_acquired( + data.get_lock_key(), + self.permits as u64, + data.get_key(), // ID of this acquirer + data.lock_ttl(), + ); - return match future.await { + AcquireQueueFuture::create( + Arc::new(data), + tokio::time::timeout(timeout, semaphore_acquire), + self.clone(), + ) + .await + } + false => { + AcquireQueueFuture::create( + Arc::new(data), + tokio::time::timeout(timeout, self.semaphore.clone().acquire_owned()), + self.clone(), + ) + .await + } + }; + + return match acquire_res { Ok(v) => { info!("finished acquiring from queue, length: {}", self.length()); @@ -197,7 +226,7 @@ impl QueueManager { }; } - Ok(AcquireQueueGuard::create(None)) + Ok(AcquireQueueGuard::create_global(None)) } pub(crate) fn add_entity(&self, inner: Inner) -> Data::Key { @@ -231,28 +260,35 @@ impl QueueManager { } } -pub struct AcquireQueueGuard { - #[allow(dead_code)] - permit: Option, +pub enum AcquireQueueGuard { + Global(Option), + Local(Option), } impl Drop for AcquireQueueGuard { fn drop(&mut self) { - if self.permit.is_some() { - dec_session_running_acquired_queries(); + match self { + AcquireQueueGuard::Local(Some(_)) | AcquireQueueGuard::Global(Some(_)) => { + dec_session_running_acquired_queries(); + } + _ => {} } } } impl AcquireQueueGuard { - pub fn create(permit: Option) -> Self { - AcquireQueueGuard { permit } + pub fn create_global(permit: Option) -> Self { + AcquireQueueGuard::Global(permit) + } + + pub fn create_local(permit: Option) -> Self { + AcquireQueueGuard::Local(permit) } } pin_project! { - pub struct AcquireQueueFuture -where T: Future, Elapsed>> +pub struct AcquireQueueFuture +where T: Future, Elapsed>> { #[pin] inner: T, @@ -266,8 +302,8 @@ where T: Future AcquireQueueFuture -where T: Future, Elapsed>> +impl AcquireQueueFuture +where T: Future, Elapsed>> { pub fn create(data: Arc, inner: T, mgr: Arc>) -> Self { AcquireQueueFuture { @@ -281,53 +317,60 @@ where T: Future Future for AcquireQueueFuture -where T: Future, Elapsed>> -{ - type Output = Result; +macro_rules! impl_acquire_queue_future { + ($Permit:ty, $fn_name:ident, $Error:ty) => { + impl Future for AcquireQueueFuture + where T: Future, Elapsed>> + { + type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); - if this.is_abort.load(Ordering::SeqCst) { - return Poll::Ready(Err(Data::remove_error_message(this.key.take()))); - } + if this.is_abort.load(Ordering::SeqCst) { + return Poll::Ready(Err(Data::remove_error_message(this.key.take()))); + } + + match this.inner.poll(cx) { + Poll::Ready(res) => { + if let Some(key) = this.key.take() { + if this.manager.remove_entity(&key).is_none() { + return Poll::Ready(Err(Data::remove_error_message(Some(key)))); + } + } - match this.inner.poll(cx) { - Poll::Ready(res) => { - if let Some(key) = this.key.take() { - if this.manager.remove_entity(&key).is_none() { - return Poll::Ready(Err(Data::remove_error_message(Some(key)))); + Poll::Ready(match res { + Ok(Ok(v)) => Ok(AcquireQueueGuard::$fn_name(Some(v))), + Ok(Err(_)) => Err(ErrorCode::TokioError("acquire queue failure.")), + Err(_elapsed) => Err(ErrorCode::Timeout("query queuing timeout")), + }) } - } + Poll::Pending => { + if !*this.has_pending { + *this.has_pending = true; + } - Poll::Ready(match res { - Ok(Ok(v)) => Ok(AcquireQueueGuard::create(Some(v))), - Ok(Err(_)) => Err(ErrorCode::TokioError("acquire queue failure.")), - Err(_elapsed) => Err(ErrorCode::Timeout("query queuing timeout")), - }) - } - Poll::Pending => { - if !*this.has_pending { - *this.has_pending = true; - } + if let Some(data) = this.data.take() { + let waker = cx.waker().clone(); + *this.key = Some(this.manager.add_entity(Inner { + data, + waker, + instant: Instant::now(), + is_abort: this.is_abort.clone(), + })); + } - if let Some(data) = this.data.take() { - let waker = cx.waker().clone(); - *this.key = Some(this.manager.add_entity(Inner { - data, - waker, - instant: Instant::now(), - is_abort: this.is_abort.clone(), - })); + Poll::Pending + } } - - Poll::Pending } } - } + }; } +impl_acquire_queue_future!(Permit, create_global, AcquireError); +impl_acquire_queue_future!(OwnedSemaphorePermit, create_local, TokioAcquireError); + pub struct QueryEntry { ctx: Arc, pub query_id: String, diff --git a/src/query/service/tests/it/sessions/queue_mgr.rs b/src/query/service/tests/it/sessions/queue_mgr.rs index dfea15e88748..9e3771c049bd 100644 --- a/src/query/service/tests/it/sessions/queue_mgr.rs +++ b/src/query/service/tests/it/sessions/queue_mgr.rs @@ -67,171 +67,178 @@ impl QueueData for TestData { #[tokio::test(flavor = "multi_thread")] async fn test_passed_acquire() -> Result<()> { - let metastore = create_meta_store().await?; - let test_count = (SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_nanos() - % 5) as usize - + 5; - - let barrier = Arc::new(tokio::sync::Barrier::new(test_count)); - let queue = QueueManager::>::create(1, metastore); - let mut join_handles = Vec::with_capacity(test_count); - - let instant = Instant::now(); - for index in 0..test_count { - join_handles.push({ - let queue = queue.clone(); - let barrier = barrier.clone(); - databend_common_base::runtime::spawn(async move { - barrier.wait().await; - let _guard = queue - .acquire(TestData:: { - lock_id: String::from("test_passed_acquire"), - acquire_id: format!("TestData{}", index), - }) - .await?; - tokio::time::sleep(Duration::from_secs(1)).await; - Result::<()>::Ok(()) + for is_global in [true, false] { + let metastore = create_meta_store().await?; + let test_count = (SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + % 5) as usize + + 5; + + let barrier = Arc::new(tokio::sync::Barrier::new(test_count)); + let queue = QueueManager::>::create(1, metastore, is_global); + let mut join_handles = Vec::with_capacity(test_count); + + let instant = Instant::now(); + for index in 0..test_count { + join_handles.push({ + let queue = queue.clone(); + let barrier = barrier.clone(); + databend_common_base::runtime::spawn(async move { + barrier.wait().await; + let _guard = queue + .acquire(TestData:: { + lock_id: String::from("test_passed_acquire"), + acquire_id: format!("TestData{}", index), + }) + .await?; + tokio::time::sleep(Duration::from_secs(1)).await; + Result::<()>::Ok(()) + }) }) - }) - } + } - for join_handle in join_handles { - let _ = join_handle.await; - } + for join_handle in join_handles { + let _ = join_handle.await; + } - assert!(instant.elapsed() < Duration::from_secs(test_count as u64)); - assert_eq!(queue.length(), 0); + assert!(instant.elapsed() < Duration::from_secs(test_count as u64)); + assert_eq!(queue.length(), 0); + } Ok(()) } #[tokio::test(flavor = "multi_thread")] async fn test_serial_acquire() -> Result<()> { - let metastore = create_meta_store().await?; - let test_count = (SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_nanos() - % 5) as usize - + 5; - - let barrier = Arc::new(tokio::sync::Barrier::new(test_count)); - let queue = QueueManager::::create(1, metastore); - let mut join_handles = Vec::with_capacity(test_count); - - let instant = Instant::now(); - for index in 0..test_count { - join_handles.push({ - let queue = queue.clone(); - let barrier = barrier.clone(); - databend_common_base::runtime::spawn(async move { - barrier.wait().await; - let _guard = queue - .acquire(TestData { - lock_id: String::from("test_serial_acquire"), - acquire_id: format!("TestData{}", index), - }) - .await?; - tokio::time::sleep(Duration::from_secs(1)).await; - Result::<()>::Ok(()) + for is_global in [true, false] { + let metastore = create_meta_store().await?; + let test_count = (SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + % 5) as usize + + 5; + + let barrier = Arc::new(tokio::sync::Barrier::new(test_count)); + let queue = QueueManager::::create(1, metastore, is_global); + let mut join_handles = Vec::with_capacity(test_count); + + let instant = Instant::now(); + for index in 0..test_count { + join_handles.push({ + let queue = queue.clone(); + let barrier = barrier.clone(); + databend_common_base::runtime::spawn(async move { + barrier.wait().await; + let _guard = queue + .acquire(TestData { + lock_id: String::from("test_serial_acquire"), + acquire_id: format!("TestData{}", index), + }) + .await?; + tokio::time::sleep(Duration::from_secs(1)).await; + Result::<()>::Ok(()) + }) }) - }) - } - - for join_handle in join_handles { - let _ = join_handle.await; - } + } - assert!(instant.elapsed() >= Duration::from_secs(test_count as u64)); - assert_eq!(queue.length(), 0); + for join_handle in join_handles { + let _ = join_handle.await; + } + assert!(instant.elapsed() >= Duration::from_secs(test_count as u64)); + assert_eq!(queue.length(), 0); + } Ok(()) } #[tokio::test(flavor = "multi_thread")] async fn test_concurrent_acquire() -> Result<()> { - let metastore = create_meta_store().await?; - let test_count = (SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_nanos() - % 5) as usize - + 5; - - let barrier = Arc::new(tokio::sync::Barrier::new(test_count)); - let queue = QueueManager::::create(2, metastore); - let mut join_handles = Vec::with_capacity(test_count); - - let instant = Instant::now(); - for index in 0..test_count { - join_handles.push({ - let queue = queue.clone(); - let barrier = barrier.clone(); - databend_common_base::runtime::spawn(async move { - barrier.wait().await; - let _guard = queue - .acquire(TestData { - lock_id: String::from("test_concurrent_acquire"), - acquire_id: format!("TestData{}", index), - }) - .await?; - - tokio::time::sleep(Duration::from_secs(1)).await; - Result::<()>::Ok(()) + for is_global in [true, false] { + let metastore = create_meta_store().await?; + let test_count = (SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + % 5) as usize + + 5; + + let barrier = Arc::new(tokio::sync::Barrier::new(test_count)); + let queue = QueueManager::::create(2, metastore, is_global); + let mut join_handles = Vec::with_capacity(test_count); + + let instant = Instant::now(); + for index in 0..test_count { + join_handles.push({ + let queue = queue.clone(); + let barrier = barrier.clone(); + databend_common_base::runtime::spawn(async move { + barrier.wait().await; + let _guard = queue + .acquire(TestData { + lock_id: String::from("test_concurrent_acquire"), + acquire_id: format!("TestData{}", index), + }) + .await?; + + tokio::time::sleep(Duration::from_secs(1)).await; + Result::<()>::Ok(()) + }) }) - }) - } + } - for join_handle in join_handles { - let _ = join_handle.await; - } + for join_handle in join_handles { + let _ = join_handle.await; + } - assert!(instant.elapsed() >= Duration::from_secs((test_count / 2) as u64)); - assert!(instant.elapsed() < Duration::from_secs((test_count) as u64)); + assert!(instant.elapsed() >= Duration::from_secs((test_count / 2) as u64)); + assert!(instant.elapsed() < Duration::from_secs((test_count) as u64)); - assert_eq!(queue.length(), 0); + assert_eq!(queue.length(), 0); + } Ok(()) } #[tokio::test(flavor = "multi_thread")] async fn test_list_acquire() -> Result<()> { - let metastore = create_meta_store().await?; - let test_count = (SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_nanos() - % 5) as usize - + 5; - - let barrier = Arc::new(tokio::sync::Barrier::new(test_count)); - let queue = QueueManager::::create(1, metastore); - let mut join_handles = Vec::with_capacity(test_count); - - for index in 0..test_count { - join_handles.push({ - let queue = queue.clone(); - let barrier = barrier.clone(); - databend_common_base::runtime::spawn(async move { - barrier.wait().await; - let _guard = queue - .acquire(TestData { - lock_id: String::from("test_list_acquire"), - acquire_id: format!("TestData{}", index), - }) - .await?; - - tokio::time::sleep(Duration::from_secs(10)).await; - Result::<()>::Ok(()) + for is_global in [true, false] { + let metastore = create_meta_store().await?; + let test_count = (SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + % 5) as usize + + 5; + + let barrier = Arc::new(tokio::sync::Barrier::new(test_count)); + let queue = QueueManager::::create(1, metastore, is_global); + let mut join_handles = Vec::with_capacity(test_count); + + for index in 0..test_count { + join_handles.push({ + let queue = queue.clone(); + let barrier = barrier.clone(); + databend_common_base::runtime::spawn(async move { + barrier.wait().await; + let _guard = queue + .acquire(TestData { + lock_id: String::from("test_list_acquire"), + acquire_id: format!("TestData{}", index), + }) + .await?; + + tokio::time::sleep(Duration::from_secs(10)).await; + Result::<()>::Ok(()) + }) }) - }) - } + } - tokio::time::sleep(Duration::from_secs(5)).await; - assert_eq!(queue.length(), test_count - 1); + tokio::time::sleep(Duration::from_secs(5)).await; + assert_eq!(queue.length(), test_count - 1); + } Ok(()) } diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 65b68f34128a..c43f951d3539 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -106,6 +106,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'query' | 'flight_sql_handler_port' | '8900' | '' | | 'query' | 'flight_sql_tls_server_cert' | '' | '' | | 'query' | 'flight_sql_tls_server_key' | '' | '' | +| 'query' | 'global_statement_queue' | 'true' | '' | | 'query' | 'http_handler_host' | '127.0.0.1' | '' | | 'query' | 'http_handler_port' | '8000' | '' | | 'query' | 'http_handler_result_timeout_secs' | '60' | '' |