From 6f59660e5688996820b0a1f61e0affd364b8a757 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Mon, 8 May 2023 14:53:32 +0800 Subject: [PATCH] fix: enable kill query before pipeline started. --- src/query/catalog/src/table_context.rs | 4 +-- .../service/src/interpreters/interpreter.rs | 25 +++++++++------ .../interpreter_table_recluster.rs | 2 +- .../interpreter_user_stage_remove.rs | 3 +- .../pipelines/executor/pipeline_executor.rs | 11 +++---- src/query/service/src/sessions/query_ctx.rs | 12 ++++--- .../service/src/sessions/query_ctx_shared.rs | 32 +++++++++++++++++-- .../src/stream/table_read_block_stream.rs | 2 +- .../src/test_kits/table_test_fixture.rs | 2 +- .../it/storages/fuse/operations/commit.rs | 7 ++-- .../mutation/block_compact_mutator.rs | 2 +- .../fuse/operations/mutation/deletion.rs | 2 +- 12 files changed, 69 insertions(+), 35 deletions(-) diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index a8fd68153e7ff..cdb43d1574cfd 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -16,7 +16,6 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::collections::HashSet; use std::net::SocketAddr; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::SystemTime; @@ -108,7 +107,8 @@ pub trait TableContext: Send + Sync { fn get_catalog(&self, catalog_name: &str) -> Result>; fn get_id(&self) -> String; fn get_current_catalog(&self) -> String; - fn get_aborting(&self) -> Arc; + fn check_aborting(&self) -> Result<()>; + fn get_error(&self) -> Option; fn get_current_database(&self) -> String; fn get_current_user(&self) -> Result; fn get_current_role(&self) -> Option; diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 5051ea20d8ff2..4f5e57b14c0d9 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -53,6 +53,11 @@ pub trait Interpreter: Sync + Send { InterpreterMetrics::record_query_start(&ctx); log_query_start(&ctx); + if let Err(err) = ctx.check_aborting() { + log_query_finished(&ctx, Some(err.clone())); + return Err(err); + } + let mut build_res = match self.execute2().await { Ok(build_res) => build_res, Err(build_error) => { @@ -91,18 +96,18 @@ pub trait Interpreter: Sync + Send { let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?; - ctx.set_executor(Arc::downgrade(&complete_executor.get_inner())); + ctx.set_executor(complete_executor.get_inner())?; complete_executor.execute()?; - return Ok(Box::pin(DataBlockStream::create(None, vec![]))); + Ok(Box::pin(DataBlockStream::create(None, vec![]))) + } else { + let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; + + ctx.set_executor(pulling_executor.get_inner())?; + Ok(Box::pin(ProgressStream::try_create( + Box::pin(PullingExecutorStream::create(pulling_executor)?), + ctx.get_result_progress(), + )?)) } - - let pulling_executor = PipelinePullingExecutor::from_pipelines(build_res, settings)?; - - ctx.set_executor(Arc::downgrade(&pulling_executor.get_inner())); - Ok(Box::pin(ProgressStream::try_create( - Box::pin(PullingExecutorStream::create(pulling_executor)?), - ctx.get_result_progress(), - )?)) } /// The core of the databend processor which will execute the logical plan and build the pipeline diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 6c598590afc9e..73efc84963bab 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -91,7 +91,7 @@ impl Interpreter for ReclusterTableInterpreter { let executor_settings = ExecutorSettings::try_create(&settings, query_id)?; let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?; - ctx.set_executor(Arc::downgrade(&executor.get_inner())); + ctx.set_executor(executor.get_inner())?; executor.execute()?; drop(executor); diff --git a/src/query/service/src/interpreters/interpreter_user_stage_remove.rs b/src/query/service/src/interpreters/interpreter_user_stage_remove.rs index 6fffb41ea2e26..20e3a7eede86f 100644 --- a/src/query/service/src/interpreters/interpreter_user_stage_remove.rs +++ b/src/query/service/src/interpreters/interpreter_user_stage_remove.rs @@ -75,8 +75,7 @@ impl Interpreter for RemoveUserStageInterpreter { error!("Failed to delete file: {:?}, error: {}", chunk, e); } - let aborting = self.ctx.get_aborting(); - if aborting.load(std::sync::atomic::Ordering::SeqCst) { + if self.ctx.check_aborting().is_err() { return Ok(PipelineBuildResult::create()); } } diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 44b441ab3a60e..e32c98a62923f 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -185,7 +185,6 @@ impl PipelineExecutor { drop(guard); catch_unwind(move || on_finished_callback(error))??; } - Ok(()) } @@ -282,7 +281,6 @@ impl PipelineExecutor { let sync_queue = std::mem::take(&mut init_schedule_queue.sync_queue); self.global_tasks_queue.init_sync_tasks(sync_queue); - Ok(()) } } @@ -400,10 +398,11 @@ impl Drop for PipelineExecutor { let mut guard = self.on_finished_callback.lock(); if let Some(on_finished_callback) = guard.take() { drop(guard); - let cause = Some(ErrorCode::Internal( - "Pipeline illegal state: not successfully shutdown.", - )); - if let Err(cause) = catch_unwind(move || on_finished_callback(&cause)).flatten() { + let cause = match self.finished_error.lock().as_ref() { + Some(cause) => cause.clone(), + None => ErrorCode::Internal("Pipeline illegal state: not successfully shutdown."), + }; + if let Err(cause) = catch_unwind(move || on_finished_callback(&Some(cause))).flatten() { warn!("Pipeline executor shutdown failure, {:?}", cause); } } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 6c7ff4c731753..c947dc1309fcd 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -19,11 +19,9 @@ use std::collections::VecDeque; use std::future::Future; use std::net::SocketAddr; use std::str::FromStr; -use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::Weak; use std::time::SystemTime; use common_base::base::tokio::task::JoinHandle; @@ -205,7 +203,7 @@ impl QueryContext { *self.shared.init_query_id.write() = id; } - pub fn set_executor(&self, weak_ptr: Weak) { + pub fn set_executor(&self, weak_ptr: Arc) -> Result<()> { self.shared.set_executor(weak_ptr) } @@ -359,8 +357,12 @@ impl TableContext for QueryContext { self.shared.get_current_catalog() } - fn get_aborting(&self) -> Arc { - self.shared.get_aborting() + fn check_aborting(&self) -> Result<()> { + self.shared.check_aborting() + } + + fn get_error(&self) -> Option { + self.shared.get_error() } fn get_current_database(&self) -> String { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 28b98f9512a9f..0c44c8e994f97 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -134,6 +134,11 @@ impl QueryContextShared { *guard = Some(err); } + pub fn get_error(&self) -> Option { + let guard = self.error.lock(); + (*guard).clone() + } + pub fn set_on_error_map(&self, map: Arc>>) { let mut guard = self.on_error_map.write(); *guard = Some(map); @@ -174,6 +179,18 @@ impl QueryContextShared { self.aborting.clone() } + pub fn check_aborting(&self) -> Result<()> { + if self.aborting.load(Ordering::Acquire) { + Err(self.get_error().unwrap_or_else(|| { + ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + ) + })) + } else { + Ok(()) + } + } + pub fn get_current_database(&self) -> String { self.session.get_current_database() } @@ -328,9 +345,18 @@ impl QueryContextShared { *guard = Some(affect); } - pub fn set_executor(&self, weak_ptr: Weak) { - let mut executor = self.executor.write(); - *executor = weak_ptr; + pub fn set_executor(&self, executor: Arc) -> Result<()> { + let mut guard = self.executor.write(); + match self.check_aborting() { + Ok(_) => { + *guard = Arc::downgrade(&executor); + Ok(()) + } + Err(err) => { + executor.finish(Some(err.clone())); + Err(err) + } + } } pub fn push_precommit_block(&self, block: DataBlock) { diff --git a/src/query/service/src/stream/table_read_block_stream.rs b/src/query/service/src/stream/table_read_block_stream.rs index 108523dc692d1..858345a36918c 100644 --- a/src/query/service/src/stream/table_read_block_stream.rs +++ b/src/query/service/src/stream/table_read_block_stream.rs @@ -52,7 +52,7 @@ impl ReadDataBlockStream for T { let query_id = ctx.get_id(); let executor_settings = ExecutorSettings::try_create(&settings, query_id)?; let executor = PipelinePullingExecutor::try_create(pipeline, executor_settings)?; - ctx.set_executor(Arc::downgrade(&executor.get_inner())); + ctx.set_executor(executor.get_inner())?; Ok(Box::pin(PullingExecutorStream::create(executor)?)) } } diff --git a/src/query/service/src/test_kits/table_test_fixture.rs b/src/query/service/src/test_kits/table_test_fixture.rs index 6e4cbe7662f0d..bdee81625d08a 100644 --- a/src/query/service/src/test_kits/table_test_fixture.rs +++ b/src/query/service/src/test_kits/table_test_fixture.rs @@ -379,7 +379,7 @@ pub fn execute_pipeline(ctx: Arc, mut res: PipelineBuildResult) -> let mut pipelines = res.sources_pipelines; pipelines.push(res.main_pipeline); let executor = PipelineCompleteExecutor::from_pipelines(pipelines, executor_settings)?; - ctx.set_executor(Arc::downgrade(&executor.get_inner())); + ctx.set_executor(executor.get_inner())?; executor.execute() } diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index cc7bd18aee5b3..1092a146add52 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -14,7 +14,6 @@ use std::any::Any; use std::collections::HashMap; use std::collections::HashSet; -use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; @@ -465,7 +464,11 @@ impl TableContext for CtxDelegation { "default".to_owned() } - fn get_aborting(&self) -> Arc { + fn check_aborting(&self) -> Result<()> { + todo!() + } + + fn get_error(&self) -> Option { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index 44ffc2035194d..b1b3441e3a717 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -117,7 +117,7 @@ async fn do_compact(ctx: Arc, table: Arc) -> Result