diff --git a/Cargo.lock b/Cargo.lock index 0bf3efa66f..73ebf15e01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -419,6 +419,12 @@ dependencies = [ "regex-syntax 0.8.5", ] +[[package]] +name = "as-any" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0f477b951e452a0b6b4a10b53ccd569042d1d01729b519e02074a9c0958a063" + [[package]] name = "assert-json-diff" version = "2.0.2" @@ -3488,6 +3494,7 @@ dependencies = [ "arrow-schema", "arrow-select", "arrow-string", + "as-any", "async-std", "async-trait", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index cae0c85cb1..43d0352003 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,6 +50,7 @@ arrow-ord = { version = "55" } arrow-schema = { version = "55" } arrow-select = { version = "55" } arrow-string = { version = "55" } +as-any = "0.3.2" async-std = "1.12" async-trait = "0.1.88" aws-config = "1.6.1" diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 8518190699..3d1f7cf83c 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -802,7 +802,7 @@ mod tests { SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type, UnboundPartitionField, UnboundPartitionSpec, }; - use iceberg::transaction::Transaction; + use iceberg::transaction::{ApplyTransactionAction, Transaction}; use mockito::{Mock, Server, ServerGuard}; use serde_json::json; use uuid::uuid; @@ -2093,6 +2093,17 @@ mod tests { let config_mock = create_config_mock(&mut server).await; + let load_table_mock = server + .mock("GET", "/v1/namespaces/ns1/tables/test1") + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + "load_table_response.json" + )) + .create_async() + .await; + let update_table_mock = server .mock("POST", "/v1/namespaces/ns1/tables/test1") .with_status(200) @@ -2125,8 +2136,11 @@ mod tests { .unwrap() }; - let table = Transaction::new(&table1) - .upgrade_table_version(FormatVersion::V2) + let tx = Transaction::new(&table1); + let table = tx + .upgrade_table_version() + .set_format_version(FormatVersion::V2) + .apply(tx) .unwrap() .commit(&catalog) .await @@ -2204,6 +2218,7 @@ mod tests { config_mock.assert_async().await; update_table_mock.assert_async().await; + load_table_mock.assert_async().await } #[tokio::test] @@ -2212,6 +2227,17 @@ mod tests { let config_mock = create_config_mock(&mut server).await; + let load_table_mock = server + .mock("GET", "/v1/namespaces/ns1/tables/test1") + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + "load_table_response.json" + )) + .create_async() + .await; + let update_table_mock = server .mock("POST", "/v1/namespaces/ns1/tables/test1") .with_status(404) @@ -2250,8 +2276,11 @@ mod tests { .unwrap() }; - let table_result = Transaction::new(&table1) - .upgrade_table_version(FormatVersion::V2) + let tx = Transaction::new(&table1); + let table_result = tx + .upgrade_table_version() + .set_format_version(FormatVersion::V2) + .apply(tx) .unwrap() .commit(&catalog) .await; @@ -2267,5 +2296,6 @@ mod tests { config_mock.assert_async().await; update_table_mock.assert_async().await; + load_table_mock.assert_async().await; } } diff --git a/crates/catalog/rest/tests/rest_catalog_test.rs b/crates/catalog/rest/tests/rest_catalog_test.rs index ab7ea3d62c..94d2e3927c 100644 --- a/crates/catalog/rest/tests/rest_catalog_test.rs +++ b/crates/catalog/rest/tests/rest_catalog_test.rs @@ -23,7 +23,7 @@ use std::sync::RwLock; use ctor::{ctor, dtor}; use iceberg::spec::{FormatVersion, NestedField, PrimitiveType, Schema, Type}; -use iceberg::transaction::Transaction; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::{Catalog, Namespace, NamespaceIdent, TableCreation, TableIdent}; use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig}; use iceberg_test_utils::docker::DockerCompose; @@ -346,9 +346,12 @@ async fn test_update_table() { &TableIdent::new(ns.name().clone(), "t1".to_string()) ); + let tx = Transaction::new(&table); // Update table by committing transaction - let table2 = Transaction::new(&table) - .set_properties(HashMap::from([("prop1".to_string(), "v1".to_string())])) + let table2 = tx + .update_table_properties() + .set("prop1".to_string(), "v1".to_string()) + .apply(tx) .unwrap() .commit(&catalog) .await diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index d1ddc82463..9f5d490128 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -54,6 +54,7 @@ arrow-ord = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } arrow-string = { workspace = true } +as-any = { workspace = true } async-std = { workspace = true, optional = true, features = ["attributes"] } async-trait = { workspace = true } base64 = { workspace = true } diff --git a/crates/iceberg/src/transaction/action.rs b/crates/iceberg/src/transaction/action.rs index 51f54c4ce9..aa0a05d0d9 100644 --- a/crates/iceberg/src/transaction/action.rs +++ b/crates/iceberg/src/transaction/action.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -#![allow(dead_code)] use std::mem::take; use std::sync::Arc; +use as_any::AsAny; use async_trait::async_trait; use crate::table::Table; @@ -26,7 +26,7 @@ use crate::transaction::Transaction; use crate::{Result, TableRequirement, TableUpdate}; /// A boxed, thread-safe reference to a `TransactionAction`. -pub type BoxedTransactionAction = Arc; +pub(crate) type BoxedTransactionAction = Arc; /// A trait representing an atomic action that can be part of a transaction. /// @@ -34,7 +34,7 @@ pub type BoxedTransactionAction = Arc; /// Each action is responsible for generating the updates and requirements needed /// to modify the table metadata. #[async_trait] -pub(crate) trait TransactionAction: Sync + Send { +pub(crate) trait TransactionAction: AsAny + Sync + Send { /// Commits this action against the provided table and returns the resulting updates. /// NOTE: This function is intended for internal use only and should not be called directly by users. /// @@ -108,6 +108,7 @@ mod tests { use std::str::FromStr; use std::sync::Arc; + use as_any::Downcast; use async_trait::async_trait; use uuid::Uuid; @@ -158,9 +159,12 @@ mod tests { let tx = Transaction::new(&table); let updated_tx = action.apply(tx).unwrap(); - // There should be one action in the transaction now assert_eq!(updated_tx.actions.len(), 1); + + (*updated_tx.actions[0]) + .downcast_ref::() + .expect("TestAction was not applied to Transaction!"); } #[test] diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index a592258a0d..fc2edcf996 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -17,26 +17,32 @@ //! This module contains transaction api. +/// The `ApplyTransactionAction` trait provides an `apply` method +/// that allows users to apply a transaction action to a `Transaction`. mod action; +pub use action::*; mod append; mod snapshot; mod sort_order; +mod update_location; +mod update_properties; +mod upgrade_format_version; -use std::cmp::Ordering; use std::collections::HashMap; use std::mem::discriminant; use std::sync::Arc; use uuid::Uuid; -use crate::TableUpdate::UpgradeFormatVersion; use crate::error::Result; -use crate::spec::FormatVersion; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; use crate::transaction::sort_order::ReplaceSortOrderAction; -use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; +use crate::transaction::update_location::UpdateLocationAction; +use crate::transaction::update_properties::UpdatePropertiesAction; +use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction; +use crate::{Catalog, TableCommit, TableRequirement, TableUpdate}; /// Table transaction. pub struct Transaction { @@ -104,32 +110,13 @@ impl Transaction { } /// Sets table to a new version. - pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result { - let current_version = self.current_table.metadata().format_version(); - match current_version.cmp(&format_version) { - Ordering::Greater => { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot downgrade table version from {} to {}", - current_version, format_version - ), - )); - } - Ordering::Less => { - self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?; - } - Ordering::Equal => { - // Do nothing. - } - } - Ok(self) + pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction { + UpgradeFormatVersionAction::new() } /// Update table's property. - pub fn set_properties(mut self, props: HashMap) -> Result { - self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?; - Ok(self) + pub fn update_table_properties(&self) -> UpdatePropertiesAction { + UpdatePropertiesAction::new() } fn generate_unique_snapshot_id(&self) -> i64 { @@ -178,27 +165,48 @@ impl Transaction { } } - /// Remove properties in table. - pub fn remove_properties(mut self, keys: Vec) -> Result { - self.apply( - vec![TableUpdate::RemoveProperties { removals: keys }], - vec![], - )?; - Ok(self) - } - /// Set the location of table - pub fn set_location(mut self, location: String) -> Result { - self.apply(vec![TableUpdate::SetLocation { location }], vec![])?; - Ok(self) + pub fn update_location(&self) -> UpdateLocationAction { + UpdateLocationAction::new() } /// Commit transaction. - pub async fn commit(self, catalog: &dyn Catalog) -> Result { + pub async fn commit(mut self, catalog: &dyn Catalog) -> Result
{ + if self.actions.is_empty() && self.updates.is_empty() { + // nothing to commit + return Ok(self.base_table.clone()); + } + + self.do_commit(catalog).await + } + + async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result
{ + let base_table_identifier = self.base_table.identifier().to_owned(); + + let refreshed = catalog.load_table(&base_table_identifier.clone()).await?; + + if self.base_table.metadata() != refreshed.metadata() + || self.base_table.metadata_location() != refreshed.metadata_location() + { + // current base is stale, use refreshed as base and re-apply transaction actions + self.base_table = refreshed.clone(); + } + + let current_table = self.base_table.clone(); + + for action in self.actions.clone() { + let mut action_commit = action.commit(¤t_table).await?; + // apply changes to current_table + self.apply( + action_commit.take_updates(), + action_commit.take_requirements(), + )?; + } + let table_commit = TableCommit::builder() - .ident(self.base_table.identifier().clone()) - .updates(self.updates) - .requirements(self.requirements) + .ident(base_table_identifier) + .updates(self.updates.clone()) + .requirements(self.requirements.clone()) .build(); catalog.update_table(table_commit).await @@ -207,17 +215,15 @@ impl Transaction { #[cfg(test)] mod tests { - use std::collections::HashMap; use std::fs::File; use std::io::BufReader; + use crate::TableIdent; use crate::io::FileIOBuilder; - use crate::spec::{FormatVersion, TableMetadata}; + use crate::spec::TableMetadata; use crate::table::Table; - use crate::transaction::Transaction; - use crate::{TableIdent, TableUpdate}; - fn make_v1_table() -> Table { + pub fn make_v1_table() -> Table { let file = File::open(format!( "{}/testdata/table_metadata/{}", env!("CARGO_MANIFEST_DIR"), @@ -273,109 +279,4 @@ mod tests { .build() .unwrap() } - - #[test] - fn test_upgrade_table_version_v1_to_v2() { - let table = make_v1_table(); - let tx = Transaction::new(&table); - let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); - - assert_eq!( - vec![TableUpdate::UpgradeFormatVersion { - format_version: FormatVersion::V2 - }], - tx.updates - ); - } - - #[test] - fn test_upgrade_table_version_v2_to_v2() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); - - assert!( - tx.updates.is_empty(), - "Upgrade table to same version should not generate any updates" - ); - assert!( - tx.requirements.is_empty(), - "Upgrade table to same version should not generate any requirements" - ); - } - - #[test] - fn test_downgrade_table_version() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx.upgrade_table_version(FormatVersion::V1); - - assert!(tx.is_err(), "Downgrade table version should fail!"); - } - - #[test] - fn test_set_table_property() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx - .set_properties(HashMap::from([("a".to_string(), "b".to_string())])) - .unwrap(); - - assert_eq!( - vec![TableUpdate::SetProperties { - updates: HashMap::from([("a".to_string(), "b".to_string())]) - }], - tx.updates - ); - } - - #[test] - fn test_remove_property() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx - .remove_properties(vec!["a".to_string(), "b".to_string()]) - .unwrap(); - - assert_eq!( - vec![TableUpdate::RemoveProperties { - removals: vec!["a".to_string(), "b".to_string()] - }], - tx.updates - ); - } - - #[test] - fn test_set_location() { - let table = make_v2_table(); - let tx = Transaction::new(&table); - let tx = tx - .set_location(String::from("s3://bucket/prefix/new_table")) - .unwrap(); - - assert_eq!( - vec![TableUpdate::SetLocation { - location: String::from("s3://bucket/prefix/new_table") - }], - tx.updates - ) - } - - #[tokio::test] - async fn test_transaction_apply_upgrade() { - let table = make_v1_table(); - let tx = Transaction::new(&table); - // Upgrade v1 to v1, do nothing. - let tx = tx.upgrade_table_version(FormatVersion::V1).unwrap(); - // Upgrade v1 to v2, success. - let tx = tx.upgrade_table_version(FormatVersion::V2).unwrap(); - assert_eq!( - vec![TableUpdate::UpgradeFormatVersion { - format_version: FormatVersion::V2 - }], - tx.updates - ); - // Upgrade v2 to v1, return error. - assert!(tx.upgrade_table_version(FormatVersion::V1).is_err()); - } } diff --git a/crates/iceberg/src/transaction/update_location.rs b/crates/iceberg/src/transaction/update_location.rs new file mode 100644 index 0000000000..0c32c75355 --- /dev/null +++ b/crates/iceberg/src/transaction/update_location.rs @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, Result, TableUpdate}; + +/// A transaction action that sets or updates the location of a table. +/// +/// This action is used to explicitly set a new metadata location during a transaction, +/// typically as part of advanced commit or recovery flows. The location is optional until +/// explicitly set via [`set_location`]. +pub struct UpdateLocationAction { + location: Option, +} + +impl UpdateLocationAction { + /// Creates a new [`UpdateLocationAction`] with no location set. + pub fn new() -> Self { + UpdateLocationAction { location: None } + } + + /// Sets the target location for this action and returns the updated instance. + /// + /// # Arguments + /// + /// * `location` - A string representing the table's location. + /// + /// # Returns + /// + /// The [`UpdateLocationAction`] with the new location set. + pub fn set_location(mut self, location: String) -> Self { + self.location = Some(location); + self + } +} + +impl Default for UpdateLocationAction { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl TransactionAction for UpdateLocationAction { + async fn commit(self: Arc, _table: &Table) -> Result { + let updates: Vec; + if let Some(location) = self.location.clone() { + updates = vec![TableUpdate::SetLocation { location }]; + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Location is not set for UpdateLocationAction!", + )); + } + + Ok(ActionCommit::new(updates, vec![])) + } +} + +#[cfg(test)] +mod tests { + use as_any::Downcast; + + use crate::transaction::Transaction; + use crate::transaction::action::ApplyTransactionAction; + use crate::transaction::tests::make_v2_table; + use crate::transaction::update_location::UpdateLocationAction; + + #[test] + fn test_set_location() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx + .update_location() + .set_location(String::from("s3://bucket/prefix/new_table")) + .apply(tx) + .unwrap(); + + assert_eq!(tx.actions.len(), 1); + + let action = (*tx.actions[0]) + .downcast_ref::() + .unwrap(); + + assert_eq!( + action.location, + Some(String::from("s3://bucket/prefix/new_table")) + ) + } +} diff --git a/crates/iceberg/src/transaction/update_properties.rs b/crates/iceberg/src/transaction/update_properties.rs new file mode 100644 index 0000000000..825d6bec8d --- /dev/null +++ b/crates/iceberg/src/transaction/update_properties.rs @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, Result, TableUpdate}; + +/// A transactional action that updates or removes table properties +/// +/// This action is used to modify key-value pairs in a table's metadata +/// properties during a transaction. It supports setting new values for existing keys +/// or adding new keys, as well as removing existing keys. Each key can only be updated +/// or removed in a single action, not both. +pub struct UpdatePropertiesAction { + updates: HashMap, + removals: HashSet, +} + +impl UpdatePropertiesAction { + /// Creates a new [`UpdatePropertiesAction`] with no updates or removals. + pub fn new() -> Self { + UpdatePropertiesAction { + updates: HashMap::default(), + removals: HashSet::default(), + } + } + + /// Adds a key-value pair to the update set of this action. + /// + /// # Arguments + /// + /// * `key` - The property key to update. + /// * `value` - The new value to associate with the key. + /// + /// # Returns + /// + /// The updated [`UpdatePropertiesAction`] with the key-value pair added to the update set. + pub fn set(mut self, key: String, value: String) -> Self { + self.updates.insert(key, value); + self + } + + /// Adds a key to the removal set of this action. + /// + /// # Arguments + /// + /// * `key` - The property key to remove. + /// + /// # Returns + /// + /// The updated [`UpdatePropertiesAction`] with the key added to the removal set. + pub fn remove(mut self, key: String) -> Self { + self.removals.insert(key); + self + } +} + +impl Default for UpdatePropertiesAction { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl TransactionAction for UpdatePropertiesAction { + async fn commit(self: Arc, _table: &Table) -> Result { + if let Some(overlapping_key) = self.removals.iter().find(|k| self.updates.contains_key(*k)) + { + return Err(Error::new( + ErrorKind::PreconditionFailed, + format!( + "Key {} is present in both removal set and update set", + overlapping_key + ), + )); + } + + let updates: Vec = vec![ + TableUpdate::SetProperties { + updates: self.updates.clone(), + }, + TableUpdate::RemoveProperties { + removals: self.removals.clone().into_iter().collect::>(), + }, + ]; + + Ok(ActionCommit::new(updates, vec![])) + } +} + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + + use as_any::Downcast; + + use crate::transaction::Transaction; + use crate::transaction::action::ApplyTransactionAction; + use crate::transaction::tests::make_v2_table; + use crate::transaction::update_properties::UpdatePropertiesAction; + + #[test] + fn test_update_table_property() { + let table = make_v2_table(); + let tx = Transaction::new(&table); + let tx = tx + .update_table_properties() + .set("a".to_string(), "b".to_string()) + .remove("b".to_string()) + .apply(tx) + .unwrap(); + + assert_eq!(tx.actions.len(), 1); + + let action = (*tx.actions[0]) + .downcast_ref::() + .unwrap(); + assert_eq!( + action.updates, + HashMap::from([("a".to_string(), "b".to_string())]) + ); + + assert_eq!(action.removals, HashSet::from(["b".to_string()])); + } +} diff --git a/crates/iceberg/src/transaction/upgrade_format_version.rs b/crates/iceberg/src/transaction/upgrade_format_version.rs new file mode 100644 index 0000000000..ff15926d00 --- /dev/null +++ b/crates/iceberg/src/transaction/upgrade_format_version.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::TableUpdate::UpgradeFormatVersion; +use crate::spec::FormatVersion; +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, Result}; + +/// A transaction action to upgrade a table's format version. +/// +/// This action is used within a transaction to indicate that the +/// table's format version should be upgraded to a specified version. +/// The location remains optional until explicitly set via [`set_format_version`]. +pub struct UpgradeFormatVersionAction { + format_version: Option, +} + +impl UpgradeFormatVersionAction { + /// Creates a new `UpgradeFormatVersionAction` with no version set. + pub fn new() -> Self { + UpgradeFormatVersionAction { + format_version: None, + } + } + + /// Sets the target format version for the upgrade. + /// + /// # Arguments + /// + /// * `format_version` - The version to upgrade the table format to. + /// + /// # Returns + /// + /// Returns the updated `UpgradeFormatVersionAction` with the format version set. + pub fn set_format_version(mut self, format_version: FormatVersion) -> Self { + self.format_version = Some(format_version); + self + } +} + +impl Default for UpgradeFormatVersionAction { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl TransactionAction for UpgradeFormatVersionAction { + async fn commit(self: Arc, _table: &Table) -> Result { + let format_version = self.format_version.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "FormatVersion is not set for UpgradeFormatVersionAction!", + ) + })?; + + Ok(ActionCommit::new( + vec![UpgradeFormatVersion { format_version }], + vec![], + )) + } +} + +#[cfg(test)] +mod tests { + use as_any::Downcast; + + use crate::spec::FormatVersion; + use crate::transaction::Transaction; + use crate::transaction::action::ApplyTransactionAction; + use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction; + + #[test] + fn test_upgrade_format_version() { + let table = crate::transaction::tests::make_v1_table(); + let tx = Transaction::new(&table); + let tx = tx + .upgrade_table_version() + .set_format_version(FormatVersion::V2) + .apply(tx) + .unwrap(); + + assert_eq!(tx.actions.len(), 1); + + let action = (*tx.actions[0]) + .downcast_ref::() + .unwrap(); + + assert_eq!(action.format_version, Some(FormatVersion::V2)); + } +}