Skip to content

Commit

Permalink
refactor: use remove actions in CDF reads
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored and rtyler committed Aug 12, 2024
1 parent 747603f commit 189431b
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 35 deletions.
61 changes: 50 additions & 11 deletions crates/core/src/delta_datafusion/cdf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::collections::HashMap;
pub(crate) use scan::*;
pub(crate) use scan_utils::*;

use crate::kernel::{Add, AddCDCFile};
use crate::{
kernel::{Add, AddCDCFile, Remove},
DeltaResult,
};

mod scan;
mod scan_utils;
Expand Down Expand Up @@ -59,37 +62,73 @@ impl<F: FileAction> CdcDataSpec<F> {
/// This trait defines a generic set of operations used by CDF Reader
pub trait FileAction {
/// Adds partition values
fn partition_values(&self) -> &HashMap<String, Option<String>>;
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>>;
/// Physical Path to the data
fn path(&self) -> String;
/// Byte size of the physical file
fn size(&self) -> usize;
fn size(&self) -> DeltaResult<usize>;
}

impl FileAction for Add {
fn partition_values(&self) -> &HashMap<String, Option<String>> {
&self.partition_values
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
Ok(&self.partition_values)
}

fn path(&self) -> String {
self.path.clone()
}

fn size(&self) -> usize {
self.size as usize
fn size(&self) -> DeltaResult<usize> {
Ok(self.size as usize)
}
}

impl FileAction for AddCDCFile {
fn partition_values(&self) -> &HashMap<String, Option<String>> {
&self.partition_values
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
Ok(&self.partition_values)
}

fn path(&self) -> String {
self.path.clone()
}

fn size(&self) -> usize {
self.size as usize
fn size(&self) -> DeltaResult<usize> {
Ok(self.size as usize)
}
}

impl FileAction for Remove {
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
// If extended_file_metadata is true, it should be required to have this filled in
if self.extended_file_metadata.unwrap_or_default() {
Ok(self.partition_values.as_ref().unwrap())
} else {
match self.partition_values {
Some(ref part_map) => Ok(part_map),
_ => Err(crate::DeltaTableError::Protocol {
source: crate::protocol::ProtocolError::InvalidField(
"partition_values".to_string(),
),
}),
}
}
}

fn path(&self) -> String {
self.path.clone()
}

fn size(&self) -> DeltaResult<usize> {
// If extended_file_metadata is true, it should be required to have this filled in
if self.extended_file_metadata.unwrap_or_default() {
Ok(self.size.unwrap() as usize)
} else {
match self.size {
Some(size) => Ok(size as usize),
_ => Err(crate::DeltaTableError::Protocol {
source: crate::protocol::ProtocolError::InvalidField("size".to_string()),
}),
}
}
}
}
12 changes: 6 additions & 6 deletions crates/core/src/delta_datafusion/cdf/scan_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ pub fn map_action_to_scalar<F: FileAction>(
action: &F,
part: &str,
schema: SchemaRef,
) -> ScalarValue {
action
.partition_values()
) -> DeltaResult<ScalarValue> {
Ok(action
.partition_values()?
.get(part)
.map(|val| {
schema
Expand All @@ -36,7 +36,7 @@ pub fn map_action_to_scalar<F: FileAction>(
})
.unwrap_or(ScalarValue::Null)
})
.unwrap_or(ScalarValue::Null)
.unwrap_or(ScalarValue::Null))
}

pub fn create_spec_partition_values<F: FileAction>(
Expand Down Expand Up @@ -67,15 +67,15 @@ pub fn create_partition_values<F: FileAction>(
let partition_values = table_partition_cols
.iter()
.map(|part| map_action_to_scalar(&action, part, schema.clone()))
.collect::<Vec<ScalarValue>>();
.collect::<DeltaResult<Vec<ScalarValue>>>()?;

let mut new_part_values = spec_partition_values.clone();
new_part_values.extend(partition_values);

let part = PartitionedFile {
object_meta: ObjectMeta {
location: Path::parse(action.path().as_str())?,
size: action.size(),
size: action.size()?,
e_tag: None,
last_modified: chrono::Utc.timestamp_nanos(0),
version: None,
Expand Down
Loading

0 comments on commit 189431b

Please sign in to comment.