Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions src/query/ee/src/storages/fuse/operations/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use databend_common_catalog::table_context::AbortChecker;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::Result;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropTablesResult;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDroppedTablesCtx;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumTempOptions;
use databend_enterprise_vacuum_handler::VacuumHandler;
use databend_enterprise_vacuum_handler::VacuumHandlerWrapper;
Expand Down Expand Up @@ -50,13 +51,11 @@ impl VacuumHandler for RealVacuumHandler {
do_vacuum2(table, ctx, respect_flash_back).await
}

async fn do_vacuum_drop_tables(
async fn do_vacuum_dropped_tables(
&self,
threads_nums: usize,
tables: Vec<Arc<dyn Table>>,
dry_run_limit: Option<usize>,
dropped_tables: VacuumDroppedTablesCtx,
) -> VacuumDropTablesResult {
vacuum_drop_tables(threads_nums, tables, dry_run_limit).await
vacuum_drop_tables(dropped_tables).await
}

async fn do_vacuum_temporary_files(
Expand Down
148 changes: 95 additions & 53 deletions src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,91 @@ use std::sync::Arc;
use std::time::Instant;

use databend_common_base::runtime::execute_futures_in_parallel;
use databend_common_catalog::table::Table;
use databend_common_catalog::catalog::Catalog;
use databend_common_exception::Result;
use databend_common_meta_app::schema::DBIdTableName;
use databend_common_meta_app::schema::DroppedId;
use databend_common_meta_app::schema::GcDroppedTableReq;
use databend_common_meta_app::schema::TableId;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::tenant::Tenant;
use databend_common_storages_fuse::FuseTable;
use databend_enterprise_vacuum_handler::vacuum_handler::DroppedTableDesc;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropFileInfo;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDropTablesResult;
use databend_enterprise_vacuum_handler::vacuum_handler::VacuumDroppedTablesCtx;
use futures_util::TryStreamExt;
use log::error;
use log::info;
use opendal::EntryMode;
use opendal::Operator;

#[async_backtrace::framed]
pub async fn do_vacuum_drop_table(
tables: Vec<(TableInfo, Operator)>,
catalog: Arc<dyn Catalog>,
tenant: Tenant,
tables: Vec<DroppedTableDesc>,
dry_run_limit: Option<usize>,
) -> VacuumDropTablesResult {
let is_dry_run = dry_run_limit.is_some();

let mut list_files = vec![];
let mut failed_tables = HashSet::new();
for (table_info, operator) in tables {
let result =
vacuum_drop_single_table(&table_info, operator, dry_run_limit, &mut list_files).await;
if result.is_err() {
let table_id = table_info.ident.table_id;
failed_tables.insert(table_id);
for table_desc in tables {
let table = &table_desc.table;
let table_info = table.get_table_info();

if !table.is_read_only() {
if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) {
let operator = fuse_table.get_operator();

let result =
vacuum_drop_single_table(table_info, operator, dry_run_limit, &mut list_files)
.await;

if result.is_err() {
let table_id = table_info.ident.table_id;
failed_tables.insert(table_id);
continue;
}
} else {
info!(
"table {} is not of engine {}, it's data will NOT be cleaned.",
table.get_table_info().name,
table.engine()
);
};
} else {
info!(
"table {} is read-only, it's data will NOT be cleaned.",
table.get_table_info().name,
);
}

if !is_dry_run {
let req = GcDroppedTableReq {
tenant: tenant.clone(),
catalog: catalog.name(),
drop_ids: vec![DroppedId::Table {
name: DBIdTableName {
db_id: table_desc.db_id,
table_name: table_desc.table_name.clone(),
},
id: TableId::new(table.get_table_info().ident.table_id),
}],
};

let result = catalog.gc_drop_tables(req).await;

if result.is_err() {
let table_id = table_info.ident.table_id;
failed_tables.insert(table_id);
info!(
"failed to drop table table db_id: {}, table {}[{:}]",
table_desc.db_id, table_desc.table_name, table_id
);
continue;
}
}
}
Ok(if dry_run_limit.is_some() {
Expand All @@ -50,7 +111,8 @@ pub async fn do_vacuum_drop_table(
})
}

async fn vacuum_drop_single_table(
// public for IT
pub async fn vacuum_drop_single_table(
table_info: &TableInfo,
operator: Operator,
dry_run_limit: Option<usize>,
Expand Down Expand Up @@ -127,36 +189,39 @@ async fn vacuum_drop_single_table(

#[async_backtrace::framed]
pub async fn vacuum_drop_tables_by_table_info(
num_threads: usize,
table_infos: Vec<(TableInfo, Operator)>,
dry_run_limit: Option<usize>,
dropped_tables: VacuumDroppedTablesCtx,
) -> VacuumDropTablesResult {
let start = Instant::now();
let num_tables = table_infos.len();
let VacuumDroppedTablesCtx {
tenant,
catalog,
threads_nums,
tables,
dry_run_limit,
} = dropped_tables;

// - for each vacuum task, the tables passed to it will be processed sequentially
// - while removing one table's data, at most 1000 objects will be deleted (in batch)
// - let's assume that the rate limit is 3500 (individual) objects per second:
// A parallelism degree of up to 3 appears to be safe.
let num_threads = std::cmp::min(num_threads, 3);
let start = Instant::now();
let num_tables = tables.len();

let num_threads = std::cmp::min(threads_nums, 3);
let batch_size = (num_tables / num_threads).clamp(1, 50);

info!(
"vacuum dropped tables, number of tables: {}, batch_size: {}, parallelism degree: {}",
num_tables, batch_size, num_threads
);

let result = if batch_size >= table_infos.len() {
do_vacuum_drop_table(table_infos, dry_run_limit).await?
let result = if batch_size >= tables.len() {
do_vacuum_drop_table(catalog, tenant, tables, dry_run_limit).await?
} else {
let mut chunks = table_infos.chunks(batch_size);
let mut chunks = tables.chunks(batch_size);
let dry_run_limit = dry_run_limit
.map(|dry_run_limit| (dry_run_limit / num_threads).min(dry_run_limit).max(1));
let tasks = std::iter::from_fn(move || {
chunks
.next()
.map(|tables| do_vacuum_drop_table(tables.to_vec(), dry_run_limit))
chunks.next().map(|tables| {
let catalog = catalog.clone();
let tenant = tenant.clone();
do_vacuum_drop_table(catalog, tenant, tables.to_vec(), dry_run_limit)
})
});

let result = execute_futures_in_parallel(
Expand All @@ -169,8 +234,8 @@ pub async fn vacuum_drop_tables_by_table_info(

// Note that Errs should NOT be swallowed if any target is not successfully deleted.
// Otherwise, the caller site may proceed to purge meta-data from meta-server with
// some table data un-vacuumed, and the `vacuum` action of those dropped tables can no
// longer be roll-forward.
// some table data have not been cleaned completely, and the `vacuum` action of those
// dropped tables can no longer be roll-forward.
if dry_run_limit.is_some() {
let mut ret_files = vec![];
for res in result {
Expand Down Expand Up @@ -202,31 +267,8 @@ pub async fn vacuum_drop_tables_by_table_info(
Ok(result)
}

// TODO remove this func
#[async_backtrace::framed]
pub async fn vacuum_drop_tables(
threads_nums: usize,
tables: Vec<Arc<dyn Table>>,
dry_run_limit: Option<usize>,
) -> VacuumDropTablesResult {
let num_tables = tables.len();
info!("vacuum_drop_tables {} tables", num_tables);

let mut table_infos = Vec::with_capacity(num_tables);
for table in tables {
let (table_info, operator) =
if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) {
(fuse_table.get_table_info(), fuse_table.get_operator())
} else {
info!(
"ignore table {}, which is not of FUSE engine. Table engine {}",
table.get_table_info().name,
table.engine()
);
continue;
};

table_infos.push((table_info.clone(), operator));
}

vacuum_drop_tables_by_table_info(threads_nums, table_infos, dry_run_limit).await
pub async fn vacuum_drop_tables(dropped_tables: VacuumDroppedTablesCtx) -> VacuumDropTablesResult {
vacuum_drop_tables_by_table_info(dropped_tables).await
}
Loading
Loading