Skip to content

Commit

Permalink
fix: split if block too big during append (#16435)
Browse files Browse the repository at this point in the history
* split if block too big during append

* fix

* fix

* Update block_thresholds.rs

* Update block.rs

* fix

* refactor

* fix

* fix

* fix

* add test

* fix

* fix

* fix
  • Loading branch information
zhyass authored Sep 14, 2024
1 parent 1ee6d66 commit 9f0b15b
Show file tree
Hide file tree
Showing 33 changed files with 417 additions and 722 deletions.
16 changes: 2 additions & 14 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,8 @@ pub trait Table: Sync + Send {
}

/// Assembly the pipeline of appending data to storage
fn append_data(
&self,
ctx: Arc<dyn TableContext>,
pipeline: &mut Pipeline,
append_mode: AppendMode,
) -> Result<()> {
let (_, _, _) = (ctx, pipeline, append_mode);
fn append_data(&self, ctx: Arc<dyn TableContext>, pipeline: &mut Pipeline) -> Result<()> {
let (_, _) = (ctx, pipeline);

Err(ErrorCode::Unimplemented(format!(
"The 'append_data' operation is not available for the table '{}'. Current table engine: '{}'.",
Expand Down Expand Up @@ -539,13 +534,6 @@ pub enum CompactTarget {
Segments,
}

pub enum AppendMode {
// From INSERT and RECUSTER operation
Normal,
// From COPY, Streaming load operation
Copy,
}

pub trait ColumnStatisticsProvider: Send {
// returns the statistics of the given column, if any.
// column_id is just the index of the column in table's schema
Expand Down
13 changes: 8 additions & 5 deletions src/query/expression/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,19 @@ impl DataBlock {
res
}

pub fn split_by_rows_if_needed_no_tail(&self, min_rows_per_block: usize) -> Vec<Self> {
let max_rows_per_block = min_rows_per_block * 2;
pub fn split_by_rows_if_needed_no_tail(&self, rows_per_block: usize) -> Vec<Self> {
// Since rows_per_block represents the expected number of rows per block,
// and the minimum number of rows per block is 0.8 * rows_per_block,
// the maximum is taken as 1.8 * rows_per_block.
let max_rows_per_block = (rows_per_block * 9).div_ceil(5);
let mut res = vec![];
let mut offset = 0;
let mut remain_rows = self.num_rows;
while remain_rows >= max_rows_per_block {
let cut = self.slice(offset..(offset + min_rows_per_block));
let cut = self.slice(offset..(offset + rows_per_block));
res.push(cut);
offset += min_rows_per_block;
remain_rows -= min_rows_per_block;
offset += rows_per_block;
remain_rows -= rows_per_block;
}
res.push(self.slice(offset..(offset + remain_rows)));
res
Expand Down
49 changes: 31 additions & 18 deletions src/query/expression/src/utils/block_thresholds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,28 +66,41 @@ impl BlockThresholds {
total_rows <= self.max_rows_per_block && total_bytes <= self.max_bytes_per_block
}

#[inline]
pub fn calc_rows_per_block(&self, total_bytes: usize, total_rows: usize) -> usize {
let mut block_num = std::cmp::max(total_bytes / self.max_bytes_per_block, 1);
let mut rows_per_block = total_rows.div_ceil(block_num);
if self.check_for_compact(total_rows, total_bytes) {
return total_rows;
}

let max_bytes_per_block = if rows_per_block < self.max_rows_per_block / 10 {
// If block rows < 100_000, max_bytes_per_block set to 200M
2 * self.max_bytes_per_block
} else if rows_per_block < self.max_rows_per_block / 2 {
// If block rows < 500_000, max_bytes_per_block set to 150M
3 * self.max_bytes_per_block / 2
} else if rows_per_block < self.min_rows_per_block {
// If block rows < 800_000, max_bytes_per_block set to 125M
5 * self.max_bytes_per_block / 4
} else {
self.max_bytes_per_block
let block_num_by_size = std::cmp::max(total_bytes / self.max_bytes_per_block, 1);
let block_num_by_rows = std::cmp::max(total_rows / self.min_rows_per_block, 1);
if block_num_by_rows >= block_num_by_size {
return self.max_rows_per_block;
}

let mut rows_per_block = total_rows.div_ceil(block_num_by_size);
let max_bytes_per_block = match rows_per_block {
v if v < self.max_rows_per_block / 10 => {
// If block rows < 100_000, max_bytes_per_block set to 200M
2 * self.max_bytes_per_block
}
v if v < self.max_rows_per_block / 2 => {
// If block rows < 500_000, max_bytes_per_block set to 150M
3 * self.max_bytes_per_block / 2
}
v if v < self.min_rows_per_block => {
// If block rows < 800_000, max_bytes_per_block set to 125M
5 * self.max_bytes_per_block / 4
}
_ => self.max_bytes_per_block,
};

if block_num > 1 && max_bytes_per_block > self.max_bytes_per_block {
block_num = std::cmp::max(total_bytes / max_bytes_per_block, 1);
rows_per_block = total_rows.div_ceil(block_num);
if max_bytes_per_block > self.max_bytes_per_block {
rows_per_block = std::cmp::max(
total_rows / (std::cmp::max(total_bytes / max_bytes_per_block, 1)),
1,
);
}

rows_per_block.min(self.max_rows_per_block)
rows_per_block
}
}
12 changes: 6 additions & 6 deletions src/query/pipeline/transforms/src/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ mod transform;
mod transform_accumulating;
mod transform_accumulating_async;
mod transform_async;
mod transform_block_compact;
mod transform_block_compact_for_copy;
mod transform_blocking;
mod transform_compact;
mod transform_compact_block;
mod transform_compact_builder;
mod transform_compact_no_split_builder;
mod transform_dummy;
mod transform_multi_sort_merge;
mod transform_pipeline_helper;
Expand All @@ -34,10 +34,10 @@ pub use transform::*;
pub use transform_accumulating::*;
pub use transform_accumulating_async::*;
pub use transform_async::*;
pub use transform_block_compact::*;
pub use transform_block_compact_for_copy::*;
pub use transform_blocking::*;
pub use transform_compact::*;
pub use transform_compact_block::*;
pub use transform_compact_builder::*;
pub use transform_compact_no_split_builder::*;
pub use transform_dummy::*;
pub use transform_multi_sort_merge::try_add_multi_sort_merge;
pub use transform_pipeline_helper::TransformPipelineHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ pub trait BlockMetaTransform<B: BlockMetaInfo>: Send + 'static {
fn on_finish(&mut self) -> Result<()> {
Ok(())
}

fn interrupt(&self) {}
}

pub struct BlockMetaTransformer<B: BlockMetaInfo, T: BlockMetaTransform<B>> {
Expand Down Expand Up @@ -315,4 +317,8 @@ impl<B: BlockMetaInfo, T: BlockMetaTransform<B>> Processor for BlockMetaTransfor

Ok(())
}

fn interrupt(&self) {
self.transform.interrupt();
}
}

This file was deleted.

Loading

0 comments on commit 9f0b15b

Please sign in to comment.