Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 91 additions & 1 deletion crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt::{Debug, Display};
use std::future::Future;
use std::mem::take;
use std::ops::Deref;
use std::sync::Arc;

use _serde::deserialize_snapshot;
use async_trait::async_trait;
Expand Down Expand Up @@ -313,6 +314,27 @@ impl TableCommit {
pub fn take_updates(&mut self) -> Vec<TableUpdate> {
take(&mut self.updates)
}

/// Applies this [`TableCommit`] to the given [`Table`] as part of a catalog update.
/// Typically used by [`Catalog::update_table`] to validate requirements and apply metadata updates.
///
/// Returns a new [`Table`] with updated metadata,
/// or an error if validation or application fails.
pub fn apply(self, table: Table) -> Result<Table> {
// check requirements
for requirement in self.requirements {
requirement.check(Some(table.metadata()))?;
}

// apply updates to metadata builder
let mut metadata_builder = table.metadata().clone().into_builder(None);

for update in self.updates {
metadata_builder = update.apply(metadata_builder)?;
}

Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
}
}

/// TableRequirement represents a requirement for a table in the catalog.
Expand Down Expand Up @@ -884,12 +906,15 @@ mod _serde_set_statistics {
mod tests {
use std::collections::HashMap;
use std::fmt::Debug;
use std::fs::File;
use std::io::BufReader;

use serde::Serialize;
use serde::de::DeserializeOwned;
use uuid::uuid;

use super::ViewUpdate;
use crate::io::FileIOBuilder;
use crate::spec::{
BlobMetadata, FormatVersion, MAIN_BRANCH, NestedField, NullOrder, Operation,
PartitionStatisticsFile, PrimitiveType, Schema, Snapshot, SnapshotReference,
Expand All @@ -898,7 +923,10 @@ mod tests {
UnboundPartitionSpec, ViewFormatVersion, ViewRepresentation, ViewRepresentations,
ViewVersion,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate};
use crate::table::Table;
use crate::{
NamespaceIdent, TableCommit, TableCreation, TableIdent, TableRequirement, TableUpdate,
};

#[test]
fn test_parent_namespace() {
Expand Down Expand Up @@ -2111,4 +2139,66 @@ mod tests {
},
);
}

#[test]
fn test_table_commit() {
let table = {
let file = File::open(format!(
"{}/testdata/table_metadata/{}",
env!("CARGO_MANIFEST_DIR"),
"TableMetadataV2Valid.json"
))
.unwrap();
let reader = BufReader::new(file);
let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();

Table::builder()
.metadata(resp)
.metadata_location("s3://bucket/test/location/metadata/v2.json".to_string())
.identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
.file_io(FileIOBuilder::new("memory").build().unwrap())
.build()
.unwrap()
};

let updates = vec![
TableUpdate::SetLocation {
location: "s3://bucket/test/new_location/metadata/v2.json".to_string(),
},
TableUpdate::SetProperties {
updates: vec![
("prop1".to_string(), "v1".to_string()),
("prop2".to_string(), "v2".to_string()),
]
.into_iter()
.collect(),
},
];

let requirements = vec![TableRequirement::UuidMatch {
uuid: table.metadata().table_uuid,
}];

let table_commit = TableCommit::builder()
.ident(table.identifier().to_owned())
.updates(updates)
.requirements(requirements)
.build();

let updated_table = table_commit.apply(table).unwrap();

assert_eq!(
updated_table.metadata().properties.get("prop1").unwrap(),
"v1"
);
assert_eq!(
updated_table.metadata().properties.get("prop2").unwrap(),
"v2"
);

assert_eq!(
updated_table.metadata().location,
"s3://bucket/test/new_location/metadata/v2.json".to_string()
)
}
}
2 changes: 1 addition & 1 deletion crates/iceberg/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub enum ErrorKind {
/// Iceberg data is invalid.
///
/// This error is returned when we try to read a table from iceberg but
/// failed to parse it's metadata or data file correctly.
/// failed to parse its metadata or data file correctly.
///
/// The table could be invalid or corrupted.
DataInvalid,
Expand Down
Loading