diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index baee83c32279..3fb7dbc81c07 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -176,8 +176,12 @@ pub trait TableContext: Send + Sync { fn set_can_scan_from_agg_index(&self, enable: bool); fn get_enable_sort_spill(&self) -> bool; fn set_enable_sort_spill(&self, enable: bool); - fn set_compaction_num_block_hint(&self, hint: u64); - fn get_compaction_num_block_hint(&self) -> u64; + fn set_compaction_num_block_hint(&self, _table_name: &str, _hint: u64) { + unimplemented!() + } + fn get_compaction_num_block_hint(&self, _table_name: &str) -> u64 { + unimplemented!() + } fn set_table_snapshot(&self, snapshot: Arc); fn get_table_snapshot(&self) -> Option>; fn set_lazy_mutation_delete(&self, lazy: bool); diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index a291c7610b06..04f17281ca1b 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -80,8 +80,8 @@ async fn do_hook_compact( pipeline.set_on_finished(move |info: &ExecutionInfo| { let compaction_limits = match compact_target.mutation_kind { MutationKind::Insert => { - let compaction_num_block_hint = ctx.get_compaction_num_block_hint(); - info!("hint number of blocks need to be compacted {}", compaction_num_block_hint); + let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table); + info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint); if compaction_num_block_hint == 0 { return Ok(()); } diff --git a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs index e1147d11d2b0..11d566a34371 100644 --- a/src/query/service/src/interpreters/interpreter_insert_multi_table.rs +++ b/src/query/service/src/interpreters/interpreter_insert_multi_table.rs @@ -14,6 +14,8 @@ use std::sync::Arc; +use databend_common_catalog::catalog::CATALOG_DEFAULT; +use databend_common_catalog::lock::LockTableOption; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::UInt64Type; @@ -34,6 +36,7 @@ use databend_common_sql::executor::physical_plans::ChunkFillAndReorder; use databend_common_sql::executor::physical_plans::ChunkMerge; use databend_common_sql::executor::physical_plans::FillAndReorder; use databend_common_sql::executor::physical_plans::MultiInsertEvalScalar; +use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_sql::executor::physical_plans::SerializableTable; use databend_common_sql::executor::physical_plans::ShuffleStrategy; use databend_common_sql::executor::PhysicalPlan; @@ -46,6 +49,7 @@ use databend_common_sql::plans::Plan; use databend_common_sql::MetadataRef; use databend_common_sql::ScalarExpr; +use super::HookOperator; use crate::interpreters::common::dml_build_update_stream_req; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; @@ -87,8 +91,27 @@ impl Interpreter for InsertMultiTableInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { let physical_plan = self.build_physical_plan().await?; - let build_res = + let mut build_res = build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?; + // Execute hook. + if self + .ctx + .get_settings() + .get_enable_compact_after_multi_table_insert()? + { + for (_, (db, tbl)) in &self.plan.target_tables { + let hook_operator = HookOperator::create( + self.ctx.clone(), + // multi table insert only support default catalog + CATALOG_DEFAULT.to_string(), + db.to_string(), + tbl.to_string(), + MutationKind::Insert, + LockTableOption::LockNoRetry, + ); + hook_operator.execute(&mut build_res.main_pipeline).await; + } + } Ok(build_res) } diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index 9048ab7581ab..020aa7afd9ac 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -84,7 +84,6 @@ impl PipelineBuilder { self.ctx.set_partitions(plan.parts.clone())?; - // ReadDataKind to avoid OOM. table.read_data(self.ctx.clone(), &plan, &mut self.main_pipeline, false)?; let num_input_columns = schema.fields().len(); diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index f839a243c321..6a7a46cc05c6 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -592,17 +592,26 @@ impl TableContext for QueryContext { } // get a hint at the number of blocks that need to be compacted. - fn get_compaction_num_block_hint(&self) -> u64 { + fn get_compaction_num_block_hint(&self, table_name: &str) -> u64 { self.shared .num_fragmented_block_hint - .load(Ordering::Acquire) + .lock() + .get(table_name) + .copied() + .unwrap_or_default() } // set a hint at the number of blocks that need to be compacted. - fn set_compaction_num_block_hint(&self, hint: u64) { - self.shared + fn set_compaction_num_block_hint(&self, table_name: &str, hint: u64) { + let old = self + .shared .num_fragmented_block_hint - .store(hint, Ordering::Release); + .lock() + .insert(table_name.to_string(), hint); + info!( + "set_compaction_num_block_hint: table_name {} old hint {:?}, new hint {}", + table_name, old, hint + ); } fn attach_query_str(&self, kind: QueryKind, query: String) { diff --git a/src/query/service/src/sessions/query_ctx_shared.rs b/src/query/service/src/sessions/query_ctx_shared.rs index 960e9771d4cd..55c1f44bdd7a 100644 --- a/src/query/service/src/sessions/query_ctx_shared.rs +++ b/src/query/service/src/sessions/query_ctx_shared.rs @@ -15,7 +15,6 @@ use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::atomic::AtomicBool; -use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; use std::sync::Weak; @@ -117,7 +116,7 @@ pub struct QueryContextShared { pub(in crate::sessions) partitions_shas: Arc>>, pub(in crate::sessions) cacheable: Arc, pub(in crate::sessions) can_scan_from_agg_index: Arc, - pub(in crate::sessions) num_fragmented_block_hint: Arc, + pub(in crate::sessions) num_fragmented_block_hint: Arc>>, pub(in crate::sessions) enable_sort_spill: Arc, // Status info. pub(in crate::sessions) status: Arc>, @@ -175,7 +174,7 @@ impl QueryContextShared { partitions_shas: Arc::new(RwLock::new(vec![])), cacheable: Arc::new(AtomicBool::new(true)), can_scan_from_agg_index: Arc::new(AtomicBool::new(true)), - num_fragmented_block_hint: Arc::new(AtomicU64::new(0)), + num_fragmented_block_hint: Default::default(), enable_sort_spill: Arc::new(AtomicBool::new(true)), status: Arc::new(RwLock::new("null".to_string())), user_agent: Arc::new(RwLock::new("null".to_string())), diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index cd6f3cd9e7a4..1a46ec641255 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -872,14 +872,6 @@ impl TableContext for CtxDelegation { todo!() } - fn set_compaction_num_block_hint(&self, _enable: u64) { - todo!() - } - - fn get_compaction_num_block_hint(&self) -> u64 { - todo!() - } - fn add_file_status(&self, _file_path: &str, _file_status: FileStatus) -> Result<()> { todo!() } diff --git a/src/query/service/tests/it/storages/fuse/conflict.rs b/src/query/service/tests/it/storages/fuse/conflict.rs index 746e9ebc1b09..b615a9a56667 100644 --- a/src/query/service/tests/it/storages/fuse/conflict.rs +++ b/src/query/service/tests/it/storages/fuse/conflict.rs @@ -65,6 +65,7 @@ fn test_unresolvable_delete_conflict() { None, TxnManager::init(), 0, + "test", ); assert!(result.is_err()); } @@ -156,6 +157,7 @@ fn test_resolvable_delete_conflict() { None, TxnManager::init(), 0, + "test", ); let snapshot = result.unwrap(); let expected = vec![("8".to_string(), 1), ("4".to_string(), 1)]; @@ -263,6 +265,7 @@ fn test_resolvable_replace_conflict() { None, TxnManager::init(), 0, + "test", ); let snapshot = result.unwrap(); let expected = vec![ diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index 7acaf862daa2..492d1200934b 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -767,14 +767,6 @@ impl TableContext for CtxDelegation { todo!() } - fn set_compaction_num_block_hint(&self, _enable: u64) { - todo!() - } - - fn get_compaction_num_block_hint(&self) -> u64 { - todo!() - } - fn add_file_status(&self, _file_path: &str, _file_status: FileStatus) -> Result<()> { todo!() } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 2c189bb5681f..26944906f790 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -590,6 +590,15 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ( + "enable_compact_after_multi_table_insert", + DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Enables recluster and compact after multi-table insert.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=1)), + } + ), ("auto_compaction_imperfect_blocks_threshold", DefaultSettingValue { value: UserSettingValue::UInt64(25), desc: "Threshold for triggering auto compaction. This occurs when the number of imperfect blocks in a snapshot exceeds this value after write operations.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 866e56772072..7a71ede7e73a 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -508,6 +508,10 @@ impl Settings { Ok(self.try_get_u64("enable_compact_after_write")? != 0) } + pub fn get_enable_compact_after_multi_table_insert(&self) -> Result { + Ok(self.try_get_u64("enable_compact_after_multi_table_insert")? != 0) + } + pub fn get_auto_compaction_imperfect_blocks_threshold(&self) -> Result { self.try_get_u64("auto_compaction_imperfect_blocks_threshold") } diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs index 60103da7ad93..015621e92c65 100644 --- a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -119,6 +119,7 @@ impl SnapshotGenerator for AppendGenerator { cluster_key_meta: Option, previous: &Option>, prev_table_seq: Option, + table_name: &str, ) -> Result { let (snapshot_merged, expected_schema) = self.conflict_resolve_ctx()?; if is_column_type_modified(&schema, expected_schema) { @@ -223,7 +224,7 @@ impl SnapshotGenerator for AppendGenerator { ) + 1; info!("set compact_num_block_hint to {compact_num_block_hint }"); self.ctx - .set_compaction_num_block_hint(compact_num_block_hint); + .set_compaction_num_block_hint(table_name, compact_num_block_hint); } Ok(TableSnapshot::new( diff --git a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs index ac9ba16286a1..ecc768366d3b 100644 --- a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs @@ -63,6 +63,7 @@ impl SnapshotGenerator for MutationGenerator { cluster_key_meta: Option, previous: &Option>, prev_table_seq: Option, + _table_name: &str, ) -> Result { let default_cluster_key_id = cluster_key_meta.clone().map(|v| v.0); match &self.conflict_resolve_ctx { diff --git a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs index bf031d62fbf2..9310931dc6c6 100644 --- a/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/snapshot_generator.rs @@ -46,9 +46,15 @@ pub trait SnapshotGenerator { prev_table_seq: Option, txn_mgr: TxnManagerRef, table_id: u64, + table_name: &str, ) -> Result { - let mut snapshot = - self.do_generate_new_snapshot(schema, cluster_key_meta, &previous, prev_table_seq)?; + let mut snapshot = self.do_generate_new_snapshot( + schema, + cluster_key_meta, + &previous, + prev_table_seq, + table_name, + )?; let has_pending_transactional_mutations = { let guard = txn_mgr.lock(); @@ -73,5 +79,6 @@ pub trait SnapshotGenerator { cluster_key_meta: Option, previous: &Option>, prev_table_seq: Option, + table_name: &str, ) -> Result; } diff --git a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs index 750043cebb0b..c6c7834e2943 100644 --- a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs @@ -60,6 +60,7 @@ impl SnapshotGenerator for TruncateGenerator { cluster_key_meta: Option, previous: &Option>, prev_table_seq: Option, + _table_name: &str, ) -> Result { let (prev_timestamp, prev_snapshot_id) = if let Some(prev_snapshot) = previous { ( diff --git a/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs b/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs index e5f49738920b..49e76d5332dd 100644 --- a/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/multi_table_insert_commit.rs @@ -255,6 +255,7 @@ async fn build_update_table_meta_req( Some(fuse_table.table_info.ident.seq), txn_mgr, table.get_id(), + table.name(), )?; // write snapshot diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 795fa51f680e..dd1a23cad263 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -288,6 +288,7 @@ where F: SnapshotGenerator + Send + 'static Some(table_info.ident.seq), self.ctx.txn_mgr(), table_info.ident.table_id, + table_info.name.as_str(), ) { Ok(snapshot) => { self.state = State::TryCommit { diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql b/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql new file mode 100644 index 000000000000..8cb157927424 --- /dev/null +++ b/tests/sqllogictests/suites/base/09_fuse_engine/090101_multi_table_insert_auto_compact.sql @@ -0,0 +1,99 @@ +statement ok +set enable_compact_after_multi_table_insert = 1; + +statement ok +create or replace database multi_table_insert_auto_compact; + +statement ok +use multi_table_insert_auto_compact; + +statement ok +set auto_compaction_imperfect_blocks_threshold = 3; + +statement ok +create or replace table t1 (c int) block_per_segment = 10 row_per_block = 3; + +statement ok +create or replace table t2 (c int) block_per_segment = 10 row_per_block = 3; + +statement ok +create or replace table t3 (c int) block_per_segment = 10 row_per_block = 3; + +# first block (after compaction) +# There is less one rows in t3 +statement ok +insert all into t1 into t2 select 1; + +statement ok +insert all into t1 into t2 into t3 select 1; + +statement ok +insert all into t1 into t2 into t3 select 1; + + +# second block (after compaction) +statement ok +insert all into t1 into t2 into t3 select 1; + +statement ok +insert all into t1 into t2 into t3 select 1; + +statement ok +insert all into t1 into t2 into t3 select 1; + + +# third block (after compaction) +statement ok +insert all into t1 into t2 into t3 select 1; + +statement ok +insert all into t1 into t2 into t3 select 1; + +statement ok +insert all into t1 into t2 into t3 select 1; + +query III +select segment_count , block_count , row_count from fuse_snapshot('multi_table_insert_auto_compact', 't1') limit 20; +---- +1 3 9 +4 5 9 +3 4 8 +2 3 7 +1 2 6 +4 4 6 +3 3 5 +2 2 4 +1 1 3 +3 3 3 +2 2 2 +1 1 1 + +query III +select segment_count , block_count , row_count from fuse_snapshot('multi_table_insert_auto_compact', 't2') limit 20; +---- +1 3 9 +4 5 9 +3 4 8 +2 3 7 +1 2 6 +4 4 6 +3 3 5 +2 2 4 +1 1 3 +3 3 3 +2 2 2 +1 1 1 + +query III +select segment_count , block_count , row_count from fuse_snapshot('multi_table_insert_auto_compact', 't3') limit 20; +---- +3 4 8 +2 3 7 +1 2 6 +4 4 6 +3 3 5 +2 2 4 +1 1 3 +3 3 3 +2 2 2 +1 1 1