Skip to content

Commit

Permalink
fix: Iceberg fixes for reading table metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal committed Mar 25, 2024
1 parent c1afec7 commit 56a21c0
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 26 deletions.
18 changes: 15 additions & 3 deletions crates/datasources/src/lake/iceberg/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use serde_with::{serde_as, Bytes};

use super::{PartitionField, Schema};
use crate::lake::iceberg::errors::{IcebergError, Result};
use crate::lake::iceberg::spec::PartitionSpec;

/// Manifest lists include summary medata for the table alongside the path the
/// actual manifest.
Expand Down Expand Up @@ -176,12 +177,23 @@ impl Manifest {
// Spec says schema id is required, but seems like it's actually
// optional. Missing from the spark outputs.
let schema_id = get_metadata_as_i32(m, "schema-id").unwrap_or_default();
let partition_spec = serde_json::from_slice(m.get("partition-spec").ok_or_else(|| {

let partition_spec_id = get_metadata_as_i32(m, "partition-spec-id")?;

let raw_partition_spec = m.get("partition-spec").ok_or_else(|| {
IcebergError::DataInvalid(
"Missing field 'partition-spec' in manifest metadata".to_string(),
)
})?)?;
let partition_spec_id = get_metadata_as_i32(m, "partition-spec-id")?;
})?;

let partition_spec = match serde_json::from_slice::<PartitionSpec>(raw_partition_spec) {
Ok(spec) => spec.fields,
Err(_e) => {
// Try to get it as a slice of PartitionField.
serde_json::from_slice(raw_partition_spec)?
}
};

let format_version = get_metadata_as_i32(m, "format-version")?;
let content = match get_metadata_field(m, "content") {
Ok(c) => String::from_utf8_lossy(c).parse()?,
Expand Down
114 changes: 92 additions & 22 deletions crates/datasources/src/lake/iceberg/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use datafusion::physical_plan::{
SendableRecordBatchStream,
Statistics,
};
use futures::StreamExt;
use object_store::path::Path as ObjectPath;
use object_store::{ObjectMeta, ObjectStore};

Expand Down Expand Up @@ -92,28 +93,8 @@ struct TableState {

impl TableState {
async fn open(location: DatasourceUrl, store: Arc<dyn ObjectStore>) -> Result<TableState> {
// Get table version.
// TODO: Handle not finding a version hint.
let path = format_object_path(&location, "metadata/version-hint.text")?;
let path = ObjectPath::parse(path)?;
let bs = store.get(&path).await?.bytes().await?;
let version_contents = String::from_utf8(bs.to_vec()).map_err(|e| {
IcebergError::DataInvalid(format!("Expected utf-8 in version hint: {}", e))
})?;
// Read the first line of the `version-hint.text` file.
let first_line = if let Some((first_line, _)) = version_contents.split_once('\n') {
first_line
} else {
version_contents.as_str()
};
let version = first_line.trim();

// Read metadata.
let path = format_object_path(&location, format!("metadata/v{version}.metadata.json"))?;
let bs = store.get(&path).await?.bytes().await?;
let metadata: TableMetadata = serde_json::from_slice(&bs).map_err(|e| {
IcebergError::DataInvalid(format!("Failed to read table metadata: {}", e))
})?;
let metadata = Self::get_table_metadata(&location, &store).await?;

let resolver = PathResolver::from_metadata(&metadata);

Expand All @@ -125,6 +106,81 @@ impl TableState {
})
}

async fn get_table_metadata(
location: &DatasourceUrl,
store: &dyn ObjectStore,
) -> Result<TableMetadata> {
let path = format_object_path(location, "metadata/version-hint.text")?;

let version_obj = match store.get(&path).await {
Ok(get_res) => {
let bs = get_res.bytes().await?;

let version_contents = String::from_utf8(bs.to_vec()).map_err(|e| {
IcebergError::DataInvalid(format!("Expected utf-8 in version hint: {}", e))
})?;

// Read the first line of the `version-hint.text` file.
let first_line = if let Some((first_line, _)) = version_contents.split_once('\n') {
first_line
} else {
version_contents.as_str()
};

format_object_path(
location,
format!("metadata/v{}.metadata.json", first_line.trim()),
)?
}
Err(_e) => {
// List all the metadata files and try to get the one with the
// latest version.

let metadata_prefix = format_object_path(location, "metadata/")?;
let mut metadata_objects = store.list(Some(&metadata_prefix));

let (mut latest_v, mut latest_v_obj) = (0_u32, Option::<ObjectPath>::None);

while let Some(obj_meta) = metadata_objects.next().await {
let obj_meta = obj_meta?;

let file_name = obj_meta.location.filename().unwrap_or_default();

if let Some(version_str) = file_name.strip_suffix(".metadata.json") {
let version_num = if let Some(version_str) = version_str.strip_prefix('v') {
version_str
} else if let Some((version_str, _uuid)) = version_str.split_once('-') {
// TODO: Maybe validate the "uuid". If invalid, continue.
version_str
} else {
continue;
};

if let Ok(version_num) = version_num.parse::<u32>() {
if version_num >= latest_v {
latest_v = version_num;
latest_v_obj = Some(obj_meta.location);
}
}
}
}

latest_v_obj.ok_or_else(|| {
IcebergError::DataInvalid(
"no valid iceberg table exists at the given path".to_string(),
)
})?
}
};

let bs = store.get(&version_obj).await?.bytes().await?;
let metadata: TableMetadata = serde_json::from_slice(&bs).map_err(|e| {
IcebergError::DataInvalid(format!("Failed to read table metadata: {}", e))
})?;

Ok(metadata)
}

/// Get the current snapshot from the table metadata
fn current_snapshot(&self) -> Result<&Snapshot> {
let current_snapshot_id = self
Expand Down Expand Up @@ -434,7 +490,21 @@ fn format_object_path(
}
DatasourceUrl::File(root_path) => {
let path = root_path.join(path);
ObjectPath::from_filesystem_path(path)

// Get absolute path without checking if the file exists or not.
let abs_path = if path.is_absolute() {
path
} else {
let cwd = std::env::current_dir().map_err(|source| {
object_store::path::Error::Canonicalize {
path: path.clone(),
source,
}
})?;
cwd.join(path)
};

ObjectPath::from_absolute_path(abs_path)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion testdata/sqllogictests_object_store/local/iceberg.slt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Tests external iceberg table in the local file system.
statement error No such file or directory
statement error no valid iceberg table exists at the given path
create external table iceberg_local
from iceberg
options (
Expand Down

0 comments on commit 56a21c0

Please sign in to comment.