diff --git a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs index ac1bef6b13d2..66d1380e09c3 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_distribution.rs @@ -220,7 +220,7 @@ fn csv_exec_multiple_sorted(output_ordering: Vec) -> Arc, alias_pairs: Vec<(String, String)>, ) -> Arc { diff --git a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs index 3223768acb74..26a00ef0f29c 100644 --- a/datafusion/core/tests/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/tests/physical_optimizer/enforce_sorting.rs @@ -17,16 +17,11 @@ use std::sync::Arc; -use crate::physical_optimizer::enforce_distribution::projection_exec_with_alias; -use crate::physical_optimizer::sanity_checker::{ - assert_sanity_check, assert_sanity_check_err, -}; use crate::physical_optimizer::test_utils::{ aggregate_exec, bounded_window_exec, check_integrity, coalesce_batches_exec, coalesce_partitions_exec, create_test_schema, create_test_schema2, create_test_schema3, filter_exec, global_limit_exec, hash_join_exec, limit_exec, - local_limit_exec, memory_exec, parquet_exec, parquet_exec_with_stats, - repartition_exec, schema, single_partitioned_aggregate, sort_exec, sort_expr, + local_limit_exec, memory_exec, parquet_exec, repartition_exec, sort_exec, sort_expr, sort_expr_options, sort_merge_join_exec, sort_preserving_merge_exec, sort_preserving_merge_exec_with_fetch, spr_repartition_exec, stream_exec_ordered, union_exec, RequirementsTestExec, @@ -3351,62 +3346,3 @@ async fn test_window_partial_constant_and_set_monotonicity() -> Result<()> { Ok(()) } - -#[tokio::test] -async fn test_preserve_needed_coalesce() -> Result<()> { - // Input to EnforceSorting, from our test case. - let plan = projection_exec_with_alias( - union_exec(vec![parquet_exec_with_stats(); 2]), - vec![ - ("a".to_string(), "a".to_string()), - ("b".to_string(), "value".to_string()), - ], - ); - let plan = Arc::new(CoalescePartitionsExec::new(plan)); - let schema = schema(); - let sort_key = LexOrdering::new(vec![PhysicalSortExpr { - expr: col("a", &schema).unwrap(), - options: SortOptions::default(), - }]); - let plan: Arc = - single_partitioned_aggregate(plan, vec![("a".to_string(), "a1".to_string())]); - let plan = sort_exec(sort_key, plan); - - // Starting plan: as in our test case. - assert_eq!( - get_plan_string(&plan), - vec![ - "SortExec: expr=[a@0 ASC], preserve_partitioning=[false]", - " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", - " CoalescePartitionsExec", - " ProjectionExec: expr=[a@0 as a, b@1 as value]", - " UnionExec", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - ); - // Test: plan is valid. - assert_sanity_check(&plan, true); - - // EnforceSorting will remove the coalesce, and add an SPM further up (above the aggregate). - let optimizer = EnforceSorting::new(); - let optimized = optimizer.optimize(plan, &Default::default())?; - assert_eq!( - get_plan_string(&optimized), - vec![ - "SortPreservingMergeExec: [a@0 ASC]", - " SortExec: expr=[a@0 ASC], preserve_partitioning=[true]", - " AggregateExec: mode=SinglePartitioned, gby=[a@0 as a1], aggr=[]", - " ProjectionExec: expr=[a@0 as a, b@1 as value]", - " UnionExec", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - " DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet", - ], - ); - - // Bug: Plan is now invalid. - let err = "does not satisfy distribution requirements: HashPartitioned[[a@0]]). Child-0 output partitioning: UnknownPartitioning(2)"; - assert_sanity_check_err(&optimized, err); - - Ok(()) -} diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index ee9cb032c341..a73d084a081f 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -388,7 +388,7 @@ fn create_test_schema2() -> SchemaRef { } /// Check if sanity checker should accept or reject plans. -pub(crate) fn assert_sanity_check(plan: &Arc, is_sane: bool) { +fn assert_sanity_check(plan: &Arc, is_sane: bool) { let sanity_checker = SanityCheckPlan::new(); let opts = ConfigOptions::default(); assert_eq!( @@ -397,14 +397,6 @@ pub(crate) fn assert_sanity_check(plan: &Arc, is_sane: bool) ); } -/// Assert reason for sanity check failure. -pub(crate) fn assert_sanity_check_err(plan: &Arc, err: &str) { - let sanity_checker = SanityCheckPlan::new(); - let opts = ConfigOptions::default(); - let error = sanity_checker.optimize(plan.clone(), &opts).unwrap_err(); - assert!(error.message().contains(err)); -} - /// Check if the plan we created is as expected by comparing the plan /// formatted as a string. fn assert_plan(plan: &dyn ExecutionPlan, expected_lines: Vec<&str>) { diff --git a/datafusion/core/tests/physical_optimizer/test_utils.rs b/datafusion/core/tests/physical_optimizer/test_utils.rs index 0b9c3b80bb93..c7572eb08900 100644 --- a/datafusion/core/tests/physical_optimizer/test_utils.rs +++ b/datafusion/core/tests/physical_optimizer/test_utils.rs @@ -30,10 +30,9 @@ use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::physical_plan::ParquetSource; use datafusion::datasource::source::DataSourceExec; use datafusion_common::config::ConfigOptions; -use datafusion_common::stats::Precision; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; -use datafusion_common::{ColumnStatistics, JoinType, Result, Statistics}; +use datafusion_common::{JoinType, Result}; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_execution::object_store::ObjectStoreUrl; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; @@ -103,44 +102,6 @@ pub fn schema() -> SchemaRef { ])) } -fn int64_stats() -> ColumnStatistics { - ColumnStatistics { - null_count: Precision::Absent, - sum_value: Precision::Absent, - max_value: Precision::Exact(1_000_000.into()), - min_value: Precision::Exact(0.into()), - distinct_count: Precision::Absent, - } -} - -fn column_stats() -> Vec { - vec![ - int64_stats(), // a - int64_stats(), // b - int64_stats(), // c - ColumnStatistics::default(), - ColumnStatistics::default(), - ] -} - -/// Create parquet datasource exec using schema from [`schema`]. -pub(crate) fn parquet_exec_with_stats() -> Arc { - let mut statistics = Statistics::new_unknown(&schema()); - statistics.num_rows = Precision::Inexact(10); - statistics.column_statistics = column_stats(); - - let config = FileScanConfig::new( - ObjectStoreUrl::parse("test:///").unwrap(), - schema(), - Arc::new(ParquetSource::new(Default::default())), - ) - .with_file(PartitionedFile::new("x".to_string(), 10000)) - .with_statistics(statistics); - assert_eq!(config.statistics.num_rows, Precision::Inexact(10)); - - config.build() -} - pub fn create_test_schema() -> Result { let nullable_column = Field::new("nullable_col", DataType::Int32, true); let non_nullable_column = Field::new("non_nullable_col", DataType::Int32, false); @@ -561,30 +522,6 @@ pub fn build_group_by(input_schema: &SchemaRef, columns: Vec) -> Physica PhysicalGroupBy::new_single(group_by_expr.clone()) } -pub(crate) fn single_partitioned_aggregate( - input: Arc, - alias_pairs: Vec<(String, String)>, -) -> Arc { - let schema = schema(); - let group_by = alias_pairs - .iter() - .map(|(column, alias)| (col(column, &input.schema()).unwrap(), alias.to_string())) - .collect::>(); - let group_by = PhysicalGroupBy::new_single(group_by); - - Arc::new( - AggregateExec::try_new( - AggregateMode::SinglePartitioned, - group_by, - vec![], - vec![], - input, - schema, - ) - .unwrap(), - ) -} - pub fn assert_plan_matches_expected( plan: &Arc, expected: &[&str],