Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: compact after merge sort #16401

Merged
merged 2 commits into from
Sep 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .cargo/audit.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,7 @@ ignore = [
# gix-index: Refs and paths with reserved Windows device names access the devices
"RUSTSEC-2024-0352",
# gix-ref: Refs and paths with reserved Windows device names access the devices
"RUSTSEC-2024-0351"
"RUSTSEC-2024-0351",
# gix-path: improperly resolves configuration path reported by Git
"RUSTSEC-2024-0371",
]
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::BlockMetaInfo;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
use databend_common_pipeline_core::processors::Processor;
Expand All @@ -28,6 +29,8 @@ use crate::processors::AsyncAccumulatingTransform;
use crate::processors::AsyncAccumulatingTransformer;
use crate::processors::AsyncTransform;
use crate::processors::AsyncTransformer;
use crate::processors::BlockMetaTransform;
use crate::processors::BlockMetaTransformer;
use crate::processors::Transform;
use crate::processors::Transformer;

Expand Down Expand Up @@ -94,6 +97,25 @@ pub trait TransformPipelineHelper {
self.try_add_accumulating_transformer(|| Ok(f())).unwrap()
}

fn try_add_block_meta_transformer<F, B, R>(&mut self, f: F) -> Result<()>
where
F: Fn() -> Result<R>,
B: BlockMetaInfo,
R: BlockMetaTransform<B> + 'static,
{
self.try_add_transform_with_builder(f, BlockMetaTransformer::<B, R>::create)
}

fn add_block_meta_transformer<F, B, R>(&mut self, f: F)
where
F: Fn() -> R,
B: BlockMetaInfo,
R: BlockMetaTransform<B> + 'static,
{
// Safe to unwrap, since the closure always return Ok(_).
self.try_add_block_meta_transformer(|| Ok(f())).unwrap()
}

fn try_add_async_accumulating_transformer<F, R>(
&mut self,
f: F,
Expand Down
14 changes: 9 additions & 5 deletions src/query/service/src/pipelines/builders/builder_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::TableContext;

use crate::pipelines::builders::SortPipelineBuilder;
use crate::pipelines::processors::BlockCompactBuilder;
use crate::pipelines::processors::TransformAddStreamColumns;
use crate::pipelines::processors::TransformBlockConcat;
use crate::pipelines::PipelineBuilder;

impl PipelineBuilder {
Expand Down Expand Up @@ -150,12 +152,14 @@ impl PipelineBuilder {
.remove_order_col_at_last();
sort_pipeline_builder.build_merge_sort_pipeline(&mut self.main_pipeline, false)?;

let output_block_num = task.total_rows.div_ceil(final_block_size);
let max_threads = std::cmp::min(
self.ctx.get_settings().get_max_threads()? as usize,
output_block_num,
);
// Compact after merge sort.
self.main_pipeline
.add_accumulating_transformer(|| BlockCompactBuilder::new(block_thresholds));
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
self.main_pipeline.try_resize(max_threads)?;
self.main_pipeline
.add_block_meta_transformer(|| TransformBlockConcat {});

self.main_pipeline
.add_transform(|transform_input_port, transform_output_port| {
let proc = TransformSerializeBlock::try_create(
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/pipelines/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
pub use databend_common_pipeline_core::processors::*;
pub(crate) mod transforms;

pub use transforms::BlockCompactBuilder;
pub use transforms::HashJoinBuildState;
pub use transforms::HashJoinDesc;
pub use transforms::HashJoinState;
pub use transforms::TransformAddStreamColumns;
pub use transforms::TransformBlockConcat;
pub use transforms::TransformCastSchema;
pub use transforms::TransformCreateSets;
pub use transforms::TransformLimit;
Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod transform_add_const_columns;
mod transform_add_internal_columns;
mod transform_add_stream_columns;
mod transform_async_function;
mod transform_block_compact_no_split;
mod transform_cache_scan;
mod transform_cast_schema;
mod transform_create_sets;
Expand All @@ -46,6 +47,8 @@ pub use transform_add_const_columns::TransformAddConstColumns;
pub use transform_add_internal_columns::TransformAddInternalColumns;
pub use transform_add_stream_columns::TransformAddStreamColumns;
pub use transform_async_function::TransformAsyncFunction;
pub use transform_block_compact_no_split::BlockCompactBuilder;
pub use transform_block_compact_no_split::TransformBlockConcat;
pub use transform_cache_scan::CacheSourceState;
pub use transform_cache_scan::HashJoinCacheState;
pub use transform_cache_scan::TransformCacheScan;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_expression::DataBlock;
use databend_common_pipeline_transforms::processors::AccumulatingTransform;
use databend_common_pipeline_transforms::processors::BlockMetaTransform;
use databend_common_pipeline_transforms::processors::UnknownMode;

use crate::servers::flight::v1::exchange::ExchangeShuffleMeta;

pub struct BlockCompactBuilder {
thresholds: BlockThresholds,
// Holds blocks that exceeded the threshold and are waiting to be compacted.
staged_blocks: Vec<DataBlock>,
// Holds blocks that are partially accumulated but haven't reached the threshold.
pending_blocks: Vec<DataBlock>,
accumulated_rows: usize,
accumulated_bytes: usize,
}

impl BlockCompactBuilder {
pub fn new(thresholds: BlockThresholds) -> Self {
Self {
thresholds,
staged_blocks: vec![],
pending_blocks: vec![],
accumulated_rows: 0,
accumulated_bytes: 0,
}
}

fn create_output_data(blocks: &mut Vec<DataBlock>) -> DataBlock {
// This function creates a DataBlock with ExchangeShuffleMeta,
// but the metadata is only used internally and is not intended
// for inter-node communication. The ExchangeShuffleMeta is simply
// being reused here for its structure, and no data will be transferred
// between nodes.
DataBlock::empty_with_meta(ExchangeShuffleMeta::create(std::mem::take(blocks)))
}

fn reset_accumulated(&mut self) {
self.accumulated_rows = 0;
self.accumulated_bytes = 0;
}

fn check_for_compact(&self) -> bool {
self.thresholds
.check_for_compact(self.accumulated_rows, self.accumulated_bytes)
}
}

impl AccumulatingTransform for BlockCompactBuilder {
const NAME: &'static str = "BlockCompactBuilder";

fn transform(&mut self, data: DataBlock) -> Result<Vec<DataBlock>> {
self.accumulated_rows += data.num_rows();
self.accumulated_bytes += data.memory_size();
if !self
.thresholds
.check_large_enough(self.accumulated_rows, self.accumulated_bytes)
{
// blocks < N
self.pending_blocks.push(data);
return Ok(vec![]);
}

let mut res = Vec::with_capacity(2);
if !self.staged_blocks.is_empty() {
res.push(Self::create_output_data(&mut self.staged_blocks));
}

if !self.check_for_compact() && !self.pending_blocks.is_empty() {
// blocks > 2N
res.push(Self::create_output_data(&mut self.pending_blocks));
} else {
// N <= blocks < 2N
std::mem::swap(&mut self.staged_blocks, &mut self.pending_blocks);
}
self.staged_blocks.push(data);
self.reset_accumulated();
Ok(res)
}

fn on_finish(&mut self, _output: bool) -> Result<Vec<DataBlock>> {
match (
self.pending_blocks.is_empty(),
self.staged_blocks.is_empty(),
) {
(true, true) => Ok(vec![]),
(true, false) => Ok(vec![Self::create_output_data(&mut self.staged_blocks)]),
(false, true) => Ok(vec![Self::create_output_data(&mut self.pending_blocks)]),
(false, false) => {
for block in &self.staged_blocks {
self.accumulated_rows += block.num_rows();
self.accumulated_bytes += block.memory_size();
}
if self.check_for_compact() {
self.staged_blocks.append(&mut self.pending_blocks);
Ok(vec![Self::create_output_data(&mut self.staged_blocks)])
} else {
// blocks > 2N
Ok(vec![
Self::create_output_data(&mut self.staged_blocks),
Self::create_output_data(&mut self.pending_blocks),
])
}
}
}
}
}

pub struct TransformBlockConcat;

#[async_trait::async_trait]
impl BlockMetaTransform<ExchangeShuffleMeta> for TransformBlockConcat {
const UNKNOWN_MODE: UnknownMode = UnknownMode::Pass;
const NAME: &'static str = "TransformBlockConcat";

fn transform(&mut self, meta: ExchangeShuffleMeta) -> Result<Vec<DataBlock>> {
if meta.blocks.len() > 1 {
Ok(vec![DataBlock::concat(&meta.blocks)?])
} else {
Ok(meta.blocks)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ statement ok
ALTER TABLE t15 cluster by(abs(a))

statement ok
insert into t15 values(2),(5),(-7)
insert into t15 values(2),(-7)

query TTIIFFT
select * exclude(timestamp) from clustering_information('db_09_0008','t15')
Expand All @@ -841,9 +841,9 @@ select * exclude(timestamp) from clustering_information('db_09_0008','t15')
query III
select segment_count, block_count, row_count from fuse_snapshot('db_09_0008','t15') limit 3
----
1 3 9
2 3 9
4 4 9
1 3 8
2 3 8
4 4 8


statement ok
Expand Down
Loading