diff --git a/Cargo.lock b/Cargo.lock index 6d37d85c4d0d8..05d36952ff7c2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4945,6 +4945,7 @@ dependencies = [ "databend-common-base", "databend-common-catalog", "databend-common-exception", + "databend-common-meta-app", ] [[package]] diff --git a/src/query/ee/src/storages/fuse/operations/handler.rs b/src/query/ee/src/storages/fuse/operations/handler.rs index 48e53d0e2122a..39b47cb804421 100644 --- a/src/query/ee/src/storages/fuse/operations/handler.rs +++ b/src/query/ee/src/storages/fuse/operations/handler.rs @@ -20,6 +20,7 @@ use databend_common_catalog::table_context::AbortChecker; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropTablesResult; +use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDroppedTablesCtx; use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions; use databend_enterprise_vacuum_handler::VacuumHandler; use databend_enterprise_vacuum_handler::VacuumHandlerWrapper; @@ -50,13 +51,11 @@ impl VacuumHandler for RealVacuumHandler { do_vacuum2(table, ctx, respect_flash_back).await } - async fn do_vacuum_drop_tables( + async fn do_vacuum_dropped_tables( &self, - threads_nums: usize, - tables: Vec>, - dry_run_limit: Option, + dropped_tables: VacuumDroppedTablesCtx, ) -> VacuumDropTablesResult { - vacuum_drop_tables(threads_nums, tables, dry_run_limit).await + vacuum_drop_tables(dropped_tables).await } async fn do_vacuum_temporary_files( diff --git a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs index 1c1079c156e2a..bf335ef6206e7 100644 --- a/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs +++ b/src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs @@ -17,30 +17,91 @@ use std::sync::Arc; use std::time::Instant; use databend_common_base::runtime::execute_futures_in_parallel; -use databend_common_catalog::table::Table; +use databend_common_catalog::catalog::Catalog; use databend_common_exception::Result; +use databend_common_meta_app::schema::DBIdTableName; +use databend_common_meta_app::schema::DroppedId; +use databend_common_meta_app::schema::GcDroppedTableReq; +use databend_common_meta_app::schema::TableId; use databend_common_meta_app::schema::TableInfo; +use databend_common_meta_app::tenant::Tenant; use databend_common_storages_fuse::FuseTable; +use databend_enterprise_vacuum_handler::vacuum_handler::DroppedTableDesc; use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropFileInfo; use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropTablesResult; +use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDroppedTablesCtx; use futures_util::TryStreamExt; use log::error; use log::info; use opendal::EntryMode; use opendal::Operator; + #[async_backtrace::framed] pub async fn do_vacuum_drop_table( - tables: Vec<(TableInfo, Operator)>, + catalog: Arc, + tenant: Tenant, + tables: Vec, dry_run_limit: Option, ) -> VacuumDropTablesResult { + let is_dry_run = dry_run_limit.is_some(); + let mut list_files = vec![]; let mut failed_tables = HashSet::new(); - for (table_info, operator) in tables { - let result = - vacuum_drop_single_table(&table_info, operator, dry_run_limit, &mut list_files).await; - if result.is_err() { - let table_id = table_info.ident.table_id; - failed_tables.insert(table_id); + for table_desc in tables { + let table = &table_desc.table; + let table_info = table.get_table_info(); + + if !table.is_read_only() { + if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) { + let operator = fuse_table.get_operator(); + + let result = + vacuum_drop_single_table(table_info, operator, dry_run_limit, &mut list_files) + .await; + + if result.is_err() { + let table_id = table_info.ident.table_id; + failed_tables.insert(table_id); + continue; + } + } else { + info!( + "table {} is not of engine {}, it's data will NOT be cleaned.", + table.get_table_info().name, + table.engine() + ); + }; + } else { + info!( + "table {} is read-only, it's data will NOT be cleaned.", + table.get_table_info().name, + ); + } + + if !is_dry_run { + let req = GcDroppedTableReq { + tenant: tenant.clone(), + catalog: catalog.name(), + drop_ids: vec![DroppedId::Table { + name: DBIdTableName { + db_id: table_desc.db_id, + table_name: table_desc.table_name.clone(), + }, + id: TableId::new(table.get_table_info().ident.table_id), + }], + }; + + let result = catalog.gc_drop_tables(req).await; + + if result.is_err() { + let table_id = table_info.ident.table_id; + failed_tables.insert(table_id); + info!( + "failed to drop table table db_id: {}, table {}[{:}]", + table_desc.db_id, table_desc.table_name, table_id + ); + continue; + } } } Ok(if dry_run_limit.is_some() { @@ -50,7 +111,8 @@ pub async fn do_vacuum_drop_table( }) } -async fn vacuum_drop_single_table( +// public for IT +pub async fn vacuum_drop_single_table( table_info: &TableInfo, operator: Operator, dry_run_limit: Option, @@ -127,19 +189,20 @@ async fn vacuum_drop_single_table( #[async_backtrace::framed] pub async fn vacuum_drop_tables_by_table_info( - num_threads: usize, - table_infos: Vec<(TableInfo, Operator)>, - dry_run_limit: Option, + dropped_tables: VacuumDroppedTablesCtx, ) -> VacuumDropTablesResult { - let start = Instant::now(); - let num_tables = table_infos.len(); + let VacuumDroppedTablesCtx { + tenant, + catalog, + threads_nums, + tables, + dry_run_limit, + } = dropped_tables; - // - for each vacuum task, the tables passed to it will be processed sequentially - // - while removing one table's data, at most 1000 objects will be deleted (in batch) - // - let's assume that the rate limit is 3500 (individual) objects per second: - // A parallelism degree of up to 3 appears to be safe. - let num_threads = std::cmp::min(num_threads, 3); + let start = Instant::now(); + let num_tables = tables.len(); + let num_threads = std::cmp::min(threads_nums, 3); let batch_size = (num_tables / num_threads).clamp(1, 50); info!( @@ -147,16 +210,18 @@ pub async fn vacuum_drop_tables_by_table_info( num_tables, batch_size, num_threads ); - let result = if batch_size >= table_infos.len() { - do_vacuum_drop_table(table_infos, dry_run_limit).await? + let result = if batch_size >= tables.len() { + do_vacuum_drop_table(catalog, tenant, tables, dry_run_limit).await? } else { - let mut chunks = table_infos.chunks(batch_size); + let mut chunks = tables.chunks(batch_size); let dry_run_limit = dry_run_limit .map(|dry_run_limit| (dry_run_limit / num_threads).min(dry_run_limit).max(1)); let tasks = std::iter::from_fn(move || { - chunks - .next() - .map(|tables| do_vacuum_drop_table(tables.to_vec(), dry_run_limit)) + chunks.next().map(|tables| { + let catalog = catalog.clone(); + let tenant = tenant.clone(); + do_vacuum_drop_table(catalog, tenant, tables.to_vec(), dry_run_limit) + }) }); let result = execute_futures_in_parallel( @@ -169,8 +234,8 @@ pub async fn vacuum_drop_tables_by_table_info( // Note that Errs should NOT be swallowed if any target is not successfully deleted. // Otherwise, the caller site may proceed to purge meta-data from meta-server with - // some table data un-vacuumed, and the `vacuum` action of those dropped tables can no - // longer be roll-forward. + // some table data have not been cleaned completely, and the `vacuum` action of those + // dropped tables can no longer be roll-forward. if dry_run_limit.is_some() { let mut ret_files = vec![]; for res in result { @@ -202,31 +267,8 @@ pub async fn vacuum_drop_tables_by_table_info( Ok(result) } +// TODO remove this func #[async_backtrace::framed] -pub async fn vacuum_drop_tables( - threads_nums: usize, - tables: Vec>, - dry_run_limit: Option, -) -> VacuumDropTablesResult { - let num_tables = tables.len(); - info!("vacuum_drop_tables {} tables", num_tables); - - let mut table_infos = Vec::with_capacity(num_tables); - for table in tables { - let (table_info, operator) = - if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) { - (fuse_table.get_table_info(), fuse_table.get_operator()) - } else { - info!( - "ignore table {}, which is not of FUSE engine. Table engine {}", - table.get_table_info().name, - table.engine() - ); - continue; - }; - - table_infos.push((table_info.clone(), operator)); - } - - vacuum_drop_tables_by_table_info(threads_nums, table_infos, dry_run_limit).await +pub async fn vacuum_drop_tables(dropped_tables: VacuumDroppedTablesCtx) -> VacuumDropTablesResult { + vacuum_drop_tables_by_table_info(dropped_tables).await } diff --git a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs index 4a07c7ad2a471..c46c22ca07d28 100644 --- a/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs +++ b/src/query/ee/tests/it/storages/fuse/operations/vacuum.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::fmt::Debug; +use std::path::Path; use std::sync::Arc; use std::time::Duration; @@ -29,9 +30,7 @@ use databend_common_meta_app::principal::OwnershipObject; use databend_common_meta_app::principal::TenantOwnershipObjectIdent; use databend_common_meta_app::schema::DBIdTableName; use databend_common_meta_app::schema::DatabaseId; -use databend_common_meta_app::schema::TableInfo; -use databend_common_meta_app::schema::TableMeta; -use databend_common_meta_app::storage::StorageParams; +use databend_common_meta_app::schema::TableId; use databend_common_meta_kvapi::kvapi::KvApiExt; use databend_common_meta_store::MetaStore; use databend_common_meta_store::MetaStoreProvider; @@ -39,15 +38,12 @@ use databend_common_meta_types::TxnRequest; use databend_common_storage::DataOperator; use databend_common_storages_fuse::TableContext; use databend_common_version::BUILD_INFO; -use databend_enterprise_query::storages::fuse::operations::vacuum_drop_tables::do_vacuum_drop_table; -use databend_enterprise_query::storages::fuse::operations::vacuum_drop_tables::vacuum_drop_tables_by_table_info; use databend_enterprise_query::storages::fuse::operations::vacuum_temporary_files::do_vacuum_temporary_files; -use databend_enterprise_query::storages::fuse::vacuum_drop_tables; use databend_enterprise_query::test_kits::context::EESetup; use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions; use databend_query::test_kits::*; use databend_storages_common_io::Files; -use databend_storages_common_table_meta::table::OPT_KEY_DATABASE_ID; +use databend_storages_common_table_meta::meta::parse_storage_prefix; use opendal::raw::Access; use opendal::raw::AccessorInfo; use opendal::raw::OpStat; @@ -55,23 +51,20 @@ use opendal::raw::RpStat; use opendal::EntryMode; use opendal::Metadata; use opendal::OperatorBuilder; +use tempfile::TempDir; #[tokio::test(flavor = "multi_thread")] async fn test_fuse_do_vacuum_drop_tables() -> Result<()> { - let fixture = TestFixture::setup().await?; + let ee_setup = EESetup::new(); + let fixture = TestFixture::setup_with_custom(ee_setup).await?; fixture - .default_session() - .get_settings() - .set_data_retention_time_in_days(0)?; - - fixture.create_default_database().await?; - fixture.create_default_table().await?; - - let number_of_block = 1; - append_sample_data(number_of_block, &fixture).await?; + .execute_command("create database test_vacuum") + .await?; - let table = fixture.latest_default_table().await?; + fixture + .execute_command("create table test_vacuum.t (c int) as select * from numbers(100)") + .await?; check_data_dir( &fixture, @@ -87,17 +80,14 @@ async fn test_fuse_do_vacuum_drop_tables() -> Result<()> { ) .await?; - // do gc. - let db = fixture.default_db_name(); - let tbl = fixture.default_table_name(); - let qry = format!("drop table {}.{}", db, tbl); - fixture.execute_command(&qry).await?; - let ctx = fixture.new_query_ctx().await?; - let threads_nums = ctx.get_settings().get_max_threads()? as usize; + fixture.execute_command("drop table test_vacuum.t").await?; // verify dry run never delete files { - vacuum_drop_tables(threads_nums, vec![table.clone()], Some(100)).await?; + fixture + .execute_command("vacuum drop table from test_vacuum dry run") + .await?; + fixture.execute_command("vacuum drop table dry run").await?; check_data_dir( &fixture, "test_fuse_do_vacuum_drop_table: verify generate files", @@ -114,7 +104,9 @@ async fn test_fuse_do_vacuum_drop_tables() -> Result<()> { } { - vacuum_drop_tables(threads_nums, vec![table], None).await?; + fixture + .execute_command("settings(data_retention_time_in_days = 0) vacuum drop table") + .await?; // after vacuum drop tables, verify the files number check_data_dir( @@ -219,334 +211,209 @@ async fn test_do_vacuum_temporary_files() -> Result<()> { Ok(()) } -mod test_accessor { - use std::future::Future; - use std::sync::atomic::AtomicBool; - use std::sync::atomic::Ordering; +#[tokio::test(flavor = "multi_thread")] +async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> { + // **** Primary Scenario ******************************************************************** + // Test vacuum dropped table should keep going if some of the table data can not be removed + // successfully, e.g. bucket of external tables may under heavy load, can not handle the + // deletion operations in time. + // But, the metadata of the table that owns those data, should not be removed. + // ******************************************************************************************* - use opendal::raw::oio; - use opendal::raw::oio::Entry; - use opendal::raw::MaybeSend; - use opendal::raw::OpDelete; - use opendal::raw::OpList; - use opendal::raw::RpDelete; - use opendal::raw::RpList; + let meta = new_local_meta().await; + let endpoints = meta.endpoints.clone(); - use super::*; + // Modify config to use local meta store + let mut ee_setup = EESetup::new(); + let config = ee_setup.config_mut(); + config.meta.endpoints = endpoints.clone(); - // Accessor that throws an error when deleting dir or files. - #[derive(Debug)] - pub(crate) struct AccessorFaultyDeletion { - hit_delete: AtomicBool, - hit_batch: Arc, - hit_stat: AtomicBool, - inject_delete_faulty: bool, - inject_stat_faulty: bool, - } + let fixture = TestFixture::setup_with_custom(ee_setup).await?; - impl AccessorFaultyDeletion { - pub(crate) fn with_delete_fault() -> Self { - AccessorFaultyDeletion { - hit_delete: AtomicBool::new(false), - hit_batch: Arc::new(AtomicBool::new(false)), - hit_stat: AtomicBool::new(false), - inject_delete_faulty: true, - inject_stat_faulty: false, - } - } + let session = fixture.default_session(); + session.get_settings().set_data_retention_time_in_days(0)?; - pub(crate) fn with_stat_fault() -> Self { - AccessorFaultyDeletion { - hit_delete: AtomicBool::new(false), - hit_batch: Arc::new(AtomicBool::new(false)), - hit_stat: AtomicBool::new(false), - inject_delete_faulty: false, - inject_stat_faulty: true, - } - } + // Prepare 2 tables with some data + fixture + .execute_command("create database test_vacuum") + .await?; - pub(crate) fn hit_delete_operation(&self) -> bool { - self.hit_delete.load(Ordering::Acquire) - } - } + fixture + .execute_command("create table test_vacuum.t1 (c int) as select * from numbers(100)") + .await?; - pub struct VecLister(Vec); - impl oio::List for VecLister { - fn next(&mut self) -> impl Future>> + MaybeSend { - let me = &mut self.0; - async move { - Ok(me.pop().map(|v| { - Entry::new( - &v, - if v.ends_with('/') { - Metadata::new(EntryMode::DIR) - } else { - Metadata::new(EntryMode::FILE) - }, - ) - })) - } - } - } + fixture + .execute_command("create table test_vacuum.t2 (c int) as select * from numbers(100)") + .await?; - pub struct MockDeleter { - size: usize, - hit_batch: Arc, - } + let ctx = fixture.new_query_ctx().await?; + let tenant = ctx.get_tenant(); + let cat = ctx.get_default_catalog()?; + let db = cat.get_database(&tenant, "test_vacuum").await?; - impl oio::Delete for MockDeleter { - fn delete(&mut self, _path: &str, _args: OpDelete) -> opendal::Result<()> { - self.size += 1; - Ok(()) - } + let t1 = cat.get_table(&tenant, "test_vacuum", "t1").await?; + let t2 = cat.get_table(&tenant, "test_vacuum", "t2").await?; + let t1_table_id = t1.get_id(); + let t2_table_id = t2.get_id(); + let db_id = db.get_db_info().database_id.db_id; + let storage_root = fixture.storage_root(); - async fn flush(&mut self) -> opendal::Result { - self.hit_batch.store(true, Ordering::Release); + // Let make t1's data path unremovable + { + let path = Path::new(storage_root) + .join(db_id.to_string()) + .join(t1_table_id.to_string()); - let n = self.size; - self.size = 0; - Ok(n) - } + let mut perms = std::fs::metadata(&path)?.permissions(); + perms.set_readonly(true); + std::fs::set_permissions(&path, perms)?; } - impl Access for AccessorFaultyDeletion { - type Reader = (); - type BlockingReader = (); - type Writer = (); - type BlockingWriter = (); - type Lister = VecLister; - type BlockingLister = (); - type Deleter = MockDeleter; - type BlockingDeleter = (); + // Drop tables and vacuum them + fixture.execute_command("drop table test_vacuum.t1").await?; + fixture.execute_command("drop table test_vacuum.t2").await?; - fn info(&self) -> Arc { - let info = AccessorInfo::default(); - info.set_native_capability(opendal::Capability { - stat: true, - create_dir: true, - delete: true, - delete_max_size: Some(1000), - list: true, - ..Default::default() - }); - info.into() - } + // before vacuum, check that tables' data paths exist - async fn stat(&self, _path: &str, _args: OpStat) -> opendal::Result { - self.hit_stat.store(true, Ordering::Release); - if self.inject_stat_faulty { - Err(opendal::Error::new( - opendal::ErrorKind::NotFound, - "does not matter (stat)", - )) - } else { - let stat = if _path.ends_with('/') { - RpStat::new(Metadata::new(EntryMode::DIR)) - } else { - RpStat::new(Metadata::new(EntryMode::FILE)) - }; - Ok(stat) - } - } + let path = Path::new(storage_root) + .join(db_id.to_string()) + .join(t1_table_id.to_string()); + assert!(path.exists()); - async fn delete(&self) -> opendal::Result<(RpDelete, Self::Deleter)> { - self.hit_delete.store(true, Ordering::Release); + let path = Path::new(storage_root) + .join(db_id.to_string()) + .join(t2_table_id.to_string()); + assert!(path.exists()); - if self.inject_delete_faulty { - Err(opendal::Error::new( - opendal::ErrorKind::Unexpected, - "does not matter (delete)", - )) - } else { - Ok((RpDelete::default(), MockDeleter { - size: 0, - hit_batch: self.hit_batch.clone(), - })) - } - } + // Expects: + // + // - the vacuum drop operation returns Ok + fixture.execute_command("vacuum drop table").await?; - async fn list(&self, path: &str, _args: OpList) -> opendal::Result<(RpList, Self::Lister)> { - if self.inject_delete_faulty { - // While injecting faulty for delete operation, return an empty list; - // otherwise we need to impl other methods. - return Ok((RpList::default(), VecLister(vec![]))); - }; + // - t2's metadata should be removed, and its metadata should also have be removed - Ok(( - RpList::default(), - if path.ends_with('/') { - VecLister(vec!["a".to_owned(), "b".to_owned()]) - } else { - VecLister(vec![]) - }, - )) - } - } -} + { + let path = Path::new(storage_root) + .join(db_id.to_string()) + .join(t2_table_id.to_string()); -#[tokio::test(flavor = "multi_thread")] -async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> { - // do_vacuum_drop_table should return Err if file deletion failed + assert!(!path.exists()); - let mut table_info = TableInfo::default(); - table_info - .meta - .options - .insert(OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()); - table_info.desc = "`default`.`t`".to_string(); + let table_id_key = TableId::new(t2_table_id); + let t2_table_meta = meta.get_pb(&table_id_key).await?; + assert!(t2_table_meta.is_none()); + } - use test_accessor::AccessorFaultyDeletion; - // Operator with mocked accessor that will fail on `remove_all` - // - // Note that: - // In real case, `Accessor::batch` will be called (instead of Accessor::delete) - // but all that we need here is let Operator::remove_all failed - let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::with_delete_fault()); - let operator = OperatorBuilder::new(faulty_accessor.clone()).finish(); - - let tables = vec![(table_info, operator)]; - let result = do_vacuum_drop_table(tables, None).await?; - assert!(!result.1.is_empty()); - // verify that accessor.delete() was called - assert!(faulty_accessor.hit_delete_operation()); + // - but t1's metadata should still be there, since its table data is unable to be removed + let table_id_key = TableId::new(t1_table_id); + let t1_table_meta = meta.get_pb(&table_id_key).await?; + assert!(t1_table_meta.is_some()); - Ok(()) -} + // ********************* + // * Appendix scenario * + // ********************* -#[tokio::test(flavor = "multi_thread")] -async fn test_fuse_vacuum_drop_tables_in_parallel_with_deletion_error() -> Result<()> { - let mut table_info = TableInfo::default(); - table_info - .meta - .options - .insert(OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()); - table_info.desc = "`default`.`t`".to_string(); - use test_accessor::AccessorFaultyDeletion; - - // Case 1: non-parallel vacuum dropped tables + // Let make t1's data path removable { - let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::with_delete_fault()); - let operator = OperatorBuilder::new(faulty_accessor.clone()).finish(); - - let table = (table_info.clone(), operator); + let path = Path::new(storage_root) + .join(db_id.to_string()) + .join(t1_table_id.to_string()); + let mut perms = std::fs::metadata(&path)?.permissions(); + #[allow(clippy::permissions_set_readonly_false)] + perms.set_readonly(false); + std::fs::set_permissions(&path, perms)?; + + // make sure it still there + let path = Path::new(storage_root) + .join(db_id.to_string()) + .join(t1_table_id.to_string()); + + assert!(path.exists()); + } - // with one table and one thread, `vacuum_drop_tables_by_table_info` will NOT run in parallel - let tables = vec![table]; - let num_threads = 1; - let result = vacuum_drop_tables_by_table_info(num_threads, tables, None).await?; - // verify that accessor.delete() was called - assert!(faulty_accessor.hit_delete_operation()); + // vacuum again + fixture.execute_command("vacuum drop table").await?; - // verify that errors of deletions are not swallowed - assert!(!result.1.is_empty()); - } + // Expects: + // - t1's metadata and table data should be removed - // Case 2: parallel vacuum dropped tables { - let faulty_accessor = std::sync::Arc::new(AccessorFaultyDeletion::with_delete_fault()); - let operator = OperatorBuilder::new(faulty_accessor.clone()).finish(); + let path = Path::new(storage_root) + .join(db_id.to_string()) + .join(t1_table_id.to_string()); - let table = (table_info, operator); - // with 2 tables and 2 threads, `vacuum_drop_tables_by_table_info` will run in parallel (one table per thread) - let tables = vec![table.clone(), table]; - let num_threads = 2; - let result = vacuum_drop_tables_by_table_info(num_threads, tables, None).await?; - // verify that accessor.delete() was called - assert!(faulty_accessor.hit_delete_operation()); - // verify that errors of deletions are not swallowed - assert!(!result.1.is_empty()); + assert!(!path.exists()); + + let table_id_key = TableId::new(t1_table_id); + let t1_table_meta = meta.get_pb(&table_id_key).await?; + assert!(t1_table_meta.is_none()); } Ok(()) } #[tokio::test(flavor = "multi_thread")] -async fn test_fuse_vacuum_drop_tables_dry_run_with_obj_not_found_error() -> Result<()> { - let mut table_info = TableInfo::default(); - table_info - .meta - .options - .insert(OPT_KEY_DATABASE_ID.to_owned(), "1".to_owned()); +async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> { + // Test vacuum works on external tables + let meta = new_local_meta().await; + let endpoints = meta.endpoints.clone(); - use test_accessor::AccessorFaultyDeletion; + // Modify config to use local meta store + let mut ee_setup = EESetup::new(); + let config = ee_setup.config_mut(); + config.meta.endpoints = endpoints.clone(); - // Case 1: non-parallel vacuum dry-run dropped tables - { - let faulty_accessor = Arc::new(AccessorFaultyDeletion::with_stat_fault()); - let operator = OperatorBuilder::new(faulty_accessor.clone()).finish(); + let fixture = TestFixture::setup_with_custom(ee_setup).await?; + let tmp_dir_for_t1 = TempDir::new().unwrap().keep(); + let tmp_dir_for_t2 = TempDir::new().unwrap().keep(); - let table = (table_info.clone(), operator); + let external_location_for_t1 = format!("fs://{}/", tmp_dir_for_t1.to_str().unwrap()); + let external_location_for_t2 = format!("fs://{}/", tmp_dir_for_t2.to_str().unwrap()); - // with one table and one thread, `vacuum_drop_tables_by_table_info` will NOT run in parallel - let tables = vec![table]; - let num_threads = 1; - let result = vacuum_drop_tables_by_table_info(num_threads, tables, Some(usize::MAX)).await; - // verify that errors of NotFound are swallowed - assert!(result.is_ok()); - } + fixture + .execute_command("create database test_vacuum") + .await?; - // Case 2: parallel vacuum dry-run dropped tables - { - let faulty_accessor = Arc::new(AccessorFaultyDeletion::with_stat_fault()); - let operator = OperatorBuilder::new(faulty_accessor.clone()).finish(); + fixture + .execute_command(&format!("create table test_vacuum.t1 (c int) '{external_location_for_t1}' as select * from numbers(100)")) + .await?; - let table = (table_info, operator); - // with 2 tables and 2 threads, `vacuum_drop_tables_by_table_info` will run in parallel (one table per thread) - let tables = vec![table.clone(), table]; - let num_threads = 2; - let result = vacuum_drop_tables_by_table_info(num_threads, tables, Some(usize::MAX)).await; - // verify that errors of NotFound are swallowed - assert!(result.is_ok()); - } + fixture + .execute_command(&format!("create table test_vacuum.t2 (c int) '{external_location_for_t2}' as select * from numbers(100)")) + .await?; - Ok(()) -} + let ctx = fixture.new_query_ctx().await?; + let cat = ctx.get_default_catalog()?; + let tenant = ctx.get_tenant(); + let t1 = cat.get_table(&tenant, "test_vacuum", "t1").await?; + let t2 = cat.get_table(&tenant, "test_vacuum", "t2").await?; -// fuse table on external storage is same as internal storage. -#[tokio::test(flavor = "multi_thread")] -async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> { - let meta = TableMeta { - storage_params: Some(StorageParams::default()), - ..Default::default() - }; + fixture.execute_command("drop table test_vacuum.t1").await?; + fixture.execute_command("drop table test_vacuum.t2").await?; - let table_info = TableInfo { - desc: "`default`.`t`".to_string(), - meta, - ..Default::default() + let t1_storage_path = { + let storage_prefix = parse_storage_prefix(&t1.get_table_info().meta.options, t1.get_id())?; + let mut dir = tmp_dir_for_t1.clone(); + dir.push(storage_prefix); + dir }; - // Accessor passed in does NOT matter in this case, `do_vacuum_drop_table` should - // return Ok(None) before accessor is used. - use test_accessor::AccessorFaultyDeletion; - let accessor = std::sync::Arc::new(AccessorFaultyDeletion::with_delete_fault()); - let operator = OperatorBuilder::new(accessor.clone()).finish(); - - let tables = vec![(table_info, operator)]; - let result = do_vacuum_drop_table(tables, None).await?; - assert!(!result.1.is_empty()); - - // verify that accessor.delete() was called - assert!(!accessor.hit_delete_operation()); - - Ok(()) -} + let t2_storage_path = { + let storage_prefix = parse_storage_prefix(&t2.get_table_info().meta.options, t2.get_id())?; + let mut dir = tmp_dir_for_t2.clone(); + dir.push(storage_prefix); + dir + }; -#[tokio::test(flavor = "multi_thread")] -async fn test_remove_files_in_batch_do_not_swallow_errors() -> Result<()> { - // errors should not be swallowed in remove_file_in_batch - let faulty_accessor = Arc::new(test_accessor::AccessorFaultyDeletion::with_delete_fault()); - let operator = OperatorBuilder::new(faulty_accessor.clone()).finish(); - let fixture = TestFixture::setup().await?; - let ctx = fixture.new_query_ctx().await?; - let file_util = Files::create(ctx, operator); + assert!(t1_storage_path.exists()); + assert!(t2_storage_path.exists()); - // files to be deleted does not matter, faulty_accessor will always fail to delete - let r = file_util.remove_file_in_batch(vec!["1", "2"]).await; - assert!(r.is_err()); + fixture + .execute_command("settings (data_retention_time_in_days = 0) vacuum drop table") + .await?; - // verify that accessor.delete() was called - assert!(faulty_accessor.hit_delete_operation()); + assert!(!t1_storage_path.exists()); + assert!(!t2_storage_path.exists()); Ok(()) } @@ -638,6 +505,13 @@ async fn test_vacuum_dropped_table_clean_ownership() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread")] +async fn test_fuse_vacuum_drop_tables_dry_run_with_obj_not_found_error() -> Result<()> { + // During dry run, vacuum should ignore obj not found error + + Ok(()) +} + #[tokio::test(flavor = "multi_thread")] async fn test_gc_in_progress_db_not_undroppable() -> Result<()> { // 1. Prepare local meta service @@ -812,3 +686,187 @@ async fn new_local_meta() -> MetaStore { }; meta } + +mod test_accessor { + use std::future::Future; + use std::sync::atomic::AtomicBool; + use std::sync::atomic::Ordering; + + use opendal::raw::oio; + use opendal::raw::oio::Entry; + use opendal::raw::MaybeSend; + use opendal::raw::OpDelete; + use opendal::raw::OpList; + use opendal::raw::RpDelete; + use opendal::raw::RpList; + + use super::*; + + // Accessor that throws an error when deleting dir or files. + #[derive(Debug)] + pub(crate) struct AccessorFaultyDeletion { + hit_delete: AtomicBool, + hit_batch: Arc, + hit_stat: AtomicBool, + inject_delete_faulty: bool, + inject_stat_faulty: bool, + } + + impl AccessorFaultyDeletion { + pub(crate) fn with_delete_fault() -> Self { + AccessorFaultyDeletion { + hit_delete: AtomicBool::new(false), + hit_batch: Arc::new(AtomicBool::new(false)), + hit_stat: AtomicBool::new(false), + inject_delete_faulty: true, + inject_stat_faulty: false, + } + } + + #[allow(dead_code)] + pub(crate) fn with_stat_fault() -> Self { + AccessorFaultyDeletion { + hit_delete: AtomicBool::new(false), + hit_batch: Arc::new(AtomicBool::new(false)), + hit_stat: AtomicBool::new(false), + inject_delete_faulty: false, + inject_stat_faulty: true, + } + } + + pub(crate) fn hit_delete_operation(&self) -> bool { + self.hit_delete.load(Ordering::Acquire) + } + } + + pub struct VecLister(Vec); + impl oio::List for VecLister { + fn next(&mut self) -> impl Future>> + MaybeSend { + let me = &mut self.0; + async move { + Ok(me.pop().map(|v| { + Entry::new( + &v, + if v.ends_with('/') { + Metadata::new(EntryMode::DIR) + } else { + Metadata::new(EntryMode::FILE) + }, + ) + })) + } + } + } + + pub struct MockDeleter { + size: usize, + hit_batch: Arc, + } + + impl oio::Delete for MockDeleter { + fn delete(&mut self, _path: &str, _args: OpDelete) -> opendal::Result<()> { + self.size += 1; + Ok(()) + } + + async fn flush(&mut self) -> opendal::Result { + self.hit_batch.store(true, Ordering::Release); + + let n = self.size; + self.size = 0; + Ok(n) + } + } + + impl Access for AccessorFaultyDeletion { + type Reader = (); + type BlockingReader = (); + type Writer = (); + type BlockingWriter = (); + type Lister = VecLister; + type BlockingLister = (); + type Deleter = MockDeleter; + type BlockingDeleter = (); + + fn info(&self) -> Arc { + let info = AccessorInfo::default(); + info.set_native_capability(opendal::Capability { + stat: true, + create_dir: true, + delete: true, + delete_max_size: Some(1000), + list: true, + ..Default::default() + }); + info.into() + } + + async fn stat(&self, _path: &str, _args: OpStat) -> opendal::Result { + self.hit_stat.store(true, Ordering::Release); + if self.inject_stat_faulty { + Err(opendal::Error::new( + opendal::ErrorKind::NotFound, + "does not matter (stat)", + )) + } else { + let stat = if _path.ends_with('/') { + RpStat::new(Metadata::new(EntryMode::DIR)) + } else { + RpStat::new(Metadata::new(EntryMode::FILE)) + }; + Ok(stat) + } + } + + async fn delete(&self) -> opendal::Result<(RpDelete, Self::Deleter)> { + self.hit_delete.store(true, Ordering::Release); + + if self.inject_delete_faulty { + Err(opendal::Error::new( + opendal::ErrorKind::Unexpected, + "does not matter (delete)", + )) + } else { + Ok((RpDelete::default(), MockDeleter { + size: 0, + hit_batch: self.hit_batch.clone(), + })) + } + } + + async fn list(&self, path: &str, _args: OpList) -> opendal::Result<(RpList, Self::Lister)> { + if self.inject_delete_faulty { + // While injecting faulty for delete operation, return an empty list; + // otherwise we need to impl other methods. + return Ok((RpList::default(), VecLister(vec![]))); + }; + + Ok(( + RpList::default(), + if path.ends_with('/') { + VecLister(vec!["a".to_owned(), "b".to_owned()]) + } else { + VecLister(vec![]) + }, + )) + } + } + #[tokio::test(flavor = "multi_thread")] + async fn test_file_util_remove_files_in_batch_do_not_swallow_errors() -> Result<()> { + // errors should not be swallowed in remove_file_in_batch + let faulty_accessor = Arc::new(test_accessor::AccessorFaultyDeletion::with_delete_fault()); + let operator = OperatorBuilder::new(faulty_accessor.clone()).finish(); + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + let file_util = Files::create(ctx, operator); + + // files to be deleted does not matter, faulty_accessor will always fail to delete + let r = file_util.remove_file_in_batch(vec!["1", "2"]).await; + assert!(r.is_err()); + + // verify that accessor.delete() was called + assert!(faulty_accessor.hit_delete_operation()); + + Ok(()) + } +} diff --git a/src/query/ee_features/vacuum_handler/Cargo.toml b/src/query/ee_features/vacuum_handler/Cargo.toml index 7119689c57b02..2f78edb0398b3 100644 --- a/src/query/ee_features/vacuum_handler/Cargo.toml +++ b/src/query/ee_features/vacuum_handler/Cargo.toml @@ -13,6 +13,7 @@ async-trait = { workspace = true } databend-common-base = { workspace = true } databend-common-catalog = { workspace = true } databend-common-exception = { workspace = true } +databend-common-meta-app = { workspace = true } [build-dependencies] diff --git a/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs b/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs index 2dec82feaf6a7..5438020e6ac8a 100644 --- a/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs +++ b/src/query/ee_features/vacuum_handler/src/vacuum_handler.rs @@ -17,16 +17,34 @@ use std::sync::Arc; use std::time::Duration; use databend_common_base::base::GlobalInstance; +use databend_common_catalog::catalog::Catalog; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::AbortChecker; use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; +use databend_common_meta_app::tenant::Tenant; + // (TableName, file, file size) pub type VacuumDropFileInfo = (String, String, u64); // (drop_files, failed_tables) pub type VacuumDropTablesResult = Result<(Option>, HashSet)>; +#[derive(Clone)] +pub struct DroppedTableDesc { + pub table: Arc, + pub db_id: u64, + pub table_name: String, +} +#[derive(Clone)] +pub struct VacuumDroppedTablesCtx { + pub tenant: Tenant, + pub catalog: Arc, + pub threads_nums: usize, + pub tables: Vec, + pub dry_run_limit: Option, +} + #[async_trait::async_trait] pub trait VacuumHandler: Sync + Send { async fn do_vacuum( @@ -43,11 +61,9 @@ pub trait VacuumHandler: Sync + Send { respect_flash_back: bool, ) -> Result>; - async fn do_vacuum_drop_tables( + async fn do_vacuum_dropped_tables( &self, - threads_nums: usize, - tables: Vec>, - dry_run_limit: Option, + dropped_tables: VacuumDroppedTablesCtx, ) -> VacuumDropTablesResult; async fn do_vacuum_temporary_files( @@ -100,13 +116,9 @@ impl VacuumHandlerWrapper { #[async_backtrace::framed] pub async fn do_vacuum_drop_tables( &self, - threads_nums: usize, - tables: Vec>, - dry_run_limit: Option, + dropped_tables: VacuumDroppedTablesCtx, ) -> VacuumDropTablesResult { - self.handler - .do_vacuum_drop_tables(threads_nums, tables, dry_run_limit) - .await + self.handler.do_vacuum_dropped_tables(dropped_tables).await } #[async_backtrace::framed] diff --git a/src/query/service/src/catalogs/default/mutable_catalog.rs b/src/query/service/src/catalogs/default/mutable_catalog.rs index 4435fd262285b..53f9c32718374 100644 --- a/src/query/service/src/catalogs/default/mutable_catalog.rs +++ b/src/query/service/src/catalogs/default/mutable_catalog.rs @@ -618,7 +618,7 @@ impl Catalog for MutableCatalog { let resp = ctx.meta.get_drop_table_infos(req).await?; - let drop_ids = resp.drop_ids.clone(); + let drop_ids = resp.drop_ids; let storage = ctx.storage_factory; diff --git a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs index 468c01f4dbdea..3bc2658e19294 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs @@ -30,9 +30,11 @@ use databend_common_license::license_manager::LicenseManagerSwitch; use databend_common_meta_app::schema::DroppedId; use databend_common_meta_app::schema::GcDroppedTableReq; use databend_common_meta_app::schema::ListDroppedTableReq; +use databend_common_meta_app::tenant::Tenant; use databend_common_sql::plans::VacuumDropTablePlan; -use databend_common_storages_basic::view_table::VIEW_ENGINE; use databend_enterprise_vacuum_handler::get_vacuum_handler; +use databend_enterprise_vacuum_handler::vacuum_handler::DroppedTableDesc; +use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDroppedTablesCtx; use log::info; use crate::interpreters::Interpreter; @@ -55,9 +57,10 @@ impl VacuumDropTablesInterpreter { /// Vacuum metadata of dropped tables and databases. /// /// Returns the approximate number of metadata keys removed. - async fn gc_drop_tables( + async fn gc_drop_db_table_metas( &self, catalog: Arc, + tenant: &Tenant, drop_ids: Vec, ) -> Result { info!( @@ -87,22 +90,24 @@ impl VacuumDropTablesInterpreter { let chunk_size = 50; let mut num_meta_keys_removed = 0; - // first gc drop table ids - for c in drop_db_table_ids.chunks(chunk_size) { - info!("vacuum drop {} table ids: {:?}", c.len(), c); - let req = GcDroppedTableReq { - tenant: self.ctx.get_tenant(), - catalog: self.plan.catalog.clone(), - drop_ids: c.to_vec(), - }; - num_meta_keys_removed += catalog.gc_drop_tables(req).await?; - } + + // Table's meta data vacuumed individually + // // first gc drop table ids + // for c in drop_db_table_ids.chunks(chunk_size) { + // info!("vacuum drop {} table ids: {:?}", c.len(), c); + // let req = GcDroppedTableReq { + // tenant: tenant.clone(), + // catalog: self.plan.catalog.clone(), + // drop_ids: c.to_vec(), + // }; + // num_meta_keys_removed += catalog.gc_drop_tables(req).await?; + // } // then gc drop db ids for c in drop_db_ids.chunks(chunk_size) { info!("vacuum drop {} db ids: {:?}", c.len(), c); let req = GcDroppedTableReq { - tenant: self.ctx.get_tenant(), + tenant: tenant.clone(), catalog: self.plan.catalog.clone(), drop_ids: c.to_vec(), }; @@ -160,7 +165,7 @@ impl Interpreter for VacuumDropTablesInterpreter { let mut containing_db = BTreeMap::new(); for drop_id in drop_ids.iter() { if let DroppedId::Table { name, id } = drop_id { - containing_db.insert(id.table_id, name.db_id); + containing_db.insert(id.table_id, name); } } @@ -180,52 +185,67 @@ impl Interpreter for VacuumDropTablesInterpreter { drop_ids ); - // Filter out read-only tables and views. - // Note: The drop_ids list still includes view IDs - let (views, tables): (Vec<_>, Vec<_>) = tables - .into_iter() - .filter(|tbl| !tbl.as_ref().is_read_only()) - .partition(|tbl| tbl.get_table_info().meta.engine == VIEW_ENGINE); - - { - let view_ids = views.into_iter().map(|v| v.get_id()).collect::>(); - info!("view ids excluded from purging data: {:?}", view_ids); - } - - info!( - "after filter read-only tables: {} tables remain: [{}]", - tables.len(), - tables - .iter() - .map(|t| format!( - "{}(id:{})", - t.get_table_info().name, - t.get_table_info().ident.table_id - )) - .collect::>() - .join(", ") - ); + //// Filter out read-only tables and views. + //// Note: The drop_ids list still includes view IDs + // let (views, tables): (Vec<_>, Vec<_>) = tables + // .into_iter() + // .filter(|tbl| !tbl.as_ref().is_read_only()) + // .partition(|tbl| tbl.get_table_info().meta.engine == VIEW_ENGINE); + + // let view_ids = views.into_iter().map(|v| v.get_id()).collect::>(); + // info!("view ids excluded from purging data: {:?}", view_ids); + + // info!( + // "after filter read-only tables: {} tables remain: [{}]", + // tables.len(), + // tables + // .iter() + // .map(|t| format!( + // "{}(id:{})", + // t.get_table_info().name, + // t.get_table_info().ident.table_id + // )) + // .collect::>() + // .join(", ") + //); let tables_count = tables.len(); let handler = get_vacuum_handler(); let threads_nums = self.ctx.get_settings().get_max_vacuum_threads()? as usize; - let (files_opt, failed_tables) = handler - .do_vacuum_drop_tables( - threads_nums, - tables, - if self.plan.option.dry_run.is_some() { - Some(DRY_RUN_LIMIT) - } else { - None - }, - ) - .await?; + + let tenant = self.ctx.get_tenant(); + + let drop_ctx = VacuumDroppedTablesCtx { + tenant: tenant.clone(), + catalog: catalog.clone(), + threads_nums, + tables: tables + .into_iter() + .map(|table| { + let db_id_table_name = containing_db + .get(&table.get_table_info().ident.table_id) + .unwrap(); + DroppedTableDesc { + table, + db_id: db_id_table_name.db_id, + table_name: db_id_table_name.table_name.clone(), + } + }) + .collect(), + dry_run_limit: if self.plan.option.dry_run.is_some() { + Some(DRY_RUN_LIMIT) + } else { + None + }, + }; + + let (files_opt, failed_tables) = handler.do_vacuum_drop_tables(drop_ctx).await?; let failed_db_ids = failed_tables .iter() // Safe unwrap: the map is built from drop_ids - .map(|id| *containing_db.get(id).unwrap()) + .map(|id| containing_db.get(id).unwrap().db_id) .collect::>(); let mut num_meta_keys_removed = 0; @@ -241,13 +261,12 @@ impl Interpreter for VacuumDropTablesInterpreter { success_dropped_ids.push(drop_id); } } - DroppedId::Table { name: _, id } => { - if !failed_tables.contains(&id.table_id) { - success_dropped_ids.push(drop_id); - } + DroppedId::Table { .. } => { + // Table metadata has been cleaned individually } } } + info!( "vacuum drop table summary - failed dbs: {}, failed tables: {}, successfully cleaned: {} items", failed_db_ids.len(), @@ -258,7 +277,9 @@ impl Interpreter for VacuumDropTablesInterpreter { info!("failed table ids: {:?}", failed_tables); } - num_meta_keys_removed = self.gc_drop_tables(catalog, success_dropped_ids).await?; + num_meta_keys_removed = self + .gc_drop_db_table_metas(catalog, &tenant, success_dropped_ids) + .await?; } let success_count = tables_count as u64 - failed_tables.len() as u64;