Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,21 @@ impl LogicalPlanBuilder {
union(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
}

/// Apply a union by name, preserving duplicate rows
pub fn union_by_name(self, plan: LogicalPlan) -> Result<Self> {
union_by_name(Arc::unwrap_or_clone(self.plan), plan).map(Self::new)
}

/// Apply a union by name, removing duplicate rows
pub fn union_by_name_distinct(self, plan: LogicalPlan) -> Result<Self> {
let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
let right_plan: LogicalPlan = plan;

Ok(Self::new(LogicalPlan::Distinct(Distinct::All(Arc::new(
union_by_name(left_plan, right_plan)?,
)))))
}

/// Apply a union, removing duplicate rows
pub fn union_distinct(self, plan: LogicalPlan) -> Result<Self> {
let left_plan: LogicalPlan = Arc::unwrap_or_clone(self.plan);
Expand Down Expand Up @@ -1538,6 +1553,18 @@ pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalP
])?))
}

/// Like [`union`], but combine rows from different tables by name, rather than
/// by position.
pub fn union_by_name(
left_plan: LogicalPlan,
right_plan: LogicalPlan,
) -> Result<LogicalPlan> {
Ok(LogicalPlan::Union(Union::try_new_by_name(vec![
Arc::new(left_plan),
Arc::new(right_plan),
])?))
}

/// Create Projection
/// # Errors
/// This function errors under any of the following conditions:
Expand Down
141 changes: 135 additions & 6 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Logical plan types

use std::cmp::Ordering;
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::{Arc, LazyLock};
Expand Down Expand Up @@ -705,6 +705,13 @@ impl LogicalPlan {
// If inputs are not pruned do not change schema
Ok(LogicalPlan::Union(Union { inputs, schema }))
} else {
// A note on `Union`s constructed via `try_new_by_name`:
//
// At this point, the schema for each input should have
// the same width. Thus, we do not need to save whether a
// `Union` was created `BY NAME`, and can safely rely on the
// `try_new` initializer to derive the new schema based on
// column positions.
Ok(LogicalPlan::Union(Union::try_new(inputs)?))
}
}
Expand Down Expand Up @@ -2648,7 +2655,7 @@ pub struct Union {
impl Union {
/// Constructs new Union instance deriving schema from inputs.
fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
let schema = Self::derive_schema_from_inputs(&inputs, false)?;
let schema = Self::derive_schema_from_inputs(&inputs, false, false)?;
Ok(Union { inputs, schema })
}

Expand All @@ -2657,21 +2664,143 @@ impl Union {
/// take type from the first input.
// TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all.
pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
let schema = Self::derive_schema_from_inputs(&inputs, true)?;
let schema = Self::derive_schema_from_inputs(&inputs, true, false)?;
Ok(Union { inputs, schema })
}

/// Constructs a new Union instance that combines rows from different tables by name,
/// instead of by position. This means that the specified inputs need not have schemas
/// that are all the same width.
pub fn try_new_by_name(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
let schema = Self::derive_schema_from_inputs(&inputs, true, true)?;
let inputs = Self::rewrite_inputs_from_schema(&schema, inputs)?;

Ok(Union { inputs, schema })
}

/// When constructing a `UNION BY NAME`, we may need to wrap inputs
/// in an additional `Projection` to account for absence of columns
/// in input schemas.
fn rewrite_inputs_from_schema(
schema: &DFSchema,
inputs: Vec<Arc<LogicalPlan>>,
) -> Result<Vec<Arc<LogicalPlan>>> {
let schema_width = schema.iter().count();
let mut wrapped_inputs = Vec::with_capacity(inputs.len());
for input in inputs {
// If the input plan's schema contains the same number of fields
// as the derived schema, then it does not to be wrapped in an
// additional `Projection`.
if input.schema().iter().count() == schema_width {
wrapped_inputs.push(input);
continue;
}

// Any columns that exist within the derived schema but do not exist
// within an input's schema should be replaced with `NULL` aliased
// to the appropriate column in the derived schema.
let mut expr = Vec::with_capacity(schema_width);
for column in schema.columns() {
if input
.schema()
.has_column_with_unqualified_name(column.name())
{
expr.push(Expr::Column(column));
} else {
expr.push(Expr::Literal(ScalarValue::Null).alias(column.name()));
}
}
wrapped_inputs.push(Arc::new(LogicalPlan::Projection(Projection::try_new(
expr, input,
)?)));
}

Ok(wrapped_inputs)
}

/// Constructs new Union instance deriving schema from inputs.
///
/// `loose_types` if true, inputs do not have to have matching types and produced schema will
/// take type from the first input. TODO (<https://github.com/apache/datafusion/issues/14380>) this is not necessarily reasonable behavior.
/// If `loose_types` is true, inputs do not need to have matching types and
/// the produced schema will use the type from the first input.
/// TODO (<https://github.com/apache/datafusion/issues/14380>): This is not necessarily reasonable behavior.
///
/// If `by_name` is `true`, input schemas need not be the same width. That is,
/// the constructed schema follows `UNION BY NAME` semantics.
fn derive_schema_from_inputs(
inputs: &[Arc<LogicalPlan>],
loose_types: bool,
by_name: bool,
) -> Result<DFSchemaRef> {
if inputs.len() < 2 {
return plan_err!("UNION requires at least two inputs");
}

if by_name {
Self::derive_schema_from_inputs_by_name(inputs, loose_types)
} else {
Self::derive_schema_from_inputs_by_position(inputs, loose_types)
}
}

fn derive_schema_from_inputs_by_name(
inputs: &[Arc<LogicalPlan>],
loose_types: bool,
) -> Result<DFSchemaRef> {
type FieldData<'a> = (&'a DataType, bool, Vec<&'a HashMap<String, String>>);
// Prefer `BTreeMap` as it produces items in order by key when iterated over
let mut cols: BTreeMap<&str, FieldData> = BTreeMap::new();
for input in inputs.iter() {
for field in input.schema().fields() {
match cols.entry(field.name()) {
std::collections::btree_map::Entry::Occupied(mut occupied) => {
let (data_type, is_nullable, metadata) = occupied.get_mut();
if !loose_types && *data_type != field.data_type() {
return plan_err!(
"Found different types for field {}",
field.name()
);
}

metadata.push(field.metadata());
// If the field is nullable in any one of the inputs,
// then the field in the final schema is also nullable.
*is_nullable |= field.is_nullable();
}
std::collections::btree_map::Entry::Vacant(vacant) => {
vacant.insert((
field.data_type(),
field.is_nullable(),
vec![field.metadata()],
));
}
}
}
}

let union_fields = cols
.into_iter()
.map(|(name, (data_type, is_nullable, unmerged_metadata))| {
let mut field = Field::new(name, data_type.clone(), is_nullable);
field.set_metadata(intersect_maps(unmerged_metadata));

(None, Arc::new(field))
})
.collect::<Vec<(Option<TableReference>, _)>>();

let union_schema_metadata =
intersect_maps(inputs.iter().map(|input| input.schema().metadata()));

// Functional Dependencies are not preserved after UNION operation
let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
let schema = Arc::new(schema);

Ok(schema)
}

fn derive_schema_from_inputs_by_position(
inputs: &[Arc<LogicalPlan>],
loose_types: bool,
) -> Result<DFSchemaRef> {
let first_schema = inputs[0].schema();
let fields_count = first_schema.fields().len();
for input in inputs.iter().skip(1) {
Expand Down Expand Up @@ -2727,7 +2856,7 @@ impl Union {
let union_schema_metadata =
intersect_maps(inputs.iter().map(|input| input.schema().metadata()));

// Functional Dependencies doesn't preserve after UNION operation
// Functional Dependencies are not preserved after UNION operation
let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
let schema = Arc::new(schema);

Expand Down
77 changes: 44 additions & 33 deletions datafusion/sql/src/set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,18 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
return Err(err);
}
};
self.validate_set_expr_num_of_columns(
op,
left_span,
right_span,
&left_plan,
&right_plan,
set_expr_span,
)?;

if !(set_quantifier == SetQuantifier::ByName
|| set_quantifier == SetQuantifier::AllByName)
{
self.validate_set_expr_num_of_columns(
op,
left_span,
right_span,
&left_plan,
&right_plan,
set_expr_span,
)?;
}
self.set_operation_to_plan(op, left_plan, right_plan, set_quantifier)
}
SetExpr::Query(q) => self.query_to_plan(*q, planner_context),
Expand All @@ -72,17 +75,11 @@ impl<S: ContextProvider> SqlToRel<'_, S> {

pub(super) fn is_union_all(set_quantifier: SetQuantifier) -> Result<bool> {
match set_quantifier {
SetQuantifier::All => Ok(true),
SetQuantifier::Distinct | SetQuantifier::None => Ok(false),
SetQuantifier::ByName => {
not_impl_err!("UNION BY NAME not implemented")
}
SetQuantifier::AllByName => {
not_impl_err!("UNION ALL BY NAME not implemented")
}
SetQuantifier::DistinctByName => {
not_impl_err!("UNION DISTINCT BY NAME not implemented")
}
SetQuantifier::All | SetQuantifier::AllByName => Ok(true),
SetQuantifier::Distinct
| SetQuantifier::ByName
| SetQuantifier::DistinctByName
| SetQuantifier::None => Ok(false),
}
}

Expand Down Expand Up @@ -127,28 +124,42 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
right_plan: LogicalPlan,
set_quantifier: SetQuantifier,
) -> Result<LogicalPlan> {
let all = Self::is_union_all(set_quantifier)?;
match (op, all) {
(SetOperator::Union, true) => LogicalPlanBuilder::from(left_plan)
.union(right_plan)?
.build(),
(SetOperator::Union, false) => LogicalPlanBuilder::from(left_plan)
.union_distinct(right_plan)?
match (op, set_quantifier) {
(SetOperator::Union, SetQuantifier::All) => {
LogicalPlanBuilder::from(left_plan)
.union(right_plan)?
.build()
}
(SetOperator::Union, SetQuantifier::AllByName) => {
LogicalPlanBuilder::from(left_plan)
.union_by_name(right_plan)?
.build()
}
(SetOperator::Union, SetQuantifier::Distinct | SetQuantifier::None) => {
LogicalPlanBuilder::from(left_plan)
.union_distinct(right_plan)?
.build()
}
(
SetOperator::Union,
SetQuantifier::ByName | SetQuantifier::DistinctByName,
) => LogicalPlanBuilder::from(left_plan)
.union_by_name_distinct(right_plan)?
.build(),
(SetOperator::Intersect, true) => {
(SetOperator::Intersect, SetQuantifier::All) => {
LogicalPlanBuilder::intersect(left_plan, right_plan, true)
}
(SetOperator::Intersect, false) => {
(SetOperator::Intersect, SetQuantifier::Distinct | SetQuantifier::None) => {
LogicalPlanBuilder::intersect(left_plan, right_plan, false)
}
(SetOperator::Except, true) => {
(SetOperator::Except, SetQuantifier::All) => {
LogicalPlanBuilder::except(left_plan, right_plan, true)
}
(SetOperator::Except, false) => {
(SetOperator::Except, SetQuantifier::Distinct | SetQuantifier::None) => {
LogicalPlanBuilder::except(left_plan, right_plan, false)
}
(SetOperator::Minus, _) => {
not_impl_err!("MINUS Set Operator not implemented")
(op, quantifier) => {
not_impl_err!("{op} {quantifier} not implemented")
}
}
}
Expand Down
Loading
Loading