Skip to content

Commit

Permalink
fix: delete operation when partition and non partition columns are us…
Browse files Browse the repository at this point in the history
…ed (#1375)

# Description
The deletion operation fails when a predicate contains partition and
non-partition columns.

Datafusion currently raises an error about comparing a dictionary to a
scalar
E.G The expression `col("partition_col").eq(lit"2023-05-16"))` fails
since the literal is a UTF8 and not a dictionary.

This is fixed by creating the physical expression using the table schema
where partition columns are not wrapped. Otherwise, the wrapped schema
is used elsewhere.

---------

Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
  • Loading branch information
Blajda and rtyler authored May 21, 2023
1 parent dde2f7e commit 8fd0af8
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 31 deletions.
111 changes: 85 additions & 26 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use crate::action::DeltaOperation;
use crate::delta::DeltaResult;
use crate::delta_datafusion::logical_expr_to_physical_expr;
use crate::delta_datafusion::parquet_scan_from_actions;
use crate::delta_datafusion::partitioned_file_from_action;
use crate::delta_datafusion::register_store;
Expand All @@ -42,11 +41,10 @@ use arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::MemTable;
use datafusion::execution::context::ExecutionProps;
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::file_format::{wrap_partition_type_in_dict, FileScanConfig};
use datafusion::physical_plan::file_format::FileScanConfig;
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
use datafusion::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -111,7 +109,7 @@ pub struct DeleteMetrics {
async fn find_files<'a>(
snapshot: &DeltaTableState,
store: ObjectStoreRef,
schema: &ArrowSchema,
schema: Arc<ArrowSchema>,
file_schema: Arc<ArrowSchema>,
candidates: Vec<&'a Add>,
state: &SessionState,
Expand All @@ -128,7 +126,7 @@ async fn find_files<'a>(

let mut file_groups: HashMap<Vec<ScalarValue>, Vec<PartitionedFile>> = HashMap::new();
for action in candidates {
let mut part = partitioned_file_from_action(action, schema);
let mut part = partitioned_file_from_action(action, &schema);
part.partition_values
.push(ScalarValue::Utf8(Some(action.path.clone())));

Expand All @@ -142,26 +140,21 @@ async fn find_files<'a>(

let mut table_partition_cols = table_partition_cols
.iter()
.map(|c| {
Ok((
c.to_owned(),
wrap_partition_type_in_dict(schema.field_with_name(c)?.data_type().clone()),
))
})
.map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone())))
.collect::<Result<Vec<_>, ArrowError>>()?;
// Append a column called __delta_rs_path to track the file path
table_partition_cols.push((PATH_COLUMN.to_owned(), DataType::Utf8));

let physical_schema = file_schema.clone();
let logical_schema: DFSchema = file_schema.as_ref().clone().try_into()?;
let execution_props = ExecutionProps::new();
let input_schema = snapshot.input_schema()?;
let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;

let predicate_expr = create_physical_expr(
&Expr::IsTrue(Box::new(expression.clone())),
&logical_schema,
&physical_schema,
&execution_props,
&input_dfschema,
&input_schema,
state.execution_props(),
)?;

let parquet_scan = ParquetFormat::new()
.create_physical_plan(
state,
Expand Down Expand Up @@ -387,9 +380,12 @@ async fn excute_non_empty_expr(
// For each identified file perform a parquet scan + filter + limit (1) + count.
// If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file.

let schema = snapshot.arrow_schema()?;
let scan_start = Instant::now();

let schema = snapshot.arrow_schema()?;
let input_schema = snapshot.input_schema()?;
let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;

let table_partition_cols = snapshot
.current_metadata()
.ok_or(DeltaTableError::NoMetadata)?
Expand All @@ -403,7 +399,12 @@ async fn excute_non_empty_expr(
.cloned()
.collect::<Vec<_>>(),
));
let expr = logical_expr_to_physical_expr(expression, &schema);
let expr = create_physical_expr(
expression,
&input_dfschema,
&input_schema,
state.execution_props(),
)?;

let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?;
let files_to_prune = pruning_predicate.prune(snapshot)?;
Expand All @@ -418,7 +419,7 @@ async fn excute_non_empty_expr(
let rewrite = find_files(
snapshot,
object_store.clone(),
&schema,
schema.clone(),
file_schema.clone(),
files,
state,
Expand All @@ -444,15 +445,12 @@ async fn excute_non_empty_expr(

// Apply the negation of the filter and rewrite files
let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone()))));
let physical_schema = file_schema.clone();
let logical_schema: DFSchema = file_schema.as_ref().clone().try_into()?;
let execution_props = ExecutionProps::new();

let predicate_expr = create_physical_expr(
&negated_expression,
&logical_schema,
&physical_schema,
&execution_props,
&input_dfschema,
&input_schema,
state.execution_props(),
)?;
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate_expr, parquet_scan.clone())?);
Expand Down Expand Up @@ -1006,6 +1004,67 @@ mod tests {
assert_batches_sorted_eq!(&expected, &actual);
}

#[tokio::test]
async fn test_delete_on_mixed_columns() {
// Test predicates that contain non-partition and partition column
let schema = get_arrow_schema(&None);
let table = setup_table(Some(["modified"].to_vec())).await;

let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])),
Arc::new(arrow::array::Int32Array::from_slice([0, 20, 10, 100])),
Arc::new(arrow::array::StringArray::from_slice([
"2021-02-02",
"2021-02-03",
"2021-02-02",
"2021-02-04",
])),
],
)
.unwrap();

// write some data
let table = DeltaOps(table)
.write(vec![batch])
.with_save_mode(SaveMode::Append)
.await
.unwrap();
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 3);

let (table, metrics) = DeltaOps(table)
.delete()
.with_predicate(
col("modified")
.eq(lit("2021-02-04"))
.and(col("value").eq(lit(100))),
)
.await
.unwrap();
assert_eq!(table.version(), 2);
assert_eq!(table.get_file_uris().count(), 2);

assert_eq!(metrics.num_added_files, 0);
assert_eq!(metrics.num_removed_files, 1);
assert_eq!(metrics.num_deleted_rows, Some(1));
assert_eq!(metrics.num_copied_rows, Some(0));
assert!(metrics.scan_time_ms > 0);

let expected = [
"+----+-------+------------+",
"| id | value | modified |",
"+----+-------+------------+",
"| A | 0 | 2021-02-02 |",
"| A | 10 | 2021-02-02 |",
"| B | 20 | 2021-02-03 |",
"+----+-------+------------+",
];
let actual = get_data(table).await;
assert_batches_sorted_eq!(&expected, &actual);
}

#[tokio::test]
async fn test_failure_nondeterministic_query() {
// Deletion requires a deterministic predicate
Expand Down
22 changes: 17 additions & 5 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ use crate::DeltaTableError;
impl DeltaTableState {
/// Get the table schema as an [`ArrowSchemaRef`]
pub fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
self._arrow_schema(true)
}

fn _arrow_schema(&self, wrap_partitions: bool) -> DeltaResult<ArrowSchemaRef> {
let meta = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?;
let fields = meta
.schema
Expand All @@ -42,11 +46,15 @@ impl DeltaTableState {
.filter(|f| meta.partition_columns.contains(&f.get_name().to_string()))
.map(|f| {
let field = ArrowField::try_from(f)?;
let corrected = match field.data_type() {
// Dictionary encoding boolean types does not yield benefits
// https://github.com/apache/arrow-datafusion/pull/5545#issuecomment-1526917997
DataType::Boolean => field.data_type().clone(),
_ => wrap_partition_type_in_dict(field.data_type().clone()),
let corrected = if wrap_partitions {
match field.data_type() {
// Dictionary encoding boolean types does not yield benefits
// https://github.com/apache/arrow-datafusion/pull/5545#issuecomment-1526917997
DataType::Boolean => field.data_type().clone(),
_ => wrap_partition_type_in_dict(field.data_type().clone()),
}
} else {
field.data_type().clone()
};
Ok(field.with_data_type(corrected))
}),
Expand All @@ -56,6 +64,10 @@ impl DeltaTableState {
Ok(Arc::new(ArrowSchema::new(fields)))
}

pub(crate) fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
self._arrow_schema(false)
}

/// Iterate over all files in the log matching a predicate
pub fn files_matching_predicate(
&self,
Expand Down

0 comments on commit 8fd0af8

Please sign in to comment.