Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,25 @@ impl DFSchema {
Ok(dfschema)
}

/// Return the same schema, where all fields have a given qualifier.
pub fn with_field_specific_qualified_schema(
&self,
qualifiers: Vec<Option<TableReference>>,
) -> Result<Self> {
if qualifiers.len() != self.fields().len() {
return _plan_err!(
"Number of qualifiers must match number of fields. Expected {}, got {}",
self.fields().len(),
qualifiers.len()
);
}
Ok(DFSchema {
inner: Arc::clone(&self.inner),
field_qualifiers: qualifiers,
functional_dependencies: self.functional_dependencies.clone(),
})
}

/// Check if the schema have some fields with the same name
pub fn check_names(&self) -> Result<()> {
let mut qualified_names = BTreeSet::new();
Expand Down
95 changes: 86 additions & 9 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::TableReference;
use sqlparser::ast::NullTreatment;

use crate::schema_equivalence::schema_satisfied_by;
use async_trait::async_trait;
use datafusion_datasource::file_groups::FileGroup;
use futures::{StreamExt, TryStreamExt};
use itertools::{multiunzip, Itertools};
use log::{debug, trace};
use sqlparser::ast::NullTreatment;
use tokio::sync::Mutex;

/// Physical query planner that converts a `LogicalPlan` to an
Expand Down Expand Up @@ -890,8 +891,8 @@ impl DefaultPhysicalPlanner {

// 2 Children
LogicalPlan::Join(Join {
left,
right,
left: original_left,
right: original_right,
on: keys,
filter,
join_type,
Expand All @@ -916,23 +917,25 @@ impl DefaultPhysicalPlanner {
let (left, left_col_keys, left_projected) =
wrap_projection_for_join_if_necessary(
&left_keys,
left.as_ref().clone(),
original_left.as_ref().clone(),
)?;
let (right, right_col_keys, right_projected) =
wrap_projection_for_join_if_necessary(
&right_keys,
right.as_ref().clone(),
original_right.as_ref().clone(),
)?;
let column_on = (left_col_keys, right_col_keys);

let left = Arc::new(left);
let right = Arc::new(right);
let new_join = LogicalPlan::Join(Join::try_new_with_project_input(
let (new_join, requalified) = Join::try_new_with_project_input(
node,
Arc::clone(&left),
Arc::clone(&right),
column_on,
)?);
)?;

let new_join = LogicalPlan::Join(new_join);

// If inputs were projected then create ExecutionPlan for these new
// LogicalPlan nodes.
Expand Down Expand Up @@ -965,8 +968,24 @@ impl DefaultPhysicalPlanner {

// Remove temporary projected columns
if left_projected || right_projected {
let final_join_result =
join_schema.iter().map(Expr::from).collect::<Vec<_>>();
// Re-qualify the join schema only if the inputs were previously requalified in
// `try_new_with_project_input`. This ensures that when building the Projection
// it can correctly resolve field nullability and data types
// by disambiguating fields from the left and right sides of the join.
let qualified_join_schema = if requalified {
Arc::new(qualify_join_schema_sides(
join_schema,
original_left,
original_right,
)?)
} else {
Arc::clone(join_schema)
};

let final_join_result = qualified_join_schema
.iter()
.map(Expr::from)
.collect::<Vec<_>>();
let projection = LogicalPlan::Projection(Projection::try_new(
final_join_result,
Arc::new(new_join),
Expand Down Expand Up @@ -1463,6 +1482,64 @@ fn get_null_physical_expr_pair(
Ok((Arc::new(null_value), physical_name))
}

/// Qualifies the fields in a join schema with "left" and "right" qualifiers
/// without mutating the original schema. This function should only be used when
/// the join inputs have already been requalified earlier in `try_new_with_project_input`.
///
/// The purpose is to avoid ambiguity errors later in planning (e.g., in nullability or data type resolution)
/// when converting expressions to fields.
fn qualify_join_schema_sides(
join_schema: &DFSchema,
left: &LogicalPlan,
right: &LogicalPlan,
) -> Result<DFSchema> {
let left_fields = left.schema().fields();
let right_fields = right.schema().fields();
let join_fields = join_schema.fields();

// Validate lengths
if join_fields.len() != left_fields.len() + right_fields.len() {
return internal_err!(
"Join schema field count must match left and right field count."
);
}

// Validate field names match
for (i, (field, expected)) in join_fields
.iter()
.zip(left_fields.iter().chain(right_fields.iter()))
.enumerate()
{
if field.name() != expected.name() {
return internal_err!(
"Field name mismatch at index {}: expected '{}', found '{}'",
i,
expected.name(),
field.name()
);
}
}

// qualify sides
let qualifiers = join_fields
.iter()
.enumerate()
.map(|(i, _)| {
if i < left_fields.len() {
Some(TableReference::Bare {
table: Arc::from("left"),
})
} else {
Some(TableReference::Bare {
table: Arc::from("right"),
})
}
})
.collect();

join_schema.with_field_specific_qualified_schema(qualifiers)
}

fn get_physical_expr_pair(
expr: &Expr,
input_dfschema: &DFSchema,
Expand Down
32 changes: 32 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,38 @@ pub fn build_join_schema(
dfschema.with_functional_dependencies(func_dependencies)
}

/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
/// conflict with the columns from the other.
/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
/// aliases, neither for columns nor for tables. DataFusion requires columns to be uniquely identifiable, in some
/// places (see e.g. DFSchema::check_names).
/// The function returns:
/// - The requalified or original left logical plan
/// - The requalified or original right logical plan
/// - If a requalification was needed or not
pub fn requalify_sides_if_needed(
left: LogicalPlanBuilder,
right: LogicalPlanBuilder,
) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
let left_cols = left.schema().columns();
let right_cols = right.schema().columns();
if left_cols.iter().any(|l| {
right_cols.iter().any(|r| {
l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
})
}) {
// These names have no connection to the original plan, but they'll make the columns
// (mostly) unique.
Ok((
left.alias(TableReference::bare("left"))?,
right.alias(TableReference::bare("right"))?,
true,
))
} else {
Ok((left, right, false))
}
}

/// Add additional "synthetic" group by expressions based on functional
/// dependencies.
///
Expand Down
5 changes: 3 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ mod statement;
pub mod tree_node;

pub use builder::{
build_join_schema, table_scan, union, wrap_projection_for_join_if_necessary,
LogicalPlanBuilder, LogicalPlanBuilderOptions, LogicalTableSource, UNNAMED_TABLE,
build_join_schema, requalify_sides_if_needed, table_scan, union,
wrap_projection_for_join_if_necessary, LogicalPlanBuilder, LogicalPlanBuilderOptions,
LogicalTableSource, UNNAMED_TABLE,
};
pub use ddl::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
Expand Down
59 changes: 42 additions & 17 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ use crate::utils::{
grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
};
use crate::{
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute,
Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare,
TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr,
CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder,
Operator, Prepare, TableProviderFilterPushDown, TableSource,
WindowFunctionDefinition,
};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand Down Expand Up @@ -3750,37 +3751,61 @@ impl Join {
})
}

/// Create Join with input which wrapped with projection, this method is used to help create physical join.
/// Create Join with input which wrapped with projection, this method is used in physcial planning only to help
/// create the physical join.
pub fn try_new_with_project_input(
original: &LogicalPlan,
left: Arc<LogicalPlan>,
right: Arc<LogicalPlan>,
column_on: (Vec<Column>, Vec<Column>),
) -> Result<Self> {
) -> Result<(Self, bool)> {
let original_join = match original {
LogicalPlan::Join(join) => join,
_ => return plan_err!("Could not create join with project input"),
};

let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));

let mut requalified = false;

// By definition, the resulting schema of an inner/left/right & full join will have first the left side fields and then the right,
// potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
if original_join.join_type == JoinType::Inner
|| original_join.join_type == JoinType::Left
|| original_join.join_type == JoinType::Right
|| original_join.join_type == JoinType::Full
{
(left_sch, right_sch, requalified) =
requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
}

let on: Vec<(Expr, Expr)> = column_on
.0
.into_iter()
.zip(column_on.1)
.map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
.collect();
let join_schema =
build_join_schema(left.schema(), right.schema(), &original_join.join_type)?;

Ok(Join {
left,
right,
on,
filter: original_join.filter.clone(),
join_type: original_join.join_type,
join_constraint: original_join.join_constraint,
schema: Arc::new(join_schema),
null_equals_null: original_join.null_equals_null,
})
let join_schema = build_join_schema(
left_sch.schema(),
right_sch.schema(),
&original_join.join_type,
)?;

Ok((
Join {
left,
right,
on,
filter: original_join.filter.clone(),
join_type: original_join.join_type,
join_constraint: original_join.join_constraint,
schema: Arc::new(join_schema),
null_equals_null: original_join.null_equals_null,
},
requalified,
))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::logical_plan::consumer::utils::requalify_sides_if_needed;
use crate::logical_plan::consumer::SubstraitConsumer;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};

use datafusion::logical_expr::requalify_sides_if_needed;

use substrait::proto::CrossRel;

pub async fn from_cross_rel(
Expand All @@ -30,6 +32,6 @@ pub async fn from_cross_rel(
let right = LogicalPlanBuilder::from(
consumer.consume_rel(cross.right.as_ref().unwrap()).await?,
);
let (left, right) = requalify_sides_if_needed(left, right)?;
let (left, right, _requalified) = requalify_sides_if_needed(left, right)?;
left.cross_join(right.build()?)?.build()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::logical_plan::consumer::utils::requalify_sides_if_needed;
use crate::logical_plan::consumer::SubstraitConsumer;
use datafusion::common::{not_impl_err, plan_err, Column, JoinType};
use datafusion::logical_expr::requalify_sides_if_needed;
use datafusion::logical_expr::utils::split_conjunction;
use datafusion::logical_expr::{
BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator,
};

use substrait::proto::{join_rel, JoinRel};

pub async fn from_join_rel(
Expand All @@ -38,7 +39,7 @@ pub async fn from_join_rel(
let right = LogicalPlanBuilder::from(
consumer.consume_rel(join.right.as_ref().unwrap()).await?,
);
let (left, right) = requalify_sides_if_needed(left, right)?;
let (left, right, _requalified) = requalify_sides_if_needed(left, right)?;

let join_type = from_substrait_jointype(join.r#type)?;
// The join condition expression needs full input schema and not the output schema from join since we lose columns from
Expand Down
Loading
Loading