Skip to content

Commit

Permalink
recluster: compact after merge sort
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Sep 6, 2024
1 parent 7720978 commit 125bf50
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfo;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::Processor;
Expand All @@ -28,6 +29,8 @@ use crate::processors::AsyncAccumulatingTransform;
use crate::processors::AsyncAccumulatingTransformer;
use crate::processors::AsyncTransform;
use crate::processors::AsyncTransformer;
use crate::processors::BlockMetaTransform;
use crate::processors::BlockMetaTransformer;
use crate::processors::Transform;
use crate::processors::Transformer;

Expand Down Expand Up @@ -94,6 +97,25 @@ pub trait TransformPipelineHelper {
self.try_add_accumulating_transformer(|| Ok(f())).unwrap()
}

fn try_add_block_meta_transformer<F, B, R>(&mut self, f: F) -> Result<()>
where
F: Fn() -> Result<R>,
B: BlockMetaInfo,
R: BlockMetaTransform<B> + 'static,
{
self.try_add_transform_with_builder(f, BlockMetaTransformer::<B, R>::create)
}

fn add_block_meta_transformer<F, B, R>(&mut self, f: F)
where
F: Fn() -> R,
B: BlockMetaInfo,
R: BlockMetaTransform<B> + 'static,
{
// Safe to unwrap, since the closure always return Ok(_).
self.try_add_block_meta_transformer(|| Ok(f())).unwrap()
}

fn try_add_async_accumulating_transformer<F, R>(
&mut self,
f: F,
Expand Down
14 changes: 9 additions & 5 deletions src/query/service/src/pipelines/builders/builder_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;

use crate::pipelines::builders::SortPipelineBuilder;
use crate::pipelines::processors::BlockCompactBuilder;
use crate::pipelines::processors::TransformAddStreamColumns;
use crate::pipelines::processors::TransformBlockConcat;
use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
Expand Down Expand Up @@ -150,12 +152,14 @@ impl PipelineBuilder {
.remove_order_col_at_last();
sort_pipeline_builder.build_merge_sort_pipeline(&mut self.main_pipeline, false)?;

let output_block_num = task.total_rows.div_ceil(final_block_size);
let max_threads = std::cmp::min(
self.ctx.get_settings().get_max_threads()? as usize,
output_block_num,
);
// Compact after merge sort.
self.main_pipeline
.add_accumulating_transformer(|| BlockCompactBuilder::new(block_thresholds));
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
self.main_pipeline.try_resize(max_threads)?;
self.main_pipeline
.add_block_meta_transformer(|| TransformBlockConcat {});

self.main_pipeline
.add_transform(|transform_input_port, transform_output_port| {
let proc = TransformSerializeBlock::try_create(
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
pub use databend_common_pipeline_core::processors::*;
pub(crate) mod transforms;

pub use transforms::BlockCompactBuilder;
pub use transforms::HashJoinBuildState;
pub use transforms::HashJoinDesc;
pub use transforms::HashJoinState;
pub use transforms::TransformAddStreamColumns;
pub use transforms::TransformBlockConcat;
pub use transforms::TransformCastSchema;
pub use transforms::TransformCreateSets;
pub use transforms::TransformLimit;
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod transform_add_const_columns;
mod transform_add_internal_columns;
mod transform_add_stream_columns;
mod transform_async_function;
mod transform_block_compact_no_split;
mod transform_cache_scan;
mod transform_cast_schema;
mod transform_create_sets;
Expand All @@ -46,6 +47,8 @@ pub use transform_add_const_columns::TransformAddConstColumns;
pub use transform_add_internal_columns::TransformAddInternalColumns;
pub use transform_add_stream_columns::TransformAddStreamColumns;
pub use transform_async_function::TransformAsyncFunction;
pub use transform_block_compact_no_split::BlockCompactBuilder;
pub use transform_block_compact_no_split::TransformBlockConcat;
pub use transform_cache_scan::CacheSourceState;
pub use transform_cache_scan::HashJoinCacheState;
pub use transform_cache_scan::TransformCacheScan;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_expression::DataBlock;
use databend_common_pipeline_transforms::processors::AccumulatingTransform;
use databend_common_pipeline_transforms::processors::BlockMetaTransform;
use databend_common_pipeline_transforms::processors::UnknownMode;

use crate::servers::flight::v1::exchange::ExchangeShuffleMeta;

pub struct BlockCompactBuilder {
thresholds: BlockThresholds,
// Holds blocks that exceeded the threshold and are waiting to be compacted.
staged_blocks: Vec<DataBlock>,
// Holds blocks that are partially accumulated but haven't reached the threshold.
pending_blocks: Vec<DataBlock>,
accumulated_rows: usize,
accumulated_bytes: usize,
}

impl BlockCompactBuilder {
pub fn new(thresholds: BlockThresholds) -> Self {
Self {
thresholds,
staged_blocks: vec![],
pending_blocks: vec![],
accumulated_rows: 0,
accumulated_bytes: 0,
}
}

fn create_output_data(blocks: &mut Vec<DataBlock>) -> DataBlock {
DataBlock::empty_with_meta(ExchangeShuffleMeta::create(std::mem::take(blocks)))
}

fn reset_accumulated(&mut self) {
self.accumulated_rows = 0;
self.accumulated_bytes = 0;
}

fn check_for_compact(&self) -> bool {
self.thresholds
.check_for_compact(self.accumulated_rows, self.accumulated_bytes)
}
}

impl AccumulatingTransform for BlockCompactBuilder {
const NAME: &'static str = "BlockCompactBuilder";

fn transform(&mut self, data: DataBlock) -> Result<Vec<DataBlock>> {
self.accumulated_rows += data.num_rows();
self.accumulated_bytes += data.memory_size();
if !self
.thresholds
.check_large_enough(self.accumulated_rows, self.accumulated_bytes)
{
// blocks < N
self.pending_blocks.push(data);
return Ok(vec![]);
}

let mut res = Vec::with_capacity(2);
if !self.staged_blocks.is_empty() {
res.push(Self::create_output_data(&mut self.staged_blocks));
}

if !self.check_for_compact() && !self.pending_blocks.is_empty() {
// blocks > 2N
res.push(Self::create_output_data(&mut self.pending_blocks));
} else {
// N <= blocks < 2N
std::mem::swap(&mut self.staged_blocks, &mut self.pending_blocks);
}
self.staged_blocks.push(data);
self.reset_accumulated();
Ok(res)
}

fn on_finish(&mut self, _output: bool) -> Result<Vec<DataBlock>> {
match (
self.pending_blocks.is_empty(),
self.staged_blocks.is_empty(),
) {
(true, true) => Ok(vec![]),
(true, false) => Ok(vec![Self::create_output_data(&mut self.staged_blocks)]),
(false, true) => Ok(vec![Self::create_output_data(&mut self.pending_blocks)]),
(false, false) => {
for block in &self.staged_blocks {
self.accumulated_rows += block.num_rows();
self.accumulated_bytes += block.memory_size();
}
if self.check_for_compact() {
self.staged_blocks.append(&mut self.pending_blocks);
Ok(vec![Self::create_output_data(&mut self.staged_blocks)])
} else {
// blocks > 2N
Ok(vec![
Self::create_output_data(&mut self.staged_blocks),
Self::create_output_data(&mut self.pending_blocks),
])
}
}
}
}
}

pub struct TransformBlockConcat;

#[async_trait::async_trait]
impl BlockMetaTransform<ExchangeShuffleMeta> for TransformBlockConcat {
const UNKNOWN_MODE: UnknownMode = UnknownMode::Pass;
const NAME: &'static str = "TransformBlockConcat";

fn transform(&mut self, meta: ExchangeShuffleMeta) -> Result<Vec<DataBlock>> {
if meta.blocks.len() > 1 {
Ok(vec![DataBlock::concat(&meta.blocks)?])
} else {
Ok(meta.blocks)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ statement ok
ALTER TABLE t15 cluster by(abs(a))

statement ok
insert into t15 values(2),(5),(-7)
insert into t15 values(2),(-7)

query TTIIFFT
select * exclude(timestamp) from clustering_information('db_09_0008','t15')
Expand All @@ -841,9 +841,9 @@ select * exclude(timestamp) from clustering_information('db_09_0008','t15')
query III
select segment_count, block_count, row_count from fuse_snapshot('db_09_0008','t15') limit 3
----
1 3 9
2 3 9
4 4 9
1 3 8
2 3 8
4 4 8


statement ok
Expand Down

0 comments on commit 125bf50

Please sign in to comment.