From 188ab3e0d741613f8e6aeba8e041250cf0bdd09e Mon Sep 17 00:00:00 2001 From: Matt Friede <7852262+Friede80@users.noreply.github.com> Date: Mon, 10 Mar 2025 15:54:23 -0400 Subject: [PATCH 1/8] Add test for coerce_union_schema --- .../optimizer/src/analyzer/type_coercion.rs | 54 +++++++++++++++++-- 1 file changed, 50 insertions(+), 4 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 538ef98ac7be..2add9636963a 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -1043,9 +1043,11 @@ mod test { use arrow::datatypes::DataType::Utf8; use arrow::datatypes::{DataType, Field, TimeUnit}; + use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule; use crate::analyzer::type_coercion::{ coerce_case_expression, TypeCoercion, TypeCoercionRewriter, }; + use crate::analyzer::Analyzer; use crate::test::{assert_analyzed_plan_eq, assert_analyzed_plan_with_config_eq}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{TransformedResult, TreeNode}; @@ -1054,12 +1056,13 @@ mod test { use datafusion_expr::logical_plan::{EmptyRelation, Projection, Sort}; use datafusion_expr::test::function_stub::avg_udaf; use datafusion_expr::{ - cast, col, create_udaf, is_true, lit, AccumulatorFactoryFunction, AggregateUDF, - BinaryExpr, Case, ColumnarValue, Expr, ExprSchemable, Filter, LogicalPlan, - Operator, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, - SimpleAggregateUDF, Subquery, Volatility, + cast, col, create_udaf, is_true, lit, wildcard, AccumulatorFactoryFunction, + AggregateUDF, BinaryExpr, Case, ColumnarValue, Expr, ExprSchemable, Filter, + LogicalPlan, Operator, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, + SimpleAggregateUDF, Subquery, Union, Volatility, }; use datafusion_functions_aggregate::average::AvgAccumulator; + use datafusion_sql::TableReference; fn empty() -> Arc { Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { @@ -1090,6 +1093,49 @@ mod test { assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected) } + #[test] + fn test_coerce_union() -> Result<()> { + let left_plan = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new( + DFSchema::try_from_qualified_schema( + TableReference::full("datafusion", "test", "foo"), + &Schema::new(vec![Field::new("a", DataType::Int32, false)]), + ) + .unwrap(), + ), + })); + let right_plan = Arc::new(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new( + DFSchema::try_from_qualified_schema( + TableReference::full("datafusion", "test", "foo"), + &Schema::new(vec![Field::new("a", DataType::Int64, false)]), + ) + .unwrap(), + ), + })); + let union = LogicalPlan::Union(Union::try_new_with_loose_types(vec![ + left_plan, right_plan, + ])?); + let analyzed_union = Analyzer::with_rules(vec![Arc::new(TypeCoercion::new())]) + .execute_and_check(union, &ConfigOptions::default(), |_, _| {})?; + let top_level_plan = LogicalPlan::Projection(Projection::try_new( + vec![wildcard()], + Arc::new(analyzed_union), + )?); + let expanded_plan = + Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]) + .execute_and_check( + top_level_plan, + &ConfigOptions::default(), + |_, _| {}, + )?; + + let expected = "Projection: datafusion.test.foo.a\n Union\n Projection: CAST(datafusion.test.foo.a AS Int64) AS a\n EmptyRelation\n EmptyRelation"; + assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), expanded_plan, expected) + } + fn coerce_on_output_if_viewtype(plan: LogicalPlan, expected: &str) -> Result<()> { let mut options = ConfigOptions::default(); options.optimizer.expand_views_at_output = true; From f88e4324401620bb91289df859295a8aed5245b7 Mon Sep 17 00:00:00 2001 From: Matt Friede <7852262+Friede80@users.noreply.github.com> Date: Mon, 10 Mar 2025 15:54:39 -0400 Subject: [PATCH 2/8] coerce_union to use its own schema instead of that of the first plan --- datafusion/optimizer/src/analyzer/type_coercion.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 2add9636963a..55941e06b35a 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -214,7 +214,7 @@ impl<'a> TypeCoercionRewriter<'a> { /// Coerce the union’s inputs to a common schema compatible with all inputs. /// This occurs after wildcard expansion and the coercion of the input expressions. pub fn coerce_union(union_plan: Union) -> Result { - let union_schema = Arc::new(coerce_union_schema(&union_plan.inputs)?); + let union_schema = Arc::new(coerce_union_schema(&union_plan)?); let new_inputs = union_plan .inputs .into_iter() @@ -929,8 +929,8 @@ fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result { /// /// This method presumes that the wildcard expansion is unneeded, or has already /// been applied. -pub fn coerce_union_schema(inputs: &[Arc]) -> Result { - let base_schema = inputs[0].schema(); +pub fn coerce_union_schema(union_plan: &Union) -> Result { + let base_schema = &union_plan.schema; let mut union_datatypes = base_schema .fields() .iter() @@ -949,7 +949,7 @@ pub fn coerce_union_schema(inputs: &[Arc]) -> Result { let mut metadata = base_schema.metadata().clone(); - for (i, plan) in inputs.iter().enumerate().skip(1) { + for (i, plan) in union_plan.inputs.iter().enumerate().skip(1) { let plan_schema = plan.schema(); metadata.extend(plan_schema.metadata().clone()); @@ -1041,7 +1041,7 @@ mod test { use std::sync::Arc; use arrow::datatypes::DataType::Utf8; - use arrow::datatypes::{DataType, Field, TimeUnit}; + use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule; use crate::analyzer::type_coercion::{ From 7165b1c1a9b547670a0f781544e2b5c9a0816e2e Mon Sep 17 00:00:00 2001 From: Matt Friede <7852262+Friede80@users.noreply.github.com> Date: Wed, 12 Mar 2025 15:50:41 -0400 Subject: [PATCH 3/8] Generate unique field names for union schema instead of using table qualifiers --- datafusion/expr/src/logical_plan/builder.rs | 27 ++++++++++- datafusion/expr/src/logical_plan/plan.rs | 20 ++++++-- .../optimizer/src/analyzer/type_coercion.rs | 10 ++-- .../optimizer/src/propagate_empty_relation.rs | 4 +- datafusion/sqllogictest/test_files/limit.slt | 2 +- datafusion/sqllogictest/test_files/order.slt | 6 +-- .../sqllogictest/test_files/type_coercion.slt | 2 +- datafusion/sqllogictest/test_files/union.slt | 46 +++++++++++++++++-- .../sqllogictest/test_files/union_by_name.slt | 2 +- 9 files changed, 95 insertions(+), 24 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index f60bb2f00771..c981d64c18e0 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -51,6 +51,7 @@ use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::file_type::FileType; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ exec_err, get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef, @@ -776,8 +777,32 @@ impl LogicalPlanBuilder { &missing_cols, is_distinct, )?; + + let mut sort_exprs = normalize_sorts(sorts, &plan)?; + if matches!(&plan, LogicalPlan::Union(_)) + || matches!(&plan, LogicalPlan::Distinct(distinct) if matches!(**distinct.input(), LogicalPlan::Union(_))) + { + sort_exprs = sort_exprs + .into_iter() + .map(|sort_expr| { + let inner_expr = sort_expr.expr.clone(); + let qualifiers_removed = inner_expr + .transform_up(|expr| { + if let Expr::Column(mut col) = expr { + col.relation = None; + Ok(Transformed::yes(Expr::Column(col))) + } else { + Ok(Transformed::no(expr)) + } + }) + .data()?; + Ok(sort_expr.with_expr(qualifiers_removed)) + }) + .collect::>()? + }; + let sort_plan = LogicalPlan::Sort(Sort { - expr: normalize_sorts(sorts, &plan)?, + expr: sort_exprs, input: Arc::new(plan), fetch, }); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 682342d27b29..d37145641f7a 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2813,6 +2813,7 @@ impl Union { } } + let mut name_counts: HashMap = HashMap::new(); let union_fields = (0..fields_count) .map(|i| { let fields = inputs @@ -2820,7 +2821,8 @@ impl Union { .map(|input| input.schema().field(i)) .collect::>(); let first_field = fields[0]; - let name = first_field.name(); + let base_name = first_field.name().to_string(); + let data_type = if loose_types { // TODO apply type coercion here, or document why it's better to defer // temporarily use the data type from the left input and later rely on the analyzer to @@ -2843,13 +2845,21 @@ impl Union { )? }; let nullable = fields.iter().any(|field| field.is_nullable()); - let mut field = Field::new(name, data_type.clone(), nullable); + + // Generate unique field name + let name = if let Some(count) = name_counts.get_mut(&base_name) { + *count += 1; + format!("{}_{}", base_name, count) + } else { + name_counts.insert(base_name.clone(), 0); + base_name + }; + + let mut field = Field::new(&name, data_type.clone(), nullable); let field_metadata = intersect_maps(fields.iter().map(|field| field.metadata())); field.set_metadata(field_metadata); - // TODO reusing table reference from the first schema is probably wrong - let table_reference = first_schema.qualified_field(i).0.cloned(); - Ok((table_reference, Arc::new(field))) + Ok((None, Arc::new(field))) }) .collect::>()?; let union_schema_metadata = diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 55941e06b35a..19614b18005d 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -214,7 +214,7 @@ impl<'a> TypeCoercionRewriter<'a> { /// Coerce the union’s inputs to a common schema compatible with all inputs. /// This occurs after wildcard expansion and the coercion of the input expressions. pub fn coerce_union(union_plan: Union) -> Result { - let union_schema = Arc::new(coerce_union_schema(&union_plan)?); + let union_schema = Arc::new(coerce_union_schema(&union_plan.inputs)?); let new_inputs = union_plan .inputs .into_iter() @@ -929,8 +929,8 @@ fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result { /// /// This method presumes that the wildcard expansion is unneeded, or has already /// been applied. -pub fn coerce_union_schema(union_plan: &Union) -> Result { - let base_schema = &union_plan.schema; +pub fn coerce_union_schema(inputs: &[Arc]) -> Result { + let base_schema = inputs[0].schema().as_ref().clone().strip_qualifiers(); let mut union_datatypes = base_schema .fields() .iter() @@ -949,7 +949,7 @@ pub fn coerce_union_schema(union_plan: &Union) -> Result { let mut metadata = base_schema.metadata().clone(); - for (i, plan) in union_plan.inputs.iter().enumerate().skip(1) { + for (i, plan) in inputs.iter().enumerate().skip(1) { let plan_schema = plan.schema(); metadata.extend(plan_schema.metadata().clone()); @@ -1132,7 +1132,7 @@ mod test { |_, _| {}, )?; - let expected = "Projection: datafusion.test.foo.a\n Union\n Projection: CAST(datafusion.test.foo.a AS Int64) AS a\n EmptyRelation\n EmptyRelation"; + let expected = "Projection: a\n Union\n Projection: CAST(datafusion.test.foo.a AS Int64) AS a\n EmptyRelation\n EmptyRelation"; assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), expanded_plan, expected) } diff --git a/datafusion/optimizer/src/propagate_empty_relation.rs b/datafusion/optimizer/src/propagate_empty_relation.rs index d26df073dc6f..344707ae8dbe 100644 --- a/datafusion/optimizer/src/propagate_empty_relation.rs +++ b/datafusion/optimizer/src/propagate_empty_relation.rs @@ -316,7 +316,7 @@ mod tests { let plan = LogicalPlanBuilder::from(left).union(right)?.build()?; - let expected = "TableScan: test"; + let expected = "Projection: a, b, c\n TableScan: test"; assert_together_optimized_plan(plan, expected, true) } @@ -406,7 +406,7 @@ mod tests { let plan = LogicalPlanBuilder::from(left).union(right)?.build()?; - let expected = "TableScan: test"; + let expected = "Projection: a, b, c\n TableScan: test"; assert_together_optimized_plan(plan, expected, true) } diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index 067b23ac2fb0..93ffa313b8f7 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -654,7 +654,7 @@ explain select * FROM ( ---- logical_plan 01)Limit: skip=4, fetch=10 -02)--Sort: ordered_table.c DESC NULLS FIRST, fetch=14 +02)--Sort: c DESC NULLS FIRST, fetch=14 03)----Union 04)------Projection: CAST(ordered_table.c AS Int64) AS c 05)--------TableScan: ordered_table projection=[c] diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index d7da21c58ec6..951685e8d1f3 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -774,7 +774,7 @@ SELECT * FROM v ORDER BY 1, 2; ---- logical_plan -01)Sort: u.m ASC NULLS LAST, u.t ASC NULLS LAST +01)Sort: m ASC NULLS LAST, t ASC NULLS LAST 02)--Union 03)----SubqueryAlias: u 04)------Projection: Int64(0) AS m, m0.t @@ -1239,8 +1239,8 @@ order by d, c, a, a0, b limit 2; ---- logical_plan -01)Projection: t1.b, t1.c, t1.a, t1.a0 -02)--Sort: t1.d ASC NULLS LAST, t1.c ASC NULLS LAST, t1.a ASC NULLS LAST, t1.a0 ASC NULLS LAST, t1.b ASC NULLS LAST, fetch=2 +01)Projection: b, c, a, a0 +02)--Sort: d ASC NULLS LAST, c ASC NULLS LAST, a ASC NULLS LAST, a0 ASC NULLS LAST, b ASC NULLS LAST, fetch=2 03)----Union 04)------SubqueryAlias: t1 05)--------Projection: ordered_table.b, ordered_table.c, ordered_table.a, Int32(NULL) AS a0, ordered_table.d diff --git a/datafusion/sqllogictest/test_files/type_coercion.slt b/datafusion/sqllogictest/test_files/type_coercion.slt index 0900c88c15c0..2c6079bc7039 100644 --- a/datafusion/sqllogictest/test_files/type_coercion.slt +++ b/datafusion/sqllogictest/test_files/type_coercion.slt @@ -187,7 +187,7 @@ EXPLAIN SELECT a FROM (select 1 a) x GROUP BY 1 (SELECT a FROM (select 1.1 a) x GROUP BY 1) ORDER BY 1 ---- logical_plan -01)Sort: x.a ASC NULLS LAST +01)Sort: a ASC NULLS LAST 02)--Union 03)----Projection: CAST(x.a AS Float64) AS a 04)------Aggregate: groupBy=[[x.a]], aggr=[[]] diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 918c6e281173..9d2c5c9bc14c 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -226,7 +226,7 @@ query TT EXPLAIN SELECT name FROM t1 UNION (SELECT name from t2 UNION SELECT name || '_new' from t2) ---- logical_plan -01)Aggregate: groupBy=[[t1.name]], aggr=[[]] +01)Aggregate: groupBy=[[name]], aggr=[[]] 02)--Union 03)----TableScan: t1 projection=[name] 04)----TableScan: t2 projection=[name] @@ -411,7 +411,7 @@ query TT explain SELECT c1, c9 FROM aggregate_test_100 UNION ALL SELECT c1, c3 FROM aggregate_test_100 ORDER BY c9 DESC LIMIT 5 ---- logical_plan -01)Sort: aggregate_test_100.c9 DESC NULLS FIRST, fetch=5 +01)Sort: c9 DESC NULLS FIRST, fetch=5 02)--Union 03)----Projection: aggregate_test_100.c1, CAST(aggregate_test_100.c9 AS Decimal128(20, 0)) AS c9 04)------TableScan: aggregate_test_100 projection=[c1, c9] @@ -449,7 +449,7 @@ SELECT count(*) FROM ( ---- logical_plan 01)Projection: count(Int64(1)) AS count(*) -02)--Aggregate: groupBy=[[t1.name]], aggr=[[count(Int64(1))]] +02)--Aggregate: groupBy=[[name]], aggr=[[count(Int64(1))]] 03)----Union 04)------Aggregate: groupBy=[[t1.name]], aggr=[[]] 05)--------TableScan: t1 projection=[name] @@ -601,7 +601,7 @@ UNION ALL ORDER BY c1 ---- logical_plan -01)Sort: t1.c1 ASC NULLS LAST +01)Sort: c1 ASC NULLS LAST 02)--Union 03)----TableScan: t1 projection=[c1] 04)----Projection: t2.c1a AS c1 @@ -814,7 +814,7 @@ UNION ALL ORDER BY c1 ---- logical_plan -01)Sort: aggregate_test_100.c1 ASC NULLS LAST +01)Sort: c1 ASC NULLS LAST 02)--Union 03)----Filter: aggregate_test_100.c1 = Utf8("a") 04)------TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], partial_filters=[aggregate_test_100.c1 = Utf8("a")] @@ -860,3 +860,39 @@ FROM ( GROUP BY combined ---- AB + + +# Test union in view +statement ok +CREATE TABLE u1 (x INT, y INT); + +statement ok +INSERT INTO u1 VALUES (3, 3), (3, 3), (1, 1); + +statement ok +CREATE TABLE u2 (y BIGINT, z BIGINT); + +statement ok +INSERT INTO u2 VALUES (20, 20), (40, 40); + +statement ok +CREATE VIEW v1 AS +SELECT y FROM u1 UNION ALL SELECT y FROM u2 ORDER BY y; + +query I +SELECT * FROM (SELECT y FROM u1 UNION ALL SELECT y FROM u2) ORDER BY y; +---- +1 +3 +3 +20 +40 + +query I +SELECT * FROM v1; +---- +1 +3 +3 +20 +40 diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 63a43a36ff16..2897d17f2aa7 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -124,7 +124,7 @@ NULL 3 # Ambiguous name -statement error DataFusion error: Schema error: No field named t1.x. Valid fields are a, b. +statement error DataFusion error: Schema error: No field named x. Valid fields are a, b. SELECT x AS a FROM t1 UNION BY NAME SELECT x AS b FROM t1 ORDER BY t1.x; query II From cf9449824117252275fc90b0fb311b2f8ed08adc Mon Sep 17 00:00:00 2001 From: Matt Friede <7852262+Friede80@users.noreply.github.com> Date: Thu, 13 Mar 2025 14:04:40 -0400 Subject: [PATCH 4/8] Review feedback: avoid cloning schema --- datafusion/optimizer/src/analyzer/type_coercion.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 19614b18005d..62480d9f1f57 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -930,7 +930,7 @@ fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result { /// This method presumes that the wildcard expansion is unneeded, or has already /// been applied. pub fn coerce_union_schema(inputs: &[Arc]) -> Result { - let base_schema = inputs[0].schema().as_ref().clone().strip_qualifiers(); + let base_schema = inputs[0].schema(); let mut union_datatypes = base_schema .fields() .iter() @@ -989,15 +989,15 @@ pub fn coerce_union_schema(inputs: &[Arc]) -> Result { } } let union_qualified_fields = izip!( - base_schema.iter(), + base_schema.fields(), union_datatypes.into_iter(), union_nullabilities, union_field_meta.into_iter() ) - .map(|((qualifier, field), datatype, nullable, metadata)| { + .map(|(field, datatype, nullable, metadata)| { let mut field = Field::new(field.name().clone(), datatype, nullable); field.set_metadata(metadata); - (qualifier.cloned(), field.into()) + (None, field.into()) }) .collect::>(); From 612c217d13b86647b7438b6fd1054c0d8c27f766 Mon Sep 17 00:00:00 2001 From: Matt Friede <7852262+Friede80@users.noreply.github.com> Date: Thu, 13 Mar 2025 16:11:21 -0400 Subject: [PATCH 5/8] start from union schema when coercing --- .../optimizer/src/analyzer/type_coercion.rs | 8 +++++--- datafusion/sqllogictest/test_files/union.slt | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 62480d9f1f57..7f4cc34dda5d 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -214,7 +214,7 @@ impl<'a> TypeCoercionRewriter<'a> { /// Coerce the union’s inputs to a common schema compatible with all inputs. /// This occurs after wildcard expansion and the coercion of the input expressions. pub fn coerce_union(union_plan: Union) -> Result { - let union_schema = Arc::new(coerce_union_schema(&union_plan.inputs)?); + let union_schema = Arc::new(coerce_union_schema_with_schema(&union_plan.inputs, &union_plan.schema)?); let new_inputs = union_plan .inputs .into_iter() @@ -930,7 +930,9 @@ fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result { /// This method presumes that the wildcard expansion is unneeded, or has already /// been applied. pub fn coerce_union_schema(inputs: &[Arc]) -> Result { - let base_schema = inputs[0].schema(); + coerce_union_schema_with_schema(&inputs[1..], inputs[0].schema()) +} +fn coerce_union_schema_with_schema(inputs: &[Arc], base_schema: &DFSchemaRef) -> Result { let mut union_datatypes = base_schema .fields() .iter() @@ -949,7 +951,7 @@ pub fn coerce_union_schema(inputs: &[Arc]) -> Result { let mut metadata = base_schema.metadata().clone(); - for (i, plan) in inputs.iter().enumerate().skip(1) { + for (i, plan) in inputs.iter().enumerate() { let plan_schema = plan.schema(); metadata.extend(plan_schema.metadata().clone()); diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 9d2c5c9bc14c..654bccfab5a6 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -709,6 +709,25 @@ SELECT t1.v2, t1.v0 FROM t2 NATURAL JOIN t1 SELECT t1.v2, t1.v0 FROM t2 NATURAL JOIN t1 WHERE (t1.v2 IS NULL); ---- +query IR +SELECT t1.v0, t2.v0 FROM t1,t2 + UNION ALL +SELECT t1.v0, t2.v0 FROM t1,t2 +ORDER BY v0; +---- +-1493773377 0.280145772929 +-1493773377 0.280145772929 +-1229445667 0.280145772929 +-1229445667 0.280145772929 +1541512604 0.280145772929 +1541512604 0.280145772929 +NULL 0.280145772929 +NULL 0.280145772929 +NULL 0.280145772929 +NULL 0.280145772929 +NULL 0.280145772929 +NULL 0.280145772929 + statement ok CREATE TABLE t3 ( id INT From 5e7965929731b445b6b480bb0dbe5ac434939636 Mon Sep 17 00:00:00 2001 From: Matt Friede <7852262+Friede80@users.noreply.github.com> Date: Thu, 13 Mar 2025 19:31:00 -0400 Subject: [PATCH 6/8] cargo fmt --- datafusion/optimizer/src/analyzer/type_coercion.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 7f4cc34dda5d..1be0a1198165 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -214,7 +214,10 @@ impl<'a> TypeCoercionRewriter<'a> { /// Coerce the union’s inputs to a common schema compatible with all inputs. /// This occurs after wildcard expansion and the coercion of the input expressions. pub fn coerce_union(union_plan: Union) -> Result { - let union_schema = Arc::new(coerce_union_schema_with_schema(&union_plan.inputs, &union_plan.schema)?); + let union_schema = Arc::new(coerce_union_schema_with_schema( + &union_plan.inputs, + &union_plan.schema, + )?); let new_inputs = union_plan .inputs .into_iter() @@ -932,7 +935,10 @@ fn coerce_case_expression(case: Case, schema: &DFSchema) -> Result { pub fn coerce_union_schema(inputs: &[Arc]) -> Result { coerce_union_schema_with_schema(&inputs[1..], inputs[0].schema()) } -fn coerce_union_schema_with_schema(inputs: &[Arc], base_schema: &DFSchemaRef) -> Result { +fn coerce_union_schema_with_schema( + inputs: &[Arc], + base_schema: &DFSchemaRef, +) -> Result { let mut union_datatypes = base_schema .fields() .iter() From 090f0b6dfd77cc13e56622bd492340efd58e0fd8 Mon Sep 17 00:00:00 2001 From: Matt Friede <7852262+Friede80@users.noreply.github.com> Date: Thu, 13 Mar 2025 19:53:44 -0400 Subject: [PATCH 7/8] dont use wildcard in test --- .../optimizer/src/analyzer/type_coercion.rs | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs b/datafusion/optimizer/src/analyzer/type_coercion.rs index 1be0a1198165..8248a0cc793a 100644 --- a/datafusion/optimizer/src/analyzer/type_coercion.rs +++ b/datafusion/optimizer/src/analyzer/type_coercion.rs @@ -1051,7 +1051,6 @@ mod test { use arrow::datatypes::DataType::Utf8; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; - use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule; use crate::analyzer::type_coercion::{ coerce_case_expression, TypeCoercion, TypeCoercionRewriter, }; @@ -1064,9 +1063,9 @@ mod test { use datafusion_expr::logical_plan::{EmptyRelation, Projection, Sort}; use datafusion_expr::test::function_stub::avg_udaf; use datafusion_expr::{ - cast, col, create_udaf, is_true, lit, wildcard, AccumulatorFactoryFunction, - AggregateUDF, BinaryExpr, Case, ColumnarValue, Expr, ExprSchemable, Filter, - LogicalPlan, Operator, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, + cast, col, create_udaf, is_true, lit, AccumulatorFactoryFunction, AggregateUDF, + BinaryExpr, Case, ColumnarValue, Expr, ExprSchemable, Filter, LogicalPlan, + Operator, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, SimpleAggregateUDF, Subquery, Union, Volatility, }; use datafusion_functions_aggregate::average::AvgAccumulator; @@ -1129,19 +1128,12 @@ mod test { let analyzed_union = Analyzer::with_rules(vec![Arc::new(TypeCoercion::new())]) .execute_and_check(union, &ConfigOptions::default(), |_, _| {})?; let top_level_plan = LogicalPlan::Projection(Projection::try_new( - vec![wildcard()], + vec![col("a")], Arc::new(analyzed_union), )?); - let expanded_plan = - Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]) - .execute_and_check( - top_level_plan, - &ConfigOptions::default(), - |_, _| {}, - )?; let expected = "Projection: a\n Union\n Projection: CAST(datafusion.test.foo.a AS Int64) AS a\n EmptyRelation\n EmptyRelation"; - assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), expanded_plan, expected) + assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), top_level_plan, expected) } fn coerce_on_output_if_viewtype(plan: LogicalPlan, expected: &str) -> Result<()> { From cb4194d67d3b811341e83db9c7c168e35eaa5ab3 Mon Sep 17 00:00:00 2001 From: Matt Friede <7852262+Friede80@users.noreply.github.com> Date: Fri, 14 Mar 2025 14:01:07 -0400 Subject: [PATCH 8/8] Dont strip qualifiers for sorts over unions --- datafusion/expr/src/logical_plan/builder.rs | 26 +------------------ .../sqllogictest/test_files/union_by_name.slt | 10 +++---- 2 files changed, 6 insertions(+), 30 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c981d64c18e0..2ade93a25f17 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -51,7 +51,6 @@ use arrow::compute::can_cast_types; use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef}; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::file_options::file_type::FileType; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{ exec_err, get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, Constraints, DFSchema, DFSchemaRef, @@ -778,31 +777,8 @@ impl LogicalPlanBuilder { is_distinct, )?; - let mut sort_exprs = normalize_sorts(sorts, &plan)?; - if matches!(&plan, LogicalPlan::Union(_)) - || matches!(&plan, LogicalPlan::Distinct(distinct) if matches!(**distinct.input(), LogicalPlan::Union(_))) - { - sort_exprs = sort_exprs - .into_iter() - .map(|sort_expr| { - let inner_expr = sort_expr.expr.clone(); - let qualifiers_removed = inner_expr - .transform_up(|expr| { - if let Expr::Column(mut col) = expr { - col.relation = None; - Ok(Transformed::yes(Expr::Column(col))) - } else { - Ok(Transformed::no(expr)) - } - }) - .data()?; - Ok(sort_expr.with_expr(qualifiers_removed)) - }) - .collect::>()? - }; - let sort_plan = LogicalPlan::Sort(Sort { - expr: sort_exprs, + expr: normalize_sorts(sorts, &plan)?, input: Arc::new(plan), fetch, }); diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 2897d17f2aa7..3844dba68079 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -54,13 +54,13 @@ INSERT INTO t2 VALUES (2, 2), (4, 4); # Test binding query I -SELECT t1.x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY t1.x; +SELECT t1.x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY x; ---- 1 3 query I -SELECT t1.x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY t1.x; +SELECT t1.x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY x; ---- 1 1 @@ -70,13 +70,13 @@ SELECT t1.x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY t1.x; 3 query I -SELECT x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY t1.x; +SELECT x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY x; ---- 1 3 query I -SELECT x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY t1.x; +SELECT x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY x; ---- 1 1 @@ -125,7 +125,7 @@ NULL 3 # Ambiguous name statement error DataFusion error: Schema error: No field named x. Valid fields are a, b. -SELECT x AS a FROM t1 UNION BY NAME SELECT x AS b FROM t1 ORDER BY t1.x; +SELECT x AS a FROM t1 UNION BY NAME SELECT x AS b FROM t1 ORDER BY x; query II (SELECT y FROM t1 UNION ALL SELECT x FROM t1) UNION BY NAME (SELECT z FROM t2 UNION ALL SELECT y FROM t2) ORDER BY y, z;