From a1c78f57907b8385a4699fe36eb64a1022169f33 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Tue, 2 Sep 2025 10:42:47 +0200 Subject: [PATCH 1/3] test: regression test for #17372 --- .../physical_optimizer/sanity_checker.rs | 81 ++++++++++++++++++- 1 file changed, 78 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/physical_optimizer/sanity_checker.rs b/datafusion/core/tests/physical_optimizer/sanity_checker.rs index 6233f5d09c56..ce6eb13c86c4 100644 --- a/datafusion/core/tests/physical_optimizer/sanity_checker.rs +++ b/datafusion/core/tests/physical_optimizer/sanity_checker.rs @@ -20,7 +20,8 @@ use std::sync::Arc; use crate::physical_optimizer::test_utils::{ bounded_window_exec, global_limit_exec, local_limit_exec, memory_exec, - repartition_exec, sort_exec, sort_expr_options, sort_merge_join_exec, + projection_exec, repartition_exec, sort_exec, sort_expr, sort_expr_options, + sort_merge_join_exec, sort_preserving_merge_exec, union_exec, }; use arrow::compute::SortOptions; @@ -28,8 +29,8 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion::datasource::stream::{FileStreamProvider, StreamConfig, StreamTable}; use datafusion::prelude::{CsvReadOptions, SessionContext}; use datafusion_common::config::ConfigOptions; -use datafusion_common::{JoinType, Result}; -use datafusion_physical_expr::expressions::col; +use datafusion_common::{JoinType, Result, ScalarValue}; +use datafusion_physical_expr::expressions::{col, Literal}; use datafusion_physical_expr::Partitioning; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_optimizer::sanity_checker::SanityCheckPlan; @@ -665,3 +666,77 @@ async fn test_sort_merge_join_dist_missing() -> Result<()> { assert_sanity_check(&smj, false); Ok(()) } + +/// A particular edge case. +/// +/// See . +#[tokio::test] +async fn test_union_with_sorts_and_constants() -> Result<()> { + let schema_in = create_test_schema2(); + + let proj_exprs_1 = vec![ + ( + Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_owned())))) as _, + "const_1".to_owned(), + ), + ( + Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_owned())))) as _, + "const_2".to_owned(), + ), + (col("a", &schema_in).unwrap(), "a".to_owned()), + ]; + let proj_exprs_2 = vec![ + ( + Arc::new(Literal::new(ScalarValue::Utf8(Some("foo".to_owned())))) as _, + "const_1".to_owned(), + ), + ( + Arc::new(Literal::new(ScalarValue::Utf8(Some("bar".to_owned())))) as _, + "const_2".to_owned(), + ), + (col("a", &schema_in).unwrap(), "a".to_owned()), + ]; + + let source_1 = memory_exec(&schema_in); + let source_1 = projection_exec(proj_exprs_1.clone(), source_1).unwrap(); + let schema_sources = source_1.schema(); + let ordering_sources: LexOrdering = + [sort_expr("a", &schema_sources).nulls_last()].into(); + let source_1 = sort_exec(ordering_sources.clone(), source_1); + + let source_2 = memory_exec(&schema_in); + let source_2 = projection_exec(proj_exprs_2, source_2).unwrap(); + let source_2 = sort_exec(ordering_sources.clone(), source_2); + + let plan = union_exec(vec![source_1, source_2]); + + let schema_out = plan.schema(); + let ordering_out: LexOrdering = [ + sort_expr("const_1", &schema_out).nulls_last(), + sort_expr("const_2", &schema_out).nulls_last(), + sort_expr("a", &schema_out).nulls_last(), + ] + .into(); + + let plan = sort_preserving_merge_exec(ordering_out, plan); + + let plan_str = displayable(plan.as_ref()).indent(true).to_string(); + let plan_str = plan_str.trim(); + assert_snapshot!( + plan_str, + @r" + SortPreservingMergeExec: [const_1@0 ASC NULLS LAST, const_2@1 ASC NULLS LAST, a@2 ASC NULLS LAST] + UnionExec + SortExec: expr=[a@2 ASC NULLS LAST], preserve_partitioning=[false] + ProjectionExec: expr=[foo as const_1, foo as const_2, a@0 as a] + DataSourceExec: partitions=1, partition_sizes=[0] + SortExec: expr=[a@2 ASC NULLS LAST], preserve_partitioning=[false] + ProjectionExec: expr=[foo as const_1, bar as const_2, a@0 as a] + DataSourceExec: partitions=1, partition_sizes=[0] + " + ); + + assert_sanity_check(&plan, true); + + Ok(()) +} From 917833733c25850ab9011c8c03df4a607c911f5b Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 4 Sep 2025 12:46:52 +0200 Subject: [PATCH 2/3] test: add more direct regression for #17372 --- .../src/equivalence/properties/union.rs | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/datafusion/physical-expr/src/equivalence/properties/union.rs b/datafusion/physical-expr/src/equivalence/properties/union.rs index 4f44b9b0c9d4..efbefd0d39bf 100644 --- a/datafusion/physical-expr/src/equivalence/properties/union.rs +++ b/datafusion/physical-expr/src/equivalence/properties/union.rs @@ -921,4 +921,63 @@ mod tests { .collect::>(), )) } + + #[test] + fn test_constants_share_values() -> Result<()> { + let schema = Arc::new(Schema::new(vec![ + Field::new("const_1", DataType::Utf8, false), + Field::new("const_2", DataType::Utf8, false), + ])); + + let col_const_1 = col("const_1", &schema)?; + let col_const_2 = col("const_2", &schema)?; + + let literal_foo = ScalarValue::Utf8(Some("foo".to_owned())); + let literal_bar = ScalarValue::Utf8(Some("bar".to_owned())); + + let const_expr_1_foo = ConstExpr::new( + Arc::clone(&col_const_1), + AcrossPartitions::Uniform(Some(literal_foo.clone())), + ); + let const_expr_2_foo = ConstExpr::new( + Arc::clone(&col_const_2), + AcrossPartitions::Uniform(Some(literal_foo.clone())), + ); + let const_expr_2_bar = ConstExpr::new( + Arc::clone(&col_const_2), + AcrossPartitions::Uniform(Some(literal_bar.clone())), + ); + + let mut input1 = EquivalenceProperties::new(Arc::clone(&schema)); + let mut input2 = EquivalenceProperties::new(Arc::clone(&schema)); + + // | Input | Const_1 | Const_2 | + // | ----- | ------- | ------- | + // | 1 | foo | foo | + // | 2 | foo | bar | + input1.add_constants(vec![const_expr_1_foo.clone(), const_expr_2_foo.clone()])?; + input2.add_constants(vec![const_expr_1_foo.clone(), const_expr_2_bar.clone()])?; + + // Calculate union properties + let union_props = calculate_union(vec![input1, input2], schema)?; + + // This should result in: + // const_1 = Uniform("foo") + // const_2 = Heterogeneous + assert_eq!(union_props.constants().len(), 2); + let union_const_1 = &union_props.constants()[0]; + assert!(union_const_1.expr.eq(&col_const_1)); + assert_eq!( + union_const_1.across_partitions, + AcrossPartitions::Uniform(Some(literal_foo)), + ); + let union_const_2 = &union_props.constants()[1]; + assert!(union_const_2.expr.eq(&col_const_2)); + assert_eq!( + union_const_2.across_partitions, + AcrossPartitions::Heterogeneous, + ); + + Ok(()) + } } From e8a0c6ef7bc79ab76d5c867b4ec2782125595b4a Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 4 Sep 2025 12:49:20 +0200 Subject: [PATCH 3/3] fix: return ALL constants in `EquivalenceProperties::constants` --- .../physical-expr/src/equivalence/properties/mod.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index f4175758778c..2404b8f0dd3e 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -255,10 +255,11 @@ impl EquivalenceProperties { pub fn constants(&self) -> Vec { self.eq_group .iter() - .filter_map(|c| { - c.constant.as_ref().and_then(|across| { - c.canonical_expr() - .map(|expr| ConstExpr::new(Arc::clone(expr), across.clone())) + .flat_map(|c| { + c.iter().filter_map(|expr| { + c.constant + .as_ref() + .map(|across| ConstExpr::new(Arc::clone(expr), across.clone())) }) }) .collect()