Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// 4. Pick cartesian product if join type is inner like.
// 5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
// other choice.
case p @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right, hint) =>
case j @ ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, nonEquiCond, left, right, hint) =>
def createBroadcastHashJoin(buildLeft: Boolean, buildRight: Boolean) = {
val wantToBuildLeft = canBuildLeft(joinType) && buildLeft
val wantToBuildRight = canBuildRight(joinType) && buildRight
Expand All @@ -254,7 +254,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
rightKeys,
joinType,
buildSide,
condition,
nonEquiCond,
planLater(left),
planLater(right)))
}
Expand All @@ -269,7 +269,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
rightKeys,
joinType,
buildSide,
condition,
nonEquiCond,
planLater(left),
planLater(right)))
}
Expand All @@ -278,15 +278,17 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
def createSortMergeJoin() = {
if (RowOrdering.isOrderable(leftKeys)) {
Some(Seq(joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right))))
leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right))))
} else {
None
}
}

def createCartesianProduct() = {
if (joinType.isInstanceOf[InnerLike]) {
Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), p.condition)))
// `CartesianProductExec` can't implicitly evaluate equal join condition, here we should
// pass the original condition which includes both equal and non-equal conditions.
Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), j.condition)))
} else {
None
}
Expand All @@ -311,7 +313,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// This join could be very slow or OOM
val buildSide = getSmallerSide(left, right)
Seq(joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition))
planLater(left), planLater(right), buildSide, joinType, nonEquiCond))
}
}

Expand Down