From 125bf5014ac990a542b1ba7cb0130ee1b7697165 Mon Sep 17 00:00:00 2001 From: zhyass Date: Thu, 5 Sep 2024 18:51:50 +0800 Subject: [PATCH] recluster: compact after merge sort --- .../transforms/transform_pipeline_helper.rs | 22 +++ .../pipelines/builders/builder_recluster.rs | 14 +- .../service/src/pipelines/processors/mod.rs | 2 + .../pipelines/processors/transforms/mod.rs | 3 + .../transform_block_compact_no_split.rs | 134 ++++++++++++++++++ .../09_0008_fuse_optimize_table.test | 8 +- 6 files changed, 174 insertions(+), 9 deletions(-) create mode 100644 src/query/service/src/pipelines/processors/transforms/transform_block_compact_no_split.rs diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_pipeline_helper.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_pipeline_helper.rs index c1ead7ccd2b5f..3a987aaf05268 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_pipeline_helper.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_pipeline_helper.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfo; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; @@ -28,6 +29,8 @@ use crate::processors::AsyncAccumulatingTransform; use crate::processors::AsyncAccumulatingTransformer; use crate::processors::AsyncTransform; use crate::processors::AsyncTransformer; +use crate::processors::BlockMetaTransform; +use crate::processors::BlockMetaTransformer; use crate::processors::Transform; use crate::processors::Transformer; @@ -94,6 +97,25 @@ pub trait TransformPipelineHelper { self.try_add_accumulating_transformer(|| Ok(f())).unwrap() } + fn try_add_block_meta_transformer(&mut self, f: F) -> Result<()> + where + F: Fn() -> Result, + B: BlockMetaInfo, + R: BlockMetaTransform + 'static, + { + self.try_add_transform_with_builder(f, BlockMetaTransformer::::create) + } + + fn add_block_meta_transformer(&mut self, f: F) + where + F: Fn() -> R, + B: BlockMetaInfo, + R: BlockMetaTransform + 'static, + { + // Safe to unwrap, since the closure always return Ok(_). + self.try_add_block_meta_transformer(|| Ok(f())).unwrap() + } + fn try_add_async_accumulating_transformer( &mut self, f: F, diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index f770dbfdf7665..9048ab7581ab6 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -35,7 +35,9 @@ use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; use crate::pipelines::builders::SortPipelineBuilder; +use crate::pipelines::processors::BlockCompactBuilder; use crate::pipelines::processors::TransformAddStreamColumns; +use crate::pipelines::processors::TransformBlockConcat; use crate::pipelines::PipelineBuilder; impl PipelineBuilder { @@ -150,12 +152,14 @@ impl PipelineBuilder { .remove_order_col_at_last(); sort_pipeline_builder.build_merge_sort_pipeline(&mut self.main_pipeline, false)?; - let output_block_num = task.total_rows.div_ceil(final_block_size); - let max_threads = std::cmp::min( - self.ctx.get_settings().get_max_threads()? as usize, - output_block_num, - ); + // Compact after merge sort. + self.main_pipeline + .add_accumulating_transformer(|| BlockCompactBuilder::new(block_thresholds)); + let max_threads = self.ctx.get_settings().get_max_threads()? as usize; self.main_pipeline.try_resize(max_threads)?; + self.main_pipeline + .add_block_meta_transformer(|| TransformBlockConcat {}); + self.main_pipeline .add_transform(|transform_input_port, transform_output_port| { let proc = TransformSerializeBlock::try_create( diff --git a/src/query/service/src/pipelines/processors/mod.rs b/src/query/service/src/pipelines/processors/mod.rs index 1d924d948156e..a920c18405639 100644 --- a/src/query/service/src/pipelines/processors/mod.rs +++ b/src/query/service/src/pipelines/processors/mod.rs @@ -15,10 +15,12 @@ pub use databend_common_pipeline_core::processors::*; pub(crate) mod transforms; +pub use transforms::BlockCompactBuilder; pub use transforms::HashJoinBuildState; pub use transforms::HashJoinDesc; pub use transforms::HashJoinState; pub use transforms::TransformAddStreamColumns; +pub use transforms::TransformBlockConcat; pub use transforms::TransformCastSchema; pub use transforms::TransformCreateSets; pub use transforms::TransformLimit; diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index e729effe40f83..2b39227e30e8e 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -21,6 +21,7 @@ mod transform_add_const_columns; mod transform_add_internal_columns; mod transform_add_stream_columns; mod transform_async_function; +mod transform_block_compact_no_split; mod transform_cache_scan; mod transform_cast_schema; mod transform_create_sets; @@ -46,6 +47,8 @@ pub use transform_add_const_columns::TransformAddConstColumns; pub use transform_add_internal_columns::TransformAddInternalColumns; pub use transform_add_stream_columns::TransformAddStreamColumns; pub use transform_async_function::TransformAsyncFunction; +pub use transform_block_compact_no_split::BlockCompactBuilder; +pub use transform_block_compact_no_split::TransformBlockConcat; pub use transform_cache_scan::CacheSourceState; pub use transform_cache_scan::HashJoinCacheState; pub use transform_cache_scan::TransformCacheScan; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_block_compact_no_split.rs b/src/query/service/src/pipelines/processors/transforms/transform_block_compact_no_split.rs new file mode 100644 index 0000000000000..c81eecff4dab6 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/transform_block_compact_no_split.rs @@ -0,0 +1,134 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::BlockThresholds; +use databend_common_expression::DataBlock; +use databend_common_pipeline_transforms::processors::AccumulatingTransform; +use databend_common_pipeline_transforms::processors::BlockMetaTransform; +use databend_common_pipeline_transforms::processors::UnknownMode; + +use crate::servers::flight::v1::exchange::ExchangeShuffleMeta; + +pub struct BlockCompactBuilder { + thresholds: BlockThresholds, + // Holds blocks that exceeded the threshold and are waiting to be compacted. + staged_blocks: Vec, + // Holds blocks that are partially accumulated but haven't reached the threshold. + pending_blocks: Vec, + accumulated_rows: usize, + accumulated_bytes: usize, +} + +impl BlockCompactBuilder { + pub fn new(thresholds: BlockThresholds) -> Self { + Self { + thresholds, + staged_blocks: vec![], + pending_blocks: vec![], + accumulated_rows: 0, + accumulated_bytes: 0, + } + } + + fn create_output_data(blocks: &mut Vec) -> DataBlock { + DataBlock::empty_with_meta(ExchangeShuffleMeta::create(std::mem::take(blocks))) + } + + fn reset_accumulated(&mut self) { + self.accumulated_rows = 0; + self.accumulated_bytes = 0; + } + + fn check_for_compact(&self) -> bool { + self.thresholds + .check_for_compact(self.accumulated_rows, self.accumulated_bytes) + } +} + +impl AccumulatingTransform for BlockCompactBuilder { + const NAME: &'static str = "BlockCompactBuilder"; + + fn transform(&mut self, data: DataBlock) -> Result> { + self.accumulated_rows += data.num_rows(); + self.accumulated_bytes += data.memory_size(); + if !self + .thresholds + .check_large_enough(self.accumulated_rows, self.accumulated_bytes) + { + // blocks < N + self.pending_blocks.push(data); + return Ok(vec![]); + } + + let mut res = Vec::with_capacity(2); + if !self.staged_blocks.is_empty() { + res.push(Self::create_output_data(&mut self.staged_blocks)); + } + + if !self.check_for_compact() && !self.pending_blocks.is_empty() { + // blocks > 2N + res.push(Self::create_output_data(&mut self.pending_blocks)); + } else { + // N <= blocks < 2N + std::mem::swap(&mut self.staged_blocks, &mut self.pending_blocks); + } + self.staged_blocks.push(data); + self.reset_accumulated(); + Ok(res) + } + + fn on_finish(&mut self, _output: bool) -> Result> { + match ( + self.pending_blocks.is_empty(), + self.staged_blocks.is_empty(), + ) { + (true, true) => Ok(vec![]), + (true, false) => Ok(vec![Self::create_output_data(&mut self.staged_blocks)]), + (false, true) => Ok(vec![Self::create_output_data(&mut self.pending_blocks)]), + (false, false) => { + for block in &self.staged_blocks { + self.accumulated_rows += block.num_rows(); + self.accumulated_bytes += block.memory_size(); + } + if self.check_for_compact() { + self.staged_blocks.append(&mut self.pending_blocks); + Ok(vec![Self::create_output_data(&mut self.staged_blocks)]) + } else { + // blocks > 2N + Ok(vec![ + Self::create_output_data(&mut self.staged_blocks), + Self::create_output_data(&mut self.pending_blocks), + ]) + } + } + } + } +} + +pub struct TransformBlockConcat; + +#[async_trait::async_trait] +impl BlockMetaTransform for TransformBlockConcat { + const UNKNOWN_MODE: UnknownMode = UnknownMode::Pass; + const NAME: &'static str = "TransformBlockConcat"; + + fn transform(&mut self, meta: ExchangeShuffleMeta) -> Result> { + if meta.blocks.len() > 1 { + Ok(vec![DataBlock::concat(&meta.blocks)?]) + } else { + Ok(meta.blocks) + } + } +} diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test index 4e0052edc5d26..36a0ea829df5b 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test @@ -815,7 +815,7 @@ statement ok ALTER TABLE t15 cluster by(abs(a)) statement ok -insert into t15 values(2),(5),(-7) +insert into t15 values(2),(-7) query TTIIFFT select * exclude(timestamp) from clustering_information('db_09_0008','t15') @@ -841,9 +841,9 @@ select * exclude(timestamp) from clustering_information('db_09_0008','t15') query III select segment_count, block_count, row_count from fuse_snapshot('db_09_0008','t15') limit 3 ---- -1 3 9 -2 3 9 -4 4 9 +1 3 8 +2 3 8 +4 4 8 statement ok