diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index 85d826109f89..f22a896c1894 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -372,46 +372,91 @@ macro_rules! plans_matches_expected { } } +fn test_suite_default_config_options() -> ConfigOptions { + let mut config = ConfigOptions::new(); + + // By default, will not repartition / resort data if it is already sorted. + config.optimizer.prefer_existing_sort = false; + + // By default, will attempt to convert Union to Interleave. + config.optimizer.prefer_existing_union = false; + + // By default, will not repartition file scans. + config.optimizer.repartition_file_scans = false; + config.optimizer.repartition_file_min_size = 1024; + + // By default, set query execution concurrency to 10. + config.execution.target_partitions = 10; + + // Use a small batch size, to trigger RoundRobin in tests + config.execution.batch_size = 1; + + config +} + +/// How the optimizers are run. +#[derive(PartialEq, Clone)] +enum DoFirst { + /// Runs: (EnforceDistribution, EnforceDistribution, EnforceSorting) + Distribution, + /// Runs: (EnforceSorting, EnforceDistribution, EnforceDistribution) + Sorting, +} + +#[derive(Clone)] +struct TestConfig { + config: ConfigOptions, + optimizers_to_run: DoFirst, +} + +impl TestConfig { + fn new(optimizers_to_run: DoFirst) -> Self { + Self { + config: test_suite_default_config_options(), + optimizers_to_run, + } + } + + /// If preferred, will not repartition / resort data if it is already sorted. + fn with_prefer_existing_sort(mut self) -> Self { + self.config.optimizer.prefer_existing_sort = true; + self + } + + /// If preferred, will not attempt to convert Union to Interleave. + fn with_prefer_existing_union(mut self) -> Self { + self.config.optimizer.prefer_existing_union = true; + self + } + + /// If preferred, will repartition file scans. + /// Accepts a minimum file size to repartition. + fn with_prefer_repartition_file_scans(mut self, file_min_size: usize) -> Self { + self.config.optimizer.repartition_file_scans = true; + self.config.optimizer.repartition_file_min_size = file_min_size; + self + } + + /// Set the preferred target partitions for query execution concurrency. + fn with_query_execution_partitions(mut self, target_partitions: usize) -> Self { + self.config.execution.target_partitions = target_partitions; + self + } +} + /// Runs the repartition optimizer and asserts the plan against the expected /// Arguments /// * `EXPECTED_LINES` - Expected output plan /// * `PLAN` - Input plan -/// * `FIRST_ENFORCE_DIST` - -/// true: (EnforceDistribution, EnforceDistribution, EnforceSorting) -/// false: else runs (EnforceSorting, EnforceDistribution, EnforceDistribution) -/// * `PREFER_EXISTING_SORT` (optional) - if true, will not repartition / resort data if it is already sorted -/// * `TARGET_PARTITIONS` (optional) - number of partitions to repartition to -/// * `REPARTITION_FILE_SCANS` (optional) - if true, will repartition file scans -/// * `REPARTITION_FILE_MIN_SIZE` (optional) - minimum file size to repartition -/// * `PREFER_EXISTING_UNION` (optional) - if true, will not attempt to convert Union to Interleave +/// * `CONFIG` - [`TestConfig`] macro_rules! assert_optimized { - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, false, 10, false, 1024, false); - }; - - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, false); - }; - - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $PREFER_EXISTING_UNION: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, 10, false, 1024, $PREFER_EXISTING_UNION); - }; - - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr) => { - assert_optimized!($EXPECTED_LINES, $PLAN, $FIRST_ENFORCE_DIST, $PREFER_EXISTING_SORT, $TARGET_PARTITIONS, $REPARTITION_FILE_SCANS, $REPARTITION_FILE_MIN_SIZE, false); - }; - - ($EXPECTED_LINES: expr, $PLAN: expr, $FIRST_ENFORCE_DIST: expr, $PREFER_EXISTING_SORT: expr, $TARGET_PARTITIONS: expr, $REPARTITION_FILE_SCANS: expr, $REPARTITION_FILE_MIN_SIZE: expr, $PREFER_EXISTING_UNION: expr) => { + ($EXPECTED_LINES: expr, $PLAN: expr, $CONFIG: expr) => { let expected_lines: Vec<&str> = $EXPECTED_LINES.iter().map(|s| *s).collect(); - let mut config = ConfigOptions::new(); - config.execution.target_partitions = $TARGET_PARTITIONS; - config.optimizer.repartition_file_scans = $REPARTITION_FILE_SCANS; - config.optimizer.repartition_file_min_size = $REPARTITION_FILE_MIN_SIZE; - config.optimizer.prefer_existing_sort = $PREFER_EXISTING_SORT; - config.optimizer.prefer_existing_union = $PREFER_EXISTING_UNION; - // Use a small batch size, to trigger RoundRobin in tests - config.execution.batch_size = 1; + let TestConfig { + config, + optimizers_to_run, + } = $CONFIG; // NOTE: These tests verify the joint `EnforceDistribution` + `EnforceSorting` cascade // because they were written prior to the separation of `BasicEnforcement` into @@ -455,7 +500,7 @@ macro_rules! assert_optimized { // TODO: End state payloads will be checked here. } - let optimized = if $FIRST_ENFORCE_DIST { + let optimized = if *optimizers_to_run == DoFirst::Distribution { // Run enforce distribution rule first: let optimizer = EnforceDistribution::new(); let optimized = optimizer.optimize(optimized, &config)?; @@ -602,8 +647,12 @@ fn multi_hash_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!(expected, top_join.clone(), true); - assert_optimized!(expected, top_join, false); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); } JoinType::RightSemi | JoinType::RightAnti => {} } @@ -666,8 +715,12 @@ fn multi_hash_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!(expected, top_join.clone(), true); - assert_optimized!(expected, top_join, false); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); } JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => {} } @@ -723,8 +776,12 @@ fn multi_joins_after_alias() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, top_join.clone(), true); - assert_optimized!(expected, top_join, false); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); // Join on (a2 == c) let top_join_on = vec![( @@ -749,8 +806,12 @@ fn multi_joins_after_alias() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, top_join.clone(), true); - assert_optimized!(expected, top_join, false); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -803,8 +864,12 @@ fn multi_joins_after_multi_alias() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, top_join.clone(), true); - assert_optimized!(expected, top_join, false); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, top_join, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -843,8 +908,12 @@ fn join_after_agg_alias() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, join.clone(), true); - assert_optimized!(expected, join, false); + assert_optimized!( + expected, + join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -896,8 +965,12 @@ fn hash_join_key_ordering() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, join.clone(), true); - assert_optimized!(expected, join, false); + assert_optimized!( + expected, + join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, join, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1022,8 +1095,16 @@ fn multi_hash_join_key_ordering() -> Result<()> { " ProjectionExec: expr=[a@0 as a1, b@1 as b1, c@2 as c1]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, filter_top_join.clone(), true); - assert_optimized!(expected, filter_top_join, false); + assert_optimized!( + expected, + filter_top_join.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected, + filter_top_join, + &TestConfig::new(DoFirst::Sorting) + ); Ok(()) } @@ -1301,6 +1382,7 @@ fn reorder_join_keys_to_right_input() -> Result<()> { Ok(()) } +/// These test cases use [`TestConfig::with_prefer_existing_sort`]. #[test] fn multi_smj_joins() -> Result<()> { let left = parquet_exec(); @@ -1402,7 +1484,11 @@ fn multi_smj_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!(expected, top_join.clone(), true, true); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); let expected_first_sort_enforcement = match join_type { // Should include 6 RepartitionExecs (3 hash, 3 round-robin), 3 SortExecs @@ -1456,7 +1542,11 @@ fn multi_smj_joins() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ], }; - assert_optimized!(expected_first_sort_enforcement, top_join, false, true); + assert_optimized!( + expected_first_sort_enforcement, + top_join, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); match join_type { JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { @@ -1513,7 +1603,11 @@ fn multi_smj_joins() -> Result<()> { // this match arm cannot be reached _ => unreachable!() }; - assert_optimized!(expected, top_join.clone(), true, true); + assert_optimized!( + expected, + top_join.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); let expected_first_sort_enforcement = match join_type { // Should include 6 RepartitionExecs (3 of them preserves order) and 3 SortExecs @@ -1559,7 +1653,12 @@ fn multi_smj_joins() -> Result<()> { // this match arm cannot be reached _ => unreachable!() }; - assert_optimized!(expected_first_sort_enforcement, top_join, false, true); + + assert_optimized!( + expected_first_sort_enforcement, + top_join, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); } _ => {} } @@ -1568,6 +1667,7 @@ fn multi_smj_joins() -> Result<()> { Ok(()) } +/// These test cases use [`TestConfig::with_prefer_existing_sort`]. #[test] fn smj_join_key_ordering() -> Result<()> { // group by (a as a1, b as b1) @@ -1633,7 +1733,11 @@ fn smj_join_key_ordering() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, join.clone(), true, true); + assert_optimized!( + expected, + join.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); let expected_first_sort_enforcement = &[ "SortMergeJoin: join_type=Inner, on=[(b3@1, b2@1), (a3@0, a2@0)]", @@ -1659,7 +1763,11 @@ fn smj_join_key_ordering() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected_first_sort_enforcement, join, false, true); + assert_optimized!( + expected_first_sort_enforcement, + join, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); Ok(()) } @@ -1690,7 +1798,7 @@ fn merge_does_not_need_sort() -> Result<()> { " CoalesceBatchesExec: target_batch_size=4096", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!(expected, exec, true); + assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Distribution)); // In this case preserving ordering through order preserving operators is not desirable // (according to flag: PREFER_EXISTING_SORT) @@ -1702,7 +1810,7 @@ fn merge_does_not_need_sort() -> Result<()> { " CoalesceBatchesExec: target_batch_size=4096", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!(expected, exec, false); + assert_optimized!(expected, exec, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1743,8 +1851,12 @@ fn union_to_interleave() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan.clone(), false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan.clone(), &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1786,24 +1898,16 @@ fn union_not_to_interleave() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - // no sort in the plan but since we need it as a parameter, make it default false - let prefer_existing_sort = false; - let first_enforce_distribution = true; - let prefer_existing_union = true; assert_optimized!( expected, plan.clone(), - first_enforce_distribution, - prefer_existing_sort, - prefer_existing_union + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_union() ); assert_optimized!( expected, plan, - !first_enforce_distribution, - prefer_existing_sort, - prefer_existing_union + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_union() ); Ok(()) @@ -1821,8 +1925,12 @@ fn added_repartition_to_single_partition() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1840,14 +1948,17 @@ fn repartition_deepest_node() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } #[test] - fn repartition_unsorted_limit() -> Result<()> { let plan = limit_exec(filter_exec(parquet_exec())); @@ -1861,8 +1972,12 @@ fn repartition_unsorted_limit() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1883,8 +1998,12 @@ fn repartition_sorted_limit() -> Result<()> { " SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1911,8 +2030,12 @@ fn repartition_sorted_limit_with_filter() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1941,8 +2064,12 @@ fn repartition_ignores_limit() -> Result<()> { // Expect no repartition to happen for local limit " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1961,8 +2088,12 @@ fn repartition_ignores_union() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -1982,8 +2113,12 @@ fn repartition_through_sort_preserving_merge() -> Result<()> { "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2008,14 +2143,18 @@ fn repartition_ignores_sort_preserving_merge() -> Result<()> { " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", " CoalescePartitionsExec", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan, false); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2039,7 +2178,11 @@ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected = &[ "SortExec: expr=[c@2 ASC], preserve_partitioning=[false]", @@ -2048,11 +2191,12 @@ fn repartition_ignores_sort_preserving_merge_with_union() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan, false); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } +/// These test cases use [`TestConfig::with_prefer_existing_sort`]. #[test] fn repartition_does_not_destroy_sort() -> Result<()> { // SortRequired @@ -2075,8 +2219,16 @@ fn repartition_does_not_destroy_sort() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true, true); - assert_optimized!(expected, plan, false, true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); + assert_optimized!( + expected, + plan, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); Ok(()) } @@ -2116,8 +2268,12 @@ fn repartition_does_not_destroy_sort_more_complex() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2150,7 +2306,11 @@ fn repartition_transitively_with_projection() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected_first_sort_enforcement = &[ "SortExec: expr=[sum@0 ASC], preserve_partitioning=[false]", @@ -2160,7 +2320,11 @@ fn repartition_transitively_with_projection() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected_first_sort_enforcement, plan, false); + assert_optimized!( + expected_first_sort_enforcement, + plan, + &TestConfig::new(DoFirst::Sorting) + ); Ok(()) } @@ -2192,8 +2356,12 @@ fn repartition_ignores_transitively_with_projection() -> Result<()> { " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2225,8 +2393,12 @@ fn repartition_transitively_past_sort_with_projection() -> Result<()> { " ProjectionExec: expr=[a@0 as a, b@1 as b, c@2 as c]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); - assert_optimized!(expected, plan, false); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2249,7 +2421,11 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", @@ -2259,7 +2435,11 @@ fn repartition_transitively_past_sort_with_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected_first_sort_enforcement, plan, false); + assert_optimized!( + expected_first_sort_enforcement, + plan, + &TestConfig::new(DoFirst::Sorting) + ); Ok(()) } @@ -2296,7 +2476,11 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, plan.clone(), true); + assert_optimized!( + expected, + plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected_first_sort_enforcement = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", @@ -2306,7 +2490,11 @@ fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected_first_sort_enforcement, plan, false); + assert_optimized!( + expected_first_sort_enforcement, + plan, + &TestConfig::new(DoFirst::Sorting) + ); Ok(()) } @@ -2329,8 +2517,12 @@ fn parallelization_single_partition() -> Result<()> { " AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]", " DataSourceExec: file_groups={2 groups: [[x:0..50], [x:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10); - assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10); + + let test_config = TestConfig::new(DoFirst::Distribution) + .with_prefer_repartition_file_scans(10) + .with_query_execution_partitions(2); + assert_optimized!(expected_parquet, plan_parquet, &test_config); + assert_optimized!(expected_csv, plan_csv, &test_config); Ok(()) } @@ -2346,24 +2538,22 @@ fn parallelization_multiple_files() -> Result<()> { let plan = filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()])); let plan = sort_required_exec_with_req(plan, sort_key); + let test_config = TestConfig::new(DoFirst::Distribution) + .with_prefer_existing_sort() + .with_prefer_repartition_file_scans(1); + // The groups must have only contiguous ranges of rows from the same file // if any group has rows from multiple files, the data is no longer sorted destroyed // https://github.com/apache/datafusion/issues/8451 let expected = [ "SortRequiredExec: [a@0 ASC]", " FilterExec: c@2 = 0", - " DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], [x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - let target_partitions = 3; - let repartition_size = 1; + " DataSourceExec: file_groups={3 groups: [[x:0..50], [y:0..100], [x:50..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", + ]; assert_optimized!( expected, plan, - true, - true, - target_partitions, - true, - repartition_size, - false + &test_config.clone().with_query_execution_partitions(3) ); let expected = [ @@ -2371,17 +2561,10 @@ fn parallelization_multiple_files() -> Result<()> { " FilterExec: c@2 = 0", " DataSourceExec: file_groups={8 groups: [[x:0..25], [y:0..25], [x:25..50], [y:25..50], [x:50..75], [y:50..75], [x:75..100], [y:75..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - let target_partitions = 8; - let repartition_size = 1; assert_optimized!( expected, plan, - true, - true, - target_partitions, - true, - repartition_size, - false + &test_config.with_query_execution_partitions(8) ); Ok(()) @@ -2432,7 +2615,13 @@ fn parallelization_compressed_csv() -> Result<()> { .build(), vec![("a".to_string(), "a".to_string())], ); - assert_optimized!(expected, plan, true, false, 2, true, 10, false); + assert_optimized!( + expected, + plan, + &TestConfig::new(DoFirst::Distribution) + .with_query_execution_partitions(2) + .with_prefer_repartition_file_scans(10) + ); } Ok(()) } @@ -2457,8 +2646,11 @@ fn parallelization_two_partitions() -> Result<()> { // Plan already has two partitions " DataSourceExec: file_groups={2 groups: [[x:0..100], [y:0..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true, false, 2, true, 10); - assert_optimized!(expected_csv, plan_csv, true, false, 2, true, 10); + let test_config = TestConfig::new(DoFirst::Distribution) + .with_query_execution_partitions(2) + .with_prefer_repartition_file_scans(10); + assert_optimized!(expected_parquet, plan_parquet, &test_config); + assert_optimized!(expected_csv, plan_csv, &test_config); Ok(()) } @@ -2482,8 +2674,11 @@ fn parallelization_two_partitions_into_four() -> Result<()> { // Multiple source files splitted across partitions " DataSourceExec: file_groups={4 groups: [[x:0..50], [x:50..100], [y:0..50], [y:50..100]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true, false, 4, true, 10); - assert_optimized!(expected_csv, plan_csv, true, false, 4, true, 10); + let test_config = TestConfig::new(DoFirst::Distribution) + .with_query_execution_partitions(4) + .with_prefer_repartition_file_scans(10); + assert_optimized!(expected_parquet, plan_parquet, &test_config); + assert_optimized!(expected_csv, plan_csv, &test_config); Ok(()) } @@ -2514,8 +2709,16 @@ fn parallelization_sorted_limit() -> Result<()> { // Doesn't parallelize for SortExec without preserve_partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2558,8 +2761,16 @@ fn parallelization_limit_with_filter() -> Result<()> { // SortExec doesn't benefit from input partitioning " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2606,8 +2817,16 @@ fn parallelization_ignores_limit() -> Result<()> { " LocalLimitExec: fetch=100", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2635,8 +2854,16 @@ fn parallelization_union_inputs() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2663,8 +2890,16 @@ fn parallelization_prior_to_sort_preserving_merge() -> Result<()> { let expected_csv = &[ "DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2697,8 +2932,16 @@ fn parallelization_sort_preserving_merge_with_union() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2728,8 +2971,16 @@ fn parallelization_does_not_benefit() -> Result<()> { "SortRequiredExec: [c@2 ASC]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!(expected_parquet, plan_parquet, true); - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2768,7 +3019,11 @@ fn parallelization_ignores_transitively_with_projection_parquet() -> Result<()> "ProjectionExec: expr=[a@0 as a2, c@2 as c2]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected_parquet, plan_parquet, true); + assert_optimized!( + expected_parquet, + plan_parquet, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2807,7 +3062,11 @@ fn parallelization_ignores_transitively_with_projection_csv() -> Result<()> { "ProjectionExec: expr=[a@0 as a2, c@2 as c2]", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=csv, has_header=false", ]; - assert_optimized!(expected_csv, plan_csv, true); + assert_optimized!( + expected_csv, + plan_csv, + &TestConfig::new(DoFirst::Distribution) + ); Ok(()) } @@ -2831,12 +3090,17 @@ fn remove_redundant_roundrobins() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); - assert_optimized!(expected, physical_plan, false); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } +/// This test case uses [`TestConfig::with_prefer_existing_sort`]. #[test] fn remove_unnecessary_spm_after_filter() -> Result<()> { let schema = schema(); @@ -2855,13 +3119,21 @@ fn remove_unnecessary_spm_after_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - // last flag sets config.optimizer.PREFER_EXISTING_SORT - assert_optimized!(expected, physical_plan.clone(), true, true); - assert_optimized!(expected, physical_plan, false, true); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); + assert_optimized!( + expected, + physical_plan, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); Ok(()) } +/// This test case uses [`TestConfig::with_prefer_existing_sort`]. #[test] fn preserve_ordering_through_repartition() -> Result<()> { let schema = schema(); @@ -2878,9 +3150,16 @@ fn preserve_ordering_through_repartition() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[d@3 ASC], file_type=parquet", ]; - // last flag sets config.optimizer.PREFER_EXISTING_SORT - assert_optimized!(expected, physical_plan.clone(), true, true); - assert_optimized!(expected, physical_plan, false, true); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution).with_prefer_existing_sort() + ); + assert_optimized!( + expected, + physical_plan, + &TestConfig::new(DoFirst::Sorting).with_prefer_existing_sort() + ); Ok(()) } @@ -2903,7 +3182,11 @@ fn do_not_preserve_ordering_through_repartition() -> Result<()> { " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", @@ -2912,7 +3195,7 @@ fn do_not_preserve_ordering_through_repartition() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan, false); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2935,8 +3218,12 @@ fn no_need_for_sort_after_filter() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); - assert_optimized!(expected, physical_plan, false); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2964,7 +3251,11 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); let expected = &[ "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", @@ -2974,7 +3265,7 @@ fn do_not_preserve_ordering_through_repartition2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan, false); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -2994,8 +3285,12 @@ fn do_not_preserve_ordering_through_repartition3() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); - assert_optimized!(expected, physical_plan, false); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -3091,8 +3386,16 @@ fn do_not_add_unnecessary_hash() -> Result<()> { " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; // Make sure target partition number is 1. In this case hash repartition is unnecessary - assert_optimized!(expected, physical_plan.clone(), true, false, 1, false, 1024); - assert_optimized!(expected, physical_plan, false, false, 1, false, 1024); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(1) + ); + assert_optimized!( + expected, + physical_plan, + &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(1) + ); Ok(()) } @@ -3121,8 +3424,16 @@ fn do_not_add_unnecessary_hash2() -> Result<()> { " DataSourceExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC], file_type=parquet", ]; // Make sure target partition number is larger than 2 (e.g partition number at the source). - assert_optimized!(expected, physical_plan.clone(), true, false, 4, false, 1024); - assert_optimized!(expected, physical_plan, false, false, 4, false, 1024); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution).with_query_execution_partitions(4) + ); + assert_optimized!( + expected, + physical_plan, + &TestConfig::new(DoFirst::Sorting).with_query_execution_partitions(4) + ); Ok(()) } @@ -3139,8 +3450,12 @@ fn optimize_away_unnecessary_repartition() -> Result<()> { let expected = &["DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet"]; - assert_optimized!(expected, physical_plan.clone(), true); - assert_optimized!(expected, physical_plan, false); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) } @@ -3166,8 +3481,12 @@ fn optimize_away_unnecessary_repartition2() -> Result<()> { " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1", " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", ]; - assert_optimized!(expected, physical_plan.clone(), true); - assert_optimized!(expected, physical_plan, false); + assert_optimized!( + expected, + physical_plan.clone(), + &TestConfig::new(DoFirst::Distribution) + ); + assert_optimized!(expected, physical_plan, &TestConfig::new(DoFirst::Sorting)); Ok(()) }