Skip to content

Commit 945d8bc

Browse files
viiryaJoshRosen
authored andcommitted
[SPARK-9306] [SQL] Don't use SortMergeJoin when joining on unsortable columns
JIRA: https://issues.apache.org/jira/browse/SPARK-9306 Author: Liang-Chi Hsieh <viirya@appier.com> Closes #7645 from viirya/smj_unsortable and squashes the following commits: a240707 [Liang-Chi Hsieh] Use forall instead of exists for readability. 55221fa [Liang-Chi Hsieh] Shouldn't use SortMergeJoin when joining on unsortable columns.
1 parent 1efe97d commit 945d8bc

File tree

3 files changed

+28
-5
lines changed

3 files changed

+28
-5
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ object PartialAggregation {
184184
* A pattern that finds joins with equality conditions that can be evaluated using equi-join.
185185
*/
186186
object ExtractEquiJoinKeys extends Logging with PredicateHelper {
187-
/** (joinType, rightKeys, leftKeys, condition, leftChild, rightChild) */
187+
/** (joinType, leftKeys, rightKeys, condition, leftChild, rightChild) */
188188
type ReturnType =
189189
(JoinType, Seq[Expression], Seq[Expression], Option[Expression], LogicalPlan, LogicalPlan)
190190

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
3535

3636
object LeftSemiJoin extends Strategy with PredicateHelper {
3737
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
38-
case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right)
39-
if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
40-
right.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold =>
38+
case ExtractEquiJoinKeys(
39+
LeftSemi, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
4140
joins.BroadcastLeftSemiJoinHash(
4241
leftKeys, rightKeys, planLater(left), planLater(right), condition) :: Nil
4342
// Find left semi joins where at least some predicates can be evaluated by matching join keys
@@ -90,6 +89,18 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
9089
condition.map(Filter(_, broadcastHashJoin)).getOrElse(broadcastHashJoin) :: Nil
9190
}
9291

92+
private[this] def isValidSort(
93+
leftKeys: Seq[Expression],
94+
rightKeys: Seq[Expression]): Boolean = {
95+
leftKeys.zip(rightKeys).forall { keys =>
96+
(keys._1.dataType, keys._2.dataType) match {
97+
case (l: AtomicType, r: AtomicType) => true
98+
case (NullType, NullType) => true
99+
case _ => false
100+
}
101+
}
102+
}
103+
93104
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
94105
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, CanBroadcast(right)) =>
95106
makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight)
@@ -100,7 +111,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
100111
// If the sort merge join option is set, we want to use sort merge join prior to hashjoin
101112
// for now let's support inner join first, then add outer join
102113
case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, right)
103-
if sqlContext.conf.sortMergeJoinEnabled =>
114+
if sqlContext.conf.sortMergeJoinEnabled && isValidSort(leftKeys, rightKeys) =>
104115
val mergeJoin =
105116
joins.SortMergeJoin(leftKeys, rightKeys, planLater(left), planLater(right))
106117
condition.map(Filter(_, mergeJoin)).getOrElse(mergeJoin) :: Nil

sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach {
108108
}
109109
}
110110

111+
test("SortMergeJoin shouldn't work on unsortable columns") {
112+
val SORTMERGEJOIN_ENABLED: Boolean = ctx.conf.sortMergeJoinEnabled
113+
try {
114+
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, true)
115+
Seq(
116+
("SELECT * FROM arrayData JOIN complexData ON data = a", classOf[ShuffledHashJoin])
117+
).foreach { case (query, joinClass) => assertJoin(query, joinClass) }
118+
} finally {
119+
ctx.conf.setConf(SQLConf.SORTMERGE_JOIN, SORTMERGEJOIN_ENABLED)
120+
}
121+
}
122+
111123
test("broadcasted hash join operator selection") {
112124
ctx.cacheManager.clearCache()
113125
ctx.sql("CACHE TABLE testData")

0 commit comments

Comments
 (0)