From 39a421c1d254193422aa4e2a9326a06b01f1fd31 Mon Sep 17 00:00:00 2001 From: Shreyaskr1409 Date: Tue, 25 Mar 2025 01:23:54 +0530 Subject: [PATCH 1/5] Migrated tests to insta in sorts/partial_sort.rs --- .../physical-plan/src/sorts/partial_sort.rs | 116 +++++++++--------- 1 file changed, 59 insertions(+), 57 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 5277a50b85ca..320fa21c8665 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -467,11 +467,12 @@ mod tests { use arrow::array::*; use arrow::compute::SortOptions; use arrow::datatypes::*; + use datafusion_common::test_util::batches_to_string; use futures::FutureExt; + use insta::allow_duplicates; + use insta::assert_snapshot; use itertools::Itertools; - use datafusion_common::assert_batches_eq; - use crate::collect; use crate::expressions::col; use crate::expressions::PhysicalSortExpr; @@ -522,20 +523,21 @@ mod tests { let result = collect(partial_sort_exec, Arc::clone(&task_ctx)).await?; - let expected_after_sort = [ - "+---+---+---+", - "| a | b | c |", - "+---+---+---+", - "| 0 | 1 | 0 |", - "| 0 | 1 | 1 |", - "| 0 | 2 | 5 |", - "| 1 | 2 | 4 |", - "| 1 | 3 | 2 |", - "| 1 | 3 | 3 |", - "+---+---+---+", - ]; assert_eq!(2, result.len()); - assert_batches_eq!(expected_after_sort, &result); + allow_duplicates! { + assert_snapshot!(batches_to_string(&result), @r#" + +---+---+---+ + | a | b | c | + +---+---+---+ + | 0 | 1 | 0 | + | 0 | 1 | 1 | + | 0 | 2 | 5 | + | 1 | 2 | 4 | + | 1 | 3 | 2 | + | 1 | 3 | 3 | + +---+---+---+ + "#); + } assert_eq!( task_ctx.runtime_env().memory_pool.reserved(), 0, @@ -588,18 +590,19 @@ mod tests { let result = collect(partial_sort_exec, Arc::clone(&task_ctx)).await?; - let expected_after_sort = [ - "+---+---+---+", - "| a | b | c |", - "+---+---+---+", - "| 0 | 1 | 4 |", - "| 0 | 2 | 3 |", - "| 1 | 2 | 2 |", - "| 1 | 3 | 0 |", - "+---+---+---+", - ]; assert_eq!(2, result.len()); - assert_batches_eq!(expected_after_sort, &result); + allow_duplicates! { + assert_snapshot!(batches_to_string(&result), @r#" + +---+---+---+ + | a | b | c | + +---+---+---+ + | 0 | 1 | 4 | + | 0 | 2 | 3 | + | 1 | 2 | 2 | + | 1 | 3 | 0 | + +---+---+---+ + "#); + } assert_eq!( task_ctx.runtime_env().memory_pool.reserved(), 0, @@ -663,21 +666,22 @@ mod tests { 0, "The sort should have returned all memory used back to the memory manager" ); - let expected = [ - "+---+---+---+", - "| a | b | c |", - "+---+---+---+", - "| 0 | 1 | 6 |", - "| 0 | 1 | 7 |", - "| 0 | 3 | 4 |", - "| 0 | 3 | 5 |", - "| 1 | 2 | 0 |", - "| 1 | 2 | 1 |", - "| 1 | 4 | 2 |", - "| 1 | 4 | 3 |", - "+---+---+---+", - ]; - assert_batches_eq!(expected, &result); + allow_duplicates! { + assert_snapshot!(batches_to_string(&result), @r#" + +---+---+---+ + | a | b | c | + +---+---+---+ + | 0 | 1 | 6 | + | 0 | 1 | 7 | + | 0 | 3 | 4 | + | 0 | 3 | 5 | + | 1 | 2 | 0 | + | 1 | 2 | 1 | + | 1 | 4 | 2 | + | 1 | 4 | 3 | + +---+---+---+ + "#); + } } Ok(()) } @@ -1000,21 +1004,6 @@ mod tests { 2, )); - let expected = [ - "+-----+------+-------+", - "| a | b | c |", - "+-----+------+-------+", - "| 1.0 | 20.0 | 20.0 |", - "| 1.0 | 20.0 | 10.0 |", - "| 1.0 | 40.0 | 10.0 |", - "| 2.0 | 40.0 | 100.0 |", - "| 2.0 | NaN | NaN |", - "| 3.0 | | |", - "| 3.0 | | 100.0 |", - "| 3.0 | NaN | NaN |", - "+-----+------+-------+", - ]; - assert_eq!( DataType::Float32, *partial_sort_exec.schema().field(0).data_type() @@ -1033,7 +1022,20 @@ mod tests { task_ctx, ) .await?; - assert_batches_eq!(expected, &result); + assert_snapshot!(batches_to_string(&result), @r#" + +-----+------+-------+ + | a | b | c | + +-----+------+-------+ + | 1.0 | 20.0 | 20.0 | + | 1.0 | 20.0 | 10.0 | + | 1.0 | 40.0 | 10.0 | + | 2.0 | 40.0 | 100.0 | + | 2.0 | NaN | NaN | + | 3.0 | | | + | 3.0 | | 100.0 | + | 3.0 | NaN | NaN | + +-----+------+-------+ + "#); assert_eq!(result.len(), 2); let metrics = partial_sort_exec.metrics().unwrap(); assert!(metrics.elapsed_compute().unwrap() > 0); From f0442b8290c796e7b611fe19d6cb666daf4ae0e2 Mon Sep 17 00:00:00 2001 From: Shreyaskr1409 Date: Tue, 25 Mar 2025 01:34:04 +0530 Subject: [PATCH 2/5] Migrated tests to insta in sorts/sort.rs and sorts/sort_preserving_merge.rs --- datafusion/physical-plan/src/sorts/sort.rs | 35 ++-- .../src/sorts/sort_preserving_merge.rs | 157 ++++++++---------- 2 files changed, 91 insertions(+), 101 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index e2d665e1d814..731e24b53c60 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1244,7 +1244,8 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::*; use datafusion_common::cast::as_primitive_array; - use datafusion_common::{assert_batches_eq, Result, ScalarValue}; + use datafusion_common::test_util::batches_to_string; + use datafusion_common::{Result, ScalarValue}; use datafusion_execution::config::SessionConfig; use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_execution::RecordBatchStream; @@ -1252,6 +1253,7 @@ mod tests { use datafusion_physical_expr::EquivalenceProperties; use futures::{FutureExt, Stream}; + use insta::assert_snapshot; #[derive(Debug, Clone)] pub struct SortedUnboundedExec { @@ -1913,22 +1915,21 @@ mod tests { plan = plan.with_fetch(Some(9)); let batches = collect(Arc::new(plan), task_ctx).await?; - #[rustfmt::skip] - let expected = [ - "+----+", - "| c1 |", - "+----+", - "| 0 |", - "| 1 |", - "| 2 |", - "| 3 |", - "| 4 |", - "| 5 |", - "| 6 |", - "| 7 |", - "| 8 |", - "+----+",]; - assert_batches_eq!(expected, &batches); + assert_snapshot!(batches_to_string(&batches), @r#" + +----+ + | c1 | + +----+ + | 0 | + | 1 | + | 2 | + | 3 | + | 4 | + | 5 | + | 6 | + | 7 | + | 8 | + +----+ + "#); Ok(()) } } diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index ca06a029e8db..f2fcc37d9a02 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -412,6 +412,7 @@ mod tests { }; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion_common::test_util::batches_to_string; use datafusion_common::{assert_batches_eq, assert_contains, DataFusionError}; use datafusion_common_runtime::SpawnedTask; use datafusion_execution::config::SessionConfig; @@ -423,6 +424,7 @@ mod tests { use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use futures::{FutureExt, Stream, StreamExt}; + use insta::assert_snapshot; use tokio::time::timeout; // The number in the function is highly related to the memory limit we are testing @@ -992,25 +994,22 @@ mod tests { let collected = collect(merge, task_ctx).await.unwrap(); assert_eq!(collected.len(), 1); - assert_batches_eq!( - &[ - "+---+---+-------------------------------+", - "| a | b | c |", - "+---+---+-------------------------------+", - "| 1 | | 1970-01-01T00:00:00.000000008 |", - "| 1 | | 1970-01-01T00:00:00.000000008 |", - "| 2 | a | |", - "| 7 | b | 1970-01-01T00:00:00.000000006 |", - "| 2 | b | |", - "| 9 | d | |", - "| 3 | e | 1970-01-01T00:00:00.000000004 |", - "| 3 | g | 1970-01-01T00:00:00.000000005 |", - "| 4 | h | |", - "| 5 | i | 1970-01-01T00:00:00.000000004 |", - "+---+---+-------------------------------+", - ], - collected.as_slice() - ); + assert_snapshot!(batches_to_string(&collected.as_slice()), @r#" + +---+---+-------------------------------+ + | a | b | c | + +---+---+-------------------------------+ + | 1 | | 1970-01-01T00:00:00.000000008 | + | 1 | | 1970-01-01T00:00:00.000000008 | + | 2 | a | | + | 7 | b | 1970-01-01T00:00:00.000000006 | + | 2 | b | | + | 9 | d | | + | 3 | e | 1970-01-01T00:00:00.000000004 | + | 3 | g | 1970-01-01T00:00:00.000000005 | + | 4 | h | | + | 5 | i | 1970-01-01T00:00:00.000000004 | + +---+---+-------------------------------+ + "#); } #[tokio::test] @@ -1035,17 +1034,14 @@ mod tests { let collected = collect(merge, task_ctx).await.unwrap(); assert_eq!(collected.len(), 1); - assert_batches_eq!( - &[ - "+---+---+", - "| a | b |", - "+---+---+", - "| 1 | a |", - "| 2 | b |", - "+---+---+", - ], - collected.as_slice() - ); + assert_snapshot!(batches_to_string(&collected.as_slice()), @r#" + +---+---+ + | a | b | + +---+---+ + | 1 | a | + | 2 | b | + +---+---+ + "#); } #[tokio::test] @@ -1069,20 +1065,17 @@ mod tests { let collected = collect(merge, task_ctx).await.unwrap(); assert_eq!(collected.len(), 1); - assert_batches_eq!( - &[ - "+---+---+", - "| a | b |", - "+---+---+", - "| 1 | a |", - "| 2 | b |", - "| 7 | c |", - "| 9 | d |", - "| 3 | e |", - "+---+---+", - ], - collected.as_slice() - ); + assert_snapshot!(batches_to_string(&collected.as_slice()), @r#" + +---+---+ + | a | b | + +---+---+ + | 1 | a | + | 2 | b | + | 7 | c | + | 9 | d | + | 3 | e | + +---+---+ + "#); } #[tokio::test] @@ -1179,17 +1172,16 @@ mod tests { let collected = collect(Arc::clone(&merge) as Arc, task_ctx) .await .unwrap(); - let expected = [ - "+----+---+", - "| a | b |", - "+----+---+", - "| 1 | a |", - "| 10 | b |", - "| 2 | c |", - "| 20 | d |", - "+----+---+", - ]; - assert_batches_eq!(expected, collected.as_slice()); + assert_snapshot!(batches_to_string(&collected.as_slice()), @r#" + +----+---+ + | a | b | + +----+---+ + | 1 | a | + | 10 | b | + | 2 | c | + | 20 | d | + +----+---+ + "#); // Now, validate metrics let metrics = merge.metrics().unwrap(); @@ -1293,35 +1285,32 @@ mod tests { // Expect the data to be sorted first by "batch_number" (because // that was the order it was fed in, even though only "value" // is in the sort key) - assert_batches_eq!( - &[ - "+--------------+-------+", - "| batch_number | value |", - "+--------------+-------+", - "| 0 | A |", - "| 1 | A |", - "| 2 | A |", - "| 3 | A |", - "| 4 | A |", - "| 5 | A |", - "| 6 | A |", - "| 7 | A |", - "| 8 | A |", - "| 9 | A |", - "| 0 | B |", - "| 1 | B |", - "| 2 | B |", - "| 3 | B |", - "| 4 | B |", - "| 5 | B |", - "| 6 | B |", - "| 7 | B |", - "| 8 | B |", - "| 9 | B |", - "+--------------+-------+", - ], - collected.as_slice() - ); + assert_snapshot!(batches_to_string(&collected.as_slice()), @r#" + +--------------+-------+ + | batch_number | value | + +--------------+-------+ + | 0 | A | + | 1 | A | + | 2 | A | + | 3 | A | + | 4 | A | + | 5 | A | + | 6 | A | + | 7 | A | + | 8 | A | + | 9 | A | + | 0 | B | + | 1 | B | + | 2 | B | + | 3 | B | + | 4 | B | + | 5 | B | + | 6 | B | + | 7 | B | + | 8 | B | + | 9 | B | + +--------------+-------+ + "#); } /// It returns pending for the 2nd partition until the 3rd partition is polled. The 1st From 5bb219e5f608062a2d4c4596438fe9288bbbd08e Mon Sep 17 00:00:00 2001 From: Shreyaskr1409 Date: Tue, 25 Mar 2025 01:39:08 +0530 Subject: [PATCH 3/5] Migrated tests to insta in windows/bounded_window_agg_exec.rs --- .../src/windows/bounded_window_agg_exec.rs | 62 +++++++++---------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index f9f4b78686db..ad4a3ddad8fe 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1222,6 +1222,7 @@ mod tests { }; use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; + use datafusion_common::test_util::batches_to_string; use datafusion_common::{ assert_batches_eq, exec_datafusion_err, Result, ScalarValue, }; @@ -1241,6 +1242,7 @@ mod tests { use futures::future::Shared; use futures::{pin_mut, ready, FutureExt, Stream, StreamExt}; + use insta::assert_snapshot; use itertools::Itertools; use tokio::time::timeout; @@ -1664,22 +1666,21 @@ mod tests { "\n**Optimized Plan Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n" ); - let expected = [ - "+---+------+---------------+---------------+", - "| a | last | nth_value(-1) | nth_value(-2) |", - "+---+------+---------------+---------------+", - "| 1 | 1 | 1 | |", - "| 2 | 2 | 2 | 1 |", - "| 3 | 3 | 3 | 2 |", - "| 1 | 1 | 1 | 3 |", - "| 2 | 2 | 2 | 1 |", - "| 3 | 3 | 3 | 2 |", - "| 1 | 1 | 1 | 3 |", - "| 2 | 2 | 2 | 1 |", - "| 3 | 3 | 3 | 2 |", - "+---+------+---------------+---------------+", - ]; - assert_batches_eq!(expected, &batches); + assert_snapshot!(batches_to_string(&batches), @r#" + +---+------+---------------+---------------+ + | a | last | nth_value(-1) | nth_value(-2) | + +---+------+---------------+---------------+ + | 1 | 1 | 1 | | + | 2 | 2 | 2 | 1 | + | 3 | 3 | 3 | 2 | + | 1 | 1 | 1 | 3 | + | 2 | 2 | 2 | 1 | + | 3 | 3 | 3 | 2 | + | 1 | 1 | 1 | 3 | + | 2 | 2 | 2 | 1 | + | 3 | 3 | 3 | 2 | + +---+------+---------------+---------------+ + "#); Ok(()) } @@ -1792,21 +1793,20 @@ mod tests { let task_ctx = task_context(); let batches = collect_with_timeout(plan, task_ctx, timeout_duration).await?; - let expected = [ - "+----+------+-------+", - "| sn | hash | col_2 |", - "+----+------+-------+", - "| 0 | 2 | 2 |", - "| 1 | 2 | 2 |", - "| 2 | 2 | 2 |", - "| 3 | 2 | 1 |", - "| 4 | 1 | 2 |", - "| 5 | 1 | 2 |", - "| 6 | 1 | 2 |", - "| 7 | 1 | 1 |", - "+----+------+-------+", - ]; - assert_batches_eq!(expected, &batches); + assert_snapshot!(batches_to_string(&batches), @r#" + +----+------+-------+ + | sn | hash | col_2 | + +----+------+-------+ + | 0 | 2 | 2 | + | 1 | 2 | 2 | + | 2 | 2 | 2 | + | 3 | 2 | 1 | + | 4 | 1 | 2 | + | 5 | 1 | 2 | + | 6 | 1 | 2 | + | 7 | 1 | 1 | + +----+------+-------+ + "#); Ok(()) } From d1c2ff9207bdac2d15927772e47fe9f25a7e7b5e Mon Sep 17 00:00:00 2001 From: Shreyaskr1409 Date: Tue, 25 Mar 2025 01:41:51 +0530 Subject: [PATCH 4/5] Removed unused imports --- datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index ad4a3ddad8fe..527ba62a73c5 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1224,7 +1224,7 @@ mod tests { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::test_util::batches_to_string; use datafusion_common::{ - assert_batches_eq, exec_datafusion_err, Result, ScalarValue, + exec_datafusion_err, Result, ScalarValue, }; use datafusion_execution::config::SessionConfig; use datafusion_execution::{ From 184d119d26111890a692180112df33ef34e7a729 Mon Sep 17 00:00:00 2001 From: Shreyaskr1409 Date: Tue, 25 Mar 2025 02:15:51 +0530 Subject: [PATCH 5/5] Fixes for failing tests --- .../physical-plan/src/sorts/sort_preserving_merge.rs | 10 +++++----- .../src/windows/bounded_window_agg_exec.rs | 4 +--- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs index f2fcc37d9a02..b987dff36441 100644 --- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs +++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs @@ -994,7 +994,7 @@ mod tests { let collected = collect(merge, task_ctx).await.unwrap(); assert_eq!(collected.len(), 1); - assert_snapshot!(batches_to_string(&collected.as_slice()), @r#" + assert_snapshot!(batches_to_string(collected.as_slice()), @r#" +---+---+-------------------------------+ | a | b | c | +---+---+-------------------------------+ @@ -1034,7 +1034,7 @@ mod tests { let collected = collect(merge, task_ctx).await.unwrap(); assert_eq!(collected.len(), 1); - assert_snapshot!(batches_to_string(&collected.as_slice()), @r#" + assert_snapshot!(batches_to_string(collected.as_slice()), @r#" +---+---+ | a | b | +---+---+ @@ -1065,7 +1065,7 @@ mod tests { let collected = collect(merge, task_ctx).await.unwrap(); assert_eq!(collected.len(), 1); - assert_snapshot!(batches_to_string(&collected.as_slice()), @r#" + assert_snapshot!(batches_to_string(collected.as_slice()), @r#" +---+---+ | a | b | +---+---+ @@ -1172,7 +1172,7 @@ mod tests { let collected = collect(Arc::clone(&merge) as Arc, task_ctx) .await .unwrap(); - assert_snapshot!(batches_to_string(&collected.as_slice()), @r#" + assert_snapshot!(batches_to_string(collected.as_slice()), @r#" +----+---+ | a | b | +----+---+ @@ -1285,7 +1285,7 @@ mod tests { // Expect the data to be sorted first by "batch_number" (because // that was the order it was fed in, even though only "value" // is in the sort key) - assert_snapshot!(batches_to_string(&collected.as_slice()), @r#" + assert_snapshot!(batches_to_string(collected.as_slice()), @r#" +--------------+-------+ | batch_number | value | +--------------+-------+ diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 527ba62a73c5..92138bf6a7a1 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -1223,9 +1223,7 @@ mod tests { use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::test_util::batches_to_string; - use datafusion_common::{ - exec_datafusion_err, Result, ScalarValue, - }; + use datafusion_common::{exec_datafusion_err, Result, ScalarValue}; use datafusion_execution::config::SessionConfig; use datafusion_execution::{ RecordBatchStream, SendableRecordBatchStream, TaskContext,