Skip to content

Commit

Permalink
Add TC for delta-io#2158
Browse files Browse the repository at this point in the history
  • Loading branch information
Blajda committed Mar 23, 2024
1 parent dbae5c5 commit 9e15b1a
Showing 1 changed file with 98 additions and 0 deletions.
98 changes: 98 additions & 0 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2738,4 +2738,102 @@ mod tests {

assert_eq!(split_pred, expected_pred_parts);
}

#[tokio::test]
async fn test_merge_pushdowns() {
//See #2158
let schema = vec![
StructField::new(
"id".to_string(),
DataType::Primitive(PrimitiveType::String),
true,
),
StructField::new(
"cost".to_string(),
DataType::Primitive(PrimitiveType::Float),
true,
),
StructField::new(
"month".to_string(),
DataType::Primitive(PrimitiveType::String),
true,
),
];

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", ArrowDataType::Utf8, true),
Field::new("cost", ArrowDataType::Float32, true),
Field::new("month", ArrowDataType::Utf8, true),
]));

let table = DeltaOps::new_in_memory()
.create()
.with_columns(schema)
.await
.unwrap();

let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::clone(&arrow_schema.clone()),
vec![
Arc::new(arrow::array::StringArray::from(vec!["A", "B"])),
Arc::new(arrow::array::Float32Array::from(vec![Some(10.15), None])),
Arc::new(arrow::array::StringArray::from(vec![
"2023-07-04",
"2023-07-04",
])),
],
)
.unwrap();

let table = DeltaOps(table)
.write(vec![batch.clone()])
.with_save_mode(SaveMode::Append)
.await
.unwrap();
assert_eq!(table.version(), 1);
assert_eq!(table.get_files_count(), 1);

let batch = RecordBatch::try_new(
Arc::clone(&arrow_schema.clone()),
vec![
Arc::new(arrow::array::StringArray::from(vec!["A", "B"])),
Arc::new(arrow::array::Float32Array::from(vec![
Some(12.15),
Some(11.15),
])),
Arc::new(arrow::array::StringArray::from(vec![
"2023-07-04",
"2023-07-04",
])),
],
)
.unwrap();
let source = ctx.read_batch(batch).unwrap();

let (table, _metrics) = DeltaOps(table)
.merge(source, "target.id = source.id and target.cost is null")
.with_source_alias("source")
.with_target_alias("target")
.when_matched_update(|insert| {
insert
.update("id", "target.id")
.update("cost", "source.cost")
.update("month", "target.month")
})
.unwrap()
.await
.unwrap();

let expected = vec![
"+----+-------+------------+",
"| id | cost | month |",
"+----+-------+------------+",
"| A | 10.15 | 2023-07-04 |",
"| B | 11.15 | 2023-07-04 |",
"+----+-------+------------+",
];
let actual = get_data(&table).await;
assert_batches_sorted_eq!(&expected, &actual);
}
}

0 comments on commit 9e15b1a

Please sign in to comment.