diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index bc4fbea538..477104208f 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -19,7 +19,6 @@ use crate::action::DeltaOperation; use crate::delta::DeltaResult; -use crate::delta_datafusion::logical_expr_to_physical_expr; use crate::delta_datafusion::parquet_scan_from_actions; use crate::delta_datafusion::partitioned_file_from_action; use crate::delta_datafusion::register_store; @@ -42,11 +41,10 @@ use arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::MemTable; -use datafusion::execution::context::ExecutionProps; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::physical_expr::create_physical_expr; use datafusion::physical_optimizer::pruning::PruningPredicate; -use datafusion::physical_plan::file_format::{wrap_partition_type_in_dict, FileScanConfig}; +use datafusion::physical_plan::file_format::FileScanConfig; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::LocalLimitExec; use datafusion::physical_plan::ExecutionPlan; @@ -111,7 +109,7 @@ pub struct DeleteMetrics { async fn find_files<'a>( snapshot: &DeltaTableState, store: ObjectStoreRef, - schema: &ArrowSchema, + schema: Arc, file_schema: Arc, candidates: Vec<&'a Add>, state: &SessionState, @@ -128,7 +126,7 @@ async fn find_files<'a>( let mut file_groups: HashMap, Vec> = HashMap::new(); for action in candidates { - let mut part = partitioned_file_from_action(action, schema); + let mut part = partitioned_file_from_action(action, &schema); part.partition_values .push(ScalarValue::Utf8(Some(action.path.clone()))); @@ -142,26 +140,21 @@ async fn find_files<'a>( let mut table_partition_cols = table_partition_cols .iter() - .map(|c| { - Ok(( - c.to_owned(), - wrap_partition_type_in_dict(schema.field_with_name(c)?.data_type().clone()), - )) - }) + .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone()))) .collect::, ArrowError>>()?; // Append a column called __delta_rs_path to track the file path table_partition_cols.push((PATH_COLUMN.to_owned(), DataType::Utf8)); - let physical_schema = file_schema.clone(); - let logical_schema: DFSchema = file_schema.as_ref().clone().try_into()?; - let execution_props = ExecutionProps::new(); + let input_schema = snapshot.input_schema()?; + let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; let predicate_expr = create_physical_expr( &Expr::IsTrue(Box::new(expression.clone())), - &logical_schema, - &physical_schema, - &execution_props, + &input_dfschema, + &input_schema, + state.execution_props(), )?; + let parquet_scan = ParquetFormat::new() .create_physical_plan( state, @@ -387,9 +380,12 @@ async fn excute_non_empty_expr( // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. - let schema = snapshot.arrow_schema()?; let scan_start = Instant::now(); + let schema = snapshot.arrow_schema()?; + let input_schema = snapshot.input_schema()?; + let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; + let table_partition_cols = snapshot .current_metadata() .ok_or(DeltaTableError::NoMetadata)? @@ -403,7 +399,12 @@ async fn excute_non_empty_expr( .cloned() .collect::>(), )); - let expr = logical_expr_to_physical_expr(expression, &schema); + let expr = create_physical_expr( + expression, + &input_dfschema, + &input_schema, + state.execution_props(), + )?; let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?; let files_to_prune = pruning_predicate.prune(snapshot)?; @@ -418,7 +419,7 @@ async fn excute_non_empty_expr( let rewrite = find_files( snapshot, object_store.clone(), - &schema, + schema.clone(), file_schema.clone(), files, state, @@ -444,15 +445,12 @@ async fn excute_non_empty_expr( // Apply the negation of the filter and rewrite files let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); - let physical_schema = file_schema.clone(); - let logical_schema: DFSchema = file_schema.as_ref().clone().try_into()?; - let execution_props = ExecutionProps::new(); let predicate_expr = create_physical_expr( &negated_expression, - &logical_schema, - &physical_schema, - &execution_props, + &input_dfschema, + &input_schema, + state.execution_props(), )?; let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, parquet_scan.clone())?); @@ -1006,6 +1004,67 @@ mod tests { assert_batches_sorted_eq!(&expected, &actual); } + #[tokio::test] + async fn test_delete_on_mixed_columns() { + // Test predicates that contain non-partition and partition column + let schema = get_arrow_schema(&None); + let table = setup_table(Some(["modified"].to_vec())).await; + + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from_slice(["A", "B", "A", "A"])), + Arc::new(arrow::array::Int32Array::from_slice([0, 20, 10, 100])), + Arc::new(arrow::array::StringArray::from_slice([ + "2021-02-02", + "2021-02-03", + "2021-02-02", + "2021-02-04", + ])), + ], + ) + .unwrap(); + + // write some data + let table = DeltaOps(table) + .write(vec![batch]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + assert_eq!(table.version(), 1); + assert_eq!(table.get_file_uris().count(), 3); + + let (table, metrics) = DeltaOps(table) + .delete() + .with_predicate( + col("modified") + .eq(lit("2021-02-04")) + .and(col("value").eq(lit(100))), + ) + .await + .unwrap(); + assert_eq!(table.version(), 2); + assert_eq!(table.get_file_uris().count(), 2); + + assert_eq!(metrics.num_added_files, 0); + assert_eq!(metrics.num_removed_files, 1); + assert_eq!(metrics.num_deleted_rows, Some(1)); + assert_eq!(metrics.num_copied_rows, Some(0)); + assert!(metrics.scan_time_ms > 0); + + let expected = [ + "+----+-------+------------+", + "| id | value | modified |", + "+----+-------+------------+", + "| A | 0 | 2021-02-02 |", + "| A | 10 | 2021-02-02 |", + "| B | 20 | 2021-02-03 |", + "+----+-------+------------+", + ]; + let actual = get_data(table).await; + assert_batches_sorted_eq!(&expected, &actual); + } + #[tokio::test] async fn test_failure_nondeterministic_query() { // Deletion requires a deterministic predicate diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index 53dcd0b7d2..bae30c748c 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -28,6 +28,10 @@ use crate::DeltaTableError; impl DeltaTableState { /// Get the table schema as an [`ArrowSchemaRef`] pub fn arrow_schema(&self) -> DeltaResult { + self._arrow_schema(true) + } + + fn _arrow_schema(&self, wrap_partitions: bool) -> DeltaResult { let meta = self.current_metadata().ok_or(DeltaTableError::NoMetadata)?; let fields = meta .schema @@ -42,11 +46,15 @@ impl DeltaTableState { .filter(|f| meta.partition_columns.contains(&f.get_name().to_string())) .map(|f| { let field = ArrowField::try_from(f)?; - let corrected = match field.data_type() { - // Dictionary encoding boolean types does not yield benefits - // https://github.com/apache/arrow-datafusion/pull/5545#issuecomment-1526917997 - DataType::Boolean => field.data_type().clone(), - _ => wrap_partition_type_in_dict(field.data_type().clone()), + let corrected = if wrap_partitions { + match field.data_type() { + // Dictionary encoding boolean types does not yield benefits + // https://github.com/apache/arrow-datafusion/pull/5545#issuecomment-1526917997 + DataType::Boolean => field.data_type().clone(), + _ => wrap_partition_type_in_dict(field.data_type().clone()), + } + } else { + field.data_type().clone() }; Ok(field.with_data_type(corrected)) }), @@ -56,6 +64,10 @@ impl DeltaTableState { Ok(Arc::new(ArrowSchema::new(fields))) } + pub(crate) fn input_schema(&self) -> DeltaResult { + self._arrow_schema(false) + } + /// Iterate over all files in the log matching a predicate pub fn files_matching_predicate( &self,