From 080526a77f3a4f40cb8d608b07c8a37aec0877bb Mon Sep 17 00:00:00 2001 From: Jonas Schmitz Date: Fri, 15 Mar 2024 19:10:36 +0100 Subject: [PATCH 1/6] Update merge predicate to fix collision detection --- crates/core/src/operations/merge/mod.rs | 148 +++++++++++++++--- crates/core/src/operations/transaction/mod.rs | 3 +- crates/core/src/protocol/mod.rs | 10 +- 3 files changed, 130 insertions(+), 31 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index e5da42bf8f..d8389ed573 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -29,6 +29,7 @@ //! ```` use std::collections::HashMap; +use std::ops::Deref; use std::sync::Arc; use std::time::Instant; @@ -855,6 +856,7 @@ fn replace_placeholders(expr: Expr, placeholders: &HashMap) .unwrap() } + async fn try_construct_early_filter( join_predicate: Expr, table_snapshot: &DeltaTableState, @@ -1015,29 +1017,26 @@ async fn execute( let state = state.with_query_planner(Arc::new(MergePlanner {})); - let target = { - // Attempt to construct an early filter that we can apply to the Add action list and the delta scan. - // In the case where there are partition columns in the join predicate, we can scan the source table - // to get the distinct list of partitions affected and constrain the search to those. - - if !not_match_source_operations.is_empty() { - // It's only worth trying to create an early filter where there are no `when_not_matched_source` operators, since - // that implies a full scan - target - } else if let Some(filter) = try_construct_early_filter( + // Attempt to construct an early filter that we can apply to the Add action list and the delta scan. + // In the case where there are partition columns in the join predicate, we can scan the source table + // to get the distinct list of partitions affected and constrain the search to those. + let target_subset_filter = if !not_match_source_operations.is_empty() { + // It's only worth trying to create an early filter where there are no `when_not_matched_source` operators, since + // that implies a full scan + None + } else { + try_construct_early_filter( predicate.clone(), snapshot, &state, &source, &source_name, &target_name, - ) - .await? - { - LogicalPlan::Filter(Filter::try_new(filter, target.into())?) - } else { - target - } + ).await? + }; + let target = match target_subset_filter.as_ref() { + None => { target } + Some(subset_filter) => { LogicalPlan::Filter(Filter::try_new(subset_filter.clone(), target.into())?)} }; let source = DataFrame::new(state.clone(), source); @@ -1424,9 +1423,20 @@ async fn execute( app_metadata.insert("operationMetrics".to_owned(), map); } + // Predicate will be used for conflict detection + let commit_predicate = match target_subset_filter { + None => {None} // No predicate means it's a full table merge + Some(some_filter) => { + let predict_expr = match target_alias { + None => {some_filter} + Some(alias) => {remove_table_alias(some_filter, alias)} + }; + Some(fmt_expr_to_sql(&predict_expr)?)} + }; // Do not make a commit when there are zero updates to the state let operation = DeltaOperation::Merge { - predicate: Some(fmt_expr_to_sql(&predicate)?), + predicate: commit_predicate, + merge_predicate: Some(fmt_expr_to_sql(&predicate)?), matched_predicates: match_operations, not_matched_predicates: not_match_target_operations, not_matched_by_source_predicates: not_match_source_operations, @@ -1450,6 +1460,23 @@ async fn execute( )) } +fn remove_table_alias(expr: Expr, table_alias: String) -> Expr { + expr.transform(&|expr| match expr { + Expr::Column(c) => { + match c.relation { + Some(rel) if rel.table() == table_alias => { + Ok(Transformed::Yes(Expr::Column(Column::new_unqualified(c.name)))) + }, + _ => { + Ok(Transformed::No(Expr::Column(Column::new(c.relation, c.name)))) + } + } + } + _ => Ok(Transformed::No(expr)), + }) + .unwrap() +} + // TODO: Abstract MergePlanner into DeltaPlanner to support other delta operations in the future. struct MergePlanner {} @@ -1516,6 +1543,7 @@ impl std::future::IntoFuture for MergeBuilder { } } + #[cfg(test)] mod tests { use crate::kernel::DataType; @@ -1699,7 +1727,8 @@ mod tests { let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[0]; let parameters = last_commit.operation_parameters.clone().unwrap(); - assert_eq!(parameters["predicate"], json!("target.id = source.id")); + assert!(!parameters.contains_key("predicate")); + assert_eq!(parameters["mergePredicate"], json!("target.id = source.id")); assert_eq!( parameters["matchedPredicates"], json!(r#"[{"actionType":"update"}]"#) @@ -1751,7 +1780,8 @@ mod tests { let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[0]; let parameters = last_commit.operation_parameters.clone().unwrap(); - assert_eq!(parameters["predicate"], json!("target.id = source.id")); + assert!(!parameters.contains_key("predicate")); + assert_eq!(parameters["mergePredicate"], json!("target.id = source.id")); assert_eq!( parameters["matchedPredicates"], json!(r#"[{"actionType":"update"}]"#) @@ -1955,6 +1985,11 @@ mod tests { assert_eq!(metrics.num_output_rows, 6); assert_eq!(metrics.num_source_rows, 3); + let commit_info = table.history(None).await.unwrap(); + let last_commit = &commit_info[0]; + let parameters = last_commit.operation_parameters.clone().unwrap(); + assert!(!parameters.contains_key("predicate")); + let expected = vec![ "+----+-------+------------+", "| id | value | modified |", @@ -1971,6 +2006,62 @@ mod tests { assert_batches_sorted_eq!(&expected, &actual); } + + #[tokio::test] + async fn test_merge_partition_filtered() { + let schema = get_arrow_schema(&None); + let table = setup_table(Some(vec!["modified"])).await; + let table = write_data(table, &schema).await; + assert_eq!(table.version(), 1); + + let ctx = SessionContext::new(); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["B", "C"])), + Arc::new(arrow::array::Int32Array::from(vec![10, 20])), + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-02", + "2021-02-02" + ])), + ], + ) + .unwrap(); + let source = ctx.read_batch(batch).unwrap(); + + let (table, _metrics) = DeltaOps(table) + .merge( + source, + col("target.id") + .eq(col("source.id")) + .and(col("target.modified").eq(lit("2021-02-02"))), + ) + .with_source_alias("source") + .with_target_alias("target") + .when_matched_update(|update| { + update + .update("value", col("source.value")) + .update("modified", col("source.modified")) + }) + .unwrap() + .when_not_matched_insert(|insert| { + insert + .set("id", col("source.id")) + .set("value", col("source.value")) + .set("modified", col("source.modified")) + }) + .unwrap() + .await + .unwrap(); + + assert_eq!(table.version(), 2); + + let commit_info = table.history(None).await.unwrap(); + let last_commit = &commit_info[0]; + let parameters = last_commit.operation_parameters.clone().unwrap(); + assert_eq!(parameters["predicate"], "modified = '2021-02-02'"); + } + #[tokio::test] async fn test_merge_partitions_skipping() { /* Validate the join predicate can be used for skipping partitions */ @@ -2028,6 +2119,11 @@ mod tests { assert_eq!(metrics.num_output_rows, 3); assert_eq!(metrics.num_source_rows, 3); + let commit_info = table.history(None).await.unwrap(); + let last_commit = &commit_info[0]; + let parameters = last_commit.operation_parameters.clone().unwrap(); + assert_eq!(parameters["predicate"], json!("id = 'C' OR id = 'X' OR id = 'B'")); + let expected = vec![ "+-------+------------+----+", "| value | modified | id |", @@ -2098,7 +2194,8 @@ mod tests { extra_info["operationMetrics"], serde_json::to_value(&metrics).unwrap() ); - assert_eq!(parameters["predicate"], json!("target.id = source.id")); + assert!(!parameters.contains_key("predicate")); + assert_eq!(parameters["mergePredicate"], json!("target.id = source.id")); assert_eq!( parameters["matchedPredicates"], json!(r#"[{"actionType":"delete"}]"#) @@ -2162,7 +2259,7 @@ mod tests { let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[0]; let parameters = last_commit.operation_parameters.clone().unwrap(); - assert_eq!(parameters["predicate"], json!("target.id = source.id")); + assert_eq!(parameters["mergePredicate"], json!("target.id = source.id")); assert_eq!( parameters["matchedPredicates"], json!(r#"[{"actionType":"delete","predicate":"source.value <= 10"}]"#) @@ -2231,7 +2328,8 @@ mod tests { let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[0]; let parameters = last_commit.operation_parameters.clone().unwrap(); - assert_eq!(parameters["predicate"], json!("target.id = source.id")); + assert!(!parameters.contains_key("predicate")); + assert_eq!(parameters["mergePredicate"], json!("target.id = source.id")); assert_eq!( parameters["notMatchedBySourcePredicates"], json!(r#"[{"actionType":"delete"}]"#) @@ -2295,7 +2393,7 @@ mod tests { let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[0]; let parameters = last_commit.operation_parameters.clone().unwrap(); - assert_eq!(parameters["predicate"], json!("target.id = source.id")); + assert_eq!(parameters["mergePredicate"], json!("target.id = source.id")); assert_eq!( parameters["notMatchedBySourcePredicates"], json!(r#"[{"actionType":"delete","predicate":"target.modified > '2021-02-01'"}]"#) @@ -2390,7 +2488,7 @@ mod tests { #[tokio::test] async fn test_merge_case_sensitive() { let schema = vec![ - StructField::new( + StructField::new( "Id".to_string(), DataType::Primitive(PrimitiveType::String), true, diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 64cb25c0be..7085571b41 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -493,8 +493,7 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> { read_snapshot, this.data.operation.read_predicate(), &this.data.actions, - // TODO allow tainting whole table - false, + this.data.operation.read_whole_table(), )?; let conflict_checker = ConflictChecker::new( transaction_info, diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index 7267529b74..ce57d8ed16 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -380,9 +380,12 @@ pub enum DeltaOperation { /// Merge data with a source data with the following predicate #[serde(rename_all = "camelCase")] Merge { - /// The merge predicate + /// Cleaned merge predicate for conflict checks predicate: Option, + /// The original merge predicate + merge_predicate: Option, + /// Match operations performed matched_predicates: Vec, @@ -541,9 +544,8 @@ impl DeltaOperation { /// Denotes if the operation reads the entire table pub fn read_whole_table(&self) -> bool { match self { - // TODO just adding one operation example, as currently none of the - // implemented operations scan the entire table. - Self::Write { predicate, .. } if predicate.is_none() => false, + // Predicate is none -> Merge operation had to join full source and target + Self::Merge { predicate, .. } if predicate.is_none() => true, _ => false, } } From d9c858ca51a72a5304505b5309f71f3ebf991c03 Mon Sep 17 00:00:00 2001 From: Jonas Schmitz Date: Fri, 15 Mar 2024 19:27:16 +0100 Subject: [PATCH 2/6] Format --- crates/core/src/operations/merge/mod.rs | 49 +++++++++++++------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index d8389ed573..916d227ec3 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -29,7 +29,6 @@ //! ```` use std::collections::HashMap; -use std::ops::Deref; use std::sync::Arc; use std::time::Instant; @@ -856,7 +855,6 @@ fn replace_placeholders(expr: Expr, placeholders: &HashMap) .unwrap() } - async fn try_construct_early_filter( join_predicate: Expr, table_snapshot: &DeltaTableState, @@ -1032,11 +1030,14 @@ async fn execute( &source, &source_name, &target_name, - ).await? + ) + .await? }; let target = match target_subset_filter.as_ref() { - None => { target } - Some(subset_filter) => { LogicalPlan::Filter(Filter::try_new(subset_filter.clone(), target.into())?)} + None => target, + Some(subset_filter) => { + LogicalPlan::Filter(Filter::try_new(subset_filter.clone(), target.into())?) + } }; let source = DataFrame::new(state.clone(), source); @@ -1425,13 +1426,14 @@ async fn execute( // Predicate will be used for conflict detection let commit_predicate = match target_subset_filter { - None => {None} // No predicate means it's a full table merge + None => None, // No predicate means it's a full table merge Some(some_filter) => { let predict_expr = match target_alias { - None => {some_filter} - Some(alias) => {remove_table_alias(some_filter, alias)} + None => some_filter, + Some(alias) => remove_table_alias(some_filter, alias), }; - Some(fmt_expr_to_sql(&predict_expr)?)} + Some(fmt_expr_to_sql(&predict_expr)?) + } }; // Do not make a commit when there are zero updates to the state let operation = DeltaOperation::Merge { @@ -1462,16 +1464,14 @@ async fn execute( fn remove_table_alias(expr: Expr, table_alias: String) -> Expr { expr.transform(&|expr| match expr { - Expr::Column(c) => { - match c.relation { - Some(rel) if rel.table() == table_alias => { - Ok(Transformed::Yes(Expr::Column(Column::new_unqualified(c.name)))) - }, - _ => { - Ok(Transformed::No(Expr::Column(Column::new(c.relation, c.name)))) - } - } - } + Expr::Column(c) => match c.relation { + Some(rel) if rel.table() == table_alias => Ok(Transformed::Yes(Expr::Column( + Column::new_unqualified(c.name), + ))), + _ => Ok(Transformed::No(Expr::Column(Column::new( + c.relation, c.name, + )))), + }, _ => Ok(Transformed::No(expr)), }) .unwrap() @@ -1543,7 +1543,6 @@ impl std::future::IntoFuture for MergeBuilder { } } - #[cfg(test)] mod tests { use crate::kernel::DataType; @@ -2006,7 +2005,6 @@ mod tests { assert_batches_sorted_eq!(&expected, &actual); } - #[tokio::test] async fn test_merge_partition_filtered() { let schema = get_arrow_schema(&None); @@ -2022,7 +2020,7 @@ mod tests { Arc::new(arrow::array::Int32Array::from(vec![10, 20])), Arc::new(arrow::array::StringArray::from(vec![ "2021-02-02", - "2021-02-02" + "2021-02-02", ])), ], ) @@ -2122,7 +2120,10 @@ mod tests { let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[0]; let parameters = last_commit.operation_parameters.clone().unwrap(); - assert_eq!(parameters["predicate"], json!("id = 'C' OR id = 'X' OR id = 'B'")); + assert_eq!( + parameters["predicate"], + json!("id = 'C' OR id = 'X' OR id = 'B'") + ); let expected = vec![ "+-------+------------+----+", @@ -2488,7 +2489,7 @@ mod tests { #[tokio::test] async fn test_merge_case_sensitive() { let schema = vec![ - StructField::new( + StructField::new( "Id".to_string(), DataType::Primitive(PrimitiveType::String), true, From 85f198ec9844d4319bc346ab26cfe6cb8d492d15 Mon Sep 17 00:00:00 2001 From: Jonas Schmitz Date: Fri, 15 Mar 2024 20:35:46 +0100 Subject: [PATCH 3/6] Hardening tests --- crates/core/src/operations/merge/mod.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 916d227ec3..b06cdc4b18 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1988,6 +1988,10 @@ mod tests { let last_commit = &commit_info[0]; let parameters = last_commit.operation_parameters.clone().unwrap(); assert!(!parameters.contains_key("predicate")); + assert_eq!( + parameters["mergePredicate"], + "target.id = source.id AND target.modified = '2021-02-02'" + ); let expected = vec![ "+----+-------+------------+", @@ -2058,6 +2062,10 @@ mod tests { let last_commit = &commit_info[0]; let parameters = last_commit.operation_parameters.clone().unwrap(); assert_eq!(parameters["predicate"], "modified = '2021-02-02'"); + assert_eq!( + parameters["mergePredicate"], + "target.id = source.id AND target.modified = '2021-02-02'" + ); } #[tokio::test] @@ -2473,6 +2481,11 @@ mod tests { assert_eq!(metrics.num_output_rows, 3); assert_eq!(metrics.num_source_rows, 3); + let commit_info = table.history(None).await.unwrap(); + let last_commit = &commit_info[0]; + let parameters = last_commit.operation_parameters.clone().unwrap(); + assert_eq!(parameters["predicate"], json!("modified = '2021-02-02'")); + let expected = vec![ "+----+-------+------------+", "| id | value | modified |", From 60ee37b4ed932aaad9f6edc2d04e74673ea2ff84 Mon Sep 17 00:00:00 2001 From: Jonas Schmitz Date: Sat, 16 Mar 2024 12:19:25 +0100 Subject: [PATCH 4/6] Fix test --- crates/core/src/operations/merge/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index b06cdc4b18..f0adbeb48a 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1576,6 +1576,7 @@ mod tests { use datafusion_expr::LogicalPlanBuilder; use datafusion_expr::Operator; use itertools::Itertools; + use regex::Regex; use serde_json::json; use std::collections::HashMap; use std::ops::Neg; @@ -2128,10 +2129,9 @@ mod tests { let commit_info = table.history(None).await.unwrap(); let last_commit = &commit_info[0]; let parameters = last_commit.operation_parameters.clone().unwrap(); - assert_eq!( - parameters["predicate"], - json!("id = 'C' OR id = 'X' OR id = 'B'") - ); + let predicate = parameters["predicate"].as_str().unwrap(); + let re = Regex::new(r"^id = '(C|X|B)' OR id = '(C|X|B)' OR id = '(C|X|B)'$").unwrap(); + assert!(re.is_match(predicate)); let expected = vec![ "+-------+------------+----+", From c20e50500817fe57396a742779c8c7e8cd1710dc Mon Sep 17 00:00:00 2001 From: Jonas Schmitz Date: Sat, 16 Mar 2024 21:42:24 +0100 Subject: [PATCH 5/6] Add concurrent merge tests --- crates/core/src/operations/merge/mod.rs | 30 ++- crates/core/tests/command_merge.rs | 248 ++++++++++++++++++++++++ 2 files changed, 274 insertions(+), 4 deletions(-) create mode 100644 crates/core/tests/command_merge.rs diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index f0adbeb48a..c690f1970e 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -866,10 +866,6 @@ async fn try_construct_early_filter( let table_metadata = table_snapshot.metadata(); let partition_columns = &table_metadata.partition_columns; - if partition_columns.is_empty() { - return Ok(None); - } - let mut placeholders = HashMap::default(); match generalize_filter( @@ -2724,6 +2720,32 @@ mod tests { assert_eq!(generalized, expected_filter); } + + #[tokio::test] + async fn test_generalize_filter_keeps_only_static_target_references() { + let source = TableReference::parse_str("source"); + let target = TableReference::parse_str("target"); + + let parsed_filter = col(Column::new(source.clone().into(), "id")) + .eq(col(Column::new(target.clone().into(), "id"))) + .and(col(Column::new(target.clone().into(), "id")).eq(lit("C"))); + + let mut placeholders = HashMap::default(); + + let generalized = generalize_filter( + parsed_filter, + &vec!["other".to_owned()], + &source, + &target, + &mut placeholders, + ) + .unwrap(); + + let expected_filter = col(Column::new(target.clone().into(), "id")).eq(lit("C")); + + assert_eq!(generalized, expected_filter); + } + #[tokio::test] async fn test_generalize_filter_removes_source_references() { let source = TableReference::parse_str("source"); diff --git a/crates/core/tests/command_merge.rs b/crates/core/tests/command_merge.rs new file mode 100644 index 0000000000..2a4907d982 --- /dev/null +++ b/crates/core/tests/command_merge.rs @@ -0,0 +1,248 @@ +#![allow(dead_code)] +mod fs_common; + +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use datafusion::dataframe::DataFrame; +use datafusion::prelude::SessionContext; +use datafusion_expr::{col, Expr, lit}; +use deltalake_core::kernel::{DataType as DeltaDataType, PrimitiveType, StructField, StructType}; +use deltalake_core::operations::merge::MergeMetrics; +use deltalake_core::operations::transaction::TransactionError; +use deltalake_core::protocol::SaveMode; +use deltalake_core::{open_table, DeltaOps, DeltaResult, DeltaTable, DeltaTableError}; +use std::sync::Arc; +use datafusion_common::Column; + +async fn create_table(table_uri: &String, partition: Option>) -> DeltaTable { + let table_schema = get_delta_schema(); + let ops = DeltaOps::try_from_uri(table_uri.clone()).await.unwrap(); + let table = ops + .create() + .with_columns(table_schema.fields().clone()) + .with_partition_columns(partition.unwrap_or_default()) + .await + .expect("Failed to create table"); + + let schema = get_arrow_schema(); + return write_data(table, &schema).await; +} + +fn get_delta_schema() -> StructType { + StructType::new(vec![ + StructField::new( + "id".to_string(), + DeltaDataType::Primitive(PrimitiveType::String), + true, + ), + StructField::new( + "value".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "event_date".to_string(), + DeltaDataType::Primitive(PrimitiveType::String), + true, + ), + ]) +} + +fn get_arrow_schema() -> Arc { + return Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + Field::new("event_date", DataType::Utf8, true), + ])); +} + +async fn write_data(table: DeltaTable, schema: &Arc) -> DeltaTable { + let batch = RecordBatch::try_new( + Arc::clone(schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])), + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-01", + "2021-02-01", + "2021-02-02", + "2021-02-02", + ])), + ], + ) + .unwrap(); + // write some data + DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::Append) + .await + .unwrap() +} + +fn create_test_data() -> (DataFrame, DataFrame) { + let schema = get_arrow_schema(); + let ctx = SessionContext::new(); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["C", "D"])), + Arc::new(arrow::array::Int32Array::from(vec![10, 20])), + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-02", + "2021-02-02", + ])), + ], + ) + .unwrap(); + let df1 = ctx.read_batch(batch).unwrap(); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(arrow::array::StringArray::from(vec!["E", "F"])), + Arc::new(arrow::array::Int32Array::from(vec![10, 20])), + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-03", + "2021-02-03", + ])), + ], + ) + .unwrap(); + let df2 = ctx.read_batch(batch).unwrap(); + return (df1, df2); +} + +async fn merge( + table: DeltaTable, + df: DataFrame, + predicate: Expr, +) -> DeltaResult<(DeltaTable, MergeMetrics)> { + return DeltaOps(table) + .merge(df, predicate) + .with_source_alias("source") + .with_target_alias("target") + .when_matched_update(|update| { + update + .update("value", col("source.value")) + .update("event_date", col("source.event_date")) + }) + .unwrap() + .when_not_matched_insert(|insert| { + insert + .set("id", col("source.id")) + .set("value", col("source.value")) + .set("event_date", col("source.event_date")) + }) + .unwrap() + .await; +} + +#[tokio::test] +async fn test_merge_concurrent_conflict() { + // No partition key or filter predicate -> Commit conflict + let tmp_dir = tempfile::tempdir().unwrap(); + let table_uri = tmp_dir.path().to_str().to_owned().unwrap(); + + let table_ref1 = create_table(&table_uri.to_string(), Some(vec!["event_date"])).await; + let table_ref2 = open_table(table_uri).await.unwrap(); + let (df1, df2) = create_test_data(); + + let expr = col("target.id").eq(col("source.id")); + let (_table_ref1, _metrics) = merge(table_ref1, df1, expr.clone()).await.unwrap(); + let result = merge(table_ref2, df2, expr).await; + + assert!(matches!( + result.as_ref().unwrap_err(), + DeltaTableError::Transaction { .. } + )); + if let DeltaTableError::Transaction { source } = result.unwrap_err() { + assert!(matches!(source, TransactionError::CommitConflict(_))); + } +} + +#[tokio::test] +async fn test_merge_concurrent_different_partition() { + // partition key in predicate -> Successful merge + let tmp_dir = tempfile::tempdir().unwrap(); + let table_uri = tmp_dir.path().to_str().to_owned().unwrap(); + + let table_ref1 = create_table(&table_uri.to_string(), Some(vec!["event_date"])).await; + let table_ref2 = open_table(table_uri).await.unwrap(); + let (df1, df2) = create_test_data(); + + let expr = col("target.id") + .eq(col("source.id")) + .and(col("target.event_date").eq(col("source.event_date"))); + let (_table_ref1, _metrics) = merge(table_ref1, df1, expr.clone()).await.unwrap(); + let result = merge(table_ref2, df2, expr).await; + + // TODO: Currently it throws a Version mismatch error, but the merge commit was successfully + // This bug needs to be fixed, see pull request #2280 + assert!(!matches!( + result.as_ref().unwrap_err(), + DeltaTableError::Transaction { .. } + )); + assert!(matches!( + result.as_ref().unwrap_err(), + DeltaTableError::Generic(_) + )); + if let DeltaTableError::Generic(msg) = result.unwrap_err() { + assert_eq!(msg, "Version mismatch"); + } +} + + +#[tokio::test] +async fn test_merge_concurrent_no_overlapping_files() { + // predicate contains filter and files are not overlapping -> No conflict + let tmp_dir = tempfile::tempdir().unwrap(); + let table_uri = tmp_dir.path().to_str().to_owned().unwrap(); + + let table_ref1 = create_table(&table_uri.to_string(), None).await; + let table_ref2 = open_table(table_uri).await.unwrap(); + let (df1, df2) = create_test_data(); + + let expr = col("target.id") + .eq(col("source.id")); + let (_table_ref1, _metrics) = merge(table_ref1, df2, expr.clone().and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-03")))).await.unwrap(); + let result = merge(table_ref2, df1, expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02")))).await; + + // TODO: Currently it throws a Version mismatch error, but the merge commit was successfully + // This bug needs to be fixed, see pull request #2280 + assert!(!matches!( + result.as_ref().unwrap_err(), + DeltaTableError::Transaction { .. } + )); + assert!(matches!( + result.as_ref().unwrap_err(), + DeltaTableError::Generic(_) + )); + if let DeltaTableError::Generic(msg) = result.unwrap_err() { + assert_eq!(msg, "Version mismatch"); + } +} + + +#[tokio::test] +async fn test_merge_concurrent_with_overlapping_files() { + // predicate contains filter and files are overlapping -> Commit conflict + let tmp_dir = tempfile::tempdir().unwrap(); + let table_uri = tmp_dir.path().to_str().to_owned().unwrap(); + + let table_ref1 = create_table(&table_uri.to_string(), None).await; + let table_ref2 = open_table(table_uri).await.unwrap(); + let (df1, _df2) = create_test_data(); + + let expr = col("target.id") + .eq(col("source.id")); + let (_table_ref1, _metrics) = merge(table_ref1, df1.clone(), expr.clone().and(col(Column::from_qualified_name("target.event_date")).lt_eq(lit("2021-02-02")))).await.unwrap(); + let result = merge(table_ref2, df1, expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02")))).await; + + + assert!(matches!( + result.as_ref().unwrap_err(), + DeltaTableError::Transaction { .. } + )); + if let DeltaTableError::Transaction { source } = result.unwrap_err() { + assert!(matches!(source, TransactionError::CommitConflict(_))); + } +} \ No newline at end of file From 227dcd329c7efddc1a53204dd209d38ac483c494 Mon Sep 17 00:00:00 2001 From: Jonas Schmitz Date: Sat, 16 Mar 2024 21:55:40 +0100 Subject: [PATCH 6/6] Format code --- crates/core/src/operations/merge/mod.rs | 1 - crates/core/tests/command_merge.rs | 47 +++++++++++++++++-------- 2 files changed, 33 insertions(+), 15 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index c690f1970e..70a7217c68 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -2720,7 +2720,6 @@ mod tests { assert_eq!(generalized, expected_filter); } - #[tokio::test] async fn test_generalize_filter_keeps_only_static_target_references() { let source = TableReference::parse_str("source"); diff --git a/crates/core/tests/command_merge.rs b/crates/core/tests/command_merge.rs index 2a4907d982..9ec257316d 100644 --- a/crates/core/tests/command_merge.rs +++ b/crates/core/tests/command_merge.rs @@ -5,14 +5,14 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use datafusion::dataframe::DataFrame; use datafusion::prelude::SessionContext; -use datafusion_expr::{col, Expr, lit}; +use datafusion_common::Column; +use datafusion_expr::{col, lit, Expr}; use deltalake_core::kernel::{DataType as DeltaDataType, PrimitiveType, StructField, StructType}; use deltalake_core::operations::merge::MergeMetrics; use deltalake_core::operations::transaction::TransactionError; use deltalake_core::protocol::SaveMode; use deltalake_core::{open_table, DeltaOps, DeltaResult, DeltaTable, DeltaTableError}; use std::sync::Arc; -use datafusion_common::Column; async fn create_table(table_uri: &String, partition: Option>) -> DeltaTable { let table_schema = get_delta_schema(); @@ -190,7 +190,6 @@ async fn test_merge_concurrent_different_partition() { } } - #[tokio::test] async fn test_merge_concurrent_no_overlapping_files() { // predicate contains filter and files are not overlapping -> No conflict @@ -201,10 +200,21 @@ async fn test_merge_concurrent_no_overlapping_files() { let table_ref2 = open_table(table_uri).await.unwrap(); let (df1, df2) = create_test_data(); - let expr = col("target.id") - .eq(col("source.id")); - let (_table_ref1, _metrics) = merge(table_ref1, df2, expr.clone().and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-03")))).await.unwrap(); - let result = merge(table_ref2, df1, expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02")))).await; + let expr = col("target.id").eq(col("source.id")); + let (_table_ref1, _metrics) = merge( + table_ref1, + df2, + expr.clone() + .and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-03"))), + ) + .await + .unwrap(); + let result = merge( + table_ref2, + df1, + expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02"))), + ) + .await; // TODO: Currently it throws a Version mismatch error, but the merge commit was successfully // This bug needs to be fixed, see pull request #2280 @@ -221,7 +231,6 @@ async fn test_merge_concurrent_no_overlapping_files() { } } - #[tokio::test] async fn test_merge_concurrent_with_overlapping_files() { // predicate contains filter and files are overlapping -> Commit conflict @@ -232,11 +241,21 @@ async fn test_merge_concurrent_with_overlapping_files() { let table_ref2 = open_table(table_uri).await.unwrap(); let (df1, _df2) = create_test_data(); - let expr = col("target.id") - .eq(col("source.id")); - let (_table_ref1, _metrics) = merge(table_ref1, df1.clone(), expr.clone().and(col(Column::from_qualified_name("target.event_date")).lt_eq(lit("2021-02-02")))).await.unwrap(); - let result = merge(table_ref2, df1, expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02")))).await; - + let expr = col("target.id").eq(col("source.id")); + let (_table_ref1, _metrics) = merge( + table_ref1, + df1.clone(), + expr.clone() + .and(col(Column::from_qualified_name("target.event_date")).lt_eq(lit("2021-02-02"))), + ) + .await + .unwrap(); + let result = merge( + table_ref2, + df1, + expr.and(col(Column::from_qualified_name("target.event_date")).eq(lit("2021-02-02"))), + ) + .await; assert!(matches!( result.as_ref().unwrap_err(), @@ -245,4 +264,4 @@ async fn test_merge_concurrent_with_overlapping_files() { if let DeltaTableError::Transaction { source } = result.unwrap_err() { assert!(matches!(source, TransactionError::CommitConflict(_))); } -} \ No newline at end of file +}