From fb596fc8d8746e5e12113bb489fd6fe0da365e6d Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Wed, 5 Feb 2025 23:34:58 -0800 Subject: [PATCH 1/8] feat: Union by name --- datafusion/expr/src/logical_plan/builder.rs | 27 ++++ datafusion/expr/src/logical_plan/plan.rs | 143 +++++++++++++++++++- datafusion/sql/src/set_expr.rs | 77 ++++++----- 3 files changed, 208 insertions(+), 39 deletions(-) diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index ab89f752343d..b1107f2add91 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -720,6 +720,21 @@ impl LogicalPlanBuilder { union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new) } + /// Apply a union by name, preserving duplicate rows + pub fn union_by_name(self, plan: LogicalPlan) -> Result { + union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new) + } + + /// Apply a union by name, removing duplicate rows + pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result { + let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan); + let right_plan: LogicalPlan = plan; + + Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new( + union_by_name(left_plan, right_plan)?, + ))))) + } + /// Apply a union, removing duplicate rows pub fn union_distinct(self, plan: LogicalPlan) -> Result { let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan); @@ -1538,6 +1553,18 @@ pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result Result { + Ok(LogicalPlan::Union(Union::try_new_by_name(vec![ + Arc::new(left_plan), + Arc::new(right_plan), + ])?)) +} + /// Create Projection /// # Errors /// This function errors under any of the following conditions: diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 100a0b7d43dd..f2b3da9faaa4 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -18,7 +18,7 @@ //! Logical plan types use std::cmp::Ordering; -use std::collections::{HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{self, Debug, Display, Formatter}; use std::hash::{Hash, Hasher}; use std::sync::{Arc, LazyLock}; @@ -705,6 +705,13 @@ impl LogicalPlan { // If inputs are not pruned do not change schema Ok(LogicalPlan::Union(Union { inputs, schema })) } else { + // A note on `Union`s constructed via `try_new_by_name`: + // + // At this point, the schema for each input should have + // the same width. Thus, we do not need to save whether a + // `Union` was created `BY NAME`, and can safely rely on the + // `try_new` initializer to derive the new schema based on + // column positions. Ok(LogicalPlan::Union(Union::try_new(inputs)?)) } } @@ -2648,7 +2655,7 @@ pub struct Union { impl Union { /// Constructs new Union instance deriving schema from inputs. fn try_new(inputs: Vec>) -> Result { - let schema = Self::derive_schema_from_inputs(&inputs, false)?; + let schema = Self::derive_schema_from_inputs(&inputs, false, false)?; Ok(Union { inputs, schema }) } @@ -2657,21 +2664,145 @@ impl Union { /// take type from the first input. // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all. pub fn try_new_with_loose_types(inputs: Vec>) -> Result { - let schema = Self::derive_schema_from_inputs(&inputs, true)?; + let schema = Self::derive_schema_from_inputs(&inputs, true, false)?; Ok(Union { inputs, schema }) } + /// Constructs a new Union instance that combines rows from different tables by name, + /// instead of by position. This means that the specified inputs need not have schemas + /// that are all the same width. + pub fn try_new_by_name(inputs: Vec>) -> Result { + let schema = Self::derive_schema_from_inputs(&inputs, true, true)?; + let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?; + + Ok(Union { inputs, schema }) + } + + /// When constructing a `UNION BY NAME`, we may need to wrap inputs + /// in an additional `Projection` to account for absence of columns + /// in input schemas. + fn rewrite_inputs_from_schema( + schema: &DFSchema, + inputs: Vec>, + ) -> Result>> { + let schema_width = schema.iter().count(); + let mut wrapped_inputs = Vec::with_capacity(inputs.len()); + for input in inputs { + // If the input plan's schema contains the same number of fields + // as the derived schema, then it does not to be wrapped in an + // additional `Projection`. + if input.schema().iter().count() == schema_width { + wrapped_inputs.push(input); + continue; + } + + // Any columns that exist within the derived schema but do not exist + // within an input's schema should be replaced with `NULL` aliased + // to the appropriate column in the derived schema. + let mut expr = Vec::with_capacity(schema_width); + for column in schema.columns() { + if input + .schema() + .has_column_with_unqualified_name(column.name()) + { + expr.push(Expr::Column(column)); + } else { + expr.push(Expr::Literal(ScalarValue::Null).alias(column.name())); + } + } + wrapped_inputs.push(Arc::new(LogicalPlan::Projection(Projection::try_new( + expr, input, + )?))); + } + + Ok(wrapped_inputs) + } + /// Constructs new Union instance deriving schema from inputs. /// - /// `loose_types` if true, inputs do not have to have matching types and produced schema will - /// take type from the first input. TODO () this is not necessarily reasonable behavior. + /// If `loose_types` is true, inputs do not need to have matching types and + /// the produced schema will use the type from the first input. + /// TODO (): This is not necessarily reasonable behavior. + /// + /// If `by_name` is `true`, input schemas need not be the same width. That is, + /// the constructed schema follows `UNION BY NAME` semantics. fn derive_schema_from_inputs( inputs: &[Arc], loose_types: bool, + by_name: bool, ) -> Result { if inputs.len() < 2 { return plan_err!("UNION requires at least two inputs"); } + + if by_name { + Self::derive_schema_from_inputs_by_name(inputs, loose_types) + } else { + Self::derive_schema_from_inputs_by_position(inputs, loose_types) + } + } + + fn derive_schema_from_inputs_by_name( + inputs: &[Arc], + loose_types: bool, + ) -> Result { + // Prefer `BTreeMap` as it produces items in order by key when iterated over + let mut cols: BTreeMap<&str, (&DataType, bool, Vec<&HashMap>)> = + BTreeMap::new(); + for input in inputs.iter() { + for field in input.schema().fields() { + match cols.entry(&field.name()) { + std::collections::btree_map::Entry::Occupied(mut occupied) => { + let (data_type, is_nullable, metadata) = occupied.get_mut(); + if !loose_types { + if *data_type != field.data_type() { + return plan_err!( + "Found different types for field {}", + field.name() + ); + } + } + + metadata.push(field.metadata()); + // If the field is nullable in any one of the inputs, + // then the field in the final schema is also nullable. + *is_nullable |= field.is_nullable(); + } + std::collections::btree_map::Entry::Vacant(vacant) => { + vacant.insert(( + field.data_type(), + field.is_nullable(), + vec![field.metadata()], + )); + } + } + } + } + + let union_fields = cols + .into_iter() + .map(|(name, (data_type, is_nullable, unmerged_metadata))| { + let mut field = Field::new(name, data_type.clone(), is_nullable); + field.set_metadata(intersect_maps(unmerged_metadata)); + + (None, Arc::new(field)) + }) + .collect::, _)>>(); + + let union_schema_metadata = + intersect_maps(inputs.iter().map(|input| input.schema().metadata())); + + // Functional Dependencies are not preserved after UNION operation + let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?; + let schema = Arc::new(schema); + + Ok(schema) + } + + fn derive_schema_from_inputs_by_position( + inputs: &[Arc], + loose_types: bool, + ) -> Result { let first_schema = inputs[0].schema(); let fields_count = first_schema.fields().len(); for input in inputs.iter().skip(1) { @@ -2727,7 +2858,7 @@ impl Union { let union_schema_metadata = intersect_maps(inputs.iter().map(|input| input.schema().metadata())); - // Functional Dependencies doesn't preserve after UNION operation + // Functional Dependencies are not preserved after UNION operation let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?; let schema = Arc::new(schema); diff --git a/datafusion/sql/src/set_expr.rs b/datafusion/sql/src/set_expr.rs index 2579f2397228..a55b3b039087 100644 --- a/datafusion/sql/src/set_expr.rs +++ b/datafusion/sql/src/set_expr.rs @@ -54,15 +54,18 @@ impl SqlToRel<'_, S> { return Err(err); } }; - self.validate_set_expr_num_of_columns( - op, - left_span, - right_span, - &left_plan, - &right_plan, - set_expr_span, - )?; - + if !(set_quantifier == SetQuantifier::ByName + || set_quantifier == SetQuantifier::AllByName) + { + self.validate_set_expr_num_of_columns( + op, + left_span, + right_span, + &left_plan, + &right_plan, + set_expr_span, + )?; + } self.set_operation_to_plan(op, left_plan, right_plan, set_quantifier) } SetExpr::Query(q) => self.query_to_plan(*q, planner_context), @@ -72,17 +75,11 @@ impl SqlToRel<'_, S> { pub(super) fn is_union_all(set_quantifier: SetQuantifier) -> Result { match set_quantifier { - SetQuantifier::All => Ok(true), - SetQuantifier::Distinct | SetQuantifier::None => Ok(false), - SetQuantifier::ByName => { - not_impl_err!("UNION BY NAME not implemented") - } - SetQuantifier::AllByName => { - not_impl_err!("UNION ALL BY NAME not implemented") - } - SetQuantifier::DistinctByName => { - not_impl_err!("UNION DISTINCT BY NAME not implemented") - } + SetQuantifier::All | SetQuantifier::AllByName => Ok(true), + SetQuantifier::Distinct + | SetQuantifier::ByName + | SetQuantifier::DistinctByName + | SetQuantifier::None => Ok(false), } } @@ -127,28 +124,42 @@ impl SqlToRel<'_, S> { right_plan: LogicalPlan, set_quantifier: SetQuantifier, ) -> Result { - let all = Self::is_union_all(set_quantifier)?; - match (op, all) { - (SetOperator::Union, true) => LogicalPlanBuilder::from(left_plan) - .union(right_plan)? - .build(), - (SetOperator::Union, false) => LogicalPlanBuilder::from(left_plan) - .union_distinct(right_plan)? + match (op, set_quantifier) { + (SetOperator::Union, SetQuantifier::All) => { + LogicalPlanBuilder::from(left_plan) + .union(right_plan)? + .build() + } + (SetOperator::Union, SetQuantifier::AllByName) => { + LogicalPlanBuilder::from(left_plan) + .union_by_name(right_plan)? + .build() + } + (SetOperator::Union, SetQuantifier::Distinct | SetQuantifier::None) => { + LogicalPlanBuilder::from(left_plan) + .union_distinct(right_plan)? + .build() + } + ( + SetOperator::Union, + SetQuantifier::ByName | SetQuantifier::DistinctByName, + ) => LogicalPlanBuilder::from(left_plan) + .union_by_name_distinct(right_plan)? .build(), - (SetOperator::Intersect, true) => { + (SetOperator::Intersect, SetQuantifier::All) => { LogicalPlanBuilder::intersect(left_plan, right_plan, true) } - (SetOperator::Intersect, false) => { + (SetOperator::Intersect, SetQuantifier::Distinct | SetQuantifier::None) => { LogicalPlanBuilder::intersect(left_plan, right_plan, false) } - (SetOperator::Except, true) => { + (SetOperator::Except, SetQuantifier::All) => { LogicalPlanBuilder::except(left_plan, right_plan, true) } - (SetOperator::Except, false) => { + (SetOperator::Except, SetQuantifier::Distinct | SetQuantifier::None) => { LogicalPlanBuilder::except(left_plan, right_plan, false) } - (SetOperator::Minus, _) => { - not_impl_err!("MINUS Set Operator not implemented") + (op, quantifier) => { + not_impl_err!("{op} {quantifier} not implemented") } } } From cd024c522253768cda994ffebc191a39c485e370 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Thu, 6 Feb 2025 23:57:24 -0800 Subject: [PATCH 2/8] add sqllogictest --- .../sqllogictest/test_files/union_by_name.slt | 150 ++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 datafusion/sqllogictest/test_files/union_by_name.slt diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt new file mode 100644 index 000000000000..8850681f4291 --- /dev/null +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +statement ok +CREATE TABLE t1 (x INT, y INT); + +statement ok +INSERT INTO t1 VALUES (3, 3), (3, 3), (1, 1); + +statement ok +CREATE TABLE t2 (y INT, z INT); + +statement ok +INSERT INTO t2 VALUES (2, 2), (4, 4); + + +# Test binding +query I +SELECT t1.x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY t1.x; +---- +1 +3 + +query I +SELECT x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY t1.x; +---- +1 +3 + +query II +(SELECT x FROM t1 UNION ALL SELECT x FROM t1) UNION BY NAME SELECT 5 ORDER BY x; +---- +NULL 1 +NULL 3 +5 NULL + +query II +(SELECT x FROM t1 UNION ALL SELECT y FROM t1) UNION BY NAME SELECT 5 ORDER BY x; +---- +NULL 1 +NULL 3 +5 NULL + + +# Ambiguous name + +statement error DataFusion error: Schema error: No field named t1.x. Valid fields are a, b. +SELECT x AS a FROM t1 UNION BY NAME SELECT x AS b FROM t1 ORDER BY t1.x; + +query II +(SELECT y FROM t1 UNION ALL SELECT x FROM t1) UNION BY NAME (SELECT z FROM t2 UNION ALL SELECT y FROM t2) ORDER BY y, z; +---- +1 NULL +3 NULL +NULL 2 +NULL 4 + +# Limit + +query III rowsort +SELECT 1 UNION BY NAME SELECT * FROM unnest(range(2, 100)) UNION BY NAME SELECT 999 ORDER BY "Int64(999)", "Int64(1)" LIMIT 5; +---- +1 NULL NULL +NULL 999 NULL +NULL NULL 13 +NULL NULL 20 +NULL NULL 4 + +# Order by + +query III +SELECT x, y FROM t1 UNION BY NAME SELECT y, z FROM t2 ORDER BY y; +---- +1 1 NULL +NULL 2 2 +3 3 NULL +NULL 4 4 + +query III +SELECT x, y FROM t1 UNION BY NAME SELECT y, z FROM t2 ORDER BY 3, 1; +---- +NULL 2 2 +NULL 4 4 +1 1 NULL +3 3 NULL + +statement error +SELECT x, y FROM t1 UNION BY NAME SELECT y, z FROM t2 ORDER BY 4; +---- +DataFusion error: Error during planning: Order by column out of bounds, specified: 4, max: 3 + + +# Multi set operations + +query IIII rowsort +(SELECT 1 UNION BY NAME SELECT x, y FROM t1) UNION BY NAME SELECT y, z FROM t2; +---- +1 NULL NULL NULL +NULL 1 1 NULL +NULL 3 3 NULL +NULL NULL 2 2 +NULL NULL 4 4 + +query III +SELECT x, y FROM t1 UNION BY NAME (SELECT y, z FROM t2 INTERSECT SELECT 2, 2 as two FROM t1 ORDER BY 1) ORDER BY 1; +---- +1 1 NULL +3 3 NULL +NULL 2 2 + + +query III +(SELECT x, y FROM t1 UNION BY NAME SELECT y, z FROM t2 ORDER BY 1) EXCEPT SELECT NULL, 2, 2 as two FROM t1 ORDER BY 1; +---- +1 1 NULL +3 3 NULL +NULL 4 4 + + +# Alias in select list + +query II +SELECT x as a FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY 1, 2; +---- +1 NULL +3 NULL +NULL 1 +NULL 3 + +# Different types + +query T rowsort +select '0' as c union all by name select 0 as c; +---- +0 +0 From 7d5f640639a7f85ca2665c9b05ecaf536251ff63 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Fri, 7 Feb 2025 00:44:07 -0800 Subject: [PATCH 3/8] fix clippy warnings --- datafusion/expr/src/logical_plan/plan.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index f2b3da9faaa4..a07da8adde78 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2746,21 +2746,19 @@ impl Union { inputs: &[Arc], loose_types: bool, ) -> Result { + type FieldData<'a> = (&'a DataType, bool, Vec<&'a HashMap>); // Prefer `BTreeMap` as it produces items in order by key when iterated over - let mut cols: BTreeMap<&str, (&DataType, bool, Vec<&HashMap>)> = - BTreeMap::new(); + let mut cols: BTreeMap<&str, FieldData> = BTreeMap::new(); for input in inputs.iter() { for field in input.schema().fields() { - match cols.entry(&field.name()) { + match cols.entry(field.name()) { std::collections::btree_map::Entry::Occupied(mut occupied) => { let (data_type, is_nullable, metadata) = occupied.get_mut(); - if !loose_types { - if *data_type != field.data_type() { - return plan_err!( - "Found different types for field {}", - field.name() - ); - } + if !loose_types && *data_type != field.data_type() { + return plan_err!( + "Found different types for field {}", + field.name() + ); } metadata.push(field.metadata()); From 80864d53e403d282e501a74daacf903570b947a8 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Sun, 9 Feb 2025 14:41:51 -0800 Subject: [PATCH 4/8] more tests --- .../sqllogictest/test_files/union_by_name.slt | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index 8850681f4291..ae66e3b40440 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -35,12 +35,32 @@ SELECT t1.x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY t1.x; 1 3 +query I +SELECT t1.x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY t1.x; +---- +1 +1 +3 +3 +3 +3 + query I SELECT x FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY t1.x; ---- 1 3 +query I +SELECT x FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY t1.x; +---- +1 +1 +3 +3 +3 +3 + query II (SELECT x FROM t1 UNION ALL SELECT x FROM t1) UNION BY NAME SELECT 5 ORDER BY x; ---- @@ -48,6 +68,16 @@ NULL 1 NULL 3 5 NULL +# TODO: This should pass, but the sanity checker isn't allowing it. +# Commenting out the ordering check in the sanity checker produces the correct result. +query error +(SELECT x FROM t1 UNION ALL SELECT x FROM t1) UNION ALL BY NAME SELECT 5 ORDER BY x; +---- +DataFusion error: SanityCheckPlan +caused by +Error during planning: Plan: ["SortPreservingMergeExec: [x@1 ASC NULLS LAST]", " UnionExec", " SortExec: expr=[x@1 ASC NULLS LAST], preserve_partitioning=[true]", " ProjectionExec: expr=[NULL as Int64(5), x@0 as x]", " UnionExec", " MemoryExec: partitions=1, partition_sizes=[1]", " MemoryExec: partitions=1, partition_sizes=[1]", " ProjectionExec: expr=[5 as Int64(5), NULL as x]", " PlaceholderRowExec"] does not satisfy order requirements: [x@1 ASC NULLS LAST]. Child-0 order: [] + + query II (SELECT x FROM t1 UNION ALL SELECT y FROM t1) UNION BY NAME SELECT 5 ORDER BY x; ---- @@ -55,6 +85,16 @@ NULL 1 NULL 3 5 NULL +# TODO: This should pass, but the sanity checker isn't allowing it. +# Commenting out the ordering check in the sanity checker produces the correct result. +query error +(SELECT x FROM t1 UNION ALL SELECT y FROM t1) UNION ALL BY NAME SELECT 5 ORDER BY x; +---- +DataFusion error: SanityCheckPlan +caused by +Error during planning: Plan: ["SortPreservingMergeExec: [x@1 ASC NULLS LAST]", " UnionExec", " SortExec: expr=[x@1 ASC NULLS LAST], preserve_partitioning=[true]", " ProjectionExec: expr=[NULL as Int64(5), x@0 as x]", " UnionExec", " MemoryExec: partitions=1, partition_sizes=[1]", " ProjectionExec: expr=[y@0 as x]", " MemoryExec: partitions=1, partition_sizes=[1]", " ProjectionExec: expr=[5 as Int64(5), NULL as x]", " PlaceholderRowExec"] does not satisfy order requirements: [x@1 ASC NULLS LAST]. Child-0 order: [] + + # Ambiguous name From 18d2b448caadf7e9b0c77cdd924e7dc6725cf31b Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Sun, 9 Feb 2025 21:03:20 -0800 Subject: [PATCH 5/8] update slt --- datafusion/sqllogictest/test_files/union_by_name.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index ae66e3b40440..b39b60ec48f2 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -75,7 +75,7 @@ query error ---- DataFusion error: SanityCheckPlan caused by -Error during planning: Plan: ["SortPreservingMergeExec: [x@1 ASC NULLS LAST]", " UnionExec", " SortExec: expr=[x@1 ASC NULLS LAST], preserve_partitioning=[true]", " ProjectionExec: expr=[NULL as Int64(5), x@0 as x]", " UnionExec", " MemoryExec: partitions=1, partition_sizes=[1]", " MemoryExec: partitions=1, partition_sizes=[1]", " ProjectionExec: expr=[5 as Int64(5), NULL as x]", " PlaceholderRowExec"] does not satisfy order requirements: [x@1 ASC NULLS LAST]. Child-0 order: [] +Error during planning: Plan: ["SortPreservingMergeExec: [x@1 ASC NULLS LAST]", " UnionExec", " SortExec: expr=[x@1 ASC NULLS LAST], preserve_partitioning=[true]", " ProjectionExec: expr=[NULL as Int64(5), x@0 as x]", " UnionExec", " DataSourceExec: partitions=1, partition_sizes=[1]", " DataSourceExec: partitions=1, partition_sizes=[1]", " ProjectionExec: expr=[5 as Int64(5), NULL as x]", " PlaceholderRowExec"] does not satisfy order requirements: [x@1 ASC NULLS LAST]. Child-0 order: [] query II @@ -92,7 +92,7 @@ query error ---- DataFusion error: SanityCheckPlan caused by -Error during planning: Plan: ["SortPreservingMergeExec: [x@1 ASC NULLS LAST]", " UnionExec", " SortExec: expr=[x@1 ASC NULLS LAST], preserve_partitioning=[true]", " ProjectionExec: expr=[NULL as Int64(5), x@0 as x]", " UnionExec", " MemoryExec: partitions=1, partition_sizes=[1]", " ProjectionExec: expr=[y@0 as x]", " MemoryExec: partitions=1, partition_sizes=[1]", " ProjectionExec: expr=[5 as Int64(5), NULL as x]", " PlaceholderRowExec"] does not satisfy order requirements: [x@1 ASC NULLS LAST]. Child-0 order: [] +Error during planning: Plan: ["SortPreservingMergeExec: [x@1 ASC NULLS LAST]", " UnionExec", " SortExec: expr=[x@1 ASC NULLS LAST], preserve_partitioning=[true]", " ProjectionExec: expr=[NULL as Int64(5), x@0 as x]", " UnionExec", " DataSourceExec: partitions=1, partition_sizes=[1]", " ProjectionExec: expr=[y@0 as x]", " DataSourceExec: partitions=1, partition_sizes=[1]", " ProjectionExec: expr=[5 as Int64(5), NULL as x]", " PlaceholderRowExec"] does not satisfy order requirements: [x@1 ASC NULLS LAST]. Child-0 order: [] From 1b8439de235c17edc784a1b7dfad91458a83e64c Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Sun, 9 Feb 2025 21:03:20 -0800 Subject: [PATCH 6/8] update slt --- .../sqllogictest/test_files/union_by_name.slt | 90 +++++++++++++++++-- 1 file changed, 82 insertions(+), 8 deletions(-) diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index b39b60ec48f2..a1f1406fd108 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -109,16 +109,40 @@ query II NULL 2 NULL 4 +query II +(SELECT y FROM t1 UNION ALL SELECT x FROM t1) UNION ALL BY NAME (SELECT z FROM t2 UNION ALL SELECT y FROM t2) ORDER BY y, z; +---- +1 NULL +1 NULL +3 NULL +3 NULL +3 NULL +3 NULL +NULL 2 +NULL 2 +NULL 4 +NULL 4 + # Limit -query III rowsort -SELECT 1 UNION BY NAME SELECT * FROM unnest(range(2, 100)) UNION BY NAME SELECT 999 ORDER BY "Int64(999)", "Int64(1)" LIMIT 5; +query III +SELECT 1 UNION BY NAME SELECT * FROM unnest(range(2, 100)) UNION BY NAME SELECT 999 ORDER BY 3, 1 LIMIT 5; ---- -1 NULL NULL -NULL 999 NULL -NULL NULL 13 -NULL NULL 20 +NULL NULL 2 +NULL NULL 3 NULL NULL 4 +NULL NULL 5 +NULL NULL 6 + +# TODO: This should pass, but the sanity checker isn't allowing it. +# Commenting out the ordering check in the sanity checker produces the correct result. +query error +SELECT 1 UNION ALL BY NAME SELECT * FROM unnest(range(2, 100)) UNION ALL BY NAME SELECT 999 ORDER BY 3, 1 LIMIT 5; +---- +DataFusion error: SanityCheckPlan +caused by +Error during planning: Plan: ["SortPreservingMergeExec: [UNNEST(range(Int64(2),Int64(100)))@2 ASC NULLS LAST, Int64(1)@0 ASC NULLS LAST], fetch=5", " UnionExec", " SortExec: TopK(fetch=5), expr=[UNNEST(range(Int64(2),Int64(100)))@2 ASC NULLS LAST], preserve_partitioning=[true]", " ProjectionExec: expr=[Int64(1)@0 as Int64(1), NULL as Int64(999), UNNEST(range(Int64(2),Int64(100)))@1 as UNNEST(range(Int64(2),Int64(100)))]", " UnionExec", " ProjectionExec: expr=[1 as Int64(1), NULL as UNNEST(range(Int64(2),Int64(100)))]", " PlaceholderRowExec", " ProjectionExec: expr=[NULL as Int64(1), __unnest_placeholder(range(Int64(2),Int64(100)),depth=1)@0 as UNNEST(range(Int64(2),Int64(100)))]", " UnnestExec", " ProjectionExec: expr=[[2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99] as __unnest_placeholder(range(Int64(2),Int64(100)))]", " PlaceholderRowExec", " ProjectionExec: expr=[NULL as Int64(1), 999 as Int64(999), NULL as UNNEST(range(Int64(2),Int64(100)))]", " PlaceholderRowExec"] does not satisfy order requirements: [UNNEST(range(Int64(2),Int64(100)))@2 ASC NULLS LAST, Int64(1)@0 ASC NULLS LAST]. Child-0 order: [] + # Order by @@ -130,6 +154,15 @@ NULL 2 2 3 3 NULL NULL 4 4 +query III +SELECT x, y FROM t1 UNION ALL BY NAME SELECT y, z FROM t2 ORDER BY y; +---- +1 1 NULL +NULL 2 2 +3 3 NULL +3 3 NULL +NULL 4 4 + query III SELECT x, y FROM t1 UNION BY NAME SELECT y, z FROM t2 ORDER BY 3, 1; ---- @@ -138,12 +171,27 @@ NULL 4 4 1 1 NULL 3 3 NULL +query III +SELECT x, y FROM t1 UNION ALL BY NAME SELECT y, z FROM t2 ORDER BY 3, 1; +---- +NULL 2 2 +NULL 4 4 +1 1 NULL +3 3 NULL +3 3 NULL + statement error SELECT x, y FROM t1 UNION BY NAME SELECT y, z FROM t2 ORDER BY 4; ---- DataFusion error: Error during planning: Order by column out of bounds, specified: 4, max: 3 +statement error +SELECT x, y FROM t1 UNION ALL BY NAME SELECT y, z FROM t2 ORDER BY 4; +---- +DataFusion error: Error during planning: Order by column out of bounds, specified: 4, max: 3 + + # Multi set operations query IIII rowsort @@ -155,6 +203,16 @@ NULL 3 3 NULL NULL NULL 2 2 NULL NULL 4 4 +query IIII rowsort +(SELECT 1 UNION ALL BY NAME SELECT x, y FROM t1) UNION ALL BY NAME SELECT y, z FROM t2; +---- +1 NULL NULL NULL +NULL 1 1 NULL +NULL 3 3 NULL +NULL 3 3 NULL +NULL NULL 2 2 +NULL NULL 4 4 + query III SELECT x, y FROM t1 UNION BY NAME (SELECT y, z FROM t2 INTERSECT SELECT 2, 2 as two FROM t1 ORDER BY 1) ORDER BY 1; ---- @@ -162,6 +220,13 @@ SELECT x, y FROM t1 UNION BY NAME (SELECT y, z FROM t2 INTERSECT SELECT 2, 2 as 3 3 NULL NULL 2 2 +query III +SELECT x, y FROM t1 UNION ALL BY NAME (SELECT y, z FROM t2 INTERSECT SELECT 2, 2 as two FROM t1 ORDER BY 1) ORDER BY 1; +---- +1 1 NULL +3 3 NULL +3 3 NULL +NULL 2 2 query III (SELECT x, y FROM t1 UNION BY NAME SELECT y, z FROM t2 ORDER BY 1) EXCEPT SELECT NULL, 2, 2 as two FROM t1 ORDER BY 1; @@ -170,7 +235,6 @@ query III 3 3 NULL NULL 4 4 - # Alias in select list query II @@ -181,10 +245,20 @@ SELECT x as a FROM t1 UNION BY NAME SELECT x FROM t1 ORDER BY 1, 2; NULL 1 NULL 3 +query II +SELECT x as a FROM t1 UNION ALL BY NAME SELECT x FROM t1 ORDER BY 1, 2; +---- +1 NULL +3 NULL +3 NULL +NULL 1 +NULL 3 +NULL 3 + # Different types query T rowsort -select '0' as c union all by name select 0 as c; +SELECT '0' as c UNION ALL BY NAME SELECT 0 as c; ---- 0 0 From 7c739ff1514b782f58e80dc6b9e254d9c79199b1 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Sun, 9 Feb 2025 21:32:50 -0800 Subject: [PATCH 7/8] add integration tests --- datafusion/sql/tests/sql_integration.rs | 52 +++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index f92a844c0442..1df18302687e 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -2113,6 +2113,33 @@ fn union() { quick_test(sql, expected); } +#[test] +fn union_by_name_different_columns() { + let sql = "SELECT order_id from orders UNION BY NAME SELECT order_id, 1 FROM orders"; + let expected = "\ + Distinct:\ + \n Union\ + \n Projection: NULL AS Int64(1), order_id\ + \n Projection: orders.order_id\ + \n TableScan: orders\ + \n Projection: orders.order_id, Int64(1)\ + \n TableScan: orders"; + quick_test(sql, expected); +} + +#[test] +fn union_by_name_same_column_names() { + let sql = "SELECT order_id from orders UNION SELECT order_id FROM orders"; + let expected = "\ + Distinct:\ + \n Union\ + \n Projection: orders.order_id\ + \n TableScan: orders\ + \n Projection: orders.order_id\ + \n TableScan: orders"; + quick_test(sql, expected); +} + #[test] fn union_all() { let sql = "SELECT order_id from orders UNION ALL SELECT order_id FROM orders"; @@ -2124,6 +2151,31 @@ fn union_all() { quick_test(sql, expected); } +#[test] +fn union_all_by_name_different_columns() { + let sql = + "SELECT order_id from orders UNION ALL BY NAME SELECT order_id, 1 FROM orders"; + let expected = "\ + Union\ + \n Projection: NULL AS Int64(1), order_id\ + \n Projection: orders.order_id\ + \n TableScan: orders\ + \n Projection: orders.order_id, Int64(1)\ + \n TableScan: orders"; + quick_test(sql, expected); +} + +#[test] +fn union_all_by_name_same_column_names() { + let sql = "SELECT order_id from orders UNION ALL BY NAME SELECT order_id FROM orders"; + let expected = "Union\ + \n Projection: orders.order_id\ + \n TableScan: orders\ + \n Projection: orders.order_id\ + \n TableScan: orders"; + quick_test(sql, expected); +} + #[test] fn empty_over() { let sql = "SELECT order_id, MAX(order_id) OVER () from orders"; From f7132260e91e7dadf4a6c17205b878a04f4c0e76 Mon Sep 17 00:00:00 2001 From: Rohan Krishnaswamy Date: Sun, 16 Feb 2025 18:52:24 -0800 Subject: [PATCH 8/8] include duckdb license in union [all] by name slt tests --- .../sqllogictest/test_files/union_by_name.slt | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/datafusion/sqllogictest/test_files/union_by_name.slt b/datafusion/sqllogictest/test_files/union_by_name.slt index a1f1406fd108..0ba4c32ee5be 100644 --- a/datafusion/sqllogictest/test_files/union_by_name.slt +++ b/datafusion/sqllogictest/test_files/union_by_name.slt @@ -15,6 +15,30 @@ # specific language governing permissions and limitations # under the License. +# Portions of this file are derived from DuckDB and are licensed +# under the MIT License (see below). + +# Copyright 2018-2025 Stichting DuckDB Foundation + +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without restriction, +# including without limitation the rights to use, copy, modify, merge, +# publish, distribute, sublicense, and/or sell copies of the Software, +# and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: + +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. + +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. + statement ok CREATE TABLE t1 (x INT, y INT);