@@ -63,19 +63,23 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
6363 }
6464
6565 /**
66- * Uses the ExtractEquiJoinKeys pattern to find joins where at least some of the predicates can be
67- * evaluated by matching hash keys.
66+ * Uses the [[ ExtractEquiJoinKeys ]] pattern to find joins where at least some of the predicates
67+ * can be evaluated by matching join keys.
6868 *
69- * This strategy applies a simple optimization based on the estimates of the physical sizes of
70- * the two join sides. When planning a [[joins.BroadcastHashJoin ]], if one side has an
71- * estimated physical size smaller than the user-settable threshold
72- * [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD ]], the planner would mark it as the
73- * ''build'' relation and mark the other relation as the ''stream'' side. The build table will be
74- * ''broadcasted'' to all of the executors involved in the join, as a
75- * [[org.apache.spark.broadcast.Broadcast ]] object. If both estimates exceed the threshold, they
76- * will instead be used to decide the build side in a [[joins.ShuffledHashJoin ]].
69+ * Join implementations are chosen with the following precedence:
70+ *
71+ * - Broadcast: if one side of the join has an estimated physical size that is smaller than the
72+ * user-configurable [[org.apache.spark.sql.SQLConf.AUTO_BROADCASTJOIN_THRESHOLD ]] threshold
73+ * or if that side has an explicit broadcast hint (e.g. the user applied the
74+ * [[org.apache.spark.sql.functions.broadcast() ]] function to a DataFrame), then that side
75+ * of the join will be broadcasted and the other side will be streamed, with no shuffling
76+ * performed. If both sides of the join are eligible to be broadcasted then the
77+ * - Sort merge: if the matching join keys are sortable and
78+ * [[org.apache.spark.sql.SQLConf.SORTMERGE_JOIN ]] is enabled (default), then sort merge join
79+ * will be used.
80+ * - Hash: will be chosen if neither of the above optimizations apply to this join.
7781 */
78- object HashJoin extends Strategy with PredicateHelper {
82+ object EquiJoinSelection extends Strategy with PredicateHelper {
7983
8084 private [this ] def makeBroadcastHashJoin (
8185 leftKeys : Seq [Expression ],
@@ -90,14 +94,15 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
9094 }
9195
9296 def apply (plan : LogicalPlan ): Seq [SparkPlan ] = plan match {
97+
98+ // --- Inner joins --------------------------------------------------------------------------
99+
93100 case ExtractEquiJoinKeys (Inner , leftKeys, rightKeys, condition, left, CanBroadcast (right)) =>
94101 makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildRight )
95102
96103 case ExtractEquiJoinKeys (Inner , leftKeys, rightKeys, condition, CanBroadcast (left), right) =>
97104 makeBroadcastHashJoin(leftKeys, rightKeys, left, right, condition, joins.BuildLeft )
98105
99- // If the sort merge join option is set, we want to use sort merge join prior to hashjoin
100- // for now let's support inner join first, then add outer join
101106 case ExtractEquiJoinKeys (Inner , leftKeys, rightKeys, condition, left, right)
102107 if sqlContext.conf.sortMergeJoinEnabled && RowOrdering .isOrderable(leftKeys) =>
103108 val mergeJoin =
@@ -115,6 +120,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
115120 leftKeys, rightKeys, buildSide, planLater(left), planLater(right))
116121 condition.map(Filter (_, hashJoin)).getOrElse(hashJoin) :: Nil
117122
123+ // --- Outer joins --------------------------------------------------------------------------
124+
118125 case ExtractEquiJoinKeys (
119126 LeftOuter , leftKeys, rightKeys, condition, left, CanBroadcast (right)) =>
120127 joins.BroadcastHashOuterJoin (
@@ -125,10 +132,22 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
125132 joins.BroadcastHashOuterJoin (
126133 leftKeys, rightKeys, RightOuter , condition, planLater(left), planLater(right)) :: Nil
127134
135+ case ExtractEquiJoinKeys (LeftOuter , leftKeys, rightKeys, condition, left, right)
136+ if sqlContext.conf.sortMergeJoinEnabled && RowOrdering .isOrderable(leftKeys) =>
137+ joins.SortMergeOuterJoin (
138+ leftKeys, rightKeys, LeftOuter , condition, planLater(left), planLater(right)) :: Nil
139+
140+ case ExtractEquiJoinKeys (RightOuter , leftKeys, rightKeys, condition, left, right)
141+ if sqlContext.conf.sortMergeJoinEnabled && RowOrdering .isOrderable(leftKeys) =>
142+ joins.SortMergeOuterJoin (
143+ leftKeys, rightKeys, RightOuter , condition, planLater(left), planLater(right)) :: Nil
144+
128145 case ExtractEquiJoinKeys (joinType, leftKeys, rightKeys, condition, left, right) =>
129146 joins.ShuffledHashOuterJoin (
130147 leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
131148
149+ // --- Cases where this strategy does not apply ---------------------------------------------
150+
132151 case _ => Nil
133152 }
134153 }
0 commit comments