diff --git a/crates/datasources/src/lake/iceberg/spec/manifest.rs b/crates/datasources/src/lake/iceberg/spec/manifest.rs index 64f9554068..9b2e50ffc7 100644 --- a/crates/datasources/src/lake/iceberg/spec/manifest.rs +++ b/crates/datasources/src/lake/iceberg/spec/manifest.rs @@ -226,6 +226,37 @@ impl Manifest { } } +#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] +pub enum ManifestEntryStatus { + #[default] + Existing, + Added, + Deleted, +} + +impl ManifestEntryStatus { + pub fn is_deleted(&self) -> bool { + matches!(self, Self::Deleted) + } +} + +impl TryFrom for ManifestEntryStatus { + type Error = IcebergError; + + fn try_from(value: i32) -> std::prelude::v1::Result { + Ok(match value { + 0 => Self::Existing, + 1 => Self::Added, + 2 => Self::Deleted, + i => { + return Err(IcebergError::DataInvalid(format!( + "unknown manifest entry status: {i}" + ))) + } + }) + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ManifestEntry { pub status: i32, diff --git a/crates/datasources/src/lake/iceberg/table.rs b/crates/datasources/src/lake/iceberg/table.rs index 38d3792e25..960cb6ee14 100644 --- a/crates/datasources/src/lake/iceberg/table.rs +++ b/crates/datasources/src/lake/iceberg/table.rs @@ -27,7 +27,14 @@ use futures::StreamExt; use object_store::path::Path as ObjectPath; use object_store::{ObjectMeta, ObjectStore}; -use super::spec::{Manifest, ManifestContent, ManifestList, Snapshot, TableMetadata}; +use super::spec::{ + Manifest, + ManifestContent, + ManifestEntryStatus, + ManifestList, + Snapshot, + TableMetadata, +}; use crate::common::url::DatasourceUrl; use crate::lake::iceberg::errors::{IcebergError, Result}; @@ -359,7 +366,17 @@ impl TableProvider for IcebergTableReader { let data_files: Vec<_> = manifests .into_iter() .filter(|m| matches!(m.metadata.content, ManifestContent::Data)) - .flat_map(|m| m.entries.into_iter().map(|ent| ent.data_file)) + .flat_map(|m| { + m.entries.into_iter().filter_map(|ent| { + let ent_status: ManifestEntryStatus = ent.status.try_into().unwrap_or_default(); + if ent_status.is_deleted() { + // Ignore deleted entries during table scans. + None + } else { + Some(ent.data_file) + } + }) + }) .collect(); let partitioned_files = data_files