From 9ce4f0fbc3a5f8f596769bd7f809099f36daa5bc Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 10 Mar 2025 15:50:28 +0800 Subject: [PATCH 01/11] feat(iceberg): basic remove snapshot --- crates/iceberg/src/lib.rs | 1 + crates/iceberg/src/spec/snapshot.rs | 1 - crates/iceberg/src/transaction.rs | 314 ++++++++++++++++++++++++++++ 3 files changed, 315 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index d684be54c8..273031f675 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -52,6 +52,7 @@ //! ``` #![deny(missing_docs)] +#![feature(anonymous_lifetime_in_impl_trait)] #[macro_use] extern crate derive_builder; diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index e73b8abaa3..f2637d96b0 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -175,7 +175,6 @@ impl Snapshot { } /// Get parent snapshot. - #[cfg(test)] pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option { match self.parent_snapshot_id { Some(id) => table_metadata.snapshot_by_id(id).cloned(), diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 007a3745f0..d4a338c24d 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -25,6 +25,7 @@ use std::ops::RangeFrom; use arrow_array::StringArray; use futures::TryStreamExt; +use itertools::Itertools; use uuid::Uuid; use crate::error::Result; @@ -717,6 +718,319 @@ impl<'a> ReplaceSortOrderAction<'a> { } } +/// Transaction action for removing snapshot. +pub struct RemoveSnapshotAction<'a> { + tx: Transaction<'a>, + commit_uuid: Uuid, + clear_expire_files: bool, + ids_to_remove: HashSet, + default_expired_older_than: i64, + default_min_num_snapshots: i32, + default_max_ref_age_ms: i64, + clear_expired_meta_data: bool, + + now: i64, +} + +impl<'a> RemoveSnapshotAction<'a> { + // pub fn new() -> Self { + // Self + // } + + // pub fn new(tx: Transaction<'a>, commit_uuid: Uuid) -> Self { + // Self { tx, commit_uuid } + // } + + /// Finished building the action and apply it to the transaction. + pub fn clear_expire_files(mut self, clear_expire_files: bool) -> Self { + self.clear_expire_files = clear_expire_files; + self + } + + /// Finished building the action and apply it to the transaction. + pub fn expire_snapshot_id(mut self, expire_snapshot_id: i64) -> Self { + self.ids_to_remove.insert(expire_snapshot_id); + self + } + + /// Finished building the action and apply it to the transaction. + pub fn expire_older_than(mut self, timestamp_ms: i64) -> Self { + self.default_expired_older_than = timestamp_ms; + self + } + + /// Finished building the action and apply it to the transaction. + pub fn retain_last(mut self, min_num_snapshots: i32) -> Self { + self.default_min_num_snapshots = min_num_snapshots; + self + } + + /// Finished building the action and apply it to the transaction. + pub fn clear_expired_meta_data(mut self, clear_expired_meta_data: bool) -> Self { + self.clear_expired_meta_data = clear_expired_meta_data; + self + } + + /// Finished building the action and apply it to the transaction. + async fn apply_impl(mut self) -> Result> { + if self.tx.table.metadata().refs.is_empty() { + return Ok(self.tx); + } + + let table_meta = self.tx.table.metadata().clone(); + + let mut ids_to_retain = HashSet::new(); + let retained_refs = self.compute_retained_refs(&table_meta.refs); + let mut retained_id_to_refs = HashMap::new(); + for (ref_name, snapshot_ref) in &retained_refs { + let snapshot_id = snapshot_ref.snapshot_id; + retained_id_to_refs + .entry(snapshot_id) + .or_insert_with(Vec::new) + .push(ref_name.clone()); + + ids_to_retain.insert(snapshot_id); + } + + for id_to_remove in &self.ids_to_remove { + let refs_for_id = retained_id_to_refs.get(id_to_remove); + assert!( + refs_for_id.is_none(), + "Cannot remove snapshot {:?} with retained references: {:?}", + id_to_remove, + refs_for_id + ); + } + + ids_to_retain.extend(self.compute_all_branch_snapshots_to_retain(table_meta.refs.values())); + ids_to_retain.extend(self.unreferenced_snapshots_to_retain(table_meta.refs.values())); + + for ref_name in table_meta.refs.keys() { + if !retained_refs.contains_key(ref_name) { + self.tx + .append_updates(vec![TableUpdate::RemoveSnapshotRef { + ref_name: ref_name.clone(), + }])?; + } + } + + let mut snapshot_to_remove = Vec::new(); + for snapshot in table_meta.snapshots() { + if !ids_to_retain.contains(&snapshot.snapshot_id()) { + snapshot_to_remove.push(snapshot.snapshot_id()); + } + } + + self.tx.append_updates(vec![TableUpdate::RemoveSnapshots { + snapshot_ids: snapshot_to_remove, + }])?; + + if self.clear_expired_meta_data { + let mut reachable_specs = HashSet::new(); + reachable_specs.insert(table_meta.current_schema_id()); + let mut reachable_schemas = HashSet::new(); + reachable_schemas.insert(table_meta.current_schema_id()); + + //TODO: parallelize + for snapshot in table_meta.snapshots() { + if ids_to_retain.contains(&snapshot.snapshot_id()) { + let manifest_list = snapshot + .load_manifest_list(self.tx.table.file_io(), &table_meta) + .await?; + + for manifest in manifest_list.entries() { + reachable_specs.insert(manifest.partition_spec_id); + } + + if let Some(schema_id) = snapshot.schema_id() { + reachable_schemas.insert(schema_id); + } + } + } + + let spec_to_remove = self + .tx + .table + .metadata() + .partition_specs_iter() + .filter_map(|spec| { + if !reachable_specs.contains(&spec.spec_id()) { + Some(spec.spec_id()) + } else { + None + } + }) + .unique() + .collect(); + + self.tx + .append_updates(vec![TableUpdate::RemovePartitionSpecs { + spec_ids: spec_to_remove, + }])?; + + // let schema_to_remove = self + // .tx + // .table + // .metadata() + // .schemas_iter() + // .filter_map(|schema| { + // if !reachable_schemas.contains(&schema.schema_id()) { + // Some(schema.schema_id()) + // } else { + // None + // } + // }) + // .unique() + // .collect(); + + // TODO: RemoveSchemas + // self.tx.append_updates(vec![TableUpdate::RemoveSchemas { + // schema_ids: schema_to_remove, + // }])?; + } + + Ok(self.tx) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(self) -> Result> { + self.apply_impl().await + } + + fn compute_retained_refs( + &self, + snapshot_refs: &HashMap, + ) -> HashMap { + let table_meta = self.tx.table.metadata(); + let mut retained_refs = HashMap::new(); + // let mut snapshot_refs = &table_meta.refs; + + for (ref_name, snapshot_ref) in snapshot_refs { + if ref_name == MAIN_BRANCH { + retained_refs.insert(ref_name.clone(), snapshot_ref.clone()); + continue; + } + + let snapshot = table_meta.snapshot_by_id(snapshot_ref.snapshot_id); + let max_ref_age_ms = match &snapshot_ref.retention { + SnapshotRetention::Branch { + min_snapshots_to_keep: _, + max_snapshot_age_ms: _, + max_ref_age_ms, + } => max_ref_age_ms, + SnapshotRetention::Tag { max_ref_age_ms } => max_ref_age_ms, + } + .unwrap_or(self.default_max_ref_age_ms); + + if let Some(snapshot) = snapshot { + let ref_age_ms = self.now - snapshot.timestamp_ms(); + if ref_age_ms <= max_ref_age_ms { + retained_refs.insert(ref_name.clone(), snapshot_ref.clone()); + } + } else { + // warn + } + } + + retained_refs + } + + fn compute_all_branch_snapshots_to_retain( + &self, + refs: impl Iterator, + ) -> HashSet { + let mut branch_snapshots_to_retain = HashSet::new(); + for snapshot_ref in refs { + if snapshot_ref.is_branch() { + let max_snapshot_age_ms = match snapshot_ref.retention { + SnapshotRetention::Branch { + min_snapshots_to_keep: _, + max_snapshot_age_ms, + max_ref_age_ms: _, + } => max_snapshot_age_ms, + SnapshotRetention::Tag { max_ref_age_ms: _ } => None, + } + .unwrap_or(self.default_expired_older_than); + + let expire_snapshot_older_than = self.now - max_snapshot_age_ms; + + let min_snapshots_to_keep = match snapshot_ref.retention { + SnapshotRetention::Branch { + min_snapshots_to_keep, + max_snapshot_age_ms: _, + max_ref_age_ms: _, + } => min_snapshots_to_keep, + SnapshotRetention::Tag { max_ref_age_ms: _ } => None, + } + .unwrap_or(self.default_min_num_snapshots); + + branch_snapshots_to_retain.extend(self.compute_branch_snapshots_to_retain( + snapshot_ref.snapshot_id, + expire_snapshot_older_than, + min_snapshots_to_keep as usize, + )); + } + } + + branch_snapshots_to_retain + } + + fn compute_branch_snapshots_to_retain( + &self, + snapshot_id: i64, + expire_snapshots_older_than: i64, + min_snapshots_to_keep: usize, + ) -> HashSet { + let mut ids_to_retain = HashSet::new(); + let table_meta = self.tx.table.metadata(); + if let Some(snapshot) = table_meta.snapshot_by_id(snapshot_id) { + let mut snapshot = snapshot.clone(); + while let Some(ancestor) = snapshot.parent_snapshot(table_meta) { + if ids_to_retain.len() < min_snapshots_to_keep + || snapshot.timestamp_ms() >= expire_snapshots_older_than + { + ids_to_retain.insert(snapshot.snapshot_id()); + } else { + return ids_to_retain; + } + + snapshot = ancestor; + } + } + + ids_to_retain + } + + fn unreferenced_snapshots_to_retain( + &self, + refs: impl Iterator, + ) -> HashSet { + let mut ids_to_retain = HashSet::new(); + + for snapshot_ref in refs { + if snapshot_ref.is_branch() { + if let Some(snapshot) = self + .tx + .table + .metadata() + .snapshot_by_id(snapshot_ref.snapshot_id) + { + let mut snapshot = snapshot.clone(); + while let Some(ancestor) = snapshot.parent_snapshot(self.tx.table.metadata()) { + ids_to_retain.insert(snapshot.snapshot_id()); + + snapshot = ancestor; + } + } + } else { + ids_to_retain.insert(snapshot_ref.snapshot_id); + } + } + + ids_to_retain + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; From dcc0ea44f41d928cee8e386eba9afba762149af0 Mon Sep 17 00:00:00 2001 From: Li0k Date: Mon, 10 Mar 2025 16:25:04 +0800 Subject: [PATCH 02/11] feat(iceberg): introduce new properties for remove snapshots --- crates/iceberg/src/spec/table_metadata.rs | 13 ++++++ crates/iceberg/src/transaction.rs | 48 +++++++++++++++++++---- 2 files changed, 53 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 94f1191b26..f6f5a540ee 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -78,6 +78,19 @@ pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX: &str = "write.metadata.previo /// Default value for max number of previous versions to keep. pub const PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT: usize = 100; +/// Property key for max snapshot age in milliseconds. +pub const MAX_SNAPSHOT_AGE_MS: &str = "history.expire.max-snapshot-age-ms"; +/// Default value for max snapshot age in milliseconds. +pub const MAX_SNAPSHOT_AGE_MS_DEFAULT: i64 = 5 * 24 * 60 * 60 * 1000; // 5 days +/// Property key for min snapshots to keep. +pub const MIN_SNAPSHOTS_TO_KEEP: &str = "history.expire.min-snapshots-to-keep"; +/// Default value for min snapshots to keep. +pub const MIN_SNAPSHOTS_TO_KEEP_DEFAULT: i32 = 1; +/// Property key for max reference age in milliseconds. +pub const MAX_REF_AGE_MS: &str = "history.expire.max-ref-age-ms"; +/// Default value for max reference age in milliseconds. +pub const MAX_REF_AGE_MS_DEFAULT: i64 = i64::MAX; + /// Reserved Iceberg table properties list. /// /// Reserved table properties are only used to control behaviors when creating or updating a diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index d4a338c24d..9c251033b8 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -20,6 +20,7 @@ use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::future::Future; +use std::i32; use std::mem::discriminant; use std::ops::RangeFrom; @@ -34,7 +35,8 @@ use crate::spec::{ DataContentType, DataFile, DataFileFormat, FormatVersion, ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, - Summary, Transform, MAIN_BRANCH, + Summary, Transform, MAIN_BRANCH, MAX_REF_AGE_MS, MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS, + MAX_SNAPSHOT_AGE_MS_DEFAULT, MIN_SNAPSHOTS_TO_KEEP, MIN_SNAPSHOTS_TO_KEEP_DEFAULT, }; use crate::table::Table; use crate::writer::file_writer::ParquetWriter; @@ -158,6 +160,11 @@ impl<'a> Transaction<'a> { } } + /// Creates remove snapshot action. + pub fn expire_snapshot(self) -> RemoveSnapshotAction<'a> { + RemoveSnapshotAction::new(self) + } + /// Remove properties in table. pub fn remove_properties(mut self, keys: Vec) -> Result { self.append_updates(vec![TableUpdate::RemoveProperties { removals: keys }])?; @@ -721,7 +728,6 @@ impl<'a> ReplaceSortOrderAction<'a> { /// Transaction action for removing snapshot. pub struct RemoveSnapshotAction<'a> { tx: Transaction<'a>, - commit_uuid: Uuid, clear_expire_files: bool, ids_to_remove: HashSet, default_expired_older_than: i64, @@ -733,13 +739,39 @@ pub struct RemoveSnapshotAction<'a> { } impl<'a> RemoveSnapshotAction<'a> { - // pub fn new() -> Self { - // Self - // } + /// Creates a new action. + pub fn new(tx: Transaction<'a>) -> Self { + let table = tx.table; + let properties = table.metadata().properties(); + + let now = chrono::Utc::now().timestamp_millis(); + + let default_max_snapshot_age_ms = properties + .get(MAX_SNAPSHOT_AGE_MS) + .and_then(|v| v.parse::().ok()) + .unwrap_or(MAX_SNAPSHOT_AGE_MS_DEFAULT); + + let default_min_num_snapshots = properties + .get(MIN_SNAPSHOTS_TO_KEEP) + .and_then(|v| v.parse::().ok()) + .unwrap_or(MIN_SNAPSHOTS_TO_KEEP_DEFAULT); - // pub fn new(tx: Transaction<'a>, commit_uuid: Uuid) -> Self { - // Self { tx, commit_uuid } - // } + let default_max_ref_age_ms = properties + .get(MAX_REF_AGE_MS) + .and_then(|v| v.parse::().ok()) + .unwrap_or(MAX_REF_AGE_MS_DEFAULT); + + Self { + tx, + clear_expire_files: false, + ids_to_remove: HashSet::new(), + default_expired_older_than: now - default_max_snapshot_age_ms, + default_min_num_snapshots, + default_max_ref_age_ms, + now, + clear_expired_meta_data: false, + } + } /// Finished building the action and apply it to the transaction. pub fn clear_expire_files(mut self, clear_expire_files: bool) -> Self { From 61442c2750e69ecc6c497f679ace25fa3a2c2ad9 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 11 Mar 2025 14:19:13 +0800 Subject: [PATCH 03/11] feat(iceberg): support remove schemas --- crates/iceberg/src/transaction.rs | 80 +++++++++++++++++++++++-------- 1 file changed, 61 insertions(+), 19 deletions(-) diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 9c251033b8..0abd508aa1 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -900,27 +900,36 @@ impl<'a> RemoveSnapshotAction<'a> { spec_ids: spec_to_remove, }])?; - // let schema_to_remove = self - // .tx - // .table - // .metadata() - // .schemas_iter() - // .filter_map(|schema| { - // if !reachable_schemas.contains(&schema.schema_id()) { - // Some(schema.schema_id()) - // } else { - // None - // } - // }) - // .unique() - // .collect(); - - // TODO: RemoveSchemas - // self.tx.append_updates(vec![TableUpdate::RemoveSchemas { - // schema_ids: schema_to_remove, - // }])?; + let schema_to_remove = self + .tx + .table + .metadata() + .schemas_iter() + .filter_map(|schema| { + if !reachable_schemas.contains(&schema.schema_id()) { + Some(schema.schema_id()) + } else { + None + } + }) + .unique() + .collect(); + + self.tx.append_updates(vec![TableUpdate::RemoveSchemas { + schema_ids: schema_to_remove, + }])?; } + self.tx.append_requirements(vec![ + TableRequirement::UuidMatch { + uuid: self.tx.table.metadata().uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: self.tx.table.metadata().current_snapshot_id(), + }, + ])?; + Ok(self.tx) } @@ -1413,4 +1422,37 @@ mod tests { assert!(manifest_paths.contains(&path)); } } + + #[tokio::test] + async fn test_remove_snapshot_action() { + let table = make_v2_table(); + let table_meta = table.metadata().clone(); + assert_eq!(2, table_meta.snapshots().count()); + { + let tx = Transaction::new(&table); + let tx = tx.expire_snapshot().apply().await.unwrap(); + // keep the last snapshot + for update in &tx.updates { + match update { + TableUpdate::RemoveSnapshots { snapshot_ids } => { + assert_eq!(1, snapshot_ids.len()); + } + _ => {} + } + } + + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: tx.table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: tx.table.metadata().current_snapshot_id + } + ], + tx.requirements + ); + } + } } From 0cee955edf57e254e6a29bf6acb20edc96eb57b7 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 11 Mar 2025 17:31:48 +0800 Subject: [PATCH 04/11] refactor(iceberg): refactor file org --- crates/iceberg/src/lib.rs | 1 + crates/iceberg/src/remove_snapshots.rs | 379 +++++++++++++++++++++++++ crates/iceberg/src/transaction.rs | 378 ++---------------------- 3 files changed, 404 insertions(+), 354 deletions(-) create mode 100644 crates/iceberg/src/remove_snapshots.rs diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 273031f675..783f4b170f 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -78,6 +78,7 @@ pub mod inspect; pub mod scan; pub mod expr; +pub mod remove_snapshots; pub mod transaction; pub mod transform; diff --git a/crates/iceberg/src/remove_snapshots.rs b/crates/iceberg/src/remove_snapshots.rs new file mode 100644 index 0000000000..4575cb4609 --- /dev/null +++ b/crates/iceberg/src/remove_snapshots.rs @@ -0,0 +1,379 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Transaction action for removing snapshot. + +use std::collections::{HashMap, HashSet}; +use std::i32; + +use itertools::Itertools; + +use crate::error::Result; +use crate::spec::{ + SnapshotReference, SnapshotRetention, MAIN_BRANCH, MAX_REF_AGE_MS, MAX_REF_AGE_MS_DEFAULT, + MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT, MIN_SNAPSHOTS_TO_KEEP, + MIN_SNAPSHOTS_TO_KEEP_DEFAULT, +}; +use crate::transaction::Transaction; +use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; + +/// RemoveSnapshotAction is a transaction action for removing snapshot. +pub struct RemoveSnapshotAction<'a> { + tx: Transaction<'a>, + clear_expire_files: bool, + ids_to_remove: HashSet, + default_expired_older_than: i64, + default_min_num_snapshots: i32, + default_max_ref_age_ms: i64, + clear_expired_meta_data: bool, + + now: i64, +} + +impl<'a> RemoveSnapshotAction<'a> { + /// Creates a new action. + pub fn new(tx: Transaction<'a>) -> Self { + let table = tx.table; + let properties = table.metadata().properties(); + + let now = chrono::Utc::now().timestamp_millis(); + + let default_max_snapshot_age_ms = properties + .get(MAX_SNAPSHOT_AGE_MS) + .and_then(|v| v.parse::().ok()) + .unwrap_or(MAX_SNAPSHOT_AGE_MS_DEFAULT); + + let default_min_num_snapshots = properties + .get(MIN_SNAPSHOTS_TO_KEEP) + .and_then(|v| v.parse::().ok()) + .unwrap_or(MIN_SNAPSHOTS_TO_KEEP_DEFAULT); + + let default_max_ref_age_ms = properties + .get(MAX_REF_AGE_MS) + .and_then(|v| v.parse::().ok()) + .unwrap_or(MAX_REF_AGE_MS_DEFAULT); + + Self { + tx, + clear_expire_files: false, + ids_to_remove: HashSet::new(), + default_expired_older_than: now - default_max_snapshot_age_ms, + default_min_num_snapshots, + default_max_ref_age_ms, + now, + clear_expired_meta_data: false, + } + } + + /// Finished building the action and apply it to the transaction. + pub fn clear_expire_files(mut self, clear_expire_files: bool) -> Self { + self.clear_expire_files = clear_expire_files; + self + } + + /// Finished building the action and apply it to the transaction. + pub fn expire_snapshot_id(mut self, expire_snapshot_id: i64) -> Self { + self.ids_to_remove.insert(expire_snapshot_id); + self + } + + /// Finished building the action and apply it to the transaction. + pub fn expire_older_than(mut self, timestamp_ms: i64) -> Self { + self.default_expired_older_than = timestamp_ms; + self + } + + /// Finished building the action and apply it to the transaction. + pub fn retain_last(mut self, min_num_snapshots: i32) -> Self { + self.default_min_num_snapshots = min_num_snapshots; + self + } + + /// Finished building the action and apply it to the transaction. + pub fn clear_expired_meta_data(mut self, clear_expired_meta_data: bool) -> Self { + self.clear_expired_meta_data = clear_expired_meta_data; + self + } + + /// Finished building the action and apply it to the transaction. + async fn apply_impl(mut self) -> Result> { + if self.tx.table.metadata().refs.is_empty() { + return Ok(self.tx); + } + + let table_meta = self.tx.table.metadata().clone(); + + let mut ids_to_retain = HashSet::new(); + let retained_refs = self.compute_retained_refs(&table_meta.refs); + let mut retained_id_to_refs = HashMap::new(); + for (ref_name, snapshot_ref) in &retained_refs { + let snapshot_id = snapshot_ref.snapshot_id; + retained_id_to_refs + .entry(snapshot_id) + .or_insert_with(Vec::new) + .push(ref_name.clone()); + + ids_to_retain.insert(snapshot_id); + } + + for id_to_remove in &self.ids_to_remove { + let refs_for_id = retained_id_to_refs.get(id_to_remove); + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot remove snapshot {:?} with retained references: {:?}", + id_to_remove, refs_for_id + ), + )); + } + + ids_to_retain.extend(self.compute_all_branch_snapshots_to_retain(table_meta.refs.values())); + ids_to_retain.extend(self.unreferenced_snapshots_to_retain(table_meta.refs.values())); + + for ref_name in table_meta.refs.keys() { + if !retained_refs.contains_key(ref_name) { + self.tx + .append_updates(vec![TableUpdate::RemoveSnapshotRef { + ref_name: ref_name.clone(), + }])?; + } + } + + let mut snapshot_to_remove = Vec::from_iter(self.ids_to_remove.iter().cloned()); + for snapshot in table_meta.snapshots() { + if !ids_to_retain.contains(&snapshot.snapshot_id()) { + snapshot_to_remove.push(snapshot.snapshot_id()); + } + } + + self.tx.append_updates(vec![TableUpdate::RemoveSnapshots { + snapshot_ids: snapshot_to_remove, + }])?; + + if self.clear_expired_meta_data { + let mut reachable_specs = HashSet::new(); + reachable_specs.insert(table_meta.current_schema_id()); + let mut reachable_schemas = HashSet::new(); + reachable_schemas.insert(table_meta.current_schema_id()); + + //TODO: parallelize + for snapshot in table_meta.snapshots() { + if ids_to_retain.contains(&snapshot.snapshot_id()) { + let manifest_list = snapshot + .load_manifest_list(self.tx.table.file_io(), &table_meta) + .await?; + + for manifest in manifest_list.entries() { + reachable_specs.insert(manifest.partition_spec_id); + } + + if let Some(schema_id) = snapshot.schema_id() { + reachable_schemas.insert(schema_id); + } + } + } + + let spec_to_remove = self + .tx + .table + .metadata() + .partition_specs_iter() + .filter_map(|spec| { + if !reachable_specs.contains(&spec.spec_id()) { + Some(spec.spec_id()) + } else { + None + } + }) + .unique() + .collect(); + + self.tx + .append_updates(vec![TableUpdate::RemovePartitionSpecs { + spec_ids: spec_to_remove, + }])?; + + let schema_to_remove = self + .tx + .table + .metadata() + .schemas_iter() + .filter_map(|schema| { + if !reachable_schemas.contains(&schema.schema_id()) { + Some(schema.schema_id()) + } else { + None + } + }) + .unique() + .collect(); + + self.tx.append_updates(vec![TableUpdate::RemoveSchemas { + schema_ids: schema_to_remove, + }])?; + } + + self.tx.append_requirements(vec![ + TableRequirement::UuidMatch { + uuid: self.tx.table.metadata().uuid(), + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: self.tx.table.metadata().current_snapshot_id(), + }, + ])?; + + Ok(self.tx) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(self) -> Result> { + self.apply_impl().await + } + + fn compute_retained_refs( + &self, + snapshot_refs: &HashMap, + ) -> HashMap { + let table_meta = self.tx.table.metadata(); + let mut retained_refs = HashMap::new(); + + for (ref_name, snapshot_ref) in snapshot_refs { + if ref_name == MAIN_BRANCH { + retained_refs.insert(ref_name.clone(), snapshot_ref.clone()); + continue; + } + + let snapshot = table_meta.snapshot_by_id(snapshot_ref.snapshot_id); + let max_ref_age_ms = match &snapshot_ref.retention { + SnapshotRetention::Branch { + min_snapshots_to_keep: _, + max_snapshot_age_ms: _, + max_ref_age_ms, + } => max_ref_age_ms, + SnapshotRetention::Tag { max_ref_age_ms } => max_ref_age_ms, + } + .unwrap_or(self.default_max_ref_age_ms); + + if let Some(snapshot) = snapshot { + let ref_age_ms = self.now - snapshot.timestamp_ms(); + if ref_age_ms <= max_ref_age_ms { + retained_refs.insert(ref_name.clone(), snapshot_ref.clone()); + } + } else { + // warn + } + } + + retained_refs + } + + fn compute_all_branch_snapshots_to_retain( + &self, + refs: impl Iterator, + ) -> HashSet { + let mut branch_snapshots_to_retain = HashSet::new(); + for snapshot_ref in refs { + if snapshot_ref.is_branch() { + let max_snapshot_age_ms = match snapshot_ref.retention { + SnapshotRetention::Branch { + min_snapshots_to_keep: _, + max_snapshot_age_ms, + max_ref_age_ms: _, + } => max_snapshot_age_ms, + SnapshotRetention::Tag { max_ref_age_ms: _ } => None, + } + .unwrap_or(self.default_expired_older_than); + + let expire_snapshot_older_than = self.now - max_snapshot_age_ms; + + let min_snapshots_to_keep = match snapshot_ref.retention { + SnapshotRetention::Branch { + min_snapshots_to_keep, + max_snapshot_age_ms: _, + max_ref_age_ms: _, + } => min_snapshots_to_keep, + SnapshotRetention::Tag { max_ref_age_ms: _ } => None, + } + .unwrap_or(self.default_min_num_snapshots); + + branch_snapshots_to_retain.extend(self.compute_branch_snapshots_to_retain( + snapshot_ref.snapshot_id, + expire_snapshot_older_than, + min_snapshots_to_keep as usize, + )); + } + } + + branch_snapshots_to_retain + } + + fn compute_branch_snapshots_to_retain( + &self, + snapshot_id: i64, + expire_snapshots_older_than: i64, + min_snapshots_to_keep: usize, + ) -> HashSet { + let mut ids_to_retain = HashSet::new(); + let table_meta = self.tx.table.metadata(); + if let Some(snapshot) = table_meta.snapshot_by_id(snapshot_id) { + let mut snapshot = snapshot.clone(); + while let Some(ancestor) = snapshot.parent_snapshot(table_meta) { + if ids_to_retain.len() < min_snapshots_to_keep + || snapshot.timestamp_ms() >= expire_snapshots_older_than + { + ids_to_retain.insert(snapshot.snapshot_id()); + } else { + return ids_to_retain; + } + + snapshot = ancestor; + } + } + + ids_to_retain + } + + fn unreferenced_snapshots_to_retain( + &self, + refs: impl Iterator, + ) -> HashSet { + let mut ids_to_retain = HashSet::new(); + + for snapshot_ref in refs { + if snapshot_ref.is_branch() { + if let Some(snapshot) = self + .tx + .table + .metadata() + .snapshot_by_id(snapshot_ref.snapshot_id) + { + let mut snapshot = snapshot.clone(); + while let Some(ancestor) = snapshot.parent_snapshot(self.tx.table.metadata()) { + ids_to_retain.insert(snapshot.snapshot_id()); + + snapshot = ancestor; + } + } + } else { + ids_to_retain.insert(snapshot_ref.snapshot_id); + } + } + + ids_to_retain + } +} diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 0abd508aa1..54eaaa2206 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -20,23 +20,21 @@ use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::future::Future; -use std::i32; use std::mem::discriminant; use std::ops::RangeFrom; use arrow_array::StringArray; use futures::TryStreamExt; -use itertools::Itertools; use uuid::Uuid; use crate::error::Result; use crate::io::OutputFile; +use crate::remove_snapshots::RemoveSnapshotAction; use crate::spec::{ DataContentType, DataFile, DataFileFormat, FormatVersion, ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriterBuilder, NullOrder, Operation, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, StructType, - Summary, Transform, MAIN_BRANCH, MAX_REF_AGE_MS, MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS, - MAX_SNAPSHOT_AGE_MS_DEFAULT, MIN_SNAPSHOTS_TO_KEEP, MIN_SNAPSHOTS_TO_KEEP_DEFAULT, + Summary, Transform, MAIN_BRANCH, }; use crate::table::Table; use crate::writer::file_writer::ParquetWriter; @@ -47,7 +45,7 @@ const META_ROOT_PATH: &str = "metadata"; /// Table transaction. pub struct Transaction<'a> { - table: &'a Table, + pub(crate) table: &'a Table, updates: Vec, requirements: Vec, } @@ -62,7 +60,7 @@ impl<'a> Transaction<'a> { } } - fn append_updates(&mut self, updates: Vec) -> Result<()> { + pub(crate) fn append_updates(&mut self, updates: Vec) -> Result<()> { for update in &updates { for up in &self.updates { if discriminant(up) == discriminant(update) { @@ -80,7 +78,10 @@ impl<'a> Transaction<'a> { Ok(()) } - fn append_requirements(&mut self, requirements: Vec) -> Result<()> { + pub(crate) fn append_requirements( + &mut self, + requirements: Vec, + ) -> Result<()> { self.requirements.extend(requirements); Ok(()) } @@ -725,353 +726,6 @@ impl<'a> ReplaceSortOrderAction<'a> { } } -/// Transaction action for removing snapshot. -pub struct RemoveSnapshotAction<'a> { - tx: Transaction<'a>, - clear_expire_files: bool, - ids_to_remove: HashSet, - default_expired_older_than: i64, - default_min_num_snapshots: i32, - default_max_ref_age_ms: i64, - clear_expired_meta_data: bool, - - now: i64, -} - -impl<'a> RemoveSnapshotAction<'a> { - /// Creates a new action. - pub fn new(tx: Transaction<'a>) -> Self { - let table = tx.table; - let properties = table.metadata().properties(); - - let now = chrono::Utc::now().timestamp_millis(); - - let default_max_snapshot_age_ms = properties - .get(MAX_SNAPSHOT_AGE_MS) - .and_then(|v| v.parse::().ok()) - .unwrap_or(MAX_SNAPSHOT_AGE_MS_DEFAULT); - - let default_min_num_snapshots = properties - .get(MIN_SNAPSHOTS_TO_KEEP) - .and_then(|v| v.parse::().ok()) - .unwrap_or(MIN_SNAPSHOTS_TO_KEEP_DEFAULT); - - let default_max_ref_age_ms = properties - .get(MAX_REF_AGE_MS) - .and_then(|v| v.parse::().ok()) - .unwrap_or(MAX_REF_AGE_MS_DEFAULT); - - Self { - tx, - clear_expire_files: false, - ids_to_remove: HashSet::new(), - default_expired_older_than: now - default_max_snapshot_age_ms, - default_min_num_snapshots, - default_max_ref_age_ms, - now, - clear_expired_meta_data: false, - } - } - - /// Finished building the action and apply it to the transaction. - pub fn clear_expire_files(mut self, clear_expire_files: bool) -> Self { - self.clear_expire_files = clear_expire_files; - self - } - - /// Finished building the action and apply it to the transaction. - pub fn expire_snapshot_id(mut self, expire_snapshot_id: i64) -> Self { - self.ids_to_remove.insert(expire_snapshot_id); - self - } - - /// Finished building the action and apply it to the transaction. - pub fn expire_older_than(mut self, timestamp_ms: i64) -> Self { - self.default_expired_older_than = timestamp_ms; - self - } - - /// Finished building the action and apply it to the transaction. - pub fn retain_last(mut self, min_num_snapshots: i32) -> Self { - self.default_min_num_snapshots = min_num_snapshots; - self - } - - /// Finished building the action and apply it to the transaction. - pub fn clear_expired_meta_data(mut self, clear_expired_meta_data: bool) -> Self { - self.clear_expired_meta_data = clear_expired_meta_data; - self - } - - /// Finished building the action and apply it to the transaction. - async fn apply_impl(mut self) -> Result> { - if self.tx.table.metadata().refs.is_empty() { - return Ok(self.tx); - } - - let table_meta = self.tx.table.metadata().clone(); - - let mut ids_to_retain = HashSet::new(); - let retained_refs = self.compute_retained_refs(&table_meta.refs); - let mut retained_id_to_refs = HashMap::new(); - for (ref_name, snapshot_ref) in &retained_refs { - let snapshot_id = snapshot_ref.snapshot_id; - retained_id_to_refs - .entry(snapshot_id) - .or_insert_with(Vec::new) - .push(ref_name.clone()); - - ids_to_retain.insert(snapshot_id); - } - - for id_to_remove in &self.ids_to_remove { - let refs_for_id = retained_id_to_refs.get(id_to_remove); - assert!( - refs_for_id.is_none(), - "Cannot remove snapshot {:?} with retained references: {:?}", - id_to_remove, - refs_for_id - ); - } - - ids_to_retain.extend(self.compute_all_branch_snapshots_to_retain(table_meta.refs.values())); - ids_to_retain.extend(self.unreferenced_snapshots_to_retain(table_meta.refs.values())); - - for ref_name in table_meta.refs.keys() { - if !retained_refs.contains_key(ref_name) { - self.tx - .append_updates(vec![TableUpdate::RemoveSnapshotRef { - ref_name: ref_name.clone(), - }])?; - } - } - - let mut snapshot_to_remove = Vec::new(); - for snapshot in table_meta.snapshots() { - if !ids_to_retain.contains(&snapshot.snapshot_id()) { - snapshot_to_remove.push(snapshot.snapshot_id()); - } - } - - self.tx.append_updates(vec![TableUpdate::RemoveSnapshots { - snapshot_ids: snapshot_to_remove, - }])?; - - if self.clear_expired_meta_data { - let mut reachable_specs = HashSet::new(); - reachable_specs.insert(table_meta.current_schema_id()); - let mut reachable_schemas = HashSet::new(); - reachable_schemas.insert(table_meta.current_schema_id()); - - //TODO: parallelize - for snapshot in table_meta.snapshots() { - if ids_to_retain.contains(&snapshot.snapshot_id()) { - let manifest_list = snapshot - .load_manifest_list(self.tx.table.file_io(), &table_meta) - .await?; - - for manifest in manifest_list.entries() { - reachable_specs.insert(manifest.partition_spec_id); - } - - if let Some(schema_id) = snapshot.schema_id() { - reachable_schemas.insert(schema_id); - } - } - } - - let spec_to_remove = self - .tx - .table - .metadata() - .partition_specs_iter() - .filter_map(|spec| { - if !reachable_specs.contains(&spec.spec_id()) { - Some(spec.spec_id()) - } else { - None - } - }) - .unique() - .collect(); - - self.tx - .append_updates(vec![TableUpdate::RemovePartitionSpecs { - spec_ids: spec_to_remove, - }])?; - - let schema_to_remove = self - .tx - .table - .metadata() - .schemas_iter() - .filter_map(|schema| { - if !reachable_schemas.contains(&schema.schema_id()) { - Some(schema.schema_id()) - } else { - None - } - }) - .unique() - .collect(); - - self.tx.append_updates(vec![TableUpdate::RemoveSchemas { - schema_ids: schema_to_remove, - }])?; - } - - self.tx.append_requirements(vec![ - TableRequirement::UuidMatch { - uuid: self.tx.table.metadata().uuid(), - }, - TableRequirement::RefSnapshotIdMatch { - r#ref: MAIN_BRANCH.to_string(), - snapshot_id: self.tx.table.metadata().current_snapshot_id(), - }, - ])?; - - Ok(self.tx) - } - - /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result> { - self.apply_impl().await - } - - fn compute_retained_refs( - &self, - snapshot_refs: &HashMap, - ) -> HashMap { - let table_meta = self.tx.table.metadata(); - let mut retained_refs = HashMap::new(); - // let mut snapshot_refs = &table_meta.refs; - - for (ref_name, snapshot_ref) in snapshot_refs { - if ref_name == MAIN_BRANCH { - retained_refs.insert(ref_name.clone(), snapshot_ref.clone()); - continue; - } - - let snapshot = table_meta.snapshot_by_id(snapshot_ref.snapshot_id); - let max_ref_age_ms = match &snapshot_ref.retention { - SnapshotRetention::Branch { - min_snapshots_to_keep: _, - max_snapshot_age_ms: _, - max_ref_age_ms, - } => max_ref_age_ms, - SnapshotRetention::Tag { max_ref_age_ms } => max_ref_age_ms, - } - .unwrap_or(self.default_max_ref_age_ms); - - if let Some(snapshot) = snapshot { - let ref_age_ms = self.now - snapshot.timestamp_ms(); - if ref_age_ms <= max_ref_age_ms { - retained_refs.insert(ref_name.clone(), snapshot_ref.clone()); - } - } else { - // warn - } - } - - retained_refs - } - - fn compute_all_branch_snapshots_to_retain( - &self, - refs: impl Iterator, - ) -> HashSet { - let mut branch_snapshots_to_retain = HashSet::new(); - for snapshot_ref in refs { - if snapshot_ref.is_branch() { - let max_snapshot_age_ms = match snapshot_ref.retention { - SnapshotRetention::Branch { - min_snapshots_to_keep: _, - max_snapshot_age_ms, - max_ref_age_ms: _, - } => max_snapshot_age_ms, - SnapshotRetention::Tag { max_ref_age_ms: _ } => None, - } - .unwrap_or(self.default_expired_older_than); - - let expire_snapshot_older_than = self.now - max_snapshot_age_ms; - - let min_snapshots_to_keep = match snapshot_ref.retention { - SnapshotRetention::Branch { - min_snapshots_to_keep, - max_snapshot_age_ms: _, - max_ref_age_ms: _, - } => min_snapshots_to_keep, - SnapshotRetention::Tag { max_ref_age_ms: _ } => None, - } - .unwrap_or(self.default_min_num_snapshots); - - branch_snapshots_to_retain.extend(self.compute_branch_snapshots_to_retain( - snapshot_ref.snapshot_id, - expire_snapshot_older_than, - min_snapshots_to_keep as usize, - )); - } - } - - branch_snapshots_to_retain - } - - fn compute_branch_snapshots_to_retain( - &self, - snapshot_id: i64, - expire_snapshots_older_than: i64, - min_snapshots_to_keep: usize, - ) -> HashSet { - let mut ids_to_retain = HashSet::new(); - let table_meta = self.tx.table.metadata(); - if let Some(snapshot) = table_meta.snapshot_by_id(snapshot_id) { - let mut snapshot = snapshot.clone(); - while let Some(ancestor) = snapshot.parent_snapshot(table_meta) { - if ids_to_retain.len() < min_snapshots_to_keep - || snapshot.timestamp_ms() >= expire_snapshots_older_than - { - ids_to_retain.insert(snapshot.snapshot_id()); - } else { - return ids_to_retain; - } - - snapshot = ancestor; - } - } - - ids_to_retain - } - - fn unreferenced_snapshots_to_retain( - &self, - refs: impl Iterator, - ) -> HashSet { - let mut ids_to_retain = HashSet::new(); - - for snapshot_ref in refs { - if snapshot_ref.is_branch() { - if let Some(snapshot) = self - .tx - .table - .metadata() - .snapshot_by_id(snapshot_ref.snapshot_id) - { - let mut snapshot = snapshot.clone(); - while let Some(ancestor) = snapshot.parent_snapshot(self.tx.table.metadata()) { - ids_to_retain.insert(snapshot.snapshot_id()); - - snapshot = ancestor; - } - } - } else { - ids_to_retain.insert(snapshot_ref.snapshot_id); - } - } - - ids_to_retain - } -} - #[cfg(test)] mod tests { use std::collections::HashMap; @@ -1454,5 +1108,21 @@ mod tests { tx.requirements ); } + + { + // test remove main current snapshot + let tx = Transaction::new(&table); + let err = tx + .expire_snapshot() + .expire_snapshot_id(table.metadata().current_snapshot_id().unwrap()) + .apply() + .await + .err() + .unwrap(); + assert_eq!( + "DataInvalid => Cannot remove snapshot 3055729675574597004 with retained references: Some([\"main\"])", + err.to_string() + ) + } } } From 554c81b58cfee259cc825cf32c0e88aa468ebae0 Mon Sep 17 00:00:00 2001 From: Li0k Date: Tue, 11 Mar 2025 17:34:19 +0800 Subject: [PATCH 05/11] address comments --- crates/iceberg/src/remove_snapshots.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/crates/iceberg/src/remove_snapshots.rs b/crates/iceberg/src/remove_snapshots.rs index 4575cb4609..5f0e2dd029 100644 --- a/crates/iceberg/src/remove_snapshots.rs +++ b/crates/iceberg/src/remove_snapshots.rs @@ -110,7 +110,7 @@ impl<'a> RemoveSnapshotAction<'a> { } /// Finished building the action and apply it to the transaction. - async fn apply_impl(mut self) -> Result> { + pub async fn apply(mut self) -> Result> { if self.tx.table.metadata().refs.is_empty() { return Ok(self.tx); } @@ -240,11 +240,6 @@ impl<'a> RemoveSnapshotAction<'a> { Ok(self.tx) } - /// Finished building the action and apply it to the transaction. - pub async fn apply(self) -> Result> { - self.apply_impl().await - } - fn compute_retained_refs( &self, snapshot_refs: &HashMap, From 0b211d66f7590d28a8bf6e2833b59b5b07df57f4 Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 12 Mar 2025 14:42:00 +0800 Subject: [PATCH 06/11] refactor(iceberg): refactor and ut --- Cargo.lock | 1 + crates/iceberg/src/remove_snapshots.rs | 53 +++--- crates/iceberg/src/spec/snapshot.rs | 30 ++++ crates/iceberg/src/transaction.rs | 78 +++++++- .../TableMetadataV2ValidMultiSnapshot.json | 167 ++++++++++++++++++ crates/integration_tests/Cargo.toml | 1 + .../tests/shared_tests/mod.rs | 1 + .../shared_tests/remove_snapshots_test.rs | 139 +++++++++++++++ 8 files changed, 444 insertions(+), 26 deletions(-) create mode 100644 crates/iceberg/testdata/table_metadata/TableMetadataV2ValidMultiSnapshot.json create mode 100644 crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs diff --git a/Cargo.lock b/Cargo.lock index 13bff64bad..1aec558d7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3129,6 +3129,7 @@ version = "0.4.0" dependencies = [ "arrow-array", "arrow-schema", + "chrono", "ctor", "datafusion", "futures", diff --git a/crates/iceberg/src/remove_snapshots.rs b/crates/iceberg/src/remove_snapshots.rs index 5f0e2dd029..ed4ea88651 100644 --- a/crates/iceberg/src/remove_snapshots.rs +++ b/crates/iceberg/src/remove_snapshots.rs @@ -24,9 +24,9 @@ use itertools::Itertools; use crate::error::Result; use crate::spec::{ - SnapshotReference, SnapshotRetention, MAIN_BRANCH, MAX_REF_AGE_MS, MAX_REF_AGE_MS_DEFAULT, - MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT, MIN_SNAPSHOTS_TO_KEEP, - MIN_SNAPSHOTS_TO_KEEP_DEFAULT, + ancestors_of, SnapshotReference, SnapshotRetention, MAIN_BRANCH, MAX_REF_AGE_MS, + MAX_REF_AGE_MS_DEFAULT, MAX_SNAPSHOT_AGE_MS, MAX_SNAPSHOT_AGE_MS_DEFAULT, + MIN_SNAPSHOTS_TO_KEEP, MIN_SNAPSHOTS_TO_KEEP_DEFAULT, }; use crate::transaction::Transaction; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; @@ -160,9 +160,11 @@ impl<'a> RemoveSnapshotAction<'a> { } } - self.tx.append_updates(vec![TableUpdate::RemoveSnapshots { - snapshot_ids: snapshot_to_remove, - }])?; + if !snapshot_to_remove.is_empty() { + self.tx.append_updates(vec![TableUpdate::RemoveSnapshots { + snapshot_ids: snapshot_to_remove, + }])?; + } if self.clear_expired_meta_data { let mut reachable_specs = HashSet::new(); @@ -291,10 +293,14 @@ impl<'a> RemoveSnapshotAction<'a> { max_ref_age_ms: _, } => max_snapshot_age_ms, SnapshotRetention::Tag { max_ref_age_ms: _ } => None, - } - .unwrap_or(self.default_expired_older_than); + }; - let expire_snapshot_older_than = self.now - max_snapshot_age_ms; + let expire_snapshot_older_than = + if let Some(max_snapshot_age_ms) = max_snapshot_age_ms { + self.now - max_snapshot_age_ms + } else { + self.default_expired_older_than + }; let min_snapshots_to_keep = match snapshot_ref.retention { SnapshotRetention::Branch { @@ -326,17 +332,15 @@ impl<'a> RemoveSnapshotAction<'a> { let mut ids_to_retain = HashSet::new(); let table_meta = self.tx.table.metadata(); if let Some(snapshot) = table_meta.snapshot_by_id(snapshot_id) { - let mut snapshot = snapshot.clone(); - while let Some(ancestor) = snapshot.parent_snapshot(table_meta) { + let ancestors = ancestors_of(snapshot.clone(), table_meta); + for ancestor in ancestors { if ids_to_retain.len() < min_snapshots_to_keep - || snapshot.timestamp_ms() >= expire_snapshots_older_than + || ancestor.timestamp_ms() >= expire_snapshots_older_than { - ids_to_retain.insert(snapshot.snapshot_id()); + ids_to_retain.insert(ancestor.snapshot_id()); } else { return ids_to_retain; } - - snapshot = ancestor; } } @@ -348,6 +352,7 @@ impl<'a> RemoveSnapshotAction<'a> { refs: impl Iterator, ) -> HashSet { let mut ids_to_retain = HashSet::new(); + let mut referenced_snapshots = HashSet::new(); for snapshot_ref in refs { if snapshot_ref.is_branch() { @@ -357,15 +362,21 @@ impl<'a> RemoveSnapshotAction<'a> { .metadata() .snapshot_by_id(snapshot_ref.snapshot_id) { - let mut snapshot = snapshot.clone(); - while let Some(ancestor) = snapshot.parent_snapshot(self.tx.table.metadata()) { - ids_to_retain.insert(snapshot.snapshot_id()); - - snapshot = ancestor; + let ancestors = ancestors_of(snapshot.clone(), self.tx.table.metadata()); + for ancestor in ancestors { + referenced_snapshots.insert(ancestor.snapshot_id()); } } } else { - ids_to_retain.insert(snapshot_ref.snapshot_id); + referenced_snapshots.insert(snapshot_ref.snapshot_id); + } + } + + for snapshot in self.tx.table.metadata().snapshots() { + if !referenced_snapshots.contains(&snapshot.snapshot_id()) + && snapshot.timestamp_ms() >= self.default_expired_older_than + { + ids_to_retain.insert(snapshot.snapshot_id()); } } diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index f2637d96b0..bbc3b0b5b1 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -400,6 +400,36 @@ impl SnapshotRetention { } } +/// An iterator over the ancestors of a snapshot. +pub struct AncestorIterator<'a> { + current: Option, + table_metadata: &'a TableMetadata, +} + +impl<'a> Iterator for AncestorIterator<'a> { + type Item = SnapshotRef; + + fn next(&mut self) -> Option { + let current = self.current.take()?; + + let next = current.parent_snapshot(self.table_metadata); + self.current = next; + + Some(current) + } +} + +/// Returns an iterator over the ancestors of a snapshot. +pub fn ancestors_of<'a>( + snapshot: SnapshotRef, + table_metadata: &'a TableMetadata, +) -> AncestorIterator<'a> { + AncestorIterator { + current: Some(snapshot), + table_metadata, + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 54eaaa2206..b6b1e4c554 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -799,6 +799,25 @@ mod tests { .unwrap() } + fn make_v2_table_with_mutli_snapshot() -> Table { + let file = File::open(format!( + "{}/testdata/table_metadata/{}", + env!("CARGO_MANIFEST_DIR"), + "TableMetadataV2ValidMultiSnapshot.json" + )) + .unwrap(); + let reader = BufReader::new(file); + let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap(); + + Table::builder() + .metadata(resp) + .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string()) + .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .file_io(FileIOBuilder::new("memory").build().unwrap()) + .build() + .unwrap() + } + #[test] fn test_upgrade_table_version_v1_to_v2() { let table = make_v1_table(); @@ -1079,17 +1098,42 @@ mod tests { #[tokio::test] async fn test_remove_snapshot_action() { - let table = make_v2_table(); + let table = make_v2_table_with_mutli_snapshot(); let table_meta = table.metadata().clone(); - assert_eq!(2, table_meta.snapshots().count()); + assert_eq!(5, table_meta.snapshots().count()); { let tx = Transaction::new(&table); let tx = tx.expire_snapshot().apply().await.unwrap(); - // keep the last snapshot for update in &tx.updates { match update { TableUpdate::RemoveSnapshots { snapshot_ids } => { - assert_eq!(1, snapshot_ids.len()); + assert_eq!(4, snapshot_ids.len()); + } + _ => {} + } + } + + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: tx.table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: tx.table.metadata().current_snapshot_id + } + ], + tx.requirements + ); + } + + { + let tx = Transaction::new(&table); + let tx = tx.expire_snapshot().retain_last(2).apply().await.unwrap(); + for update in &tx.updates { + match update { + TableUpdate::RemoveSnapshots { snapshot_ids } => { + assert_eq!(3, snapshot_ids.len()); } _ => {} } @@ -1109,6 +1153,30 @@ mod tests { ); } + { + let tx = Transaction::new(&table); + let tx = tx + .expire_snapshot() + .retain_last(100) + .expire_older_than(100) + .apply() + .await + .unwrap(); + assert_eq!(0, tx.updates.len()); + assert_eq!( + vec![ + TableRequirement::UuidMatch { + uuid: tx.table.metadata().uuid() + }, + TableRequirement::RefSnapshotIdMatch { + r#ref: MAIN_BRANCH.to_string(), + snapshot_id: tx.table.metadata().current_snapshot_id + } + ], + tx.requirements + ); + } + { // test remove main current snapshot let tx = Transaction::new(&table); @@ -1120,7 +1188,7 @@ mod tests { .err() .unwrap(); assert_eq!( - "DataInvalid => Cannot remove snapshot 3055729675574597004 with retained references: Some([\"main\"])", + "DataInvalid => Cannot remove snapshot 3067729675574597004 with retained references: Some([\"main\"])", err.to_string() ) } diff --git a/crates/iceberg/testdata/table_metadata/TableMetadataV2ValidMultiSnapshot.json b/crates/iceberg/testdata/table_metadata/TableMetadataV2ValidMultiSnapshot.json new file mode 100644 index 0000000000..a52fa2a906 --- /dev/null +++ b/crates/iceberg/testdata/table_metadata/TableMetadataV2ValidMultiSnapshot.json @@ -0,0 +1,167 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 34, + "last-updated-ms": 1675100955770, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + } + ] + }, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [ + 1, + 2 + ], + "fields": [ + { + "id": 1, + "name": "x", + "required": true, + "type": "long" + }, + { + "id": 2, + "name": "y", + "required": true, + "type": "long", + "doc": "comment" + }, + { + "id": 3, + "name": "z", + "required": true, + "type": "long" + } + ] + } + ], + "default-spec-id": 0, + "partition-specs": [ + { + "spec-id": 0, + "fields": [ + { + "name": "x", + "transform": "identity", + "source-id": 1, + "field-id": 1000 + } + ] + } + ], + "last-partition-id": 1000, + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + { + "transform": "identity", + "source-id": 2, + "direction": "asc", + "null-order": "nulls-first" + }, + { + "transform": "bucket[4]", + "source-id": 3, + "direction": "desc", + "null-order": "nulls-last" + } + ] + } + ], + "properties": {}, + "current-snapshot-id": 3067729675574597004, + "snapshots": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "sequence-number": 0, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/1.avro" + }, + { + "snapshot-id": 3055729675574597004, + "parent-snapshot-id": 3051729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/2.avro", + "schema-id": 1 + }, + { + "snapshot-id": 3059729675574597004, + "parent-snapshot-id": 3055729675574597004, + "timestamp-ms": 1595100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/3.avro", + "schema-id": 1 + }, + { + "snapshot-id": 3063729675574597004, + "parent-snapshot-id": 3059729675574597004, + "timestamp-ms": 1635100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/4.avro", + "schema-id": 1 + }, + { + "snapshot-id": 3067729675574597004, + "parent-snapshot-id": 3063729675574597004, + "timestamp-ms": 1675100955770, + "sequence-number": 1, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/5.avro", + "schema-id": 1 + } + ], + "snapshot-log": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770 + }, + { + "snapshot-id": 3055729675574597004, + "timestamp-ms": 1555100955770 + }, + { + "snapshot-id": 3059729675574597004, + "timestamp-ms": 1595100955770 + }, + { + "snapshot-id": 3063729675574597004, + "timestamp-ms": 1635100955770 + }, + { + "snapshot-id": 3067729675574597004, + "timestamp-ms": 1675100955770 + } + ], + "metadata-log": [] +} \ No newline at end of file diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index 26ad2d8b3e..57c34b04ed 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -27,6 +27,7 @@ rust-version = { workspace = true } [dependencies] arrow-array = { workspace = true } arrow-schema = { workspace = true } +chrono = { workspace = true } ctor = { workspace = true } datafusion = { workspace = true } futures = { workspace = true } diff --git a/crates/integration_tests/tests/shared_tests/mod.rs b/crates/integration_tests/tests/shared_tests/mod.rs index feb1c4e585..3100009200 100644 --- a/crates/integration_tests/tests/shared_tests/mod.rs +++ b/crates/integration_tests/tests/shared_tests/mod.rs @@ -30,6 +30,7 @@ mod datafusion; mod read_evolved_schema; mod read_positional_deletes; mod scan_all_type; +mod remove_snapshots_test; pub async fn random_ns() -> Namespace { let fixture = get_shared_containers(); diff --git a/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs b/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs new file mode 100644 index 0000000000..2157543b6b --- /dev/null +++ b/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for rest catalog. + +use std::sync::Arc; + +use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; +use futures::TryStreamExt; +use iceberg::transaction::Transaction; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, TableCreation}; +use iceberg_catalog_rest::RestCatalog; +use parquet::arrow::arrow_reader::ArrowReaderOptions; +use parquet::file::properties::WriterProperties; + +use crate::get_shared_containers; +use crate::shared_tests::{random_ns, test_schema}; + +#[tokio::test] +async fn test_expire_snapshots_by_count() { + let fixture = get_shared_containers(); + let rest_catalog = RestCatalog::new(fixture.catalog_config.clone()); + let ns = random_ns().await; + let schema = test_schema(); + + let table_creation = TableCreation::builder() + .name("t1".to_string()) + .schema(schema.clone()) + .build(); + + let mut table = rest_catalog + .create_table(ns.name(), table_creation) + .await + .unwrap(); + + // Create the writer and write the data + let schema: Arc = Arc::new( + table + .metadata() + .current_schema() + .as_ref() + .try_into() + .unwrap(), + ); + let location_generator = DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_generator = DefaultFileNameGenerator::new( + "test".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let parquet_writer_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + table.file_io().clone(), + location_generator.clone(), + file_name_generator.clone(), + ); + let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None); + let mut data_file_writer = data_file_writer_builder.build().await.unwrap(); + let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); + let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); + let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) + .unwrap(); + data_file_writer.write(batch.clone()).await.unwrap(); + let data_file = data_file_writer.close().await.unwrap(); + + // check parquet file schema + let content = table + .file_io() + .new_input(data_file[0].file_path()) + .unwrap() + .read() + .await + .unwrap(); + let parquet_reader = parquet::arrow::arrow_reader::ArrowReaderMetadata::load( + &content, + ArrowReaderOptions::default(), + ) + .unwrap(); + let field_ids: Vec = parquet_reader + .parquet_schema() + .columns() + .iter() + .map(|col| col.self_type().get_basic_info().id()) + .collect(); + assert_eq!(field_ids, vec![1, 2, 3]); + + // commit result + + for _ in 0..10 { + let tx = Transaction::new(&table); + let mut append_action = tx.fast_append(None, vec![]).unwrap(); + append_action.add_data_files(data_file.clone()).unwrap(); + let tx = append_action.apply().await.unwrap(); + table = tx.commit(&rest_catalog).await.unwrap(); + } + + // check snapshot count + let snapshot_counts = table.metadata().snapshots().count(); + assert_eq!(10, snapshot_counts); + + let tx = Transaction::new(&table); + let now = chrono::Utc::now().timestamp_millis(); + let remove_action = tx + .expire_snapshot() + .retain_last(5) + .expire_older_than(now) + .apply() + .await + .unwrap(); + // assert_eq!(5, remove_action.updates().len()); + let t = remove_action.commit(&rest_catalog).await.unwrap(); + assert_eq!(5, t.metadata().snapshots().count()); +} From beaa0bcc0ce8eadeed17484705c8b0cb71dbc000 Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 12 Mar 2025 21:30:56 +0800 Subject: [PATCH 07/11] fix(iceberg): fix integration-test --- crates/iceberg/src/remove_snapshots.rs | 15 +++++++++------ .../tests/shared_tests/remove_snapshots_test.rs | 1 - 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/remove_snapshots.rs b/crates/iceberg/src/remove_snapshots.rs index ffea95943e..6d7b0b86f7 100644 --- a/crates/iceberg/src/remove_snapshots.rs +++ b/crates/iceberg/src/remove_snapshots.rs @@ -163,12 +163,15 @@ impl<'a> RemoveSnapshotAction<'a> { } if !snapshot_to_remove.is_empty() { - self.tx.apply( - vec![TableUpdate::RemoveSnapshots { - snapshot_ids: snapshot_to_remove, - }], - vec![], - )?; + // TODO: batch remove when server supports it + for snapshot_id in snapshot_to_remove { + self.tx.apply( + vec![TableUpdate::RemoveSnapshots { + snapshot_ids: vec![snapshot_id], + }], + vec![], + )?; + } } if self.clear_expired_meta_data { diff --git a/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs b/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs index 56439813d3..f9ffbff8a8 100644 --- a/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs +++ b/crates/integration_tests/tests/shared_tests/remove_snapshots_test.rs @@ -132,7 +132,6 @@ async fn test_expire_snapshots_by_count() { .apply() .await .unwrap(); - // assert_eq!(5, remove_action.updates().len()); let t = remove_action.commit(&rest_catalog).await.unwrap(); assert_eq!(5, t.metadata().snapshots().count()); } From 8df508bb459794cfa5f3735378c965022404e420 Mon Sep 17 00:00:00 2001 From: Li0k Date: Wed, 12 Mar 2025 21:35:28 +0800 Subject: [PATCH 08/11] typo --- crates/integration_tests/tests/shared_tests/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/integration_tests/tests/shared_tests/mod.rs b/crates/integration_tests/tests/shared_tests/mod.rs index 3100009200..4bab534ee6 100644 --- a/crates/integration_tests/tests/shared_tests/mod.rs +++ b/crates/integration_tests/tests/shared_tests/mod.rs @@ -29,8 +29,8 @@ mod conflict_commit_test; mod datafusion; mod read_evolved_schema; mod read_positional_deletes; -mod scan_all_type; mod remove_snapshots_test; +mod scan_all_type; pub async fn random_ns() -> Namespace { let fixture = get_shared_containers(); From 38eb391d162003cbbb419a4452bf9ab612cc15c0 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 13 Mar 2025 13:47:23 +0800 Subject: [PATCH 09/11] typo --- crates/iceberg/src/remove_snapshots.rs | 18 +++++++++--------- crates/iceberg/src/spec/snapshot.rs | 5 +---- crates/iceberg/src/transaction.rs | 14 ++++---------- 3 files changed, 14 insertions(+), 23 deletions(-) diff --git a/crates/iceberg/src/remove_snapshots.rs b/crates/iceberg/src/remove_snapshots.rs index 6d7b0b86f7..d9581c7beb 100644 --- a/crates/iceberg/src/remove_snapshots.rs +++ b/crates/iceberg/src/remove_snapshots.rs @@ -18,7 +18,6 @@ //! Transaction action for removing snapshot. use std::collections::{HashMap, HashSet}; -use std::i32; use itertools::Itertools; @@ -131,14 +130,15 @@ impl<'a> RemoveSnapshotAction<'a> { } for id_to_remove in &self.ids_to_remove { - let refs_for_id = retained_id_to_refs.get(id_to_remove); - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot remove snapshot {:?} with retained references: {:?}", - id_to_remove, refs_for_id - ), - )); + if let Some(refs_for_id) = retained_id_to_refs.get(id_to_remove) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot remove snapshot {:?} with retained references: {:?}", + id_to_remove, refs_for_id + ), + )); + } } ids_to_retain.extend(self.compute_all_branch_snapshots_to_retain(table_meta.refs.values())); diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index bbc3b0b5b1..530a0c1db6 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -420,10 +420,7 @@ impl<'a> Iterator for AncestorIterator<'a> { } /// Returns an iterator over the ancestors of a snapshot. -pub fn ancestors_of<'a>( - snapshot: SnapshotRef, - table_metadata: &'a TableMetadata, -) -> AncestorIterator<'a> { +pub fn ancestors_of(snapshot: SnapshotRef, table_metadata: &TableMetadata) -> AncestorIterator<'_> { AncestorIterator { current: Some(snapshot), table_metadata, diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 70c462b8a2..b588525e24 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -1151,11 +1151,8 @@ mod tests { let tx = Transaction::new(&table); let tx = tx.expire_snapshot().apply().await.unwrap(); for update in &tx.updates { - match update { - TableUpdate::RemoveSnapshots { snapshot_ids } => { - assert_eq!(4, snapshot_ids.len()); - } - _ => {} + if let TableUpdate::RemoveSnapshots { snapshot_ids } = update { + assert_eq!(4, snapshot_ids.len()); } } @@ -1177,11 +1174,8 @@ mod tests { let tx = Transaction::new(&table); let tx = tx.expire_snapshot().retain_last(2).apply().await.unwrap(); for update in &tx.updates { - match update { - TableUpdate::RemoveSnapshots { snapshot_ids } => { - assert_eq!(3, snapshot_ids.len()); - } - _ => {} + if let TableUpdate::RemoveSnapshots { snapshot_ids } = update { + assert_eq!(3, snapshot_ids.len()); } } From 2d7215d21944ee80b25bfd7bf08e98185f095dc2 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 13 Mar 2025 14:03:12 +0800 Subject: [PATCH 10/11] typo --- crates/iceberg/src/lib.rs | 2 +- crates/iceberg/src/remove_snapshots.rs | 10 ++++++---- crates/iceberg/src/transaction.rs | 14 +++----------- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 783f4b170f..50d31304fa 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -52,7 +52,7 @@ //! ``` #![deny(missing_docs)] -#![feature(anonymous_lifetime_in_impl_trait)] +// #![feature(anonymous_lifetime_in_impl_trait)] #[macro_use] extern crate derive_builder; diff --git a/crates/iceberg/src/remove_snapshots.rs b/crates/iceberg/src/remove_snapshots.rs index d9581c7beb..6de6d1b42d 100644 --- a/crates/iceberg/src/remove_snapshots.rs +++ b/crates/iceberg/src/remove_snapshots.rs @@ -141,8 +141,10 @@ impl<'a> RemoveSnapshotAction<'a> { } } - ids_to_retain.extend(self.compute_all_branch_snapshots_to_retain(table_meta.refs.values())); - ids_to_retain.extend(self.unreferenced_snapshots_to_retain(table_meta.refs.values())); + ids_to_retain + .extend(self.compute_all_branch_snapshots_to_retain(table_meta.refs.values().cloned())); + ids_to_retain + .extend(self.unreferenced_snapshots_to_retain(table_meta.refs.values().cloned())); for ref_name in table_meta.refs.keys() { if !retained_refs.contains_key(ref_name) { @@ -294,7 +296,7 @@ impl<'a> RemoveSnapshotAction<'a> { fn compute_all_branch_snapshots_to_retain( &self, - refs: impl Iterator, + refs: impl Iterator, ) -> HashSet { let mut branch_snapshots_to_retain = HashSet::new(); for snapshot_ref in refs { @@ -362,7 +364,7 @@ impl<'a> RemoveSnapshotAction<'a> { fn unreferenced_snapshots_to_retain( &self, - refs: impl Iterator, + refs: impl Iterator, ) -> HashSet { let mut ids_to_retain = HashSet::new(); let mut referenced_snapshots = HashSet::new(); diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index b588525e24..9d63cf9bcc 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -1150,11 +1150,7 @@ mod tests { { let tx = Transaction::new(&table); let tx = tx.expire_snapshot().apply().await.unwrap(); - for update in &tx.updates { - if let TableUpdate::RemoveSnapshots { snapshot_ids } = update { - assert_eq!(4, snapshot_ids.len()); - } - } + assert_eq!(4, tx.updates.len()); assert_eq!( vec![ @@ -1173,11 +1169,7 @@ mod tests { { let tx = Transaction::new(&table); let tx = tx.expire_snapshot().retain_last(2).apply().await.unwrap(); - for update in &tx.updates { - if let TableUpdate::RemoveSnapshots { snapshot_ids } = update { - assert_eq!(3, snapshot_ids.len()); - } - } + assert_eq!(3, tx.updates.len()); assert_eq!( vec![ @@ -1228,7 +1220,7 @@ mod tests { .err() .unwrap(); assert_eq!( - "DataInvalid => Cannot remove snapshot 3067729675574597004 with retained references: Some([\"main\"])", + "DataInvalid => Cannot remove snapshot 3067729675574597004 with retained references: [\"main\"]", err.to_string() ) } From c4517bea3d6339fa5507ac73657f4156837ece68 Mon Sep 17 00:00:00 2001 From: Li0k Date: Thu, 13 Mar 2025 14:07:41 +0800 Subject: [PATCH 11/11] typo --- crates/iceberg/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 50d31304fa..d24f3d4cfe 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -52,7 +52,6 @@ //! ``` #![deny(missing_docs)] -// #![feature(anonymous_lifetime_in_impl_trait)] #[macro_use] extern crate derive_builder;