Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ case class SortMergeJoinExec(

override def outputOrdering: Seq[SortOrder] = joinType match {
// For inner join, orders of both sides keys should be kept.
case Inner =>
case _: InnerLike =>
val leftKeyOrdering = getKeyOrdering(leftKeys, left.outputOrdering)
val rightKeyOrdering = getKeyOrdering(rightKeys, right.outputOrdering)
leftKeyOrdering.zip(rightKeyOrdering).map { case (lKey, rKey) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{execution, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftOuter, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Repartition}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.columnar.InMemoryRelation
Expand Down Expand Up @@ -513,26 +513,30 @@ class PlannerSuite extends SharedSQLContext {
}

test("EnsureRequirements skips sort when either side of join keys is required after inner SMJ") {
val innerSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, Inner, None, planA, planB)
// Both left and right keys should be sorted after the SMJ.
Seq(orderingA, orderingB).foreach { ordering =>
assertSortRequirementsAreSatisfied(
childPlan = innerSmj,
requiredOrdering = Seq(ordering),
shouldHaveSort = false)
Seq(Inner, Cross).foreach { joinType =>
val innerSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, joinType, None, planA, planB)
// Both left and right keys should be sorted after the SMJ.
Seq(orderingA, orderingB).foreach { ordering =>
assertSortRequirementsAreSatisfied(
childPlan = innerSmj,
requiredOrdering = Seq(ordering),
shouldHaveSort = false)
}
}
}

test("EnsureRequirements skips sort when key order of a parent SMJ is propagated from its " +
"child SMJ") {
val childSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, Inner, None, planA, planB)
val parentSmj = SortMergeJoinExec(exprB :: Nil, exprC :: Nil, Inner, None, childSmj, planC)
// After the second SMJ, exprA, exprB and exprC should all be sorted.
Seq(orderingA, orderingB, orderingC).foreach { ordering =>
assertSortRequirementsAreSatisfied(
childPlan = parentSmj,
requiredOrdering = Seq(ordering),
shouldHaveSort = false)
Seq(Inner, Cross).foreach { joinType =>
val childSmj = SortMergeJoinExec(exprA :: Nil, exprB :: Nil, joinType, None, planA, planB)
val parentSmj = SortMergeJoinExec(exprB :: Nil, exprC :: Nil, joinType, None, childSmj, planC)
// After the second SMJ, exprA, exprB and exprC should all be sorted.
Seq(orderingA, orderingB, orderingC).foreach { ordering =>
assertSortRequirementsAreSatisfied(
childPlan = parentSmj,
requiredOrdering = Seq(ordering),
shouldHaveSort = false)
}
}
}

Expand Down