diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index 9c2a53defef34..7b44f6dbdc747 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -513,6 +513,8 @@ build_exceptions! { UndropDbHasNoHistory(2312), /// Undrop table with no drop time UndropTableWithNoDropTime(2313), + /// Undrop table blocked by vacuum retention guard + UndropTableRetentionGuard(2326), /// Drop table with drop time DropTableWithDropTime(2314), /// Drop database with drop time diff --git a/src/meta/api/src/garbage_collection_api.rs b/src/meta/api/src/garbage_collection_api.rs index 28cdfbba937a3..71d91c1df70b1 100644 --- a/src/meta/api/src/garbage_collection_api.rs +++ b/src/meta/api/src/garbage_collection_api.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashSet; +use std::convert::Infallible; use std::ops::Range; use chrono::DateTime; @@ -27,6 +28,7 @@ use databend_common_meta_app::principal::TenantOwnershipObjectIdent; use databend_common_meta_app::schema::index_id_ident::IndexIdIdent; use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent; use databend_common_meta_app::schema::table_niv::TableNIV; +use databend_common_meta_app::schema::vacuum_watermark_ident::VacuumWatermarkIdent; use databend_common_meta_app::schema::AutoIncrementStorageIdent; use databend_common_meta_app::schema::DBIdTableName; use databend_common_meta_app::schema::DatabaseId; @@ -40,6 +42,7 @@ use databend_common_meta_app::schema::TableCopiedFileNameIdent; use databend_common_meta_app::schema::TableId; use databend_common_meta_app::schema::TableIdHistoryIdent; use databend_common_meta_app::schema::TableIdToName; +use databend_common_meta_app::schema::VacuumWatermark; use databend_common_meta_app::tenant::Tenant; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; @@ -47,6 +50,7 @@ use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_types::txn_op::Request; use databend_common_meta_types::txn_op_response::Response; use databend_common_meta_types::MetaError; +use databend_common_meta_types::SeqV; use databend_common_meta_types::TxnRequest; use display_more::DisplaySliceExt; use fastrace::func_name; @@ -60,6 +64,7 @@ use log::warn; use crate::index_api::IndexApi; use crate::kv_app_error::KVAppError; use crate::kv_pb_api::KVPbApi; +use crate::kv_pb_crud_api::KVPbCrudApi; use crate::txn_backoff::txn_backoff; use crate::txn_condition_util::txn_cond_eq_seq; use crate::txn_core_util::send_txn; @@ -101,6 +106,48 @@ where } Ok(num_meta_key_removed) } + + /// Fetch and conditionally set the vacuum retention timestamp. + /// + /// This method implements the monotonic timestamp update semantics: + /// - Only updates the timestamp if the new value is greater than the current one + /// - Returns the OLD timestamp value + /// - Ensures atomicity using compare-and-swap operations + #[fastrace::trace] + async fn fetch_set_vacuum_timestamp( + &self, + tenant: &Tenant, + new_timestamp: DateTime, + ) -> Result, KVAppError> { + let ident = VacuumWatermarkIdent::new_global(tenant.clone()); + + // Use crud_upsert_with for atomic compare-and-swap semantics + let transition = self + .crud_upsert_with::(&ident, |current: Option>| { + let current_retention: Option = current.map(|v| v.data); + + // Check if we should update based on monotonic property + let should_update = match current_retention { + None => true, // Never set before, always update + Some(existing) => new_timestamp > existing.time, // Only update if new timestamp is greater + }; + + if should_update { + let new_retention = VacuumWatermark::new(new_timestamp); + Ok(Some(new_retention)) + } else { + // Return None to indicate no update needed + Ok(None) + } + }) + .await? + // Safe to unwrap: type of business logic error is `Infallible` + .unwrap(); + + // Extract the old value to return + let old_retention = transition.prev.map(|v| v.data); + Ok(old_retention) + } } #[async_trait::async_trait] diff --git a/src/meta/api/src/schema_api.rs b/src/meta/api/src/schema_api.rs index e4617c97c9747..f3595681531e0 100644 --- a/src/meta/api/src/schema_api.rs +++ b/src/meta/api/src/schema_api.rs @@ -21,6 +21,7 @@ use databend_common_meta_app::app_error::AppError; use databend_common_meta_app::app_error::DropTableWithDropTime; use databend_common_meta_app::app_error::UndropTableAlreadyExists; use databend_common_meta_app::app_error::UndropTableHasNoHistory; +use databend_common_meta_app::app_error::UndropTableRetentionGuard; use databend_common_meta_app::app_error::UnknownTable; use databend_common_meta_app::app_error::UnknownTableId; use databend_common_meta_app::principal::OwnershipObject; @@ -30,6 +31,7 @@ use databend_common_meta_app::schema::marked_deleted_index_id::MarkedDeletedInde use databend_common_meta_app::schema::marked_deleted_index_ident::MarkedDeletedIndexIdIdent; use databend_common_meta_app::schema::marked_deleted_table_index_id::MarkedDeletedTableIndexId; use databend_common_meta_app::schema::marked_deleted_table_index_ident::MarkedDeletedTableIndexIdIdent; +use databend_common_meta_app::schema::vacuum_watermark_ident::VacuumWatermarkIdent; use databend_common_meta_app::schema::DBIdTableName; use databend_common_meta_app::schema::DatabaseId; use databend_common_meta_app::schema::DatabaseMeta; @@ -476,10 +478,39 @@ pub async fn handle_undrop_table( "undrop table" ); + // Check vacuum retention guard before allowing undrop + let drop_marker = *seq_table_meta + .data + .drop_on + .as_ref() + .unwrap_or(&seq_table_meta.data.updated_on); + + // Read vacuum timestamp with seq for concurrent safety + let vacuum_ident = VacuumWatermarkIdent::new_global(tenant_dbname_tbname.tenant().clone()); + let seq_vacuum_retention = kv_api.get_pb(&vacuum_ident).await?; + + // Early retention guard check for fast failure + if let Some(ref sr) = seq_vacuum_retention { + let retention_time = sr.data.time; + + if drop_marker <= retention_time { + return Err(KVAppError::AppError(AppError::UndropTableRetentionGuard( + UndropTableRetentionGuard::new( + &tenant_dbname_tbname.table_name, + drop_marker, + retention_time, + ), + ))); + } + } + { // reset drop on time seq_table_meta.drop_on = None; + // Prepare conditions for concurrent safety + let vacuum_seq = seq_vacuum_retention.as_ref().map(|sr| sr.seq).unwrap_or(0); + let txn = TxnRequest::new( vec![ // db has not to change, i.e., no new table is created. @@ -489,6 +520,10 @@ pub async fn handle_undrop_table( txn_cond_eq_seq(&dbid_tbname, dbid_tbname_seq), // table is not changed txn_cond_eq_seq(&tbid, seq_table_meta.seq), + // Concurrent safety: vacuum timestamp seq must not change during undrop + // - If vacuum_retention exists: seq must remain the same (no update by vacuum) + // - If vacuum_retention is None: seq must remain 0 (no creation by vacuum) + txn_cond_eq_seq(&vacuum_ident, vacuum_seq), ], vec![ // Changing a table in a db has to update the seq of db_meta, diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index 3578aba8d0b88..eeffb0ad21130 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -56,6 +56,7 @@ use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent; use databend_common_meta_app::schema::sequence_storage::SequenceStorageIdent; use databend_common_meta_app::schema::table_niv::TableNIV; +use databend_common_meta_app::schema::vacuum_watermark_ident::VacuumWatermarkIdent; use databend_common_meta_app::schema::CatalogMeta; use databend_common_meta_app::schema::CatalogNameIdent; use databend_common_meta_app::schema::CatalogOption; @@ -351,6 +352,7 @@ impl SchemaApiTestSuite { suite.gc_dropped_db_after_undrop(&b.build().await).await?; suite.catalog_create_get_list_drop(&b.build().await).await?; suite.table_least_visible_time(&b.build().await).await?; + suite.vacuum_retention_timestamp(&b.build().await).await?; suite .drop_table_without_tableid_to_name(&b.build().await) .await?; @@ -1485,6 +1487,88 @@ impl SchemaApiTestSuite { Ok(()) } + #[fastrace::trace] + async fn vacuum_retention_timestamp>( + &self, + mt: &MT, + ) -> anyhow::Result<()> { + let tenant_name = "vacuum_retention_timestamp"; + let tenant = Tenant::new_or_err(tenant_name, func_name!())?; + + // Test basic timestamp operations - monotonic property + let first = DateTime::::from_timestamp(1_000, 0).unwrap(); + let earlier = DateTime::::from_timestamp(500, 0).unwrap(); + let later = DateTime::::from_timestamp(2_000, 0).unwrap(); + + // Test fetch_set_vacuum_timestamp with correct return semantics + let old_retention = mt.fetch_set_vacuum_timestamp(&tenant, first).await?; + // Should return None as old value since never set before + assert_eq!(old_retention, None); + + // Attempt to set earlier timestamp should return current value (first) unchanged + let old_retention = mt.fetch_set_vacuum_timestamp(&tenant, earlier).await?; + assert_eq!(old_retention.unwrap().time, first); // Should return the PREVIOUS value + + // Set later timestamp should work and return previous value (first) + let old_retention = mt.fetch_set_vacuum_timestamp(&tenant, later).await?; + assert_eq!(old_retention.unwrap().time, first); // Should return PREVIOUS value (first) + + // Verify current stored value + let vacuum_ident = VacuumWatermarkIdent::new_global(tenant.clone()); + let stored = mt.get_pb(&vacuum_ident).await?; + assert_eq!(stored.unwrap().data.time, later); + + // Test undrop retention guard behavior + { + let mut util = Util::new( + mt, + tenant_name, + "db_retention_guard", + "tbl_retention_guard", + "FUSE", + ); + util.create_db().await?; + let (table_id, _table_meta) = util.create_table().await?; + util.drop_table_by_id().await?; + + let table_meta = mt + .get_pb(&TableId::new(table_id)) + .await? + .expect("dropped table meta must exist"); + let drop_time = table_meta + .data + .drop_on + .expect("dropped table should carry drop_on timestamp"); + + // Set retention timestamp after drop time to block undrop + let retention_candidate = drop_time + chrono::Duration::seconds(1); + let old_retention = mt + .fetch_set_vacuum_timestamp(&tenant, retention_candidate) + .await? + .unwrap(); + // Should return the previous retention time (later) + assert_eq!(old_retention.time, later); + + // Undrop should now fail due to retention guard + let undrop_err = mt + .undrop_table(UndropTableReq { + name_ident: TableNameIdent::new(&tenant, util.db_name(), util.tbl_name()), + }) + .await + .expect_err("undrop must fail once vacuum retention blocks it"); + + match undrop_err { + KVAppError::AppError(AppError::UndropTableRetentionGuard(e)) => { + assert_eq!(e.drop_time(), drop_time); + assert_eq!(e.retention(), retention_candidate); + } + other => panic!("unexpected undrop error: {other:?}"), + } + } + + Ok(()) + } + #[fastrace::trace] async fn table_create_get_drop>( &self, diff --git a/src/meta/app/src/app_error.rs b/src/meta/app/src/app_error.rs index 620841d3a3f53..704f61c0da96c 100644 --- a/src/meta/app/src/app_error.rs +++ b/src/meta/app/src/app_error.rs @@ -14,6 +14,8 @@ use std::fmt::Display; +use chrono::DateTime; +use chrono::Utc; use databend_common_exception::ErrorCode; use databend_common_meta_types::MatchSeq; @@ -261,6 +263,36 @@ impl UndropTableHasNoHistory { } } +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +#[error("Cannot undrop table '{table_name}': table was dropped at {drop_time} before vacuum started at {retention}. Data may have been cleaned up.")] +pub struct UndropTableRetentionGuard { + table_name: String, + drop_time: DateTime, + retention: DateTime, +} + +impl UndropTableRetentionGuard { + pub fn new( + table_name: impl Into, + drop_time: DateTime, + retention: DateTime, + ) -> Self { + Self { + table_name: table_name.into(), + drop_time, + retention, + } + } + + pub fn drop_time(&self) -> DateTime { + self.drop_time + } + + pub fn retention(&self) -> DateTime { + self.retention + } +} + #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[error("TableVersionMismatched: {table_id} expect `{expect}` but `{curr}` while `{context}`")] pub struct TableVersionMismatched { @@ -1008,6 +1040,9 @@ pub enum AppError { #[error(transparent)] UndropTableHasNoHistory(#[from] UndropTableHasNoHistory), + #[error(transparent)] + UndropTableRetentionGuard(#[from] UndropTableRetentionGuard), + #[error(transparent)] DatabaseAlreadyExists(#[from] DatabaseAlreadyExists), @@ -1461,6 +1496,11 @@ impl AppErrorMessage for UndropTableWithNoDropTime { } } +impl AppErrorMessage for UndropTableRetentionGuard { + // Use default implementation that calls self.to_string() + // since there's no sensitive information to strip +} + impl AppErrorMessage for DropTableWithDropTime { fn message(&self) -> String { format!("Drop table '{}' with drop_on time", self.table_name) @@ -1590,6 +1630,9 @@ impl From for ErrorCode { AppError::UndropTableWithNoDropTime(err) => { ErrorCode::UndropTableWithNoDropTime(err.message()) } + AppError::UndropTableRetentionGuard(err) => { + ErrorCode::UndropTableRetentionGuard(err.message()) + } AppError::DropTableWithDropTime(err) => ErrorCode::DropTableWithDropTime(err.message()), AppError::DropDbWithDropTime(err) => ErrorCode::DropDbWithDropTime(err.message()), AppError::UndropDbWithNoDropTime(err) => { diff --git a/src/meta/app/src/schema/mod.rs b/src/meta/app/src/schema/mod.rs index 468c07e26237f..1bc97598120d0 100644 --- a/src/meta/app/src/schema/mod.rs +++ b/src/meta/app/src/schema/mod.rs @@ -47,6 +47,8 @@ pub mod sequence_storage; mod table; pub mod table_lock_ident; pub mod table_niv; +mod vacuum_watermark; +pub mod vacuum_watermark_ident; pub use auto_increment::GetAutoIncrementNextValueReply; pub use auto_increment::GetAutoIncrementNextValueReq; @@ -154,3 +156,5 @@ pub use table::UpsertTableCopiedFileReq; pub use table::UpsertTableOptionReply; pub use table::UpsertTableOptionReq; pub use table_lock_ident::TableLockIdent; +pub use vacuum_watermark::VacuumWatermark; +pub use vacuum_watermark_ident::VacuumWatermarkIdent; diff --git a/src/meta/app/src/schema/vacuum_watermark.rs b/src/meta/app/src/schema/vacuum_watermark.rs new file mode 100644 index 0000000000000..8c7ee0944a855 --- /dev/null +++ b/src/meta/app/src/schema/vacuum_watermark.rs @@ -0,0 +1,28 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::DateTime; +use chrono::Utc; + +/// Monotonic timestamp marker indicating when vacuum cleanup started for a tenant. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct VacuumWatermark { + pub time: DateTime, +} + +impl VacuumWatermark { + pub fn new(time: DateTime) -> Self { + Self { time } + } +} diff --git a/src/meta/app/src/schema/vacuum_watermark_ident.rs b/src/meta/app/src/schema/vacuum_watermark_ident.rs new file mode 100644 index 0000000000000..ab5c4402a2621 --- /dev/null +++ b/src/meta/app/src/schema/vacuum_watermark_ident.rs @@ -0,0 +1,68 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::tenant_key::ident::TIdent; + +pub type VacuumWatermarkIdent = TIdent; + +pub use kvapi_impl::VacuumWatermarkRsc; + +impl VacuumWatermarkIdent { + pub fn new_global(tenant: impl crate::tenant::ToTenant) -> Self { + TIdent::new_generic(tenant, ()) + } +} + +mod kvapi_impl { + use databend_common_meta_kvapi::kvapi; + + use crate::schema::vacuum_watermark::VacuumWatermark; + use crate::schema::vacuum_watermark_ident::VacuumWatermarkIdent; + use crate::tenant_key::resource::TenantResource; + + pub struct VacuumWatermarkRsc; + + impl TenantResource for VacuumWatermarkRsc { + const PREFIX: &'static str = "__fd_vacuum_watermark_ts"; + const HAS_TENANT: bool = true; + type ValueType = VacuumWatermark; + } + + impl kvapi::Value for VacuumWatermark { + type KeyType = VacuumWatermarkIdent; + + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } +} + +#[cfg(test)] +mod tests { + use databend_common_meta_kvapi::kvapi::Key; + + use super::VacuumWatermarkIdent; + use crate::tenant::Tenant; + + #[test] + fn test_ident() { + let tenant = Tenant::new_literal("dummy"); + let ident = VacuumWatermarkIdent::new_global(tenant); + + let key = ident.to_string_key(); + assert_eq!(key, "__fd_vacuum_watermark_ts/dummy"); + + assert_eq!(ident, VacuumWatermarkIdent::from_str_key(&key).unwrap()); + } +} diff --git a/src/meta/proto-conv/src/lib.rs b/src/meta/proto-conv/src/lib.rs index b776192697ee5..5a3cd5eca891b 100644 --- a/src/meta/proto-conv/src/lib.rs +++ b/src/meta/proto-conv/src/lib.rs @@ -90,6 +90,7 @@ mod token_from_to_protobuf_impl; mod udf_from_to_protobuf_impl; mod user_from_to_protobuf_impl; mod util; +mod vacuum_watermark_from_to_protobuf_impl; pub use from_to_protobuf::FromToProto; pub use from_to_protobuf::FromToProtoEnum; diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index efe1068fa9b4b..1e397e8600a30 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -181,6 +181,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (149, "2025-09-24: Add: add AutoIncrement name and display on TableField"), (150, "2025-09-26: Add: RoleInfo::comment"), (151, "2025-09-28: Add: TableMeta::RowAccessPolicyColumnMap store policy name and column id"), + (152, "2025-09-30: Add: VacuumRetention proto"), // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/src/vacuum_watermark_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/vacuum_watermark_from_to_protobuf_impl.rs new file mode 100644 index 0000000000000..79da592a256af --- /dev/null +++ b/src/meta/proto-conv/src/vacuum_watermark_from_to_protobuf_impl.rs @@ -0,0 +1,53 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Conversion between protobuf VacuumWatermark and meta struct. + +use chrono::DateTime; +use chrono::Utc; +use databend_common_meta_app::schema as mt; +use databend_common_protos::pb; + +use crate::reader_check_msg; +use crate::FromToProto; +use crate::Incompatible; +use crate::MIN_READER_VER; +use crate::VER; + +impl FromToProto for mt::VacuumWatermark { + type PB = pb::VacuumWatermark; + + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + + fn from_pb(p: Self::PB) -> Result + where Self: Sized { + reader_check_msg(p.ver, p.min_reader_ver)?; + + let v = Self { + time: DateTime::::from_pb(p.time)?, + }; + Ok(v) + } + + fn to_pb(&self) -> Result { + let p = pb::VacuumWatermark { + ver: VER, + min_reader_ver: MIN_READER_VER, + time: self.time.to_pb()?, + }; + Ok(p) + } +} diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index e05b848a217ae..0f53476eb2224 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -143,3 +143,4 @@ mod v148_virtual_schema; mod v149_field_auto_increment; mod v150_role_comment; mod v151_row_access_column_map; +mod v152_vacuum_retention; diff --git a/src/meta/proto-conv/tests/it/v152_vacuum_retention.rs b/src/meta/proto-conv/tests/it/v152_vacuum_retention.rs new file mode 100644 index 0000000000000..274c673a103ec --- /dev/null +++ b/src/meta/proto-conv/tests/it/v152_vacuum_retention.rs @@ -0,0 +1,48 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::DateTime; +use chrono::Utc; +use databend_common_meta_app::schema as mt; +use fastrace::func_name; + +use crate::common; + +// These bytes are built when a new version in introduced, +// and are kept for backward compatibility test. +// +// ************************************************************* +// * These messages should never be updated, * +// * only be added when a new version is added, * +// * or be removed when an old version is no longer supported. * +// ************************************************************* +// + +#[test] +fn test_decode_v152_vacuum_retention() -> anyhow::Result<()> { + // Serialized VacuumWatermark with timestamp 1702603569 (2023-12-15 01:26:09 UTC) + let vacuum_retention_v152 = vec![ + 10, 23, 50, 48, 50, 51, 45, 49, 50, 45, 49, 53, 32, 48, 49, 58, 50, 54, 58, 48, 57, 32, 85, + 84, 67, 160, 6, 152, 1, 168, 6, 24, + ]; + + let want = || mt::VacuumWatermark { + time: "2023-12-15T01:26:09Z".parse::>().unwrap(), + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), vacuum_retention_v152.as_slice(), 152, want())?; + + Ok(()) +} diff --git a/src/meta/protos/proto/vacuum_watermark.proto b/src/meta/protos/proto/vacuum_watermark.proto new file mode 100644 index 0000000000000..991b0885a33ef --- /dev/null +++ b/src/meta/protos/proto/vacuum_watermark.proto @@ -0,0 +1,24 @@ +// Copyright 2024 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +package databend_proto; + +message VacuumWatermark { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + string time = 1; +} \ No newline at end of file diff --git a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs index 468c01f4dbdea..9a408bd41ff77 100644 --- a/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs +++ b/src/query/service/src/interpreters/interpreter_vacuum_drop_tables.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use chrono::Duration; use databend_common_catalog::catalog::Catalog; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::StringType; use databend_common_expression::types::UInt64Type; @@ -27,11 +28,13 @@ use databend_common_expression::DataBlock; use databend_common_expression::FromData; use databend_common_license::license::Feature::Vacuum; use databend_common_license::license_manager::LicenseManagerSwitch; +use databend_common_meta_api::GarbageCollectionApi; use databend_common_meta_app::schema::DroppedId; use databend_common_meta_app::schema::GcDroppedTableReq; use databend_common_meta_app::schema::ListDroppedTableReq; use databend_common_sql::plans::VacuumDropTablePlan; use databend_common_storages_basic::view_table::VIEW_ENGINE; +use databend_common_users::UserApiProvider; use databend_enterprise_vacuum_handler::get_vacuum_handler; use log::info; @@ -132,6 +135,25 @@ impl Interpreter for VacuumDropTablesInterpreter { let duration = Duration::days(ctx.get_settings().get_data_retention_time_in_days()? as i64); let retention_time = chrono::Utc::now() - duration; + + // Set vacuum timestamp before starting the vacuum operation (only in non-dry-run mode) + // This ensures undrop operations after this point will be blocked + if self.plan.option.dry_run.is_none() { + let tenant = ctx.get_tenant(); + let meta_api = UserApiProvider::instance().get_meta_store_client(); + + // CRITICAL: Must succeed in setting vacuum timestamp before proceeding + // If this fails, vacuum operation should not proceed to prevent data loss + meta_api + .fetch_set_vacuum_timestamp(&tenant, retention_time) + .await + .map_err(|e| { + ErrorCode::MetaStorageError(format!( + "Failed to set vacuum timestamp before vacuum operation: {}. Vacuum aborted to prevent data inconsistency.", + e + )) + })?; + } let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?; info!( "=== VACUUM DROP TABLE STARTED === db: {:?}, retention_days: {}, retention_time: {:?}",