Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(python, rust): cdc in writer not creating inserts #2751

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 50 additions & 11 deletions crates/core/src/delta_datafusion/cdf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use std::collections::HashMap;
pub(crate) use scan::*;
pub(crate) use scan_utils::*;

use crate::kernel::{Add, AddCDCFile};
use crate::{
kernel::{Add, AddCDCFile, Remove},
DeltaResult,
};

mod scan;
mod scan_utils;
Expand Down Expand Up @@ -59,37 +62,73 @@ impl<F: FileAction> CdcDataSpec<F> {
/// This trait defines a generic set of operations used by CDF Reader
pub trait FileAction {
/// Adds partition values
fn partition_values(&self) -> &HashMap<String, Option<String>>;
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>>;
/// Physical Path to the data
fn path(&self) -> String;
/// Byte size of the physical file
fn size(&self) -> usize;
fn size(&self) -> DeltaResult<usize>;
}

impl FileAction for Add {
fn partition_values(&self) -> &HashMap<String, Option<String>> {
&self.partition_values
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
Ok(&self.partition_values)
}

fn path(&self) -> String {
self.path.clone()
}

fn size(&self) -> usize {
self.size as usize
fn size(&self) -> DeltaResult<usize> {
Ok(self.size as usize)
}
}

impl FileAction for AddCDCFile {
fn partition_values(&self) -> &HashMap<String, Option<String>> {
&self.partition_values
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
Ok(&self.partition_values)
}

fn path(&self) -> String {
self.path.clone()
}

fn size(&self) -> usize {
self.size as usize
fn size(&self) -> DeltaResult<usize> {
Ok(self.size as usize)
}
}

impl FileAction for Remove {
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
// If extended_file_metadata is true, it should be required to have this filled in
if self.extended_file_metadata.unwrap_or_default() {
Ok(self.partition_values.as_ref().unwrap())
} else {
match self.partition_values {
Some(ref part_map) => Ok(part_map),
_ => Err(crate::DeltaTableError::Protocol {
source: crate::protocol::ProtocolError::InvalidField(
"partition_values".to_string(),
),
}),
}
}
}

fn path(&self) -> String {
self.path.clone()
}

fn size(&self) -> DeltaResult<usize> {
// If extended_file_metadata is true, it should be required to have this filled in
if self.extended_file_metadata.unwrap_or_default() {
Ok(self.size.unwrap() as usize)
} else {
match self.size {
Some(size) => Ok(size as usize),
_ => Err(crate::DeltaTableError::Protocol {
source: crate::protocol::ProtocolError::InvalidField("size".to_string()),
}),
}
}
}
}
12 changes: 6 additions & 6 deletions crates/core/src/delta_datafusion/cdf/scan_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ pub fn map_action_to_scalar<F: FileAction>(
action: &F,
part: &str,
schema: SchemaRef,
) -> ScalarValue {
action
.partition_values()
) -> DeltaResult<ScalarValue> {
Ok(action
.partition_values()?
.get(part)
.map(|val| {
schema
Expand All @@ -36,7 +36,7 @@ pub fn map_action_to_scalar<F: FileAction>(
})
.unwrap_or(ScalarValue::Null)
})
.unwrap_or(ScalarValue::Null)
.unwrap_or(ScalarValue::Null))
}

pub fn create_spec_partition_values<F: FileAction>(
Expand Down Expand Up @@ -67,15 +67,15 @@ pub fn create_partition_values<F: FileAction>(
let partition_values = table_partition_cols
.iter()
.map(|part| map_action_to_scalar(&action, part, schema.clone()))
.collect::<Vec<ScalarValue>>();
.collect::<DeltaResult<Vec<ScalarValue>>>()?;

let mut new_part_values = spec_partition_values.clone();
new_part_values.extend(partition_values);

let part = PartitionedFile {
object_meta: ObjectMeta {
location: Path::parse(action.path().as_str())?,
size: action.size(),
size: action.size()?,
e_tag: None,
last_modified: chrono::Utc.timestamp_nanos(0),
version: None,
Expand Down
Loading
Loading