Skip to content

Commit

Permalink
handle deleted entries
Browse files Browse the repository at this point in the history
Signed-off-by: Vaibhav <vrongmeal@gmail.com>
  • Loading branch information
vrongmeal committed Mar 29, 2024
1 parent b4f7e5e commit 5e411ec
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
31 changes: 31 additions & 0 deletions crates/datasources/src/lake/iceberg/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,37 @@ impl Manifest {
}
}

#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
pub enum ManifestEntryStatus {
#[default]
Existing,
Added,
Deleted,
}

impl ManifestEntryStatus {
pub fn is_deleted(&self) -> bool {
matches!(self, Self::Deleted)
}
}

impl TryFrom<i32> for ManifestEntryStatus {
type Error = IcebergError;

fn try_from(value: i32) -> std::prelude::v1::Result<Self, Self::Error> {
Ok(match value {
0 => Self::Existing,
1 => Self::Added,
2 => Self::Deleted,
i => {
return Err(IcebergError::DataInvalid(format!(
"unknown manifest entry status: {i}"
)))
}
})
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ManifestEntry {
pub status: i32,
Expand Down
21 changes: 19 additions & 2 deletions crates/datasources/src/lake/iceberg/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ use futures::StreamExt;
use object_store::path::Path as ObjectPath;
use object_store::{ObjectMeta, ObjectStore};

use super::spec::{Manifest, ManifestContent, ManifestList, Snapshot, TableMetadata};
use super::spec::{
Manifest,
ManifestContent,
ManifestEntryStatus,
ManifestList,
Snapshot,
TableMetadata,
};
use crate::common::url::DatasourceUrl;
use crate::lake::iceberg::errors::{IcebergError, Result};

Expand Down Expand Up @@ -359,7 +366,17 @@ impl TableProvider for IcebergTableReader {
let data_files: Vec<_> = manifests
.into_iter()
.filter(|m| matches!(m.metadata.content, ManifestContent::Data))
.flat_map(|m| m.entries.into_iter().map(|ent| ent.data_file))
.flat_map(|m| {
m.entries.into_iter().filter_map(|ent| {
let ent_status: ManifestEntryStatus = ent.status.try_into().unwrap_or_default();
if ent_status.is_deleted() {
// Ignore deleted entries during table scans.
None
} else {
Some(ent.data_file)
}
})
})
.collect();

let partitioned_files = data_files
Expand Down

0 comments on commit 5e411ec

Please sign in to comment.