From eda5cd2abe0d68d848bc0c33625a6665eb3d0ffc Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 12 Sep 2024 09:44:20 +0800 Subject: [PATCH 1/8] rm unused comment --- src/query/service/src/pipelines/builders/builder_recluster.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index 9048ab7581ab6..020aa7afd9acb 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -84,7 +84,6 @@ impl PipelineBuilder { self.ctx.set_partitions(plan.parts.clone())?; - // ReadDataKind to avoid OOM. table.read_data(self.ctx.clone(), &plan, &mut self.main_pipeline, false)?; let num_input_columns = schema.fields().len(); From 890df1cf32f376e8275b4d6207d0eaf9fdae2816 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 12 Sep 2024 11:20:21 +0800 Subject: [PATCH 2/8] feat: Auto compact(re-cluster) for multiple table insertion statement --- .../interpreter_insert_multi_table.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs index e1147d11d2b00..e9427ef71db94 100644 --- a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs +++ b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use databend_common_catalog::lock::LockTableOption; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::UInt64Type; @@ -34,6 +35,7 @@ use databend_common_sql::executor::physical_plans::ChunkFillAndReorder; use databend_common_sql::executor::physical_plans::ChunkMerge; use databend_common_sql::executor::physical_plans::FillAndReorder; use databend_common_sql::executor::physical_plans::MultiInsertEvalScalar; +use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_sql::executor::physical_plans::SerializableTable; use databend_common_sql::executor::physical_plans::ShuffleStrategy; use databend_common_sql::executor::PhysicalPlan; @@ -46,6 +48,7 @@ use databend_common_sql::plans::Plan; use databend_common_sql::MetadataRef; use databend_common_sql::ScalarExpr; +use super::HookOperator; use crate::interpreters::common::dml_build_update_stream_req; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; @@ -87,8 +90,21 @@ impl Interpreter for InsertMultiTableInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { let physical_plan = self.build_physical_plan().await?; - let build_res = + let mut build_res = build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?; + // Execute hook. + for (_, (db, tbl)) in &self.plan.target_tables { + let hook_operator = HookOperator::create( + self.ctx.clone(), + // multi table insert only support default catalog + "DEFAULT".to_string(), + db.to_string(), + tbl.to_string(), + MutationKind::Insert, + LockTableOption::LockNoRetry, + ); + hook_operator.execute(&mut build_res.main_pipeline).await; + } Ok(build_res) } From 0339c0ce283394d6df13f4bfd42345428217a5bb Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 12 Sep 2024 11:27:50 +0800 Subject: [PATCH 3/8] add setting --- .../interpreter_insert_multi_table.rs | 28 +++++++++++-------- src/query/settings/src/settings_default.rs | 9 ++++++ .../settings/src/settings_getter_setter.rs | 4 +++ 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs index e9427ef71db94..9117bb1a1442b 100644 --- a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs +++ b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs @@ -93,17 +93,23 @@ impl Interpreter for InsertMultiTableInterpreter { let mut build_res = build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?; // Execute hook. - for (_, (db, tbl)) in &self.plan.target_tables { - let hook_operator = HookOperator::create( - self.ctx.clone(), - // multi table insert only support default catalog - "DEFAULT".to_string(), - db.to_string(), - tbl.to_string(), - MutationKind::Insert, - LockTableOption::LockNoRetry, - ); - hook_operator.execute(&mut build_res.main_pipeline).await; + if self + .ctx + .get_settings() + .get_enable_compact_after_multi_table_insert()? + { + for (_, (db, tbl)) in &self.plan.target_tables { + let hook_operator = HookOperator::create( + self.ctx.clone(), + // multi table insert only support default catalog + "DEFAULT".to_string(), + db.to_string(), + tbl.to_string(), + MutationKind::Insert, + LockTableOption::LockNoRetry, + ); + hook_operator.execute(&mut build_res.main_pipeline).await; + } } Ok(build_res) } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 2c189bb5681f8..c5864a1692a94 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -590,6 +590,15 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ( + "enable_compact_after_multi_table_insert", + DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "Enables recluster and compact after multi-table insert.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=1)), + } + ), ("auto_compaction_imperfect_blocks_threshold", DefaultSettingValue { value: UserSettingValue::UInt64(25), desc: "Threshold for triggering auto compaction. This occurs when the number of imperfect blocks in a snapshot exceeds this value after write operations.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 866e56772072d..7a71ede7e73a8 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -508,6 +508,10 @@ impl Settings { Ok(self.try_get_u64("enable_compact_after_write")? != 0) } + pub fn get_enable_compact_after_multi_table_insert(&self) -> Result { + Ok(self.try_get_u64("enable_compact_after_multi_table_insert")? != 0) + } + pub fn get_auto_compaction_imperfect_blocks_threshold(&self) -> Result { self.try_get_u64("auto_compaction_imperfect_blocks_threshold") } From b577f9c12ce9172251fc99d0caff6233c25d14b0 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 12 Sep 2024 13:25:30 +0800 Subject: [PATCH 4/8] add logic test --- ...090101_multi_table_insert_auto_compact.sql | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql b/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql new file mode 100644 index 0000000000000..5ddc3ff2e836b --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql @@ -0,0 +1,95 @@ +statement ok +create or replace database multi_table_insert_auto_compact; + +statement ok +use multi_table_insert_auto_compact; + +statement ok +set auto_compaction_imperfect_blocks_threshold = 3; + +statement ok +create or replace table t1 (c int) block_per_segment = 10 row_per_block = 3; + +statement ok +create or replace table t2 (c int) block_per_segment = 10 row_per_block = 3; + +# first block (after compaction) +statement ok +insert all into t1 into t2 select 1; + +statement ok +insert all into t1 into t2 select 1; + +statement ok +insert all into t1 into t2 select 1; + + +# second block (after compaction) +statement ok +insert all into t1 into t2 select 1; + +statement ok +insert all into t1 into t2 select 1; + +statement ok +insert all into t1 into t2 select 1; + + +# third block (after compaction) +statement ok +insert all into t1 into t2 select 1; + +statement ok +insert all into t1 into t2 select 1; + +statement ok +insert all into t1 into t2 select 1; + + +# fourth block(after compaction) +statement ok +set auto_compaction_segments_limit = 2; + +statement ok +insert all into t1 into t2 select 1; + +statement ok +insert all into t1 into t2 select 2; + +query III +select segment_count , block_count , row_count from fuse_snapshot('multi_table_insert_auto_compact', 't1') limit 20; +---- +2 4 11 +3 5 11 +2 4 10 +1 3 9 +4 5 9 +3 4 8 +2 3 7 +1 2 6 +4 4 6 +3 3 5 +2 2 4 +1 1 3 +3 3 3 +2 2 2 +1 1 1 + +query III +select segment_count , block_count , row_count from fuse_snapshot('multi_table_insert_auto_compact', 't2') limit 20; +---- +2 4 11 +3 5 11 +2 4 10 +1 3 9 +4 5 9 +3 4 8 +2 3 7 +1 2 6 +4 4 6 +3 3 5 +2 2 4 +1 1 3 +3 3 3 +2 2 2 +1 1 1 From 8d895559ff11bdddeec8f76c287843362b63188f Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 12 Sep 2024 17:27:36 +0800 Subject: [PATCH 5/8] fix hint --- src/query/catalog/src/table_context.rs | 8 +++- .../src/interpreters/hook/compact_hook.rs | 4 +- .../interpreter_insert_multi_table.rs | 3 +- src/query/service/src/sessions/query_ctx.rs | 19 ++++++--- .../service/src/sessions/query_ctx_shared.rs | 5 +-- .../tests/it/sql/exec/get_table_bind_test.rs | 8 ---- .../tests/it/storages/fuse/conflict.rs | 3 ++ .../it/storages/fuse/operations/commit.rs | 8 ---- .../common/generators/append_generator.rs | 3 +- .../common/generators/mutation_generator.rs | 1 + .../common/generators/snapshot_generator.rs | 11 ++++- .../common/generators/truncate_generator.rs | 1 + .../processors/multi_table_insert_commit.rs | 1 + .../common/processors/sink_commit.rs | 1 + ...090101_multi_table_insert_auto_compact.sql | 41 ++++++++++++++----- 15 files changed, 75 insertions(+), 42 deletions(-) diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index baee83c322798..3fb7dbc81c07e 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -176,8 +176,12 @@ pub trait TableContext: Send + Sync { fn set_can_scan_from_agg_index(&self, enable: bool); fn get_enable_sort_spill(&self) -> bool; fn set_enable_sort_spill(&self, enable: bool); - fn set_compaction_num_block_hint(&self, hint: u64); - fn get_compaction_num_block_hint(&self) -> u64; + fn set_compaction_num_block_hint(&self, _table_name: &str, _hint: u64) { + unimplemented!() + } + fn get_compaction_num_block_hint(&self, _table_name: &str) -> u64 { + unimplemented!() + } fn set_table_snapshot(&self, snapshot: Arc); fn get_table_snapshot(&self) -> Option>; fn set_lazy_mutation_delete(&self, lazy: bool); diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index a291c7610b06d..04f17281ca1b3 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -80,8 +80,8 @@ async fn do_hook_compact( pipeline.set_on_finished(move |info: &ExecutionInfo| { let compaction_limits = match compact_target.mutation_kind { MutationKind::Insert => { - let compaction_num_block_hint = ctx.get_compaction_num_block_hint(); - info!("hint number of blocks need to be compacted {}", compaction_num_block_hint); + let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table); + info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint); if compaction_num_block_hint == 0 { return Ok(()); } diff --git a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs index 9117bb1a1442b..11d566a343716 100644 --- a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs +++ b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use databend_common_catalog::catalog::CATALOG_DEFAULT; use databend_common_catalog::lock::LockTableOption; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -102,7 +103,7 @@ impl Interpreter for InsertMultiTableInterpreter { let hook_operator = HookOperator::create( self.ctx.clone(), // multi table insert only support default catalog - "DEFAULT".to_string(), + CATALOG_DEFAULT.to_string(), db.to_string(), tbl.to_string(), MutationKind::Insert, diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index f839a243c321c..6a7a46cc05c6f 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -592,17 +592,26 @@ impl TableContext for QueryContext { } // get a hint at the number of blocks that need to be compacted. - fn get_compaction_num_block_hint(&self) -> u64 { + fn get_compaction_num_block_hint(&self, table_name: &str) -> u64 { self.shared .num_fragmented_block_hint - .load(Ordering::Acquire) + .lock() + .get(table_name) + .copied() + .unwrap_or_default() } // set a hint at the number of blocks that need to be compacted. - fn set_compaction_num_block_hint(&self, hint: u64) { - self.shared + fn set_compaction_num_block_hint(&self, table_name: &str, hint: u64) { + let old = self + .shared .num_fragmented_block_hint - .store(hint, Ordering::Release); + .lock() + .insert(table_name.to_string(), hint); + info!( + "set_compaction_num_block_hint: table_name {} old hint {:?}, new hint {}", + table_name, old, hint + ); } fn attach_query_str(&self, kind: QueryKind, query: String) { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 960e9771d4cda..55c1f44bdd7af 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -15,7 +15,6 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::atomic::AtomicBool; -use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Weak; @@ -117,7 +116,7 @@ pub struct QueryContextShared { pub(in crate::sessions) partitions_shas: Arc>>, pub(in crate::sessions) cacheable: Arc, pub(in crate::sessions) can_scan_from_agg_index: Arc, - pub(in crate::sessions) num_fragmented_block_hint: Arc, + pub(in crate::sessions) num_fragmented_block_hint: Arc>>, pub(in crate::sessions) enable_sort_spill: Arc, // Status info. pub(in crate::sessions) status: Arc>, @@ -175,7 +174,7 @@ impl QueryContextShared { partitions_shas: Arc::new(RwLock::new(vec![])), cacheable: Arc::new(AtomicBool::new(true)), can_scan_from_agg_index: Arc::new(AtomicBool::new(true)), - num_fragmented_block_hint: Arc::new(AtomicU64::new(0)), + num_fragmented_block_hint: Default::default(), enable_sort_spill: Arc::new(AtomicBool::new(true)), status: Arc::new(RwLock::new("null".to_string())), user_agent: Arc::new(RwLock::new("null".to_string())), diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index cd6f3cd9e7a41..1a46ec6412550 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -872,14 +872,6 @@ impl TableContext for CtxDelegation { todo!() } - fn set_compaction_num_block_hint(&self, _enable: u64) { - todo!() - } - - fn get_compaction_num_block_hint(&self) -> u64 { - todo!() - } - fn add_file_status(&self, _file_path: &str, _file_status: FileStatus) -> Result<()> { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/conflict.rs b/src/query/service/tests/it/storages/fuse/conflict.rs index 746e9ebc1b09e..b615a9a566677 100644 --- a/src/query/service/tests/it/storages/fuse/conflict.rs +++ b/src/query/service/tests/it/storages/fuse/conflict.rs @@ -65,6 +65,7 @@ fn test_unresolvable_delete_conflict() { None, TxnManager::init(), 0, + "test", ); assert!(result.is_err()); } @@ -156,6 +157,7 @@ fn test_resolvable_delete_conflict() { None, TxnManager::init(), 0, + "test", ); let snapshot = result.unwrap(); let expected = vec![("8".to_string(), 1), ("4".to_string(), 1)]; @@ -263,6 +265,7 @@ fn test_resolvable_replace_conflict() { None, TxnManager::init(), 0, + "test", ); let snapshot = result.unwrap(); let expected = vec![ diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 7acaf862daa22..492d1200934be 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -767,14 +767,6 @@ impl TableContext for CtxDelegation { todo!() } - fn set_compaction_num_block_hint(&self, _enable: u64) { - todo!() - } - - fn get_compaction_num_block_hint(&self) -> u64 { - todo!() - } - fn add_file_status(&self, _file_path: &str, _file_status: FileStatus) -> Result<()> { todo!() } diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs index 60103da7ad933..015621e92c652 100644 --- a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -119,6 +119,7 @@ impl SnapshotGenerator for AppendGenerator { cluster_key_meta: Option, previous: &Option>, prev_table_seq: Option, + table_name: &str, ) -> Result { let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?; if is_column_type_modified(&schema, expected_schema) { @@ -223,7 +224,7 @@ impl SnapshotGenerator for AppendGenerator { ) + 1; info!("set compact_num_block_hint to {compact_num_block_hint }"); self.ctx - .set_compaction_num_block_hint(compact_num_block_hint); + .set_compaction_num_block_hint(table_name, compact_num_block_hint); } Ok(TableSnapshot::new( diff --git a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs index ac9ba16286a11..ecc768366d3b4 100644 --- a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs @@ -63,6 +63,7 @@ impl SnapshotGenerator for MutationGenerator { cluster_key_meta: Option, previous: &Option>, prev_table_seq: Option, + _table_name: &str, ) -> Result { let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0); match &self.conflict_resolve_ctx { diff --git a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs index bf031d62fbf24..9310931dc6c6e 100644 --- a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs @@ -46,9 +46,15 @@ pub trait SnapshotGenerator { prev_table_seq: Option, txn_mgr: TxnManagerRef, table_id: u64, + table_name: &str, ) -> Result { - let mut snapshot = - self.do_generate_new_snapshot(schema, cluster_key_meta, &previous, prev_table_seq)?; + let mut snapshot = self.do_generate_new_snapshot( + schema, + cluster_key_meta, + &previous, + prev_table_seq, + table_name, + )?; let has_pending_transactional_mutations = { let guard = txn_mgr.lock(); @@ -73,5 +79,6 @@ pub trait SnapshotGenerator { cluster_key_meta: Option, previous: &Option>, prev_table_seq: Option, + table_name: &str, ) -> Result; } diff --git a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs index 750043cebb0b0..c6c7834e29435 100644 --- a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs @@ -60,6 +60,7 @@ impl SnapshotGenerator for TruncateGenerator { cluster_key_meta: Option, previous: &Option>, prev_table_seq: Option, + _table_name: &str, ) -> Result { let (prev_timestamp, prev_snapshot_id) = if let Some(prev_snapshot) = previous { ( diff --git a/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs b/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs index e5f49738920bc..49e76d5332dd2 100644 --- a/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs @@ -255,6 +255,7 @@ async fn build_update_table_meta_req( Some(fuse_table.table_info.ident.seq), txn_mgr, table.get_id(), + table.name(), )?; // write snapshot diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 795fa51f680e3..dd1a23cad2637 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -288,6 +288,7 @@ where F: SnapshotGenerator + Send + 'static Some(table_info.ident.seq), self.ctx.txn_mgr(), table_info.ident.table_id, + table_info.name.as_str(), ) { Ok(snapshot) => { self.state = State::TryCommit { diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql b/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql index 5ddc3ff2e836b..b31f3c16212f1 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql +++ b/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql @@ -13,37 +13,41 @@ create or replace table t1 (c int) block_per_segment = 10 row_per_block = 3; statement ok create or replace table t2 (c int) block_per_segment = 10 row_per_block = 3; +statement ok +create or replace table t3 (c int) block_per_segment = 10 row_per_block = 3; + # first block (after compaction) +# There is less one rows in t3 statement ok insert all into t1 into t2 select 1; statement ok -insert all into t1 into t2 select 1; +insert all into t1 into t2 into t3 select 1; statement ok -insert all into t1 into t2 select 1; +insert all into t1 into t2 into t3 select 1; # second block (after compaction) statement ok -insert all into t1 into t2 select 1; +insert all into t1 into t2 into t3 select 1; statement ok -insert all into t1 into t2 select 1; +insert all into t1 into t2 into t3 select 1; statement ok -insert all into t1 into t2 select 1; +insert all into t1 into t2 into t3 select 1; # third block (after compaction) statement ok -insert all into t1 into t2 select 1; +insert all into t1 into t2 into t3 select 1; statement ok -insert all into t1 into t2 select 1; +insert all into t1 into t2 into t3 select 1; statement ok -insert all into t1 into t2 select 1; +insert all into t1 into t2 into t3 select 1; # fourth block(after compaction) @@ -51,10 +55,10 @@ statement ok set auto_compaction_segments_limit = 2; statement ok -insert all into t1 into t2 select 1; +insert all into t1 into t2 into t3 select 1; statement ok -insert all into t1 into t2 select 2; +insert all into t1 into t2 into t3 select 2; query III select segment_count , block_count , row_count from fuse_snapshot('multi_table_insert_auto_compact', 't1') limit 20; @@ -93,3 +97,20 @@ select segment_count , block_count , row_count from fuse_snapshot('multi_table_i 3 3 3 2 2 2 1 1 1 + +query III +select segment_count , block_count , row_count from fuse_snapshot('multi_table_insert_auto_compact', 't3') limit 20; +---- +2 4 10 +1 3 9 +4 5 9 +3 4 8 +2 3 7 +1 2 6 +4 4 6 +3 3 5 +2 2 4 +1 1 3 +3 3 3 +2 2 2 +1 1 1 From d402e49b768549e3be0acf208b42c1bbbb9250f6 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 12 Sep 2024 17:52:12 +0800 Subject: [PATCH 6/8] modify test --- .../common/generators/append_generator.rs | 6 ++++++ ...090101_multi_table_insert_auto_compact.sql | 20 ------------------- 2 files changed, 6 insertions(+), 20 deletions(-) diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs index 015621e92c652..a972997ff0d59 100644 --- a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -209,6 +209,12 @@ impl SnapshotGenerator for AppendGenerator { .ctx .get_settings() .get_auto_compaction_imperfect_blocks_threshold()?; + println!("table_name: {}", table_name); + println!("imperfect_count: {}", imperfect_count); + println!( + "auto_compaction_imperfect_blocks_threshold: {}", + auto_compaction_imperfect_blocks_threshold + ); if imperfect_count >= auto_compaction_imperfect_blocks_threshold { // If imperfect_count is larger, SLIGHTLY increase the number of blocks diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql b/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql index b31f3c16212f1..f339f73df452f 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql +++ b/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql @@ -49,23 +49,9 @@ insert all into t1 into t2 into t3 select 1; statement ok insert all into t1 into t2 into t3 select 1; - -# fourth block(after compaction) -statement ok -set auto_compaction_segments_limit = 2; - -statement ok -insert all into t1 into t2 into t3 select 1; - -statement ok -insert all into t1 into t2 into t3 select 2; - query III select segment_count , block_count , row_count from fuse_snapshot('multi_table_insert_auto_compact', 't1') limit 20; ---- -2 4 11 -3 5 11 -2 4 10 1 3 9 4 5 9 3 4 8 @@ -82,9 +68,6 @@ select segment_count , block_count , row_count from fuse_snapshot('multi_table_i query III select segment_count , block_count , row_count from fuse_snapshot('multi_table_insert_auto_compact', 't2') limit 20; ---- -2 4 11 -3 5 11 -2 4 10 1 3 9 4 5 9 3 4 8 @@ -101,9 +84,6 @@ select segment_count , block_count , row_count from fuse_snapshot('multi_table_i query III select segment_count , block_count , row_count from fuse_snapshot('multi_table_insert_auto_compact', 't3') limit 20; ---- -2 4 10 -1 3 9 -4 5 9 3 4 8 2 3 7 1 2 6 From 4032cb5e9cf1a55f606211e725521a919607736c Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 12 Sep 2024 18:27:36 +0800 Subject: [PATCH 7/8] rm log --- .../src/operations/common/generators/append_generator.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs index a972997ff0d59..015621e92c652 100644 --- a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -209,12 +209,6 @@ impl SnapshotGenerator for AppendGenerator { .ctx .get_settings() .get_auto_compaction_imperfect_blocks_threshold()?; - println!("table_name: {}", table_name); - println!("imperfect_count: {}", imperfect_count); - println!( - "auto_compaction_imperfect_blocks_threshold: {}", - auto_compaction_imperfect_blocks_threshold - ); if imperfect_count >= auto_compaction_imperfect_blocks_threshold { // If imperfect_count is larger, SLIGHTLY increase the number of blocks From 8889fab9ad6bc1b14c0782a6382a82b1b924ed57 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 13 Sep 2024 14:13:53 +0800 Subject: [PATCH 8/8] disable by default --- src/query/settings/src/settings_default.rs | 2 +- .../09_fuse_engine/090101_multi_table_insert_auto_compact.sql | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index c5864a1692a94..26944906f790e 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -593,7 +593,7 @@ impl DefaultSettings { ( "enable_compact_after_multi_table_insert", DefaultSettingValue { - value: UserSettingValue::UInt64(1), + value: UserSettingValue::UInt64(0), desc: "Enables recluster and compact after multi-table insert.", mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql b/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql index f339f73df452f..8cb1579274243 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql +++ b/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql @@ -1,3 +1,6 @@ +statement ok +set enable_compact_after_multi_table_insert = 1; + statement ok create or replace database multi_table_insert_auto_compact;