diff --git a/Cargo.lock b/Cargo.lock index 072928abdf..c3e581ccb0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3643,7 +3643,6 @@ dependencies = [ "iceberg", "iceberg_test_utils", "itertools 0.13.0", - "serde_json", "tokio", "typed-builder 0.20.1", "uuid", @@ -3658,7 +3657,6 @@ dependencies = [ "iceberg_test_utils", "itertools 0.13.0", "regex", - "serde_json", "sqlx", "tempfile", "tokio", diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index b0e157030f..f4b4a01f9a 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -395,10 +395,7 @@ impl Catalog for GlueCatalog { .metadata; let metadata_location = create_metadata_location(&location, 0)?; - self.file_io - .new_output(&metadata_location)? - .write(serde_json::to_vec(&metadata)?.into()) - .await?; + metadata.write_to(&self.file_io, &metadata_location).await?; let glue_table = convert_to_glue_table( &table_name, @@ -463,9 +460,7 @@ impl Catalog for GlueCatalog { Some(table) => { let metadata_location = get_metadata_location(&table.parameters)?; - let input_file = self.file_io.new_input(&metadata_location)?; - let metadata_content = input_file.read().await?; - let metadata = serde_json::from_slice::(&metadata_content)?; + let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; Table::builder() .file_io(self.file_io()) diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index d126f2ffa5..72fb8c6b33 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -353,10 +353,7 @@ impl Catalog for HmsCatalog { let metadata_location = create_metadata_location(&location, 0)?; - self.file_io - .new_output(&metadata_location)? - .write(serde_json::to_vec(&metadata)?.into()) - .await?; + metadata.write_to(&self.file_io, &metadata_location).await?; let hive_table = convert_to_hive_table( db_name.clone(), @@ -406,8 +403,7 @@ impl Catalog for HmsCatalog { let metadata_location = get_metadata_location(&hive_table.parameters)?; - let metadata_content = self.file_io.new_input(&metadata_location)?.read().await?; - let metadata = serde_json::from_slice::(&metadata_content)?; + let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; Table::builder() .file_io(self.file_io()) diff --git a/crates/catalog/s3tables/Cargo.toml b/crates/catalog/s3tables/Cargo.toml index 8b6b07493c..0ec1f55f62 100644 --- a/crates/catalog/s3tables/Cargo.toml +++ b/crates/catalog/s3tables/Cargo.toml @@ -34,7 +34,6 @@ async-trait = { workspace = true } aws-config = { workspace = true } aws-sdk-s3tables = "1.10.0" iceberg = { workspace = true } -serde_json = { workspace = true } typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 486b184b10..191356d711 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -334,10 +334,7 @@ impl Catalog for S3TablesCatalog { let metadata = TableMetadataBuilder::from_table_creation(creation)? .build()? .metadata; - self.file_io - .new_output(&metadata_location)? - .write(serde_json::to_vec(&metadata)?.into()) - .await?; + metadata.write_to(&self.file_io, &metadata_location).await?; // update metadata location self.s3tables_client @@ -389,9 +386,7 @@ impl Catalog for S3TablesCatalog { ), ) })?; - let input_file = self.file_io.new_input(metadata_location)?; - let metadata_content = input_file.read().await?; - let metadata = serde_json::from_slice::(&metadata_content)?; + let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?; let table = Table::builder() .identifier(table_ident.clone()) diff --git a/crates/catalog/sql/Cargo.toml b/crates/catalog/sql/Cargo.toml index 3c87924647..b767b68775 100644 --- a/crates/catalog/sql/Cargo.toml +++ b/crates/catalog/sql/Cargo.toml @@ -31,7 +31,6 @@ repository = { workspace = true } [dependencies] async-trait = { workspace = true } iceberg = { workspace = true } -serde_json = { workspace = true } sqlx = { version = "0.8.1", features = ["any"], default-features = false } typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 24c22dbcf8..56c6fadcf1 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -642,9 +642,7 @@ impl Catalog for SqlCatalog { .try_get::(CATALOG_FIELD_METADATA_LOCATION_PROP) .map_err(from_sqlx_error)?; - let file = self.fileio.new_input(&tbl_metadata_location)?; - let metadata_content = file.read().await?; - let metadata = serde_json::from_slice::(&metadata_content)?; + let metadata = TableMetadata::read_from(&self.fileio, &tbl_metadata_location).await?; Ok(Table::builder() .file_io(self.fileio.clone()) @@ -708,8 +706,8 @@ impl Catalog for SqlCatalog { Uuid::new_v4() ); - let file = self.fileio.new_output(&tbl_metadata_location)?; - file.write(serde_json::to_vec(&tbl_metadata)?.into()) + tbl_metadata + .write_to(&self.fileio, &tbl_metadata_location) .await?; self.execute(&format!( diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index c233ab5925..d1d361c7a1 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -210,10 +210,7 @@ impl Catalog for MemoryCatalog { Uuid::new_v4() ); - self.file_io - .new_output(&metadata_location)? - .write(serde_json::to_vec(&metadata)?.into()) - .await?; + metadata.write_to(&self.file_io, &metadata_location).await?; root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?; @@ -230,9 +227,7 @@ impl Catalog for MemoryCatalog { let root_namespace_state = self.root_namespace_state.lock().await; let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; - let input_file = self.file_io.new_input(metadata_location)?; - let metadata_content = input_file.read().await?; - let metadata = serde_json::from_slice::(&metadata_content)?; + let metadata = TableMetadata::read_from(&self.file_io, metadata_location).await?; Table::builder() .file_io(self.file_io.clone()) @@ -284,9 +279,7 @@ impl Catalog for MemoryCatalog { let mut root_namespace_state = self.root_namespace_state.lock().await; root_namespace_state.insert_new_table(&table_ident.clone(), metadata_location.clone())?; - let input_file = self.file_io.new_input(metadata_location.clone())?; - let metadata_content = input_file.read().await?; - let metadata = serde_json::from_slice::(&metadata_content)?; + let metadata = TableMetadata::read_from(&self.file_io, &metadata_location).await?; Table::builder() .file_io(self.file_io.clone()) diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 2604eac03d..0f0854f7fc 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -37,6 +37,7 @@ use super::{ SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType, }; use crate::error::{Result, timestamp_ms_to_utc}; +use crate::io::FileIO; use crate::{Error, ErrorKind}; static MAIN_BRANCH: &str = "main"; @@ -464,6 +465,29 @@ impl TableMetadata { self.encryption_keys.get(key_id) } + /// Read table metadata from the given location. + pub async fn read_from( + file_io: &FileIO, + metadata_location: impl AsRef, + ) -> Result { + let input_file = file_io.new_input(metadata_location)?; + let metadata_content = input_file.read().await?; + let metadata = serde_json::from_slice::(&metadata_content)?; + Ok(metadata) + } + + /// Write table metadata to the given location. + pub async fn write_to( + &self, + file_io: &FileIO, + metadata_location: impl AsRef, + ) -> Result<()> { + file_io + .new_output(metadata_location)? + .write(serde_json::to_vec(self)?.into()) + .await + } + /// Normalize this partition spec. /// /// This is an internal method @@ -1355,10 +1379,12 @@ mod tests { use anyhow::Result; use pretty_assertions::assert_eq; + use tempfile::TempDir; use uuid::Uuid; use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder}; use crate::TableCreation; + use crate::io::FileIOBuilder; use crate::spec::table_metadata::TableMetadata; use crate::spec::{ BlobMetadata, NestedField, NullOrder, Operation, PartitionSpec, PartitionStatisticsFile, @@ -3050,4 +3076,49 @@ mod tests { )]) ); } + + #[tokio::test] + async fn test_table_metadata_io_read_write() { + // Create a temporary directory for our test + let temp_dir = TempDir::new().unwrap(); + let temp_path = temp_dir.path().to_str().unwrap(); + + // Create a FileIO instance + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + // Use an existing test metadata from the test files + let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json"); + + // Define the metadata location + let metadata_location = format!("{}/metadata.json", temp_path); + + // Write the metadata + original_metadata + .write_to(&file_io, &metadata_location) + .await + .unwrap(); + + // Verify the file exists + assert!(fs::metadata(&metadata_location).is_ok()); + + // Read the metadata back + let read_metadata = TableMetadata::read_from(&file_io, &metadata_location) + .await + .unwrap(); + + // Verify the metadata matches + assert_eq!(read_metadata, original_metadata); + } + + #[tokio::test] + async fn test_table_metadata_io_read_nonexistent_file() { + // Create a FileIO instance + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + // Try to read a non-existent file + let result = TableMetadata::read_from(&file_io, "/nonexistent/path/metadata.json").await; + + // Verify it returns an error + assert!(result.is_err()); + } }