diff --git a/src/query/ast/src/ast/format/ast_format.rs b/src/query/ast/src/ast/format/ast_format.rs index 4ca91d1b27a97..61ba739c2b79d 100644 --- a/src/query/ast/src/ast/format/ast_format.rs +++ b/src/query/ast/src/ast/format/ast_format.rs @@ -1349,7 +1349,7 @@ impl<'ast> Visitor<'ast> for AstFormatVisitor { let mut children = Vec::new(); self.visit_table_ref(&stmt.catalog, &stmt.database, &stmt.table); children.push(self.children.pop().unwrap()); - if let Some(action) = stmt.action { + if let Some(action) = &stmt.action { let action_name = format!("Action {}", action); let action_format_ctx = AstFormatContext::new(action_name); children.push(FormatTreeNode::new(action_format_ctx)); diff --git a/src/query/ast/src/ast/statements/table.rs b/src/query/ast/src/ast/statements/table.rs index 2afee93ee93d3..194876ad290b8 100644 --- a/src/query/ast/src/ast/statements/table.rs +++ b/src/query/ast/src/ast/statements/table.rs @@ -382,7 +382,7 @@ pub struct OptimizeTableStmt<'a> { pub catalog: Option>, pub database: Option>, pub table: Identifier<'a>, - pub action: Option, + pub action: Option>, } impl Display for OptimizeTableStmt<'_> { @@ -444,32 +444,41 @@ impl Display for Engine { } } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum CompactTarget { Block, Segment, } -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum OptimizeTableAction { +#[derive(Debug, Clone, PartialEq)] +pub enum OptimizeTableAction<'a> { All, Purge, - Compact(CompactTarget), + Compact { + target: CompactTarget, + limit: Option>, + }, } -impl Display for OptimizeTableAction { +impl<'a> Display for OptimizeTableAction<'a> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { OptimizeTableAction::All => write!(f, "ALL"), OptimizeTableAction::Purge => write!(f, "PURGE"), - OptimizeTableAction::Compact(action) => match action { - CompactTarget::Block => { - write!(f, "COMPACT BLOCK") + OptimizeTableAction::Compact { target, limit } => { + match target { + CompactTarget::Block => { + write!(f, "COMPACT BLOCK")?; + } + CompactTarget::Segment => { + write!(f, "COMPACT SEGMENT")?; + } } - CompactTarget::Segment => { - write!(f, "COMPACT SEGMENT") + if let Some(limit) = limit { + write!(f, " LIMIT {limit}")?; } - }, + Ok(()) + } } } } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index 42961868d0c1c..224df0a6fac79 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -1344,13 +1344,12 @@ pub fn optimize_table_action(i: Input) -> IResult { alt(( value(OptimizeTableAction::All, rule! { ALL }), value(OptimizeTableAction::Purge, rule! { PURGE }), - value( - OptimizeTableAction::Compact(CompactTarget::Segment), - rule! { COMPACT ~ SEGMENT}, - ), - value( - OptimizeTableAction::Compact(CompactTarget::Block), - rule! { COMPACT}, + map( + rule! { COMPACT ~ (SEGMENT)? ~ ( LIMIT ~ ^#expr )?}, + |(_, opt_segment, opt_limit)| OptimizeTableAction::Compact { + target: opt_segment.map_or(CompactTarget::Block, |_| CompactTarget::Segment), + limit: opt_limit.map(|(_, limit)| limit), + }, ), ))(i) } diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index 46e08e6feb86a..7dfbf760efbf9 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -233,9 +233,10 @@ pub trait Table: Sync + Send { &self, ctx: Arc, target: CompactTarget, + limit: Option, pipeline: &mut Pipeline, ) -> Result>> { - let (_, _, _) = (ctx, target, pipeline); + let (_, _, _, _) = (ctx, target, limit, pipeline); Err(ErrorCode::UnImplement(format!( "table {}, of engine type {}, does not support compact", diff --git a/src/query/planner/src/plans/optimize_table.rs b/src/query/planner/src/plans/optimize_table.rs index 3c51862aa9f32..e212f240146fe 100644 --- a/src/query/planner/src/plans/optimize_table.rs +++ b/src/query/planner/src/plans/optimize_table.rs @@ -34,6 +34,6 @@ impl OptimizeTablePlan { pub enum OptimizeTableAction { All, Purge, - CompactBlocks, - CompactSegments, + CompactBlocks(Option), + CompactSegments(Option), } diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index afe7be53bb8db..0cb58aeca5420 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -59,15 +59,26 @@ impl Interpreter for OptimizeTableInterpreter { ); let do_compact_blocks = matches!( action, - OptimizeTableAction::CompactBlocks | OptimizeTableAction::All + OptimizeTableAction::CompactBlocks(_) | OptimizeTableAction::All ); - let do_compact_segments_only = matches!(action, OptimizeTableAction::CompactSegments); + let do_compact_segments_only = matches!(action, OptimizeTableAction::CompactSegments(_)); + + let limit_opt = match action { + OptimizeTableAction::CompactBlocks(limit_opt) => *limit_opt, + OptimizeTableAction::CompactSegments(limit_opt) => *limit_opt, + _ => None, + }; if do_compact_segments_only { let mut pipeline = Pipeline::create(); if let Some(mutator) = table - .compact(ctx.clone(), CompactTarget::Segments, &mut pipeline) + .compact( + ctx.clone(), + CompactTarget::Segments, + limit_opt, + &mut pipeline, + ) .await? { mutator.try_commit(table).await?; @@ -78,7 +89,7 @@ impl Interpreter for OptimizeTableInterpreter { if do_compact_blocks { let mut pipeline = Pipeline::create(); let mutator = table - .compact(ctx.clone(), CompactTarget::Blocks, &mut pipeline) + .compact(ctx.clone(), CompactTarget::Blocks, limit_opt, &mut pipeline) .await?; if let Some(mutator) = mutator { diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/full_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/full_compact_mutator.rs index e6fd81c44d228..c9aa16f86a278 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/full_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/full_compact_mutator.rs @@ -176,7 +176,7 @@ async fn build_mutator( settings.set_max_threads(1)?; let mut pipeline = common_pipeline_core::Pipeline::create(); let mutator = fuse_table - .compact(ctx.clone(), CompactTarget::Blocks, &mut pipeline) + .compact(ctx.clone(), CompactTarget::Blocks, None, &mut pipeline) .await?; assert!(mutator.is_some()); let mutator = mutator.unwrap(); diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index d0bfa9fee550e..c881cbce088fa 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -57,7 +57,7 @@ async fn test_compact_segment_normal_case() -> Result<()> { let fuse_table = FuseTable::try_from_table(table.as_ref())?; let mut pipeline = common_pipeline_core::Pipeline::create(); let mutator = fuse_table - .compact(ctx.clone(), CompactTarget::Segments, &mut pipeline) + .compact(ctx.clone(), CompactTarget::Segments, None, &mut pipeline) .await?; assert!(mutator.is_some()); let mutator = mutator.unwrap(); @@ -102,7 +102,7 @@ async fn test_compact_segment_resolvable_conflict() -> Result<()> { let fuse_table = FuseTable::try_from_table(table.as_ref())?; let mut pipeline = common_pipeline_core::Pipeline::create(); let mutator = fuse_table - .compact(ctx.clone(), CompactTarget::Segments, &mut pipeline) + .compact(ctx.clone(), CompactTarget::Segments, None, &mut pipeline) .await?; assert!(mutator.is_some()); let mutator = mutator.unwrap(); @@ -165,7 +165,7 @@ async fn test_compact_segment_unresolvable_conflict() -> Result<()> { let fuse_table = FuseTable::try_from_table(table.as_ref())?; let mut pipeline = common_pipeline_core::Pipeline::create(); let mutator = fuse_table - .compact(ctx.clone(), CompactTarget::Segments, &mut pipeline) + .compact(ctx.clone(), CompactTarget::Segments, None, &mut pipeline) .await?; assert!(mutator.is_some()); let mutator = mutator.unwrap(); @@ -200,7 +200,7 @@ async fn compact_segment(ctx: Arc, table: &Arc) -> Resu let fuse_table = FuseTable::try_from_table(table.as_ref())?; let mut pipeline = common_pipeline_core::Pipeline::create(); let mutator = fuse_table - .compact(ctx, CompactTarget::Segments, &mut pipeline) + .compact(ctx, CompactTarget::Segments, None, &mut pipeline) .await? .unwrap(); mutator.try_commit(table.clone()).await diff --git a/src/query/service/tests/it/storages/fuse/operations/optimize.rs b/src/query/service/tests/it/storages/fuse/operations/optimize.rs index 955e91b50a57a..50062d9c8affb 100644 --- a/src/query/service/tests/it/storages/fuse/operations/optimize.rs +++ b/src/query/service/tests/it/storages/fuse/operations/optimize.rs @@ -14,14 +14,11 @@ use common_base::base::tokio; use common_exception::Result; -use futures::TryStreamExt; use crate::storages::fuse::table_test_fixture::append_sample_data; use crate::storages::fuse::table_test_fixture::append_sample_data_overwrite; use crate::storages::fuse::table_test_fixture::check_data_dir; use crate::storages::fuse::table_test_fixture::execute_command; -use crate::storages::fuse::table_test_fixture::execute_query; -use crate::storages::fuse::table_test_fixture::expects_ok; use crate::storages::fuse::table_test_fixture::history_should_have_only_one_item; use crate::storages::fuse::table_test_fixture::TestFixture; @@ -66,46 +63,3 @@ async fn do_insertions(fixture: &TestFixture) -> Result<()> { append_sample_data_overwrite(1, true, fixture).await?; Ok(()) } - -#[tokio::test] -async fn test_fuse_snapshot_optimize_compact() -> Result<()> { - let fixture = TestFixture::new().await; - let db = fixture.default_db_name(); - let tbl = fixture.default_table_name(); - fixture.create_default_table().await?; - - // insert 5 blocks - let n = 5; - for _ in 0..n { - let table = fixture.latest_default_table().await?; - let num_blocks = 1; - let stream = TestFixture::gen_sample_blocks_stream(num_blocks, 1); - - let blocks = stream.try_collect().await?; - fixture - .append_commit_blocks(table.clone(), blocks, false, true) - .await?; - } - - // optimize compact - let qry = format!("optimize table {}.{} compact", db, tbl); - execute_command(fixture.ctx(), qry.as_str()).await?; - - // optimize compact should keep the histories - // there should be 6 history items there, 5 for the above insertions, 1 for that compaction - let expected = vec![ - "+----------+", - "| count(*) |", - "+----------+", - "| 6 |", - "+----------+", - ]; - let qry = format!("select count(*) from fuse_snapshot('{}', '{}')", db, tbl); - - expects_ok( - "count_should_be_1", - execute_query(fixture.ctx(), qry.as_str()).await, - expected, - ) - .await -} diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 7fab7558e432e..26f52e60aa50f 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -778,14 +778,30 @@ impl<'a> Binder { .map(|ident| normalize_identifier(ident, &self.name_resolution_ctx).name) .unwrap_or_else(|| self.ctx.get_current_database()); let table = normalize_identifier(table, &self.name_resolution_ctx).name; - let action = action.map_or(OptimizeTableAction::Purge, |v| match v { - AstOptimizeTableAction::All => OptimizeTableAction::All, - AstOptimizeTableAction::Purge => OptimizeTableAction::Purge, - AstOptimizeTableAction::Compact(target) => match target { - CompactTarget::Block => OptimizeTableAction::CompactBlocks, - CompactTarget::Segment => OptimizeTableAction::CompactSegments, - }, - }); + let action = if let Some(ast_action) = action { + match ast_action { + AstOptimizeTableAction::All => OptimizeTableAction::All, + AstOptimizeTableAction::Purge => OptimizeTableAction::Purge, + AstOptimizeTableAction::Compact { target, limit } => { + let limit_cnt = match limit { + Some(Expr::Literal { + lit: Literal::Integer(uint), + .. + }) => Some(*uint as usize), + Some(_) => { + return Err(ErrorCode::IllegalDataType("Unsupported limit type")); + } + _ => None, + }; + match target { + CompactTarget::Block => OptimizeTableAction::CompactBlocks(limit_cnt), + CompactTarget::Segment => OptimizeTableAction::CompactSegments(limit_cnt), + } + } + } + } else { + OptimizeTableAction::Purge + }; Ok(Plan::OptimizeTable(Box::new(OptimizeTablePlan { catalog, diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 06dd8ebd41e61..cc2e3d8d88c42 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -456,9 +456,10 @@ impl Table for FuseTable { &self, ctx: Arc, target: CompactTarget, + limit: Option, pipeline: &mut Pipeline, ) -> Result>> { - self.do_compact(ctx, target, pipeline).await + self.do_compact(ctx, target, limit, pipeline).await } async fn recluster( diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index 2c28c27155a0f..c89a676cf7a01 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -33,9 +33,11 @@ use crate::TableMutator; use crate::DEFAULT_BLOCK_PER_SEGMENT; use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; -struct CompactOptions { - base_snapshot: Arc, - block_per_seg: usize, +pub struct CompactOptions { + // the snapshot that compactor working on, it never changed during phases compaction. + pub base_snapshot: Arc, + pub block_per_seg: usize, + pub limit: Option, } impl FuseTable { @@ -43,6 +45,7 @@ impl FuseTable { &self, ctx: Arc, target: CompactTarget, + limit: Option, pipeline: &mut Pipeline, ) -> Result>> { let snapshot_opt = self.read_table_snapshot(ctx.clone()).await?; @@ -63,6 +66,7 @@ impl FuseTable { let compact_params = CompactOptions { base_snapshot, block_per_seg, + limit, }; match target { @@ -79,9 +83,8 @@ impl FuseTable { ) -> Result>> { let mut segment_mutator = SegmentCompactMutator::try_create( ctx.clone(), - options.base_snapshot, + options, self.meta_location_generator().clone(), - options.block_per_seg, self.operator.clone(), )?; @@ -103,10 +106,9 @@ impl FuseTable { let block_per_seg = options.block_per_seg; let mut mutator = FullCompactMutator::try_create( ctx.clone(), - options.base_snapshot, + options, block_compactor.clone(), self.meta_location_generator().clone(), - block_per_seg, self.cluster_key_meta.is_some(), self.operator.clone(), )?; diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index 1531df22f760e..548984401a3d8 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -28,6 +28,7 @@ mod truncate; pub mod util; +pub(crate) use compact::CompactOptions; pub use fuse_sink::FuseTableSink; pub use mutation::delete_from_block; pub use mutation::DeletionMutator; diff --git a/src/query/storages/fuse/src/operations/mutation/abort_operation.rs b/src/query/storages/fuse/src/operations/mutation/abort_operation.rs index 0c5052ea5057f..5d6794e83135b 100644 --- a/src/query/storages/fuse/src/operations/mutation/abort_operation.rs +++ b/src/query/storages/fuse/src/operations/mutation/abort_operation.rs @@ -21,7 +21,7 @@ use opendal::Operator; use crate::io::Files; -#[derive(Default, Clone, Debug)] +#[derive(Default)] pub struct AbortOperation { pub segments: Vec, pub blocks: Vec, diff --git a/src/query/storages/fuse/src/operations/mutation/compact_mutator/full_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact_mutator/full_compact_mutator.rs index ea03da7a809fd..e8867b53afc60 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact_mutator/full_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact_mutator/full_compact_mutator.rs @@ -20,7 +20,6 @@ use common_fuse_meta::meta::BlockMeta; use common_fuse_meta::meta::Location; use common_fuse_meta::meta::SegmentInfo; use common_fuse_meta::meta::Statistics; -use common_fuse_meta::meta::TableSnapshot; use common_fuse_meta::meta::Versioned; use opendal::Operator; @@ -30,27 +29,27 @@ use crate::io::SegmentsIO; use crate::io::TableMetaLocationGenerator; use crate::operations::mutation::AbortOperation; use crate::operations::AppendOperationLogEntry; +use crate::operations::CompactOptions; use crate::statistics::merge_statistics; use crate::statistics::reducers::reduce_block_metas; -use crate::statistics::reducers::reduce_statistics; use crate::FuseTable; use crate::Table; use crate::TableContext; use crate::TableMutator; -#[derive(Clone)] pub struct FullCompactMutator { ctx: Arc, - base_snapshot: Arc, + compact_params: CompactOptions, data_accessor: Operator, block_compactor: BlockCompactor, location_generator: TableMetaLocationGenerator, selected_blocks: Vec, - // Blocks that need to be reorganized into new segments. - remain_blocks: Vec, - segments: Vec, - summary: Statistics, - block_per_seg: usize, + // summarised statistics of all the accumulated segments(segment compacted, and unchanged) + merged_segment_statistics: Statistics, + // locations all the accumulated segments(segment compacted, and unchanged) + merged_segments_locations: Vec, + // paths the newly created segments (which are segment compacted) + new_segment_paths: Vec, // is_cluster indicates whether the table contains cluster key. is_cluster: bool, } @@ -58,30 +57,28 @@ pub struct FullCompactMutator { impl FullCompactMutator { pub fn try_create( ctx: Arc, - base_snapshot: Arc, + compact_params: CompactOptions, block_compactor: BlockCompactor, location_generator: TableMetaLocationGenerator, - block_per_seg: usize, is_cluster: bool, operator: Operator, ) -> Result { Ok(Self { ctx, - base_snapshot, + compact_params, data_accessor: operator, block_compactor, location_generator, selected_blocks: Vec::new(), - remain_blocks: Vec::new(), - segments: Vec::new(), - summary: Statistics::default(), - block_per_seg, + merged_segment_statistics: Statistics::default(), + merged_segments_locations: Vec::new(), + new_segment_paths: Vec::new(), is_cluster, }) } pub fn partitions_total(&self) -> usize { - self.base_snapshot.summary.block_count as usize + self.compact_params.base_snapshot.summary.block_count as usize } pub fn selected_blocks(&self) -> Vec { @@ -96,17 +93,32 @@ impl FullCompactMutator { #[async_trait::async_trait] impl TableMutator for FullCompactMutator { async fn target_select(&mut self) -> Result { - let snapshot = self.base_snapshot.clone(); + let snapshot = self.compact_params.base_snapshot.clone(); let segment_locations = &snapshot.segments; - let mut summarys = Vec::new(); + // Blocks that need to be reorganized into new segments. + let mut remain_blocks = Vec::new(); // Read all segments information in parallel. let segments_io = SegmentsIO::create(self.ctx.clone(), self.data_accessor.clone()); - let segments = segments_io.read_segments(segment_locations).await?; - for (idx, segment) in segments.iter().enumerate() { + let segments = segments_io + .read_segments(segment_locations) + .await? + .into_iter() + .collect::>>()?; + + let limit = self.compact_params.limit.unwrap_or(segments.len()); + if limit < segments.len() { + for i in limit..segments.len() { + self.merged_segments_locations + .push(segment_locations[i].clone()); + self.merged_segment_statistics = + merge_statistics(&self.merged_segment_statistics, &segments[i].summary)?; + } + } + + for (idx, segment) in segments.iter().take(limit).enumerate() { let mut need_merge = false; let mut remains = Vec::new(); - let segment = segment.clone()?; segment.blocks.iter().for_each(|b| { if self.is_cluster @@ -123,33 +135,32 @@ impl TableMutator for FullCompactMutator { // If the number of blocks of segment meets block_per_seg, and the blocks in segments donot need to be compacted, // then record the segment information. - if !need_merge && segment.blocks.len() == self.block_per_seg { - let location = segment_locations[idx].clone(); - self.segments.push(location); - summarys.push(segment.summary.clone()); + if !need_merge && segment.blocks.len() == self.compact_params.block_per_seg { + self.merged_segments_locations + .push(segment_locations[idx].clone()); + self.merged_segment_statistics = + merge_statistics(&self.merged_segment_statistics, &segment.summary)?; continue; } - self.remain_blocks.append(&mut remains); + remain_blocks.append(&mut remains); + } + + let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + // Because the pipeline is used, a threshold for the number of blocks is added to resolve + // https://github.com/datafuselabs/databend/issues/8316 + if self.selected_blocks.len() < max_threads * 2 { + remain_blocks.append(&mut self.selected_blocks); + self.selected_blocks.clear(); } if self.selected_blocks.is_empty() - && (self.remain_blocks.is_empty() || snapshot.segments.len() <= self.segments.len() + 1) + && (remain_blocks.is_empty() + || snapshot.segments.len() - self.merged_segments_locations.len() <= 1) { return Ok(false); } - // update the summary of new snapshot - self.summary = reduce_statistics(&summarys)?; - Ok(true) - } - - async fn try_commit(self: Box, table: Arc) -> Result<()> { - let ctx = self.ctx.clone(); - let mut segments = self.segments; - let mut summary = self.summary; - let mut abort_operation = AbortOperation::default(); - // Create new segments. let segment_info_cache = CacheManager::instance().get_table_segment_cache(); let seg_writer = SegmentWriter::new( @@ -157,18 +168,30 @@ impl TableMutator for FullCompactMutator { &self.location_generator, &segment_info_cache, ); - let chunks = self.remain_blocks.chunks(self.block_per_seg); + let chunks = remain_blocks.chunks(self.compact_params.block_per_seg); for chunk in chunks { let new_summary = reduce_block_metas(chunk)?; - let new_segment = SegmentInfo::new(chunk.to_vec(), new_summary.clone()); + self.merged_segment_statistics = + merge_statistics(&self.merged_segment_statistics, &new_summary)?; + let new_segment = SegmentInfo::new(chunk.to_vec(), new_summary); let new_segment_location = seg_writer.write_segment(new_segment).await?; - segments.push(new_segment_location.clone()); - summary = merge_statistics(&summary, &new_summary)?; - - abort_operation = abort_operation.add_segment(new_segment_location.0); + self.merged_segments_locations + .push(new_segment_location.clone()); + self.new_segment_paths.push(new_segment_location.0); } - let append_entries = ctx.consume_precommit_blocks(); + Ok(true) + } + + async fn try_commit(self: Box, table: Arc) -> Result<()> { + let mut segments = self.merged_segments_locations; + let mut summary = self.merged_segment_statistics; + let mut abort_operation = AbortOperation { + segments: self.new_segment_paths, + ..Default::default() + }; + + let append_entries = self.ctx.consume_precommit_blocks(); let append_log_entries = append_entries .iter() .map(AppendOperationLogEntry::try_from) @@ -193,7 +216,13 @@ impl TableMutator for FullCompactMutator { let table = FuseTable::try_from_table(table.as_ref())?; table - .commit_mutation(&ctx, self.base_snapshot, segments, summary, abort_operation) + .commit_mutation( + &self.ctx, + self.compact_params.base_snapshot, + segments, + summary, + abort_operation, + ) .await } } diff --git a/src/query/storages/fuse/src/operations/mutation/compact_mutator/segment_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact_mutator/segment_compact_mutator.rs index 202d78cce04b2..ecac2a8985ff1 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact_mutator/segment_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact_mutator/segment_compact_mutator.rs @@ -22,7 +22,6 @@ use common_fuse_meta::meta::BlockMeta; use common_fuse_meta::meta::Location; use common_fuse_meta::meta::SegmentInfo; use common_fuse_meta::meta::Statistics; -use common_fuse_meta::meta::TableSnapshot; use metrics::gauge; use opendal::Operator; @@ -30,6 +29,7 @@ use crate::io::SegmentWriter; use crate::io::SegmentsIO; use crate::io::TableMetaLocationGenerator; use crate::operations::mutation::AbortOperation; +use crate::operations::CompactOptions; use crate::statistics::merge_statistics; use crate::statistics::reducers::reduce_block_metas; use crate::FuseTable; @@ -71,11 +71,9 @@ use crate::TableMutator; pub struct SegmentCompactMutator { ctx: Arc, - // the snapshot that compactor working on, it never changed during phases compaction. - base_snapshot: Arc, + compact_params: CompactOptions, data_accessor: Operator, location_generator: TableMetaLocationGenerator, - blocks_per_seg: usize, // summarised statistics of all the accumulated segments(compacted, and unchanged) merged_segment_statistics: Statistics, @@ -88,17 +86,15 @@ pub struct SegmentCompactMutator { impl SegmentCompactMutator { pub fn try_create( ctx: Arc, - base_snapshot: Arc, + compact_params: CompactOptions, location_generator: TableMetaLocationGenerator, - blocks_per_seg: usize, operator: Operator, ) -> Result { Ok(Self { ctx, - base_snapshot, + compact_params, data_accessor: operator, location_generator, - blocks_per_seg, merged_segment_statistics: Statistics::default(), merged_segments_locations: vec![], new_segment_paths: vec![], @@ -116,21 +112,31 @@ impl TableMutator for SegmentCompactMutator { let select_begin = Instant::now(); let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), self.data_accessor.clone()); - let base_segment_locations = &self.base_snapshot.segments; + let base_segment_locations = &self.compact_params.base_snapshot.segments; let base_segments = fuse_segment_io - .read_segments(&self.base_snapshot.segments) + .read_segments(&self.compact_params.base_snapshot.segments) .await? .into_iter() .collect::>>()?; - let blocks_per_segment_threshold = self.blocks_per_seg; + let blocks_per_segment_threshold = self.compact_params.block_per_seg; let mut segments_tobe_compacted = Vec::with_capacity(base_segments.len() / 2); let mut unchanged_segment_locations = Vec::with_capacity(base_segments.len() / 2); let mut unchanged_segment_statistics = Statistics::default(); - for (idx, segment) in base_segments.iter().enumerate() { + + let limit = self.compact_params.limit.unwrap_or(base_segments.len()); + if limit < base_segments.len() { + for i in limit..base_segments.len() { + unchanged_segment_locations.push(base_segment_locations[i].clone()); + unchanged_segment_statistics = + merge_statistics(&unchanged_segment_statistics, &base_segments[i].summary)?; + } + } + + for (idx, segment) in base_segments.iter().take(limit).enumerate() { let number_blocks = segment.blocks.len(); if number_blocks >= blocks_per_segment_threshold { // skip if current segment is large enough, mark it as unchanged @@ -157,8 +163,8 @@ impl TableMutator for SegmentCompactMutator { } } - // split the block metas into chunks of blocks, with chunk size set to blocks_per_seg - let chunk_of_blocks = blocks_of_new_segments.chunks(self.blocks_per_seg); + // split the block metas into chunks of blocks, with chunk size set to block_per_seg + let chunk_of_blocks = blocks_of_new_segments.chunks(blocks_per_segment_threshold); // Build new segments which are compacted according to the setting of `block_per_seg` // note that the newly segments will be persistent into storage, such that if retry // happens during the later `try_commit` phase, they do not need to be written again. @@ -216,7 +222,7 @@ impl TableMutator for SegmentCompactMutator { fuse_table .commit_mutation( &self.ctx, - self.base_snapshot, + self.compact_params.base_snapshot, self.merged_segments_locations, self.merged_segment_statistics, abort_action, diff --git a/tests/logictest/suites/base/09_fuse_engine/09_0008_fuse_optimize_table b/tests/logictest/suites/base/09_fuse_engine/09_0008_fuse_optimize_table index 0e49c78de809a..2a41b7e323033 100644 --- a/tests/logictest/suites/base/09_fuse_engine/09_0008_fuse_optimize_table +++ b/tests/logictest/suites/base/09_fuse_engine/09_0008_fuse_optimize_table @@ -27,6 +27,21 @@ select * from t order by a; 6 7 +statement ok +set max_threads=2; + +statement ok +optimize table t compact; + +statement query II +select segment_count,block_count from fuse_snapshot('db_09_0008', 't') order by to_uint64(timestamp) desc limit 1; + +---- +1 3 + +statement ok +set max_threads=1; + statement ok optimize table t compact; @@ -38,11 +53,11 @@ select * from t order by a; 6 7 -statement query B -select count(*)=4 from fuse_snapshot('db_09_0008', 't'); +statement query II +select segment_count,block_count from fuse_snapshot('db_09_0008', 't') order by to_uint64(timestamp) desc limit 1; ---- -1 +1 1 @@ -188,6 +203,51 @@ select segment_count, block_count from fuse_snapshot('db_09_0008', 't1') limit 1 ---- 1 3 + +statement ok +create table t2(a uint64); + +statement ok +insert into t2 values (5); + +statement ok +insert into t2 values (6); + +statement ok +insert into t2 values (7); + +statement query II +select segment_count, block_count from fuse_snapshot('db_09_0008', 't2') limit 1; + +---- +3 3 + +statement ok +set max_threads=1; + +statement ok +optimize table t2 compact limit 2; + +statement query II +select segment_count, block_count from fuse_snapshot('db_09_0008', 't2') limit 1; + +---- +2 2 + +statement ok +insert into t2 values (8); + +statement ok +optimize table t2 compact segment limit 2; + +statement query II +select segment_count, block_count from fuse_snapshot('db_09_0008', 't2') limit 1; + +---- +2 3 + + + statement ok DROP TABLE m; @@ -197,6 +257,9 @@ DROP TABLE t; statement ok DROP TABLE t1; +statement ok +DROP TABLE t2; + statement ok DROP DATABASE db_09_0008;