diff --git a/Cargo.lock b/Cargo.lock index 07a476125133e..725a2c376bbb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4861,6 +4861,7 @@ dependencies = [ "derive-visitor", "futures", "futures-util", + "goldenfile", "jsonb", "jwt-simple", "log", diff --git a/src/query/ast/src/ast/statements/table.rs b/src/query/ast/src/ast/statements/table.rs index 4e7f8fa8e8684..d168d1dcf547d 100644 --- a/src/query/ast/src/ast/statements/table.rs +++ b/src/query/ast/src/ast/statements/table.rs @@ -667,23 +667,42 @@ impl Display for TruncateTableStmt { } #[derive(Debug, Clone, PartialEq, Drive, DriveMut)] -pub struct VacuumTableStmt { +pub struct VacuumTargetTable { pub catalog: Option, pub database: Option, pub table: Identifier, +} + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub enum VacuumTarget { + Table(VacuumTargetTable), + All, +} + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct VacuumTableStmt { + pub target: VacuumTarget, pub option: VacuumTableOption, } impl Display for VacuumTableStmt { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { write!(f, "VACUUM TABLE ")?; - write_dot_separated_list( - f, - self.catalog - .iter() - .chain(&self.database) - .chain(Some(&self.table)), - )?; + match &self.target { + VacuumTarget::Table(target) => { + write_dot_separated_list( + f, + target + .catalog + .iter() + .chain(&target.database) + .chain(Some(&target.table)), + )?; + } + VacuumTarget::All => { + write!(f, " ALL")?; + } + } write!(f, " {}", &self.option)?; Ok(()) diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index b5268ec985362..5823580725d2c 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -1163,13 +1163,28 @@ pub fn statement_body(i: Input) -> IResult { }, |(_, _, (catalog, database, table), option)| { Statement::VacuumTable(VacuumTableStmt { - catalog, - database, - table, + target: VacuumTarget::Table(VacuumTargetTable { + catalog, + database, + table, + }), option, }) }, ); + + let vacuum_all_table = map( + rule! { + VACUUM ~ TABLE ~ ALL ~ #vacuum_table_option + }, + |(_, _, _, option)| { + Statement::VacuumTable(VacuumTableStmt { + target: VacuumTarget::All, + option, + }) + }, + ); + let vacuum_drop_table = map( rule! { VACUUM ~ DROP ~ TABLE ~ (FROM ~ ^#dot_separated_idents_1_to_2)? ~ #vacuum_drop_table_option @@ -1186,6 +1201,7 @@ pub fn statement_body(i: Input) -> IResult { }) }, ); + let analyze_table = map( rule! { ANALYZE ~ TABLE ~ #dot_separated_idents_1_to_3 ~ NOSCAN? @@ -2572,7 +2588,6 @@ pub fn statement_body(i: Input) -> IResult { | #show_indexes : "`SHOW INDEXES`" | #show_locks : "`SHOW LOCKS [IN ACCOUNT] [WHERE ...]`" | #kill_stmt : "`KILL (QUERY | CONNECTION) `" - | #vacuum_temp_files : "VACUUM TEMPORARY FILES [RETAIN number SECONDS|DAYS] [LIMIT number]" | #set_priority: "`SET PRIORITY (HIGH | MEDIUM | LOW) `" | #system_action: "`SYSTEM (ENABLE | DISABLE) EXCEPTION_BACKTRACE`" ), @@ -2676,8 +2691,6 @@ pub fn statement_body(i: Input) -> IResult { | #rename_table : "`RENAME TABLE [.] TO `" | #truncate_table : "`TRUNCATE TABLE [.]
`" | #optimize_table : "`OPTIMIZE TABLE [.]
(ALL | PURGE | COMPACT [SEGMENT])`" - | #vacuum_table : "`VACUUM TABLE [.]
[RETAIN number HOURS] [DRY RUN | DRY RUN SUMMARY]`" - | #vacuum_drop_table : "`VACUUM DROP TABLE [FROM [.]] [RETAIN number HOURS] [DRY RUN | DRY RUN SUMMARY]`" | #analyze_table : "`ANALYZE TABLE [.]
`" | #exists_table : "`EXISTS TABLE [.]
`" | #show_table_functions : "`SHOW TABLE_FUNCTIONS []`" @@ -2801,7 +2814,14 @@ AS | #call_procedure : "`CALL PROCEDURE ()`" ), rule!(#comment), - rule!(#vacuum_temporary_tables), + // Vacuum + rule!( + #vacuum_temporary_tables + | #vacuum_temp_files : "VACUUM TEMPORARY FILES [RETAIN number SECONDS|DAYS] [LIMIT number]" + | #vacuum_table : "`VACUUM TABLE [.]
[DRY RUN | DRY RUN SUMMARY]`" + | #vacuum_all_table : "`VACUUM TABLE ALL [DRY RUN | DRY RUN SUMMARY]`" + | #vacuum_drop_table : "`VACUUM DROP TABLE [FROM [.]] [RETAIN number HOURS] [DRY RUN | DRY RUN SUMMARY]`" + ), ))(i) } diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 83aed60f959e9..09f0e59213534 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -15459,16 +15459,20 @@ VACUUM TABLE t ---------- AST ------------ VacuumTable( VacuumTableStmt { - catalog: None, - database: None, - table: Identifier { - span: Some( - 13..14, - ), - name: "t", - quote: None, - ident_type: None, - }, + target: Table( + VacuumTargetTable { + catalog: None, + database: None, + table: Identifier { + span: Some( + 13..14, + ), + name: "t", + quote: None, + ident_type: None, + }, + }, + ), option: VacuumTableOption { dry_run: None, }, @@ -15483,16 +15487,20 @@ VACUUM TABLE t DRY RUN ---------- AST ------------ VacuumTable( VacuumTableStmt { - catalog: None, - database: None, - table: Identifier { - span: Some( - 13..14, - ), - name: "t", - quote: None, - ident_type: None, - }, + target: Table( + VacuumTargetTable { + catalog: None, + database: None, + table: Identifier { + span: Some( + 13..14, + ), + name: "t", + quote: None, + ident_type: None, + }, + }, + ), option: VacuumTableOption { dry_run: Some( false, @@ -15509,16 +15517,20 @@ VACUUM TABLE t DRY RUN SUMMARY ---------- AST ------------ VacuumTable( VacuumTableStmt { - catalog: None, - database: None, - table: Identifier { - span: Some( - 13..14, - ), - name: "t", - quote: None, - ident_type: None, - }, + target: Table( + VacuumTargetTable { + catalog: None, + database: None, + table: Identifier { + span: Some( + 13..14, + ), + name: "t", + quote: None, + ident_type: None, + }, + }, + ), option: VacuumTableOption { dry_run: Some( true, diff --git a/src/query/ee/Cargo.toml b/src/query/ee/Cargo.toml index c3ed823e7435f..a87f8985f68fe 100644 --- a/src/query/ee/Cargo.toml +++ b/src/query/ee/Cargo.toml @@ -66,6 +66,7 @@ uuid = { workspace = true } [dev-dependencies] databend-common-functions = { workspace = true } +goldenfile = { workspace = true } jsonb = { workspace = true } tantivy = { workspace = true } walkdir = { workspace = true } diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs index f799d0464846a..7284ffd841cd0 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum2.rs @@ -17,23 +17,82 @@ use std::path::Path; use databend_common_base::base::tokio; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::block_debug::pretty_format_blocks; +use databend_common_expression::DataBlock; use databend_enterprise_query::test_kits::context::EESetup; use databend_query::sessions::QueryContext; use databend_query::sessions::TableContext; use databend_query::test_kits::TestFixture; +use futures_util::TryStreamExt; +use goldenfile::Mint; // TODO investigate this // NOTE: SHOULD specify flavor = "multi_thread", otherwise query execution might be hanged #[tokio::test(flavor = "multi_thread")] -async fn test_vacuum2_all() -> Result<()> { +async fn test_table_function_fuse_vacuum2_all() -> Result<()> { let ee_setup = EESetup::new(); let fixture = TestFixture::setup_with_custom(ee_setup).await?; + + setup(&fixture).await?; + + // vacuum them all + let res = fixture.execute_command("call system$fuse_vacuum2()").await; + + // Check that: + + // 1. non-fuse tables should not stop us + + assert!(res.is_ok()); + + // 2. fuse table data should be vacuumed + + let storage_root = fixture.storage_root(); + + let ctx = fixture.new_query_ctx().await?; + check_files_left(&ctx, storage_root, "db1", "t1").await?; + check_files_left(&ctx, storage_root, "default", "t1").await?; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_vacuum_all_stmt() -> Result<()> { + let ee_setup = EESetup::new(); + let fixture = TestFixture::setup_with_custom(ee_setup).await?; + + setup(&fixture).await?; + + let mut mint = Mint::new("tests/it/testdata"); + let file = &mut mint.new_goldenfile("vacuum_all_stmt.txt").unwrap(); + + // execute the VACUUM TABLE ALL + let res_block = fixture + .execute_query("vacuum table all") + .await? + .try_collect::>() + .await?; + + let block_string = pretty_format_blocks(&res_block).unwrap(); + use std::io::Write; + writeln!(file, "{}", block_string).unwrap(); + + // Check: fuse table data should be vacuumed + + let storage_root = fixture.storage_root(); + + let ctx = fixture.new_query_ctx().await?; + check_files_left(&ctx, storage_root, "db1", "t1").await?; + check_files_left(&ctx, storage_root, "default", "t1").await?; + + Ok(()) +} + +async fn setup(fixture: &TestFixture) -> Result<()> { // Adjust retention period to 0, so that dropped tables will be vacuumed immediately let session = fixture.default_session(); session.get_settings().set_data_retention_time_in_days(0)?; - let ctx = fixture.new_query_ctx().await?; - + // Prepare test db / tables let setup_statements = vec![ // create non-system db1, create fuse and non-fuse table in it. "create database db1", @@ -53,67 +112,49 @@ async fn test_vacuum2_all() -> Result<()> { for stmt in setup_statements { fixture.execute_command(stmt).await?; } - - // vacuum them all - let res = fixture.execute_command("call system$fuse_vacuum2()").await; - - // Check that: - - // 1. non-fuse tables should not stop us - - assert!(res.is_ok()); - - // 2. fuse table data should be vacuumed - - let storage_root = fixture.storage_root(); - - async fn check_files_left( - ctx: &QueryContext, - storage_root: &str, - db_name: &str, - tbl_name: &str, - ) -> Result<()> { - let tenant = ctx.get_tenant(); - let table = ctx - .get_default_catalog()? - .get_table(&tenant, db_name, tbl_name) - .await?; - - let db = ctx - .get_default_catalog()? - .get_database(&tenant, db_name) - .await?; - - let path = Path::new(storage_root) - .join(db.get_db_info().database_id.db_id.to_string()) - .join(table.get_id().to_string()); - - let walker = walkdir::WalkDir::new(path).into_iter(); - - let mut files_left = Vec::new(); - for entry in walker { - let entry = entry.unwrap(); - if entry.file_type().is_file() { - files_left.push(entry); - } + Ok(()) +} +async fn check_files_left( + ctx: &QueryContext, + storage_root: &str, + db_name: &str, + tbl_name: &str, +) -> Result<()> { + let tenant = ctx.get_tenant(); + let table = ctx + .get_default_catalog()? + .get_table(&tenant, db_name, tbl_name) + .await?; + + let db = ctx + .get_default_catalog()? + .get_database(&tenant, db_name) + .await?; + + let path = Path::new(storage_root) + .join(db.get_db_info().database_id.db_id.to_string()) + .join(table.get_id().to_string()); + + let walker = walkdir::WalkDir::new(path).into_iter(); + + let mut files_left = Vec::new(); + for entry in walker { + let entry = entry.unwrap(); + if entry.file_type().is_file() { + files_left.push(entry); } - - // There should be one snapshot file and one snapshot hint file left - assert_eq!(files_left.len(), 2); - - files_left.sort_by(|a, b| a.file_name().cmp(b.file_name())); - // First is the only snapshot left - files_left[0].path().to_string_lossy().contains("/_ss/"); - // Second one is the last snapshot location hint - files_left[1] - .path() - .to_string_lossy() - .contains("last_snapshot_location_hint_v2"); - Ok::<(), ErrorCode>(()) } - check_files_left(&ctx, storage_root, "db1", "t1").await?; - check_files_left(&ctx, storage_root, "default", "t1").await?; - - Ok(()) + // There should be one snapshot file and one snapshot hint file left + assert_eq!(files_left.len(), 2); + + files_left.sort_by(|a, b| a.file_name().cmp(b.file_name())); + // First is the only snapshot left + files_left[0].path().to_string_lossy().contains("/_ss/"); + // Second one is the last snapshot location hint + files_left[1] + .path() + .to_string_lossy() + .contains("last_snapshot_location_hint_v2"); + Ok::<(), ErrorCode>(()) } diff --git a/src/query/ee/tests/it/testdata/vacuum_all_stmt.txt b/src/query/ee/tests/it/testdata/vacuum_all_stmt.txt new file mode 100644 index 0000000000000..7e9094bd36643 --- /dev/null +++ b/src/query/ee/tests/it/testdata/vacuum_all_stmt.txt @@ -0,0 +1,5 @@ ++----------+ +| Column 0 | ++----------+ +| 20 | ++----------+ diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index c839d99b832fe..7d4804920a62c 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -43,6 +43,7 @@ use databend_common_sql::plans::Mutation; use databend_common_sql::plans::OptimizeCompactBlock; use databend_common_sql::plans::PresignAction; use databend_common_sql::plans::RewriteKind; +use databend_common_sql::plans::VacuumTarget; use databend_common_sql::Planner; use databend_common_users::RoleCacheManager; use databend_common_users::UserApiProvider; @@ -1204,7 +1205,14 @@ impl AccessChecker for PrivilegeAccess { self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false, false).await? } Plan::VacuumTable(plan) => { - self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Super, false, false).await? + match &plan.target { + VacuumTarget::Table(tgt) => { + self.validate_table_access(&tgt.catalog, &tgt.database, &tgt.table, UserPrivilegeType::Super, false, false).await? + } + VacuumTarget::All => { + self.validate_access(&GrantObject::Global, UserPrivilegeType::Super, false, false).await? + } + } } Plan::VacuumDropTable(plan) => { self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Super, false).await? diff --git a/src/query/service/src/interpreters/interpreter_table_vacuum.rs b/src/query/service/src/interpreters/interpreter_table_vacuum.rs index 78a05c04c683a..80fc2ff8ebe98 100644 --- a/src/query/service/src/interpreters/interpreter_table_vacuum.rs +++ b/src/query/service/src/interpreters/interpreter_table_vacuum.rs @@ -23,6 +23,9 @@ use databend_common_expression::FromData; use databend_common_license::license::Feature::Vacuum; use databend_common_license::license_manager::LicenseManagerSwitch; use databend_common_sql::plans::VacuumTablePlan; +use databend_common_sql::plans::VacuumTarget; +use databend_common_sql::plans::VacuumTargetTable; +use databend_common_storages_fuse::operations::vacuum_all_tables; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::FUSE_TBL_BLOCK_PREFIX; use databend_common_storages_fuse::FUSE_TBL_SEGMENT_PREFIX; @@ -95,26 +98,37 @@ impl VacuumTableInterpreter { index_files, }) } -} -#[async_trait::async_trait] -impl Interpreter for VacuumTableInterpreter { - fn name(&self) -> &str { - "VacuumTableInterpreter" + async fn vacuum_table(&self, target: &VacuumTargetTable) -> Result { + let handler = get_vacuum_handler(); + let table = self + .ctx + .get_table(&target.catalog, &target.database, &target.table) + .await?; + let fuse_table = FuseTable::try_from_table(table.as_ref())?; + let obj_removed = handler + .do_vacuum2(fuse_table, self.ctx.clone(), false) + .await?; + let num_obj_removed = obj_removed.len() as u64; + let res_block = + DataBlock::new_from_columns(vec![UInt64Type::from_data(vec![num_obj_removed])]); + PipelineBuildResult::from_blocks(vec![res_block]) } - fn is_ddl(&self) -> bool { - true + async fn vacuum_all(&self) -> Result { + let handler = get_vacuum_handler(); + let catalog = self.ctx.get_default_catalog()?; + let ctx: Arc = self.ctx.clone() as _; + let num_obj_removed = vacuum_all_tables(&ctx, &handler, catalog.as_ref()).await?; + let res_block = + DataBlock::new_from_columns(vec![UInt64Type::from_data(vec![num_obj_removed])]); + PipelineBuildResult::from_blocks(vec![res_block]) } - #[async_backtrace::framed] - async fn execute2(&self) -> Result { - LicenseManagerSwitch::instance() - .check_enterprise_enabled(self.ctx.get_license_key(), Vacuum)?; - - let catalog_name = self.plan.catalog.clone(); - let db_name = self.plan.database.clone(); - let tbl_name = self.plan.table.clone(); + async fn legacy_vacuum_table(&self, target: &VacuumTargetTable) -> Result { + let catalog_name = target.catalog.clone(); + let db_name = target.database.clone(); + let tbl_name = target.table.clone(); let table = self .ctx .get_table(&catalog_name, &db_name, &tbl_name) @@ -136,29 +150,27 @@ impl Interpreter for VacuumTableInterpreter { match purge_files_opt { None => { - return { - let stat = self.get_statistics(fuse_table).await?; - let total_files = stat.snapshot_files.0 - + stat.segment_files.0 - + stat.block_files.0 - + stat.index_files.0; - let total_size = stat.snapshot_files.1 - + stat.segment_files.1 - + stat.block_files.1 - + stat.index_files.1; - PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ - UInt64Type::from_data(vec![stat.snapshot_files.0]), - UInt64Type::from_data(vec![stat.snapshot_files.1]), - UInt64Type::from_data(vec![stat.segment_files.0]), - UInt64Type::from_data(vec![stat.segment_files.1]), - UInt64Type::from_data(vec![stat.block_files.0]), - UInt64Type::from_data(vec![stat.block_files.1]), - UInt64Type::from_data(vec![stat.index_files.0]), - UInt64Type::from_data(vec![stat.index_files.1]), - UInt64Type::from_data(vec![total_files]), - UInt64Type::from_data(vec![total_size]), - ])]) - }; + let stat = self.get_statistics(fuse_table).await?; + let total_files = stat.snapshot_files.0 + + stat.segment_files.0 + + stat.block_files.0 + + stat.index_files.0; + let total_size = stat.snapshot_files.1 + + stat.segment_files.1 + + stat.block_files.1 + + stat.index_files.1; + PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![ + UInt64Type::from_data(vec![stat.snapshot_files.0]), + UInt64Type::from_data(vec![stat.snapshot_files.1]), + UInt64Type::from_data(vec![stat.segment_files.0]), + UInt64Type::from_data(vec![stat.segment_files.1]), + UInt64Type::from_data(vec![stat.block_files.0]), + UInt64Type::from_data(vec![stat.block_files.1]), + UInt64Type::from_data(vec![stat.index_files.0]), + UInt64Type::from_data(vec![stat.index_files.1]), + UInt64Type::from_data(vec![total_files]), + UInt64Type::from_data(vec![total_size]), + ])]) } Some(purge_files) => { let mut file_sizes = vec![]; @@ -183,3 +195,31 @@ impl Interpreter for VacuumTableInterpreter { } } } + +#[async_trait::async_trait] +impl Interpreter for VacuumTableInterpreter { + fn name(&self) -> &str { + "VacuumTableInterpreter" + } + + fn is_ddl(&self) -> bool { + true + } + + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + LicenseManagerSwitch::instance() + .check_enterprise_enabled(self.ctx.get_license_key(), Vacuum)?; + + match &self.plan.target { + VacuumTarget::Table(tgt_table) => { + if self.plan.use_legacy_vacuum { + self.legacy_vacuum_table(tgt_table).await + } else { + self.vacuum_table(tgt_table).await + } + } + VacuumTarget::All => self.vacuum_all().await, + } + } +} diff --git a/src/query/service/src/interpreters/interpreter_txn_commit.rs b/src/query/service/src/interpreters/interpreter_txn_commit.rs index 6f13fd58647f4..f8af9c3adabc7 100644 --- a/src/query/service/src/interpreters/interpreter_txn_commit.rs +++ b/src/query/service/src/interpreters/interpreter_txn_commit.rs @@ -103,7 +103,7 @@ pub async fn execute_commit_statement(ctx: Arc) -> Result<()> if let Err(e) = vacuum_tables_from_info(tables_need_purge, ctx.clone(), handler).await { - warn!( "Failed to vacuum tables after transaction commit (best-effort operation): {e}"); + warn!("Failed to vacuum tables after transaction commit : {e}"); } else { info!( "{num_tables} tables vacuumed after transaction commit in a best-effort manner" ); } diff --git a/src/query/service/src/interpreters/interpreter_view_describe.rs b/src/query/service/src/interpreters/interpreter_view_describe.rs index 3b2b08f4f6dfc..523b718c0b5ea 100644 --- a/src/query/service/src/interpreters/interpreter_view_describe.rs +++ b/src/query/service/src/interpreters/interpreter_view_describe.rs @@ -65,6 +65,7 @@ impl Interpreter for DescribeViewInterpreter { if let Some(query) = tbl_info.options().get(QUERY) { let mut planner = Planner::new(self.ctx.clone()); let (plan, _) = planner.plan_sql(query).await?; + eprintln!("plan schema: {:#?}", plan.schema()); infer_table_schema(&plan.schema()) } else { return Err(ErrorCode::Internal( diff --git a/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs b/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs index be84dff9d7ae2..1f7e7baa72a8b 100644 --- a/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs +++ b/src/query/service/src/table_functions/fuse_vacuum2/fuse_vacuum2_table.rs @@ -17,12 +17,13 @@ use std::sync::Arc; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::catalog_kind::CATALOG_DEFAULT; use databend_common_catalog::plan::DataSourcePlan; -use databend_common_catalog::table::Table; use databend_common_catalog::table::TableExt; use databend_common_catalog::table_args::TableArgs; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::types::NumberDataType; use databend_common_expression::types::StringType; +use databend_common_expression::types::UInt64Type; use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_expression::TableDataType; @@ -40,8 +41,6 @@ use databend_common_storages_fuse::table_functions::SimpleTableFunc; use databend_common_storages_fuse::FuseTable; use databend_enterprise_vacuum_handler::get_vacuum_handler; use databend_enterprise_vacuum_handler::VacuumHandlerWrapper; -use log::info; -use log::warn; use crate::sessions::TableContext; @@ -83,7 +82,15 @@ impl SimpleTableFunc for FuseVacuum2Table { } fn schema(&self) -> TableSchemaRef { - TableSchemaRefExt::create(vec![TableField::new("vacuumed", TableDataType::String)]) + match &self.args { + Vacuum2TableArgs::SingleTable { .. } => { + TableSchemaRefExt::create(vec![TableField::new("vacuumed", TableDataType::String)]) + } + Vacuum2TableArgs::All => TableSchemaRefExt::create(vec![TableField::new( + "num_object_removed", + TableDataType::Number(NumberDataType::UInt64), + )]), + } } async fn apply( @@ -100,20 +107,23 @@ impl SimpleTableFunc for FuseVacuum2Table { arg_table_name, respect_flash_back, } => { - self.apply_single_table( - ctx, - catalog.as_ref(), - arg_database_name, - arg_table_name, - respect_flash_back.unwrap_or_default(), - ) - .await? + let obj_removed = self + .apply_single_table( + ctx, + catalog.as_ref(), + arg_database_name, + arg_table_name, + respect_flash_back.unwrap_or_default(), + ) + .await?; + DataBlock::new_from_columns(vec![StringType::from_data(obj_removed)]) + } + Vacuum2TableArgs::All => { + let num_obj_removed = self.apply_all_tables(ctx, catalog.as_ref()).await?; + DataBlock::new_from_columns(vec![UInt64Type::from_data(vec![num_obj_removed])]) } - Vacuum2TableArgs::All => self.apply_all_tables(ctx, catalog.as_ref()).await?, }; - Ok(Some(DataBlock::new_from_columns(vec![ - StringType::from_data(res), - ]))) + Ok(Some(res)) } fn create(func_name: &str, table_args: TableArgs) -> Result @@ -181,67 +191,12 @@ impl FuseVacuum2Table { &self, ctx: &Arc, catalog: &dyn Catalog, - ) -> Result> { - let tenant_id = ctx.get_tenant(); - let dbs = catalog.list_databases(&tenant_id).await?; - let num_db = dbs.len(); - - for (idx_db, db) in dbs.iter().enumerate() { - if db.engine().to_uppercase() == "SYSTEM" { - info!("Bypass system database [{}]", db.name()); - continue; - } - - info!( - "Processing db {}, progress: {}/{}", - db.name(), - idx_db + 1, - num_db - ); - let tables = catalog.list_tables(&tenant_id, db.name()).await?; - info!("Found {} tables in db {}", tables.len(), db.name()); - - let num_tbl = tables.len(); - for (idx_tbl, table) in tables.iter().enumerate() { - info!( - "Processing table {}.{}, db level progress: {}/{}", - db.name(), - table.get_table_info().name, - idx_tbl + 1, - num_tbl - ); - - let Ok(tbl) = FuseTable::try_from_table(table.as_ref()) else { - info!( - "Bypass non-fuse table {}.{}", - db.name(), - table.get_table_info().name - ); - continue; - }; - - if tbl.is_read_only() { - info!( - "Bypass read only table {}.{}", - db.name(), - table.get_table_info().name - ); - continue; - } - - let res = self.handler.do_vacuum2(tbl, ctx.clone(), false).await; - - if let Err(e) = res { - warn!( - "vacuum2 table {}.{} failed: {}", - db.name(), - table.get_table_info().name, - e - ); - }; - } - } - - Ok(vec![]) + ) -> Result { + databend_common_storages_fuse::operations::vacuum_all_tables( + ctx, + self.handler.as_ref(), + catalog, + ) + .await } } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 1fba150428237..0501cd2e4746b 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1480,6 +1480,17 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), + + // Note : + // - This is a temporary flag which will be removed when legacy vacuum impl is removed from codebase. + // - This flag has no effect on `VACUUM TABLE ALL` which will use the vacuum2 impl unconditionally. + ("fallback_to_legacy_vacuum", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Using legacy vacuum implementation when vacuum a specific table", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 3ba9d9b0fa2d0..637d9fd20f960 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -1087,4 +1087,8 @@ impl Settings { pub fn get_queries_queue_retry_timeout(&self) -> Result { self.try_get_u64("queries_queue_retry_timeout") } + + pub fn get_fallback_to_legacy_vacuum(&self) -> Result { + Ok(self.try_get_u64("fallback_to_legacy_vacuum")? == 1) + } } diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index 4e6652443f34c..cae70b425e087 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -329,6 +329,7 @@ impl Binder { Statement::VacuumTemporaryFiles(stmt) => { self.bind_vacuum_temporary_files(bind_context, stmt).await? } + Statement::AnalyzeTable(stmt) => self.bind_analyze_table(stmt).await?, Statement::ExistsTable(stmt) => self.bind_exists_table(stmt).await?, // Dictionaries diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 453d77ec18504..87e61c0221727 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -158,6 +158,8 @@ use crate::plans::VacuumDropTableOption; use crate::plans::VacuumDropTablePlan; use crate::plans::VacuumTableOption; use crate::plans::VacuumTablePlan; +use crate::plans::VacuumTarget; +use crate::plans::VacuumTargetTable; use crate::plans::VacuumTemporaryFilesPlan; use crate::BindContext; use crate::DefaultExprBinder; @@ -1503,24 +1505,31 @@ impl Binder { _bind_context: &mut BindContext, stmt: &VacuumTableStmt, ) -> Result { - let VacuumTableStmt { - catalog, - database, - table, - option, - } = stmt; - - let (catalog, database, table) = - self.normalize_object_identifier_triple(catalog, database, table); + let use_legacy_vacuum = self.ctx.get_settings().get_fallback_to_legacy_vacuum()?; + + let target = match &stmt.target { + databend_common_ast::ast::VacuumTarget::Table(tgt) => { + let (catalog, database, table) = self.normalize_object_identifier_triple( + &tgt.catalog, + &tgt.database, + &tgt.table, + ); + VacuumTarget::Table(VacuumTargetTable { + catalog, + database, + table, + }) + } + databend_common_ast::ast::VacuumTarget::All => VacuumTarget::All, + }; let option = VacuumTableOption { - dry_run: option.dry_run, + dry_run: stmt.option.dry_run, }; Ok(Plan::VacuumTable(Box::new(VacuumTablePlan { - catalog, - database, - table, + target, option, + use_legacy_vacuum, }))) } diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index ef024a42a84bc..ac2f0d3d8d529 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -105,17 +105,36 @@ impl DropTablePlan { } } -/// Vacuum #[derive(Clone, Debug)] -pub struct VacuumTablePlan { +pub struct VacuumTargetTable { pub catalog: String, pub database: String, pub table: String, +} + +#[derive(Clone, Debug)] +pub enum VacuumTarget { + Table(VacuumTargetTable), + All, +} + +/// Vacuum +#[derive(Clone, Debug)] +pub struct VacuumTablePlan { + pub target: VacuumTarget, pub option: VacuumTableOption, + pub use_legacy_vacuum: bool, } impl VacuumTablePlan { pub fn schema(&self) -> DataSchemaRef { + if !self.use_legacy_vacuum { + return Arc::new(DataSchema::new(vec![DataField::new( + "num_of_objects_removed", + DataType::Number(NumberDataType::UInt64), + )])); + } + if let Some(summary) = self.option.dry_run { if summary { Arc::new(DataSchema::new(vec![ diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index 911ab15637abe..96551fb38ee83 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -56,4 +56,6 @@ pub use util::acquire_task_permit; pub use util::column_parquet_metas; pub use util::read_block; pub use util::set_backoff; +pub use vacuum::vacuum_all_tables; +pub use vacuum::vacuum_table; pub use vacuum::vacuum_tables_from_info; diff --git a/src/query/storages/fuse/src/operations/vacuum.rs b/src/query/storages/fuse/src/operations/vacuum.rs index 506bebf56a078..4ac56c8feeeff 100644 --- a/src/query/storages/fuse/src/operations/vacuum.rs +++ b/src/query/storages/fuse/src/operations/vacuum.rs @@ -12,10 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -// src/query/storages/fuse/src/vacuum/mod.rs - use std::sync::Arc; +use databend_common_catalog::catalog::Catalog; +use databend_common_catalog::table::Table; use databend_common_catalog::table::TableExt; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; @@ -66,3 +66,78 @@ pub async fn vacuum_tables_from_info( Ok(()) } + +pub async fn vacuum_all_tables( + ctx: &Arc, + handler: &VacuumHandlerWrapper, + catalog: &dyn Catalog, +) -> Result { + let tenant_id = ctx.get_tenant(); + let dbs = catalog.list_databases(&tenant_id).await?; + let num_db = dbs.len(); + + let mut num_obj_removed = 0; + + for (idx_db, db) in dbs.iter().enumerate() { + if db.engine().to_uppercase() == "SYSTEM" { + info!("Bypass system database [{}]", db.name()); + continue; + } + + info!( + "Processing db {}, progress: {}/{}", + db.name(), + idx_db + 1, + num_db + ); + let tables = catalog.list_tables(&tenant_id, db.name()).await?; + info!("Found {} tables in db {}", tables.len(), db.name()); + + let num_tbl = tables.len(); + for (idx_tbl, table) in tables.iter().enumerate() { + info!( + "Processing table {}.{}, progress of db {}: {}/{}", + db.name(), + table.get_table_info().name, + db.name(), + idx_tbl + 1, + num_tbl + ); + + let Ok(tbl) = FuseTable::try_from_table(table.as_ref()) else { + info!( + "Bypass non-fuse table {}.{}", + db.name(), + table.get_table_info().name + ); + continue; + }; + + if tbl.is_read_only() { + info!( + "Bypass read only table {}.{}", + db.name(), + table.get_table_info().name + ); + continue; + } + + let res = handler.do_vacuum2(tbl, ctx.clone(), false).await; + + match res { + Ok(removed) => { + num_obj_removed += removed.len() as u64; + } + Err(e) => { + warn!( + "vacuum2 table {}.{} failed: {}", + db.name(), + table.get_table_info().name, + e + ); + } + } + } + } + Ok(num_obj_removed) +} diff --git a/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py b/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py index 0ed3b191b2ce6..3afe8c1a08931 100755 --- a/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py +++ b/tests/suites/5_ee/01_vacuum/01_0000_ee_vacuum.py @@ -66,7 +66,7 @@ def compact_data(name): mycursor.execute("select a from gc_test order by a;") old_datas = mycursor.fetchall() - mycursor.execute("vacuum table gc_test dry run;") + mycursor.execute("settings (fallback_to_legacy_vacuum=1) vacuum table gc_test dry run;") datas = mycursor.fetchall() print(datas) @@ -76,7 +76,7 @@ def compact_data(name): if old_datas != datas: print("vacuum dry run lose data: %s : %s" % (old_datas, datas)) - client1.send("vacuum table gc_test;") + client1.send("settings (fallback_to_legacy_vacuum=1) vacuum table gc_test;") client1.expect(prompt) mycursor.execute("select a from gc_test order by a;") diff --git a/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh b/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh index 0f4fb8b2453b8..4fdf37bc8a0e5 100755 --- a/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh +++ b/tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh @@ -9,12 +9,12 @@ echo "CREATE DATABASE test_vacuum_drop_dry_run" | $BENDSQL_CLIENT_CONNECT echo "create table test_vacuum_drop_dry_run.a(c int)" | $BENDSQL_CLIENT_CONNECT echo "INSERT INTO test_vacuum_drop_dry_run.a VALUES (1)" | $BENDSQL_CLIENT_OUTPUT_NULL echo "drop table test_vacuum_drop_dry_run.a" | $BENDSQL_CLIENT_CONNECT -count=$(echo "set data_retention_time_in_days=0; vacuum drop table dry run" | $BENDSQL_CLIENT_CONNECT | wc -l) +count=$(echo "set data_retention_time_in_days=0; set fallback_to_legacy_vacuum=1; vacuum drop table dry run" | $BENDSQL_CLIENT_CONNECT | wc -l) if [[ ! "$count" ]]; then echo "vacuum drop table dry run, count:$count" exit 1 fi -count=$(echo "set data_retention_time_in_days=0; vacuum drop table dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) +count=$(echo "set data_retention_time_in_days=0; set fallback_to_legacy_vacuum=1; vacuum drop table dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) if [[ ! "$count" ]]; then echo "vacuum drop table dry run summary, count:$count" exit 1 @@ -44,7 +44,7 @@ echo "INSERT INTO test_vacuum_drop.b VALUES (2)" | $BENDSQL_CLIENT_OUTPUT_NULL echo "drop table test_vacuum_drop.b" | $BENDSQL_CLIENT_CONNECT -echo "vacuum drop table from test_vacuum_drop" | $BENDSQL_CLIENT_CONNECT > /dev/null +echo "set fallback_to_legacy_vacuum=1; vacuum drop table from test_vacuum_drop" | $BENDSQL_CLIENT_CONNECT > /dev/null echo "undrop table test_vacuum_drop.b" | $BENDSQL_CLIENT_CONNECT @@ -120,12 +120,12 @@ echo "select * from test_vacuum_drop_4.b" | $BENDSQL_CLIENT_CONNECT ## test vacuum table output echo "create table test_vacuum_drop_4.c(c int)" | $BENDSQL_CLIENT_CONNECT echo "INSERT INTO test_vacuum_drop_4.c VALUES (1),(2)" | $BENDSQL_CLIENT_OUTPUT_NULL -count=$(echo "set data_retention_time_in_days=0; vacuum table test_vacuum_drop_4.c" | $BENDSQL_CLIENT_CONNECT | awk '{print $9}') +count=$(echo "set data_retention_time_in_days=0; set fallback_to_legacy_vacuum = 1; vacuum table test_vacuum_drop_4.c" | $BENDSQL_CLIENT_CONNECT | awk '{print $9}') if [[ "$count" != "4" ]]; then echo "vacuum table, count:$count" exit 1 fi -count=$(echo "set data_retention_time_in_days=0; vacuum table test_vacuum_drop_4.c dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) +count=$(echo "set data_retention_time_in_days=0; set fallback_to_legacy_vacuum = 1; vacuum table test_vacuum_drop_4.c dry run summary" | $BENDSQL_CLIENT_CONNECT | wc -l) if [[ "$count" != "1" ]]; then echo "vacuum table dry run summary, count:$count" exit 1 diff --git a/tests/suites/5_ee/01_vacuum/01_003_vacuum_table_only_orphans.sh b/tests/suites/5_ee/01_vacuum/01_003_vacuum_table_only_orphans.sh index f86d0cd356963..6ad630c23d7e1 100755 --- a/tests/suites/5_ee/01_vacuum/01_003_vacuum_table_only_orphans.sh +++ b/tests/suites/5_ee/01_vacuum/01_003_vacuum_table_only_orphans.sh @@ -57,7 +57,7 @@ ls -l /tmp/test_vacuum_table_only_orphans/"$PREFIX"/_sg/ | wc -l ls -l /tmp/test_vacuum_table_only_orphans/"$PREFIX"/_i_b_v2/ | wc -l -stmt "set data_retention_time_in_days=0; vacuum table test_vacuum_table_only_orphans.a" > /dev/null +stmt "set data_retention_time_in_days=0; settings (fallback_to_legacy_vacuum=1) vacuum table test_vacuum_table_only_orphans.a" > /dev/null echo "after vacuum" diff --git a/tests/suites/5_ee/04_attach_read_only/04_0001_check_mutations.sh b/tests/suites/5_ee/04_attach_read_only/04_0001_check_mutations.sh index e797fa52e6f06..e25ef3012ec6e 100755 --- a/tests/suites/5_ee/04_attach_read_only/04_0001_check_mutations.sh +++ b/tests/suites/5_ee/04_attach_read_only/04_0001_check_mutations.sh @@ -20,12 +20,12 @@ echo "CREATE INVERTED INDEX IF NOT EXISTS idx1 ON test_attach_only.test_json_rea echo "vacuum table" echo "vacuum table should fail" -echo "VACUUM TABLE test_attach_only.test_json_read_only;" | $BENDSQL_CLIENT_CONNECT +echo "settings (fallback_to_legacy_vacuum=1) VACUUM TABLE test_attach_only.test_json_read_only;" | $BENDSQL_CLIENT_CONNECT echo "vacuum drop table from db should not include the read_only attach table" # drop & vacuum echo "drop table test_attach_only.test_json_read_only" | $BENDSQL_CLIENT_CONNECT -echo "vacuum drop table from test_attach_only" | $BENDSQL_CLIENT_CONNECT > /dev/null +echo "settings (fallback_to_legacy_vacuum=1) vacuum drop table from test_attach_only" | $BENDSQL_CLIENT_CONNECT > /dev/null # attach it back echo "attach table test_attach_only.test_json_read_only 's3://testbucket/data/$storage_prefix' connection=(access_key_id ='minioadmin' secret_access_key ='minioadmin' endpoint_url='${STORAGE_S3_ENDPOINT_URL}')" | $BENDSQL_CLIENT_CONNECT echo "expect table data still there"