Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1528,28 +1528,42 @@ object EliminateSorts extends Rule[LogicalPlan] {
}
case Sort(orders, false, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
applyLocally.lift(child).getOrElse(child)
case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
case s @ Sort(_, global, child) => s.copy(child = recursiveRemoveSort(child, global))
case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) =>
j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight))
j.copy(left = recursiveRemoveSort(originLeft, true),
right = recursiveRemoveSort(originRight, true))
case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) =>
g.copy(child = recursiveRemoveSort(originChild))
g.copy(child = recursiveRemoveSort(originChild, true))
}

private def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = {
/**
* If the upper sort is global then we can remove the global or local sort recursively.
* If the upper sort is local then we can only remove the local sort recursively.
Copy link
Contributor

@beliefer beliefer Jul 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the upper Sort is local, why we can't remove the global Sort recursively ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can eliminate it too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about what if we remove all the Repartition nodes, will users complain? They will even if Repartition does not change the data, but only change the partitioning. Data partitioning is also a user expectation that we shouldn't break.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semantics of global + local sort should be range partition + local sort, so we can not remove the global sort which is under local sort as we can not remove range partition directly. BTW, I will add a new rule to optimzie this pattern after fix this pr.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I also agree we should not remove the global Sort, as user might have expectation of overall range partitioning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it.

*/
private def recursiveRemoveSort(
plan: LogicalPlan,
canRemoveGlobalSort: Boolean): LogicalPlan = {
if (!plan.containsPattern(SORT)) {
return plan
}
plan match {
case Sort(_, _, child) => recursiveRemoveSort(child)
case Sort(_, global, child) if canRemoveGlobalSort || !global =>
recursiveRemoveSort(child, canRemoveGlobalSort)
case other if canEliminateSort(other) =>
other.withNewChildren(other.children.map(recursiveRemoveSort))
other.withNewChildren(other.children.map(c => recursiveRemoveSort(c, canRemoveGlobalSort)))
case other if canEliminateGlobalSort(other) =>
other.withNewChildren(other.children.map(c => recursiveRemoveSort(c, true)))
case _ => plan
}
}

private def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
case p: Project => p.projectList.forall(_.deterministic)
case f: Filter => f.condition.deterministic
case _ => false
}

private def canEliminateGlobalSort(plan: LogicalPlan): Boolean = plan match {
case r: RepartitionByExpression => r.partitionExpressions.forall(_.deterministic)
case r: RebalancePartitions => r.partitionExpressions.forall(_.deterministic)
case _: Repartition => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,4 +424,20 @@ class EliminateSortsSuite extends AnalysisTest {
comparePlans(optimized, correctAnswer)
}
}

test("SPARK-39835: Fix EliminateSorts remove global sort below the local sort") {
// global -> local
val plan = testRelation.orderBy($"a".asc).sortBy($"c".asc).analyze
comparePlans(Optimize.execute(plan), plan)

// global -> global -> local
val plan2 = testRelation.orderBy($"a".asc).orderBy($"b".asc).sortBy($"c".asc).analyze
val expected2 = testRelation.orderBy($"b".asc).sortBy($"c".asc).analyze
comparePlans(Optimize.execute(plan2), expected2)

// local -> global -> local
val plan3 = testRelation.sortBy($"a".asc).orderBy($"b".asc).sortBy($"c".asc).analyze
val expected3 = testRelation.orderBy($"b".asc).sortBy($"c".asc).analyze
comparePlans(Optimize.execute(plan3), expected3)
}
}