Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ class Analyzer(

lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new SubstituteHints.SubstituteBroadcastHints(conf),
SubstituteHints.RemoveAllHints),
new ResolveHints.ResolveBroadcastHints(conf),
ResolveHints.RemoveAllHints),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,9 @@ import org.apache.spark.sql.catalyst.trees.CurrentOrigin
* Note that this is separatedly into two rules because in the future we might introduce new hint
* rules that have different ordering requirements from broadcast.
*/
object SubstituteHints {
object ResolveHints {

/**
* Substitute Hints.
*
* The only hint currently available is broadcast join hint.
*
* For broadcast hint, we accept "BROADCAST", "BROADCASTJOIN", and "MAPJOIN", and a sequence of
* relation aliases can be specified in the hint. A broadcast hint plan node will be inserted
* on top of any relation (that is not aliased differently), subquery, or common table expression
Expand All @@ -47,7 +43,7 @@ object SubstituteHints {
*
* This rule must happen before common table expressions.
*/
class SubstituteBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan] {
class ResolveBroadcastHints(conf: CatalystConf) extends Rule[LogicalPlan] {
private val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")

def resolver: Resolver = conf.resolver
Expand All @@ -61,18 +57,21 @@ object SubstituteHints {
case r: UnresolvedRelation =>
val alias = r.alias.getOrElse(r.tableIdentifier.table)
if (toBroadcast.exists(resolver(_, alias))) BroadcastHint(plan) else plan
case r: SubqueryAlias =>
if (toBroadcast.exists(resolver(_, r.alias))) {
BroadcastHint(plan)
} else {
// Don't recurse down subquery aliases if there are no match.
recurse = false
plan
}
case _: BroadcastHint =>
// Found a broadcast hint; don't change the plan but also don't recurse down.

case r: SubqueryAlias if toBroadcast.exists(resolver(_, r.alias)) =>
BroadcastHint(plan)

case _: BroadcastHint | _: View | _: With | _: SubqueryAlias =>
// Don't traverse down these nodes.
// For an existing broadcast hint, there is no point going down (if we do, we either
// won't change the structure, or will introduce another broadcast hint that is useless.
// The rest (view, with, subquery) indicates different scopes that we shouldn't traverse
// down. Note that technically when this rule is executed, we haven't completed view
// resolution yet and as a result the view part should be deadcode. I'm leaving it here
// to be more future proof in case we change the view we do view resolution.
recurse = false
plan

case _ =>
plan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._

class SubstituteHintsSuite extends AnalysisTest {
class ResolveHintsSuite extends AnalysisTest {
import org.apache.spark.sql.catalyst.analysis.TestRelations._

test("invalid hints should be ignored") {
Expand Down