From ea4d84b4574beab4e3c665db5a5be3b8c802383b Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Thu, 29 Sep 2022 15:14:02 +0800 Subject: [PATCH 1/4] fix(processor): try fix cannot kill optimize --- src/common/base/src/base/runtime.rs | 5 +++-- src/query/catalog/src/table_context.rs | 2 ++ .../transforms/transform_block_compact.rs | 22 ++++++++++++++++++- .../transforms/transform_compact.rs | 18 ++++++++++++--- .../transforms/transform_sort_merge.rs | 3 ++- .../transforms/hash_join/hash_join_state.rs | 5 +++-- .../transforms/hash_join/join_hash_table.rs | 5 +++-- .../transforms/transform_mark_join.rs | 5 +++-- .../transforms/transform_right_join.rs | 5 +++-- src/query/service/src/sessions/query_ctx.rs | 6 +++++ .../service/src/sessions/query_ctx_shared.rs | 9 ++++++++ .../src/sql/executor/pipeline_builder.rs | 4 ++++ .../storages/fuse/src/operations/append.rs | 1 + .../storages/fuse/src/operations/compact.rs | 1 + .../storages/fuse/src/operations/recluster.rs | 3 +++ 15 files changed, 79 insertions(+), 15 deletions(-) diff --git a/src/common/base/src/base/runtime.rs b/src/common/base/src/base/runtime.rs index f5683c8bc6d8f..9788073212fce 100644 --- a/src/common/base/src/base/runtime.rs +++ b/src/common/base/src/base/runtime.rs @@ -103,9 +103,10 @@ impl Runtime { let _ = runtime.block_on(recv_stop); let instant = Instant::now(); // We wait up to 3 seconds to complete the runtime shutdown. - runtime.shutdown_timeout(Duration::from_secs(3)); + // runtime.shutdown_timeout(Duration::from_secs(3)); - instant.elapsed() >= Duration::from_secs(3) + false + // instant.elapsed() >= Duration::from_secs(3) }); Ok(Runtime { diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 3a81d103b0c11..b56d0d22b7913 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::net::SocketAddr; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use common_base::base::Progress; @@ -72,6 +73,7 @@ pub trait TableContext: Send + Sync { fn get_catalog(&self, catalog_name: &str) -> Result>; fn get_id(&self) -> String; fn get_current_catalog(&self) -> String; + fn get_aborting(&self) -> Arc; fn get_current_database(&self) -> String; fn get_config(&self) -> Config; fn get_current_user(&self) -> Result; diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_block_compact.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_block_compact.rs index 1f98c231d492b..be0fb636bf17a 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_block_compact.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_block_compact.rs @@ -13,10 +13,12 @@ // limitations under the License. use common_datablocks::DataBlock; +use common_exception::ErrorCode; use common_exception::Result; use super::Compactor; use super::TransformCompact; +use crate::processors::transforms::Aborting; pub struct BlockCompactor { max_rows_per_block: usize, @@ -106,12 +108,18 @@ impl Compactor for BlockCompactor { Ok(res) } - fn compact_final(&self, blocks: &[DataBlock]) -> Result> { + fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result> { let mut res = Vec::with_capacity(blocks.len()); let mut temp_blocks = vec![]; let mut accumulated_rows = 0; for block in blocks.iter() { + if aborting() { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + // Perfect block, no need to compact if block.num_rows() <= self.max_rows_per_block && (block.num_rows() >= self.min_rows_per_block @@ -134,6 +142,12 @@ impl Compactor for BlockCompactor { temp_blocks.push(block); while accumulated_rows >= self.max_rows_per_block { + if aborting() { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + let block = DataBlock::concat_blocks(&temp_blocks)?; res.push(block.slice(0, self.max_rows_per_block)); accumulated_rows -= self.max_rows_per_block; @@ -150,6 +164,12 @@ impl Compactor for BlockCompactor { } if accumulated_rows != 0 { + if aborting() { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + let block = DataBlock::concat_blocks(&temp_blocks)?; res.push(block); } diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs index e0ca23dafe065..79fd2f3118541 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact.rs @@ -14,8 +14,10 @@ use std::any::Any; use std::collections::VecDeque; +use std::sync::atomic::Ordering; use std::sync::Arc; +use common_catalog::table_context::TableContext; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; @@ -28,8 +30,11 @@ use common_pipeline_core::processors::Processor; pub struct TransformCompact { state: ProcessorState, compactor: T, + aborting: Aborting, } +pub type Aborting = Arc bool + Send + Sync + 'static>>; + /// Compactor is a trait that defines how to compact blocks. pub trait Compactor { fn name() -> &'static str; @@ -45,15 +50,17 @@ pub trait Compactor { } /// `compact_final` is called when all the blocks are pushed to finish the compaction - fn compact_final(&self, blocks: &[DataBlock]) -> Result>; + fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result>; } impl TransformCompact { pub fn try_create( + ctx: Arc, input_port: Arc, output_port: Arc, compactor: T, ) -> Result { + let aborting = ctx.get_aborting(); let state = ProcessorState::Consume(ConsumeState { input_port, output_port, @@ -61,7 +68,11 @@ impl TransformCompact { output_data_blocks: VecDeque::new(), }); - Ok(ProcessorPtr::create(Box::new(Self { state, compactor }))) + Ok(ProcessorPtr::create(Box::new(Self { + state, + compactor, + aborting: Arc::new(Box::new(move || aborting.load(Ordering::Relaxed))), + }))) } #[inline(always)] @@ -154,7 +165,8 @@ impl Processor for TransformCompact { Ok(()) } ProcessorState::Compacting(state) => { - let compacted_blocks = self.compactor.compact_final(&state.blocks)?; + let aborting = self.aborting.clone(); + let compacted_blocks = self.compactor.compact_final(&state.blocks, aborting)?; let mut temp_state = ProcessorState::Finished; std::mem::swap(&mut self.state, &mut temp_state); diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs index 0300ec000251d..89c57a73912ac 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs @@ -18,6 +18,7 @@ use common_exception::Result; use super::Compactor; use super::TransformCompact; +use crate::processors::transforms::Aborting; pub struct SortMergeCompactor { limit: Option, @@ -41,7 +42,7 @@ impl Compactor for SortMergeCompactor { "SortMergeTransform" } - fn compact_final(&self, blocks: &[DataBlock]) -> Result> { + fn compact_final(&self, blocks: &[DataBlock], _aborting: Aborting) -> Result> { if blocks.is_empty() { Ok(vec![]) } else { diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index 9ea1cfe78e851..13d6e1c35cd24 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -14,6 +14,7 @@ use common_datablocks::DataBlock; use common_exception::Result; +use common_pipeline_transforms::processors::transforms::Aborting; use super::ProbeState; @@ -43,8 +44,8 @@ pub trait HashJoinState: Send + Sync { async fn wait_finish(&self) -> Result<()>; /// Get mark join results - fn mark_join_blocks(&self) -> Result>; + fn mark_join_blocks(&self, flag: Aborting) -> Result>; /// Get right join results - fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result>; + fn right_join_blocks(&self, blocks: &[DataBlock], flag: Aborting) -> Result>; } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs index bd46015d07097..192eae786575c 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs @@ -44,6 +44,7 @@ use common_datavalues::NullableType; use common_exception::ErrorCode; use common_exception::Result; use common_hashtable::HashMap; +use common_pipeline_transforms::processors::transforms::Aborting; use common_planner::IndexType; use parking_lot::RwLock; use primitive_types::U256; @@ -677,7 +678,7 @@ impl HashJoinState for JoinHashTable { Ok(()) } - fn mark_join_blocks(&self) -> Result> { + fn mark_join_blocks(&self, _aborting: Aborting) -> Result> { let mut row_ptrs = self.row_ptrs.write(); let has_null = self.hash_join_desc.marker_join_desc.has_null.read(); let mut validity = MutableBitmap::with_capacity(row_ptrs.len()); @@ -716,7 +717,7 @@ impl HashJoinState for JoinHashTable { Ok(vec![self.merge_eq_block(&marker_block, &build_block)?]) } - fn right_join_blocks(&self, blocks: &[DataBlock]) -> Result> { + fn right_join_blocks(&self, blocks: &[DataBlock], _flag: Aborting) -> Result> { let unmatched_build_indexes = self.find_unmatched_build_indexes()?; if unmatched_build_indexes.is_empty() && self.hash_join_desc.other_predicate.is_none() { return Ok(blocks.to_vec()); diff --git a/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs index efcd117811360..f749a233e415b 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_mark_join.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use common_datablocks::DataBlock; use common_exception::Result; +use common_pipeline_transforms::processors::transforms::Aborting; use crate::pipelines::processors::transforms::Compactor; use crate::pipelines::processors::HashJoinState; @@ -37,8 +38,8 @@ impl Compactor for MarkJoinCompactor { } // `compact_final` is called when all the blocks are pushed - fn compact_final(&self, _blocks: &[DataBlock]) -> Result> { - self.hash_join_state.mark_join_blocks() + fn compact_final(&self, _blocks: &[DataBlock], aborting: Aborting) -> Result> { + self.hash_join_state.mark_join_blocks(aborting) } } diff --git a/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs index 8af9dfd65c792..2422c194e15e1 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_right_join.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use common_datablocks::DataBlock; use common_exception::Result; +use common_pipeline_transforms::processors::transforms::Aborting; use crate::pipelines::processors::transforms::Compactor; use crate::pipelines::processors::HashJoinState; @@ -37,8 +38,8 @@ impl Compactor for RightJoinCompactor { } // `compact_final` is called when all the blocks are pushed - fn compact_final(&self, blocks: &[DataBlock]) -> Result> { - self.hash_join_state.right_join_blocks(blocks) + fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result> { + self.hash_join_state.right_join_blocks(blocks, aborting) } } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 8511ebdfcd711..1be878dca2840 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -15,6 +15,7 @@ use std::collections::VecDeque; use std::future::Future; use std::net::SocketAddr; +use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -266,6 +267,11 @@ impl TableContext for QueryContext { fn get_current_catalog(&self) -> String { self.shared.get_current_catalog() } + + fn get_aborting(&self) -> Arc { + self.shared.get_aborting() + } + fn get_current_database(&self) -> String { self.shared.get_current_database() } diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 1d745e942ef5d..120a0e71c30dd 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -14,6 +14,8 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Weak; @@ -69,6 +71,7 @@ pub struct QueryContextShared { pub(in crate::sessions) running_query: Arc>>, pub(in crate::sessions) running_query_kind: Arc>>, pub(in crate::sessions) http_query: Arc>>, + pub(in crate::sessions) aborting: Arc, pub(in crate::sessions) tables_refs: Arc>>>, pub(in crate::sessions) dal_ctx: Arc, pub(in crate::sessions) auth_manager: Arc, @@ -99,6 +102,7 @@ impl QueryContextShared { running_query: Arc::new(RwLock::new(None)), running_query_kind: Arc::new(RwLock::new(None)), http_query: Arc::new(RwLock::new(None)), + aborting: Arc::new(AtomicBool::new(false)), tables_refs: Arc::new(Mutex::new(HashMap::new())), dal_ctx: Arc::new(Default::default()), auth_manager: AuthMgr::create(config).await?, @@ -114,6 +118,7 @@ impl QueryContextShared { pub fn kill(&self, cause: ErrorCode) { self.set_error(cause.clone()); + self.aborting.store(true, Ordering::Release); if let Some(executor) = self.executor.read().upgrade() { executor.finish(Some(cause)); @@ -130,6 +135,10 @@ impl QueryContextShared { self.session.get_current_catalog() } + pub fn get_aborting(&self) -> Arc { + self.aborting.clone() + } + pub fn get_current_database(&self) -> String { self.session.get_current_database() } diff --git a/src/query/service/src/sql/executor/pipeline_builder.rs b/src/query/service/src/sql/executor/pipeline_builder.rs index 4943a50e3a460..e98ca32981e17 100644 --- a/src/query/service/src/sql/executor/pipeline_builder.rs +++ b/src/query/service/src/sql/executor/pipeline_builder.rs @@ -451,6 +451,7 @@ impl PipelineBuilder { // Merge self.main_pipeline.add_transform(|input, output| { TransformSortMerge::try_create( + self.ctx.clone(), input, output, SortMergeCompactor::new(sort.limit, sort_desc.clone()), @@ -462,6 +463,7 @@ impl PipelineBuilder { // Concat merge in single thread self.main_pipeline.add_transform(|input, output| { TransformSortMerge::try_create( + self.ctx.clone(), input, output, SortMergeCompactor::new(sort.limit, sort_desc.clone()), @@ -495,6 +497,7 @@ impl PipelineBuilder { self.main_pipeline.resize(1)?; self.main_pipeline.add_transform(|input, output| { TransformMarkJoin::try_create( + self.ctx.clone(), input, output, MarkJoinCompactor::create(state.clone()), @@ -506,6 +509,7 @@ impl PipelineBuilder { self.main_pipeline.resize(1)?; self.main_pipeline.add_transform(|input, output| { TransformRightJoin::try_create( + self.ctx.clone(), input, output, RightJoinCompactor::create(state.clone()), diff --git a/src/query/storages/fuse/src/operations/append.rs b/src/query/storages/fuse/src/operations/append.rs index c58c468d4bc45..2d718190a8ff3 100644 --- a/src/query/storages/fuse/src/operations/append.rs +++ b/src/query/storages/fuse/src/operations/append.rs @@ -48,6 +48,7 @@ impl FuseTable { let block_compactor = self.get_block_compactor(); pipeline.add_transform(|transform_input_port, transform_output_port| { TransformCompact::try_create( + ctx.clone(), transform_input_port, transform_output_port, block_compactor.to_compactor(false), diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index b2e37afd316d4..34dc78085b647 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -92,6 +92,7 @@ impl FuseTable { pipeline.add_transform(|transform_input_port, transform_output_port| { TransformCompact::try_create( + ctx.clone(), transform_input_port, transform_output_port, block_compactor.to_compactor(false), diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index cd6e8240ad880..8269708b2d16e 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -155,6 +155,7 @@ impl FuseTable { })?; pipeline.add_transform(|transform_input_port, transform_output_port| { TransformSortMerge::try_create( + ctx.clone(), transform_input_port, transform_output_port, SortMergeCompactor::new(None, sort_descs.clone()), @@ -163,6 +164,7 @@ impl FuseTable { pipeline.resize(1)?; pipeline.add_transform(|transform_input_port, transform_output_port| { TransformSortMerge::try_create( + ctx.clone(), transform_input_port, transform_output_port, SortMergeCompactor::new(None, sort_descs.clone()), @@ -171,6 +173,7 @@ impl FuseTable { pipeline.add_transform(|transform_input_port, transform_output_port| { TransformCompact::try_create( + ctx.clone(), transform_input_port, transform_output_port, block_compactor.to_compactor(true), From d967a87924f8773227157102c46ba543168f6723 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Thu, 29 Sep 2022 15:21:00 +0800 Subject: [PATCH 2/4] fix(processor): make lint --- .../processors/transforms/hash_join/hash_join_state.rs | 6 +++++- .../processors/transforms/hash_join/join_hash_table.rs | 6 +++++- .../processors/transforms/transform_right_semi_anti_join.rs | 3 ++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs index 7e8ad921326e4..91087ee3310c9 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/hash_join_state.rs @@ -50,5 +50,9 @@ pub trait HashJoinState: Send + Sync { fn right_join_blocks(&self, blocks: &[DataBlock], flag: Aborting) -> Result>; /// Get right semi/anti join results - fn right_anti_semi_join_blocks(&self, blocks: &[DataBlock], flag: Aborting) -> Result>; + fn right_anti_semi_join_blocks( + &self, + blocks: &[DataBlock], + flag: Aborting, + ) -> Result>; } diff --git a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs index e381c086ebc7d..15cc8bddc5f49 100644 --- a/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs +++ b/src/query/service/src/pipelines/processors/transforms/hash_join/join_hash_table.rs @@ -804,7 +804,11 @@ impl HashJoinState for JoinHashTable { Ok(vec![merged_block]) } - fn right_anti_semi_join_blocks(&self, blocks: &[DataBlock], _flag: Aborting) -> Result> { + fn right_anti_semi_join_blocks( + &self, + blocks: &[DataBlock], + _flag: Aborting, + ) -> Result> { // Fast path for right anti join with non-equi conditions if self.hash_join_desc.other_predicate.is_none() && self.hash_join_desc.join_type == JoinType::RightAnti diff --git a/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs b/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs index e94034336a860..fa22bec7ff06c 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_right_semi_anti_join.rs @@ -39,7 +39,8 @@ impl Compactor for RightSemiAntiJoinCompactor { // `compact_final` is called when all the blocks are pushed fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result> { - self.hash_join_state.right_anti_semi_join_blocks(blocks, aborting) + self.hash_join_state + .right_anti_semi_join_blocks(blocks, aborting) } } From 6bd43f1e42201f2b2b5252cddd9861166dc64b0c Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Thu, 29 Sep 2022 15:32:04 +0800 Subject: [PATCH 3/4] fix(processor): try fix order by and recluster cannot kill --- .../datablocks/src/kernels/data_block_sort.rs | 46 ++++++++++++++++--- .../pipeline/transforms/src/processors/mod.rs | 1 + .../transforms/transform_sort_merge.rs | 10 ++-- 3 files changed, 48 insertions(+), 9 deletions(-) diff --git a/src/query/datablocks/src/kernels/data_block_sort.rs b/src/query/datablocks/src/kernels/data_block_sort.rs index 1942722a947b6..c63454dbfacb9 100644 --- a/src/query/datablocks/src/kernels/data_block_sort.rs +++ b/src/query/datablocks/src/kernels/data_block_sort.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::iter::once; +use std::sync::Arc; use common_arrow::arrow::array::ord as arrow_ord; use common_arrow::arrow::array::ord::DynComparator; @@ -29,6 +30,8 @@ use common_exception::Result; use crate::DataBlock; +pub type Aborting = Arc bool + Send + Sync + 'static>>; + #[derive(Clone)] pub struct SortColumnDescription { pub column_name: String, @@ -146,27 +149,58 @@ impl DataBlock { blocks: &[DataBlock], sort_columns_descriptions: &[SortColumnDescription], limit: Option, + aborting: Aborting, ) -> Result { match blocks.len() { 0 => Result::Err(ErrorCode::EmptyData("Can't merge empty blocks")), 1 => Ok(blocks[0].clone()), - 2 => DataBlock::merge_sort_block( - &blocks[0], - &blocks[1], - sort_columns_descriptions, - limit, - ), + 2 => { + if aborting() { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + + DataBlock::merge_sort_block( + &blocks[0], + &blocks[1], + sort_columns_descriptions, + limit, + ) + } _ => { + if aborting() { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + let left = DataBlock::merge_sort_blocks( &blocks[0..blocks.len() / 2], sort_columns_descriptions, limit, + aborting.clone(), )?; + + if aborting() { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + let right = DataBlock::merge_sort_blocks( &blocks[blocks.len() / 2..blocks.len()], sort_columns_descriptions, limit, + aborting, )?; + + if aborting() { + return Err(ErrorCode::AbortedQuery( + "Aborted query, because the server is shutting down or the query was killed.", + )); + } + DataBlock::merge_sort_block(&left, &right, sort_columns_descriptions, limit) } } diff --git a/src/query/pipeline/transforms/src/processors/mod.rs b/src/query/pipeline/transforms/src/processors/mod.rs index 748d4f9bb6af2..0080140ed6c4c 100644 --- a/src/query/pipeline/transforms/src/processors/mod.rs +++ b/src/query/pipeline/transforms/src/processors/mod.rs @@ -13,4 +13,5 @@ // limitations under the License. pub mod transforms; +pub use transforms::Aborting; pub use transforms::ExpressionExecutor; diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs index 89c57a73912ac..b1e432ce78a20 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs @@ -42,12 +42,16 @@ impl Compactor for SortMergeCompactor { "SortMergeTransform" } - fn compact_final(&self, blocks: &[DataBlock], _aborting: Aborting) -> Result> { + fn compact_final(&self, blocks: &[DataBlock], aborting: Aborting) -> Result> { if blocks.is_empty() { Ok(vec![]) } else { - let block = - DataBlock::merge_sort_blocks(blocks, &self.sort_columns_descriptions, self.limit)?; + let block = DataBlock::merge_sort_blocks( + blocks, + &self.sort_columns_descriptions, + self.limit, + aborting, + )?; Ok(vec![block]) } } From 30e36ccc85456d7599e401fd8e8cc848df22af60 Mon Sep 17 00:00:00 2001 From: zhang2014 Date: Thu, 29 Sep 2022 15:34:36 +0800 Subject: [PATCH 4/4] fix(processor): try fix compile failure --- src/query/datablocks/src/kernels/data_block_sort.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/datablocks/src/kernels/data_block_sort.rs b/src/query/datablocks/src/kernels/data_block_sort.rs index c63454dbfacb9..9127dc9bee06e 100644 --- a/src/query/datablocks/src/kernels/data_block_sort.rs +++ b/src/query/datablocks/src/kernels/data_block_sort.rs @@ -192,7 +192,7 @@ impl DataBlock { &blocks[blocks.len() / 2..blocks.len()], sort_columns_descriptions, limit, - aborting, + aborting.clone(), )?; if aborting() {