Skip to content

Commit

Permalink
refactor: DroppedId for listing db/tables for gc (#16542)
Browse files Browse the repository at this point in the history
There is no need to store tables inside `DroppedId::Db`:
the tables belonging to a DB for gc can still be stored in `DroppedId::Table`.
  • Loading branch information
drmingdrmer authored Sep 29, 2024
1 parent 5bfafbe commit de5a3d6
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 139 deletions.
44 changes: 18 additions & 26 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2678,31 +2678,25 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
drop_time_range.clone()
};

let db_filter = (table_drop_time_range, db_info.clone());

let capacity = the_limit - vacuum_table_infos.len();
let table_infos = do_get_table_history(self, db_filter, capacity).await?;
let table_infos =
do_get_table_history(self, table_drop_time_range, db_info.clone(), capacity)
.await?;

for (table_info, db_id) in table_infos.iter() {
vacuum_ids.push(DroppedId::new_table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
));
}

// A DB can be removed only when all its tables are removed.
if vacuum_db && capacity > table_infos.len() {
vacuum_ids.push(DroppedId::Db {
db_id: db_info.database_id.db_id,
db_name: db_info.name_ident.database_name().to_string(),
tables: table_infos
.iter()
.map(|(table_info, _)| {
(table_info.ident.table_id, table_info.name.clone())
})
.collect(),
});
} else {
for (table_info, db_id) in table_infos.iter().take(capacity) {
vacuum_ids.push(DroppedId::new_table(
*db_id,
table_info.ident.table_id,
table_info.name.clone(),
));
}
}

vacuum_table_infos.extend(
Expand Down Expand Up @@ -2742,8 +2736,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
name_ident: tenant_dbname.clone(),
meta: db_meta,
});
let db_filter = (drop_time_range.clone(), db_info);
let table_infos = do_get_table_history(self, db_filter, the_limit).await?;
let table_infos =
do_get_table_history(self, drop_time_range.clone(), db_info, the_limit).await?;
let mut drop_ids = vec![];
let mut drop_table_infos = vec![];

Expand All @@ -2766,11 +2760,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
async fn gc_drop_tables(&self, req: GcDroppedTableReq) -> Result<(), KVAppError> {
for drop_id in req.drop_ids {
match drop_id {
DroppedId::Db {
db_id,
db_name,
tables: _,
} => gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?,
DroppedId::Db { db_id, db_name } => {
gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?
}
DroppedId::Table { name, id } => {
gc_dropped_table_by_id(self, &req.tenant, &name, &id).await?
}
Expand Down Expand Up @@ -3532,10 +3524,10 @@ fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp {
#[fastrace::trace]
async fn do_get_table_history(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
db_filter: (Range<Option<DateTime<Utc>>>, Arc<DatabaseInfo>),
drop_time_range: Range<Option<DateTime<Utc>>>,
db_info: Arc<DatabaseInfo>,
limit: usize,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let (drop_time_range, db_info) = db_filter;
let db_id = db_info.database_id.db_id;

let dbid_tbname_idlist = TableIdHistoryIdent {
Expand Down
145 changes: 56 additions & 89 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4071,8 +4071,9 @@ impl SchemaApiTestSuite {
};
let created_on = Utc::now();

let mut drop_ids_1 = vec![];
let mut drop_ids_2 = vec![];
// The expected drop_ids built with and without retention time boundary
let mut drop_ids_boundary = vec![];
let mut drop_ids_no_boundary = vec![];

// first create a database drop within filter time
info!("--- create db1");
Expand All @@ -4081,42 +4082,38 @@ impl SchemaApiTestSuite {
let req = CreateDatabaseReq {
create_option: CreateOption::Create,
name_ident: db_name.clone(),
meta: DatabaseMeta {
engine: "".to_string(),
..DatabaseMeta::default()
},
meta: DatabaseMeta::default(),
};

let res = mt.create_database(req).await?;
drop_ids_1.push(DroppedId::Db {
db_id: *res.db_id,
db_name: db_name.database_name().to_string(),
tables: vec![],
});
drop_ids_2.push(DroppedId::Db {
db_id: *res.db_id,
db_name: db_name.database_name().to_string(),
tables: vec![],
});
let db1_id = res.db_id.db_id;

let req = CreateTableReq {
create_option: CreateOption::Create,
name_ident: TableNameIdent {
tenant: Tenant::new_or_err(tenant_name, func_name!())?,
db_name: "db1".to_string(),
table_name: "tb1".to_string(),
},

name_ident: TableNameIdent::new(&tenant, "db1", "tb1"),
table_meta: table_meta(created_on),
as_dropped: false,
};
let _resp = mt.create_table(req.clone()).await?;
let resp = mt.create_table(req.clone()).await?;
let db1_tb1_id = resp.table_id;

mt.drop_database(DropDatabaseReq {
if_exists: false,
name_ident: DatabaseNameIdent::new(&tenant, "db1"),
})
.await?;

drop_ids_boundary.push(DroppedId::new_table(db1_id, db1_tb1_id, "tb1"));
drop_ids_boundary.push(DroppedId::Db {
db_id: db1_id,
db_name: db_name.database_name().to_string(),
});

drop_ids_no_boundary.push(DroppedId::new_table(db1_id, db1_tb1_id, "tb1"));
drop_ids_no_boundary.push(DroppedId::Db {
db_id: db1_id,
db_name: db_name.database_name().to_string(),
});
}

// second create a database drop outof filter time, but has a table drop within filter time
Expand All @@ -4125,18 +4122,14 @@ impl SchemaApiTestSuite {
let create_db_req = CreateDatabaseReq {
create_option: CreateOption::Create,
name_ident: DatabaseNameIdent::new(&tenant, "db2"),
meta: DatabaseMeta {
engine: "".to_string(),
..DatabaseMeta::default()
},
meta: Default::default(),
};

let res = mt.create_database(create_db_req.clone()).await?;
let db_id = res.db_id;
drop_ids_2.push(DroppedId::Db {
db_id: *db_id,
let db2_id = res.db_id;
drop_ids_no_boundary.push(DroppedId::Db {
db_id: *db2_id,
db_name: "db2".to_string(),
tables: vec![],
});

info!("--- create and drop db2.tb1");
Expand All @@ -4153,16 +4146,21 @@ impl SchemaApiTestSuite {
as_dropped: false,
};
let resp = mt.create_table(req.clone()).await?;
drop_ids_1.push(DroppedId::new_table(
drop_ids_boundary.push(DroppedId::new_table(
*res.db_id,
resp.table_id,
table_name.table_name.clone(),
));
drop_ids_no_boundary.push(DroppedId::new_table(
*db2_id,
resp.table_id,
table_name.table_name.clone(),
));

mt.drop_table_by_id(DropTableByIdReq {
if_exists: false,
tenant: req.name_ident.tenant.clone(),
db_id: *db_id,
db_id: *db2_id,
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
Expand All @@ -4176,20 +4174,15 @@ impl SchemaApiTestSuite {
let mut table_meta = table_meta(created_on);
let req = CreateTableReq {
create_option: CreateOption::Create,
name_ident: TableNameIdent {
tenant: Tenant::new_or_err(tenant_name, func_name!())?,
db_name: "db2".to_string(),
table_name: "tb2".to_string(),
},

name_ident: TableNameIdent::new(&tenant, "db2", "tb2"),
table_meta: table_meta.clone(),
as_dropped: false,
};
let resp = mt.create_table(req.clone()).await?;
mt.drop_table_by_id(DropTableByIdReq {
if_exists: false,
tenant: req.name_ident.tenant.clone(),
db_id: *db_id,
db_id: *db2_id,
table_name: req.name_ident.table_name.clone(),
tb_id: resp.table_id,
engine: "FUSE".to_string(),
Expand All @@ -4201,22 +4194,25 @@ impl SchemaApiTestSuite {
table_meta.drop_on = Some(created_on + Duration::seconds(100));
let data = serialize_struct(&table_meta)?;
upsert_test_data(mt.as_kv_api(), &id_key, data).await?;

drop_ids_no_boundary.push(DroppedId::new_table(
*db2_id,
resp.table_id,
"tb2".to_string(),
));
}

info!("--- create db2.tb3");
let db2_tb3_id;
{
let req = CreateTableReq {
create_option: CreateOption::Create,
name_ident: TableNameIdent {
tenant: Tenant::new_or_err(tenant_name, func_name!())?,
db_name: "db2".to_string(),
table_name: "tb3".to_string(),
},

name_ident: TableNameIdent::new(&tenant, "db2", "tb3"),
table_meta: table_meta(created_on),
as_dropped: false,
};
let _resp = mt.create_table(req.clone()).await?;
let resp = mt.create_table(req.clone()).await?;
db2_tb3_id = resp.table_id;
}

mt.drop_database(DropDatabaseReq {
Expand All @@ -4227,20 +4223,19 @@ impl SchemaApiTestSuite {
// change db meta to make this db drop time outof filter time
let mut drop_db_meta = create_db_req.meta.clone();
drop_db_meta.drop_on = Some(created_on + Duration::seconds(100));
let id_key = db_id;
let id_key = db2_id;
let data = serialize_struct(&drop_db_meta)?;
upsert_test_data(mt.as_kv_api(), &id_key, data).await?;

drop_ids_no_boundary.push(DroppedId::new_table(*db2_id, db2_tb3_id, "tb3".to_string()));
}

// third create a database not dropped, but has a table drop within filter time
{
let create_db_req = CreateDatabaseReq {
create_option: CreateOption::Create,
name_ident: DatabaseNameIdent::new(&tenant, "db3"),
meta: DatabaseMeta {
engine: "".to_string(),
..DatabaseMeta::default()
},
meta: Default::default(),
};

let res = mt.create_database(create_db_req.clone()).await?;
Expand All @@ -4250,26 +4245,13 @@ impl SchemaApiTestSuite {
{
let req = CreateTableReq {
create_option: CreateOption::Create,
name_ident: TableNameIdent {
tenant: Tenant::new_or_err(tenant_name, func_name!())?,
db_name: "db3".to_string(),
table_name: "tb1".to_string(),
},

name_ident: TableNameIdent::new(&tenant, "db3", "tb1"),
table_meta: table_meta(created_on),
as_dropped: false,
};
let resp = mt.create_table(req.clone()).await?;
drop_ids_1.push(DroppedId::new_table(
*db_id,
resp.table_id,
"tb1".to_string(),
));
drop_ids_2.push(DroppedId::new_table(
*db_id,
resp.table_id,
"tb1".to_string(),
));
drop_ids_boundary.push(DroppedId::new_table(*db_id, resp.table_id, "tb1"));
drop_ids_no_boundary.push(DroppedId::new_table(*db_id, resp.table_id, "tb1"));
mt.drop_table_by_id(DropTableByIdReq {
if_exists: false,
tenant: req.name_ident.tenant.clone(),
Expand All @@ -4287,21 +4269,12 @@ impl SchemaApiTestSuite {
let mut table_meta = table_meta(created_on);
let req = CreateTableReq {
create_option: CreateOption::Create,
name_ident: TableNameIdent {
tenant: Tenant::new_or_err(tenant_name, func_name!())?,
db_name: "db3".to_string(),
table_name: "tb2".to_string(),
},

name_ident: TableNameIdent::new(&tenant, "db3", "tb2"),
table_meta: table_meta.clone(),
as_dropped: false,
};
let resp = mt.create_table(req.clone()).await?;
drop_ids_2.push(DroppedId::new_table(
*db_id,
resp.table_id,
"tb2".to_string(),
));
drop_ids_no_boundary.push(DroppedId::new_table(*db_id, resp.table_id, "tb2"));
mt.drop_table_by_id(DropTableByIdReq {
if_exists: false,
tenant: req.name_ident.tenant.clone(),
Expand All @@ -4323,12 +4296,7 @@ impl SchemaApiTestSuite {
{
let req = CreateTableReq {
create_option: CreateOption::Create,
name_ident: TableNameIdent {
tenant: Tenant::new_or_err(tenant_name, func_name!())?,
db_name: "db3".to_string(),
table_name: "tb3".to_string(),
},

name_ident: TableNameIdent::new(&tenant, "db3", "tb3"),
table_meta: table_meta(created_on),
as_dropped: false,
};
Expand All @@ -4348,7 +4316,7 @@ impl SchemaApiTestSuite {
.map(|x| x.cmp_key())
.collect::<BTreeSet<_>>();

let want = drop_ids_1
let want = drop_ids_boundary
.iter()
.map(|x| x.cmp_key())
.collect::<BTreeSet<_>>();
Expand All @@ -4360,8 +4328,7 @@ impl SchemaApiTestSuite {
"'db2'.'tb1'".to_string(),
"'db3'.'tb1'".to_string(),
]
.iter()
.cloned()
.into_iter()
.collect();
let actual: BTreeSet<String> = resp
.drop_table_infos
Expand All @@ -4382,7 +4349,7 @@ impl SchemaApiTestSuite {
.map(|x| x.cmp_key())
.collect::<BTreeSet<_>>();

let want = drop_ids_2
let want = drop_ids_no_boundary
.iter()
.map(|x| x.cmp_key())
.collect::<BTreeSet<_>>();
Expand Down
11 changes: 2 additions & 9 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,15 +984,8 @@ impl ListDroppedTableReq {

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DroppedId {
Db {
db_id: u64,
db_name: String,
tables: Vec<(u64, String)>,
},
Table {
name: DBIdTableName,
id: TableId,
},
Db { db_id: u64, db_name: String },
Table { name: DBIdTableName, id: TableId },
}

impl DroppedId {
Expand Down
Loading

0 comments on commit de5a3d6

Please sign in to comment.