From 56a21c01741f5ce7d537bcae58d26ea96a0d2ad0 Mon Sep 17 00:00:00 2001 From: Vaibhav Date: Thu, 21 Mar 2024 19:33:54 +0530 Subject: [PATCH] fix: Iceberg fixes for reading table metadata Signed-off-by: Vaibhav --- .../src/lake/iceberg/spec/manifest.rs | 18 ++- crates/datasources/src/lake/iceberg/table.rs | 114 ++++++++++++++---- .../local/iceberg.slt | 2 +- 3 files changed, 108 insertions(+), 26 deletions(-) diff --git a/crates/datasources/src/lake/iceberg/spec/manifest.rs b/crates/datasources/src/lake/iceberg/spec/manifest.rs index b112439460..64f9554068 100644 --- a/crates/datasources/src/lake/iceberg/spec/manifest.rs +++ b/crates/datasources/src/lake/iceberg/spec/manifest.rs @@ -8,6 +8,7 @@ use serde_with::{serde_as, Bytes}; use super::{PartitionField, Schema}; use crate::lake::iceberg::errors::{IcebergError, Result}; +use crate::lake::iceberg::spec::PartitionSpec; /// Manifest lists include summary medata for the table alongside the path the /// actual manifest. @@ -176,12 +177,23 @@ impl Manifest { // Spec says schema id is required, but seems like it's actually // optional. Missing from the spark outputs. let schema_id = get_metadata_as_i32(m, "schema-id").unwrap_or_default(); - let partition_spec = serde_json::from_slice(m.get("partition-spec").ok_or_else(|| { + + let partition_spec_id = get_metadata_as_i32(m, "partition-spec-id")?; + + let raw_partition_spec = m.get("partition-spec").ok_or_else(|| { IcebergError::DataInvalid( "Missing field 'partition-spec' in manifest metadata".to_string(), ) - })?)?; - let partition_spec_id = get_metadata_as_i32(m, "partition-spec-id")?; + })?; + + let partition_spec = match serde_json::from_slice::(raw_partition_spec) { + Ok(spec) => spec.fields, + Err(_e) => { + // Try to get it as a slice of PartitionField. + serde_json::from_slice(raw_partition_spec)? + } + }; + let format_version = get_metadata_as_i32(m, "format-version")?; let content = match get_metadata_field(m, "content") { Ok(c) => String::from_utf8_lossy(c).parse()?, diff --git a/crates/datasources/src/lake/iceberg/table.rs b/crates/datasources/src/lake/iceberg/table.rs index 1e3a3a8547..38d3792e25 100644 --- a/crates/datasources/src/lake/iceberg/table.rs +++ b/crates/datasources/src/lake/iceberg/table.rs @@ -23,6 +23,7 @@ use datafusion::physical_plan::{ SendableRecordBatchStream, Statistics, }; +use futures::StreamExt; use object_store::path::Path as ObjectPath; use object_store::{ObjectMeta, ObjectStore}; @@ -92,28 +93,8 @@ struct TableState { impl TableState { async fn open(location: DatasourceUrl, store: Arc) -> Result { - // Get table version. - // TODO: Handle not finding a version hint. - let path = format_object_path(&location, "metadata/version-hint.text")?; - let path = ObjectPath::parse(path)?; - let bs = store.get(&path).await?.bytes().await?; - let version_contents = String::from_utf8(bs.to_vec()).map_err(|e| { - IcebergError::DataInvalid(format!("Expected utf-8 in version hint: {}", e)) - })?; - // Read the first line of the `version-hint.text` file. - let first_line = if let Some((first_line, _)) = version_contents.split_once('\n') { - first_line - } else { - version_contents.as_str() - }; - let version = first_line.trim(); - // Read metadata. - let path = format_object_path(&location, format!("metadata/v{version}.metadata.json"))?; - let bs = store.get(&path).await?.bytes().await?; - let metadata: TableMetadata = serde_json::from_slice(&bs).map_err(|e| { - IcebergError::DataInvalid(format!("Failed to read table metadata: {}", e)) - })?; + let metadata = Self::get_table_metadata(&location, &store).await?; let resolver = PathResolver::from_metadata(&metadata); @@ -125,6 +106,81 @@ impl TableState { }) } + async fn get_table_metadata( + location: &DatasourceUrl, + store: &dyn ObjectStore, + ) -> Result { + let path = format_object_path(location, "metadata/version-hint.text")?; + + let version_obj = match store.get(&path).await { + Ok(get_res) => { + let bs = get_res.bytes().await?; + + let version_contents = String::from_utf8(bs.to_vec()).map_err(|e| { + IcebergError::DataInvalid(format!("Expected utf-8 in version hint: {}", e)) + })?; + + // Read the first line of the `version-hint.text` file. + let first_line = if let Some((first_line, _)) = version_contents.split_once('\n') { + first_line + } else { + version_contents.as_str() + }; + + format_object_path( + location, + format!("metadata/v{}.metadata.json", first_line.trim()), + )? + } + Err(_e) => { + // List all the metadata files and try to get the one with the + // latest version. + + let metadata_prefix = format_object_path(location, "metadata/")?; + let mut metadata_objects = store.list(Some(&metadata_prefix)); + + let (mut latest_v, mut latest_v_obj) = (0_u32, Option::::None); + + while let Some(obj_meta) = metadata_objects.next().await { + let obj_meta = obj_meta?; + + let file_name = obj_meta.location.filename().unwrap_or_default(); + + if let Some(version_str) = file_name.strip_suffix(".metadata.json") { + let version_num = if let Some(version_str) = version_str.strip_prefix('v') { + version_str + } else if let Some((version_str, _uuid)) = version_str.split_once('-') { + // TODO: Maybe validate the "uuid". If invalid, continue. + version_str + } else { + continue; + }; + + if let Ok(version_num) = version_num.parse::() { + if version_num >= latest_v { + latest_v = version_num; + latest_v_obj = Some(obj_meta.location); + } + } + } + } + + latest_v_obj.ok_or_else(|| { + IcebergError::DataInvalid( + "no valid iceberg table exists at the given path".to_string(), + ) + })? + } + }; + + let bs = store.get(&version_obj).await?.bytes().await?; + let metadata: TableMetadata = serde_json::from_slice(&bs).map_err(|e| { + IcebergError::DataInvalid(format!("Failed to read table metadata: {}", e)) + })?; + + Ok(metadata) + } + /// Get the current snapshot from the table metadata fn current_snapshot(&self) -> Result<&Snapshot> { let current_snapshot_id = self @@ -434,7 +490,21 @@ fn format_object_path( } DatasourceUrl::File(root_path) => { let path = root_path.join(path); - ObjectPath::from_filesystem_path(path) + + // Get absolute path without checking if the file exists or not. + let abs_path = if path.is_absolute() { + path + } else { + let cwd = std::env::current_dir().map_err(|source| { + object_store::path::Error::Canonicalize { + path: path.clone(), + source, + } + })?; + cwd.join(path) + }; + + ObjectPath::from_absolute_path(abs_path) } } } diff --git a/testdata/sqllogictests_object_store/local/iceberg.slt b/testdata/sqllogictests_object_store/local/iceberg.slt index b2bd0b91f4..4c1bd928d4 100644 --- a/testdata/sqllogictests_object_store/local/iceberg.slt +++ b/testdata/sqllogictests_object_store/local/iceberg.slt @@ -1,5 +1,5 @@ # Tests external iceberg table in the local file system. -statement error No such file or directory +statement error no valid iceberg table exists at the given path create external table iceberg_local from iceberg options (