Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unused LogicalPlan::CrossJoin as it is unused #13076

Merged
merged 2 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion-examples/examples/sql_analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ fn total_join_count(plan: &LogicalPlan) -> usize {
// We can use the TreeNode API to walk over a LogicalPlan.
plan.apply(|node| {
// if we encounter a join we update the running count
if matches!(node, LogicalPlan::Join(_) | LogicalPlan::CrossJoin(_)) {
if matches!(node, LogicalPlan::Join(_)) {
total += 1;
}
Ok(TreeNodeRecursion::Continue)
Expand Down Expand Up @@ -89,7 +89,7 @@ fn count_trees(plan: &LogicalPlan) -> (usize, Vec<usize>) {
while let Some(node) = to_visit.pop() {
// if we encounter a join, we know were at the root of the tree
// count this tree and recurse on it's inputs
if matches!(node, LogicalPlan::Join(_) | LogicalPlan::CrossJoin(_)) {
if matches!(node, LogicalPlan::Join(_)) {
let (group_count, inputs) = count_tree(node);
total += group_count;
groups.push(group_count);
Expand Down Expand Up @@ -151,7 +151,7 @@ fn count_tree(join: &LogicalPlan) -> (usize, Vec<&LogicalPlan>) {
}

// any join we count
if matches!(node, LogicalPlan::Join(_) | LogicalPlan::CrossJoin(_)) {
if matches!(node, LogicalPlan::Join(_)) {
total += 1;
Ok(TreeNodeRecursion::Continue)
} else {
Expand Down
4 changes: 0 additions & 4 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,10 +1127,6 @@ impl DefaultPhysicalPlanner {
join
}
}
LogicalPlan::CrossJoin(_) => {
let [left, right] = children.two()?;
Arc::new(CrossJoinExec::new(left, right))
}
LogicalPlan::RecursiveQuery(RecursiveQuery {
name, is_distinct, ..
}) => {
Expand Down
5 changes: 0 additions & 5 deletions datafusion/expr/src/logical_plan/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,6 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Filter": format!("{}", filter_expr)
})
}
LogicalPlan::CrossJoin(_) => {
json!({
"Node Type": "Cross Join"
})
}
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ pub use ddl::{
};
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, CrossJoin, DescribeTable,
Distinct, DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join,
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
Expand Down
67 changes: 2 additions & 65 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,6 @@ pub enum LogicalPlan {
/// Join two logical plans on one or more join columns.
/// This is used to implement SQL `JOIN`
Join(Join),
/// Apply Cross Join to two logical plans.
/// This is used to implement SQL `CROSS JOIN`
/// Deprecated: use [LogicalPlan::Join] instead with empty `on` / no filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice for cleaning up a deprecated feature

CrossJoin(CrossJoin),
/// Repartitions the input based on a partitioning scheme. This is
/// used to add parallelism and is sometimes referred to as an
/// "exchange" operator in other systems
Expand Down Expand Up @@ -312,7 +308,6 @@ impl LogicalPlan {
LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
LogicalPlan::Join(Join { schema, .. }) => schema,
LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
LogicalPlan::Statement(statement) => statement.schema(),
Expand Down Expand Up @@ -345,8 +340,7 @@ impl LogicalPlan {
| LogicalPlan::Projection(_)
| LogicalPlan::Aggregate(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::Join(_)
| LogicalPlan::CrossJoin(_) => self
| LogicalPlan::Join(_) => self
.inputs()
.iter()
.map(|input| input.schema().as_ref())
Expand Down Expand Up @@ -436,7 +430,6 @@ impl LogicalPlan {
LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
LogicalPlan::Sort(Sort { input, .. }) => vec![input],
LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
Expand Down Expand Up @@ -542,13 +535,6 @@ impl LogicalPlan {
JoinType::LeftSemi | JoinType::LeftAnti => left.head_output_expr(),
JoinType::RightSemi | JoinType::RightAnti => right.head_output_expr(),
},
LogicalPlan::CrossJoin(cross) => {
if cross.left.schema().fields().is_empty() {
cross.right.head_output_expr()
} else {
cross.left.head_output_expr()
}
}
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
static_term.head_output_expr()
}
Expand Down Expand Up @@ -674,20 +660,6 @@ impl LogicalPlan {
null_equals_null,
}))
}
LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema: _,
}) => {
let join_schema =
build_join_schema(left.schema(), right.schema(), &JoinType::Inner)?;

Ok(LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema: join_schema.into(),
}))
}
LogicalPlan::Subquery(_) => Ok(self),
LogicalPlan::SubqueryAlias(SubqueryAlias {
input,
Expand Down Expand Up @@ -938,11 +910,6 @@ impl LogicalPlan {
null_equals_null: *null_equals_null,
}))
}
LogicalPlan::CrossJoin(_) => {
self.assert_no_expressions(expr)?;
let (left, right) = self.only_two_inputs(inputs)?;
LogicalPlanBuilder::from(left).cross_join(right)?.build()
}
LogicalPlan::Subquery(Subquery {
outer_ref_columns, ..
}) => {
Expand Down Expand Up @@ -1327,12 +1294,6 @@ impl LogicalPlan {
JoinType::LeftSemi | JoinType::LeftAnti => left.max_rows(),
JoinType::RightSemi | JoinType::RightAnti => right.max_rows(),
},
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
match (left.max_rows(), right.max_rows()) {
(Some(left_max), Some(right_max)) => Some(left_max * right_max),
_ => None,
}
}
LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(),
LogicalPlan::Union(Union { inputs, .. }) => inputs
.iter()
Expand Down Expand Up @@ -1893,9 +1854,6 @@ impl LogicalPlan {
}
}
}
LogicalPlan::CrossJoin(_) => {
write!(f, "CrossJoin:")
}
LogicalPlan::Repartition(Repartition {
partitioning_scheme,
..
Expand Down Expand Up @@ -2601,28 +2559,7 @@ impl TableScan {
}
}

/// Apply Cross Join to two logical plans
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct CrossJoin {
/// Left input
pub left: Arc<LogicalPlan>,
/// Right input
pub right: Arc<LogicalPlan>,
/// The output schema, containing fields from the left and right inputs
pub schema: DFSchemaRef,
}

// Manual implementation needed because of `schema` field. Comparison excludes this field.
impl PartialOrd for CrossJoin {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match self.left.partial_cmp(&other.left) {
Some(Ordering::Equal) => self.right.partial_cmp(&other.right),
cmp => cmp,
}
}
}

/// Repartition the plan based on a partitioning scheme.
// Repartition the plan based on a partitioning scheme.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
pub struct Repartition {
/// The incoming logical plan
Expand Down
28 changes: 5 additions & 23 deletions datafusion/expr/src/logical_plan/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
//! * [`LogicalPlan::with_new_exprs`]: Create a new plan with different expressions
//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
use crate::{
dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, CrossJoin,
DdlStatement, Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter,
Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery,
Repartition, Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest,
UserDefinedLogicalNode, Values, Window,
dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, DdlStatement,
Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter, Join, Limit,
LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort,
Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, Values,
Window,
};
use std::ops::Deref;
use std::sync::Arc;
Expand Down Expand Up @@ -160,22 +160,6 @@ impl TreeNode for LogicalPlan {
null_equals_null,
})
}),
LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema,
}) => map_until_stop_and_collect!(
rewrite_arc(left, &mut f),
right,
rewrite_arc(right, &mut f)
)?
.update_data(|(left, right)| {
LogicalPlan::CrossJoin(CrossJoin {
left,
right,
schema,
})
}),
LogicalPlan::Limit(Limit { skip, fetch, input }) => rewrite_arc(input, f)?
.update_data(|input| LogicalPlan::Limit(Limit { skip, fetch, input })),
LogicalPlan::Subquery(Subquery {
Expand Down Expand Up @@ -527,7 +511,6 @@ impl LogicalPlan {
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Statement(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Union(_)
Expand Down Expand Up @@ -758,7 +741,6 @@ impl LogicalPlan {
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Statement(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Union(_)
Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/analyzer/subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ fn check_inner_plan(
LogicalPlan::Projection(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Sort(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Union(_)
| LogicalPlan::TableScan(_)
| LogicalPlan::EmptyRelation(_)
Expand Down
1 change: 0 additions & 1 deletion datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,6 @@ impl OptimizerRule for CommonSubexprEliminate {
LogicalPlan::Window(window) => self.try_optimize_window(window, config)?,
LogicalPlan::Aggregate(agg) => self.try_optimize_aggregate(agg, config)?,
LogicalPlan::Join(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Repartition(_)
| LogicalPlan::Union(_)
| LogicalPlan::TableScan(_)
Expand Down
35 changes: 8 additions & 27 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl OptimizerRule for EliminateCrossJoin {
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
}) | LogicalPlan::CrossJoin(_)
})
);

if !rewriteable {
Expand Down Expand Up @@ -241,20 +241,6 @@ fn flatten_join_inputs(
all_filters,
)?;
}
LogicalPlan::CrossJoin(join) => {
flatten_join_inputs(
Arc::unwrap_or_clone(join.left),
possible_join_keys,
all_inputs,
all_filters,
)?;
flatten_join_inputs(
Arc::unwrap_or_clone(join.right),
possible_join_keys,
all_inputs,
all_filters,
)?;
}
_ => {
all_inputs.push(plan);
}
Expand All @@ -270,23 +256,18 @@ fn can_flatten_join_inputs(plan: &LogicalPlan) -> bool {
// can only flatten inner / cross joins
match plan {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {}
LogicalPlan::CrossJoin(_) => {}
_ => return false,
};

for child in plan.inputs() {
match child {
LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
})
| LogicalPlan::CrossJoin(_) => {
if !can_flatten_join_inputs(child) {
return false;
}
if let LogicalPlan::Join(Join {
join_type: JoinType::Inner,
..
}) = child
{
if !can_flatten_join_inputs(child) {
return false;
}
// the child is not a join/cross join
_ => (),
}
}
true
Expand Down
11 changes: 0 additions & 11 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,17 +367,6 @@ fn optimize_projections(
right_indices.with_projection_beneficial(),
]
}
LogicalPlan::CrossJoin(cross_join) => {
let left_len = cross_join.left.schema().fields().len();
let (left_indices, right_indices) =
split_join_requirements(left_len, indices, &JoinType::Inner);
// Joins benefit from "small" input tables (lower memory usage).
// Therefore, each child benefits from projection:
vec![
left_indices.with_projection_beneficial(),
right_indices.with_projection_beneficial(),
]
}
// these nodes are explicitly rewritten in the match statement above
LogicalPlan::Projection(_)
| LogicalPlan::Aggregate(_)
Expand Down
13 changes: 0 additions & 13 deletions datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,19 +72,6 @@ impl OptimizerRule for PropagateEmptyRelation {
}
Ok(Transformed::no(plan))
}
LogicalPlan::CrossJoin(ref join) => {
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
if left_empty || right_empty {
return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
EmptyRelation {
produce_one_row: false,
schema: Arc::clone(plan.schema()),
},
)));
}
Ok(Transformed::no(LogicalPlan::CrossJoin(join.clone())))
}

LogicalPlan::Join(ref join) => {
// TODO: For Join, more join type need to be careful:
// For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
Expand Down
Loading