Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
9bd25da
[SPARK-16475][SQL] Broadcast Hint for SQL Queries
dongjoon-hyun Jul 11, 2016
eae3931
Address comments.
dongjoon-hyun Jul 12, 2016
60a304e
Support BROADCAST JOIN.
dongjoon-hyun Jul 12, 2016
5d81510
Use parameters.
dongjoon-hyun Jul 12, 2016
92de211
Support only comment style hint.
dongjoon-hyun Jul 14, 2016
63b629c
Address comments.
dongjoon-hyun Jul 14, 2016
3b25505
Add nested broadcast hint testcase.
dongjoon-hyun Jul 14, 2016
f029dac
Add description and one more testcase.
dongjoon-hyun Jul 14, 2016
4fce2ca
Add more tests in AnalysisSuite and address various comments.
dongjoon-hyun Jul 15, 2016
16ad3a1
Address comments.
dongjoon-hyun Jul 15, 2016
8a407aa
Updat SQLBuilder and LogicalPlanToSQLSuite.
dongjoon-hyun Jul 16, 2016
6b20a44
Use DSL
dongjoon-hyun Jul 16, 2016
0f3f53e
Update testcases.
dongjoon-hyun Jul 17, 2016
11e5f76
Add window/rollup/groupingset testcases.
dongjoon-hyun Jul 18, 2016
7ab0c19
Fix scalastyle.
dongjoon-hyun Jul 18, 2016
9cf4d65
Use checkSQL.
dongjoon-hyun Jul 19, 2016
47c3733
Abstract rules.
dongjoon-hyun Jul 19, 2016
61f7776
Apply SubstituteHints to unresolved stage back.
dongjoon-hyun Jul 19, 2016
fccf9da
Remove unused imports.
dongjoon-hyun Jul 19, 2016
b0b9f61
Address comments.
dongjoon-hyun Jul 19, 2016
39a3277
Fix indentation.
dongjoon-hyun Jul 20, 2016
1b38e55
Add more description on `SubstituteHint`.
dongjoon-hyun Jul 20, 2016
9bf6fa7
Simplify the logic by adding Hint(SubqueryAlias(SQLTable)) pattern an…
dongjoon-hyun Jul 21, 2016
c8436fd
Move check cases into CheckAnalysis.
dongjoon-hyun Jul 22, 2016
302b9d4
Hint should wrap the whole plan.
dongjoon-hyun Jul 25, 2016
e7c7943
Make BROADCAST_HINT_NAMES set and fix error message.
dongjoon-hyun Jul 25, 2016
e6b2409
Update with stable queries.
dongjoon-hyun Jul 27, 2016
9f0b104
Don't propagate Hint into subqueries.
dongjoon-hyun Jul 28, 2016
4023d97
Update golden file with newly one having regenerated hint.
dongjoon-hyun Jul 28, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ querySpecification
(RECORDREADER recordReader=STRING)?
fromClause?
(WHERE where=booleanExpression)?)
| ((kind=SELECT setQuantifier? namedExpressionSeq fromClause?
| ((kind=SELECT hint? setQuantifier? namedExpressionSeq fromClause?
| fromClause (kind=SELECT setQuantifier? namedExpressionSeq)?)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this SELECT? Should we also allow users use Hint here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want the following? It's already handled.

scala> sql("FROM t JOIN u ON t.id = u.id SELECT /*+ MAPJOIN(u) */ *").explain
== Physical Plan ==
*BroadcastHashJoin [id#0L], [id#4L], Inner, BuildRight
:- *Range (0, 1000000000, splits=8)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
   +- *Range (0, 1000000000, splits=8)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uh... This grammar file kills my eye... You are right. So far, to support Broadcast Hint, it is good enough.

lateralView*
(WHERE where=booleanExpression)?
Expand All @@ -347,6 +347,16 @@ querySpecification
windows?)
;

hint
: '/*+' hintStatement '*/'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have missed something, but do we only support one hint?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Currently, it does. Should we expand this, too?

;

hintStatement
: hintName=identifier
| hintName=identifier '(' parameters+=identifier parameters+=identifier ')'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this rule for broadcast hint?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @cloud-fan .
The first goal of this PR provides a general syntax for hints, not only broadcast hints. The ( and ) syntax is for INDEX(t idx_emp) style. You can see the testcase for this in the testcase, too.

| hintName=identifier '(' parameters+=identifier (',' parameters+=identifier)* ')'
;

fromClause
: FROM relation (',' relation)* lateralView*
;
Expand Down Expand Up @@ -945,8 +955,12 @@ SIMPLE_COMMENT
: '--' ~[\r\n]* '\r'? '\n'? -> channel(HIDDEN)
;

BRACKETED_EMPTY_COMMENT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this because the BRACKETED_COMMENT rule is now expecting at least one character?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Could you give me some workaround advice?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My advice would be to add the HINT_PREFIX rule ('/*+')

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try to add this as a case (| '/**/' -> channel(HIDDEN)) to the BRACKETED_COMMENT rule?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. It seems we can not do that due to channel(HIDDEN).

->command in lexer rule BRACKETED_COMMENT must be last element of single outermost alt

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok nvm....

: '/**/' -> channel(HIDDEN)
;

BRACKETED_COMMENT
: '/*' .*? '*/' -> channel(HIDDEN)
: '/*' ~[+] .*? '*/' -> channel(HIDDEN)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be easier to introduce a HINT_PREFIX rule ('/*+') and place this before the BRACKET_COMMENT rule.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see.

;

WS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ class Analyzer(
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
EliminateUnions),
EliminateUnions,
SubstituteHints),
Batch("Resolution", fixedPoint,
ResolveRelations ::
ResolveReferences ::
Expand Down Expand Up @@ -1786,6 +1787,63 @@ class Analyzer(
}
}

/**
* Substitute Hints.
* - BROADCAST/BROADCASTJOIN/MAPJOIN match the closest table with the given name parameters.
*
* This rule substitutes `UnresolvedRelation`s in `Substitute` batch before `ResolveRelations`
* rule is applied. Here are two reasons.
* - To support `MetastoreRelation` in Hive module.
* - To reduce the effect of `Hint` on the other rules.
*
* After this rule, it is guaranteed that there exists no unknown `Hint` in the plan.
* All new `Hint`s should be transformed into concrete Hint classes `BroadcastHint` here.
*/
object SubstituteHints extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does hive handle sql hints? This rule looks a little complex to me, according to the fact that hints can only be applied to table relation. It will be great if we can wrap UnresolvedRelation with Hint in the parser, and then it will be very easy to resolve hints at analyzer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @cloud-fan . Thank you again.

Yes. That's true. So, at my first commit of this PR, I actually implemented to attach just the concrete BroadcastHint to the relations in AstBuilder.scala. But, there were some advices to

  • Move out the logic from the parser module (for preventing future updates on parser module)
  • Generalize the hint syntax like the current code.

Since a general Hint is attached on a logical plan not a table relation, we can substitute any other plans (filter/project/aggregator), too. The following is my previous comment about this situation. The child of hint is JOIN.

Yep. I grasp your point. First, the given relation is just a logical plan, not a table. In the following case, the relation is Join.
SELECT /*+ MAPJOIN(parquet_t0) */ * FROM parquet_t0, parquet_t1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then should we wrap the whole query plan with Hint instead of just the relation? #14132 (diff)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Since this is SELECT hint, which one do you mean?

  • Hint(Project)
  • Project(Hint)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the second one, I will revise again.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all, I'll take a look again. I need some boot-up time. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I will wrap the whole query plan. Project is not always there.

SELECT /*+ MAPJOIN(t) */ * from t, u where 1=1 group by 1 order by 1

'Sort [1 ASC], true
+- 'Aggregate [1], [*]
   +- 'Filter (1 = 1)
      +- 'Hint MAPJOIN, [t]              
         +- 'Join Inner                  
            :- 'UnresolvedRelation `t`   
            +- 'UnresolvedRelation `u`   

val BROADCAST_HINT_NAMES = Set("BROADCAST", "BROADCASTJOIN", "MAPJOIN")

import scala.collection.mutable.Set
private def appendAllDescendant(set: Set[LogicalPlan], plan: LogicalPlan): Unit = {
set += plan
plan.children.foreach { child => appendAllDescendant(set, child) }
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case logical: LogicalPlan => logical transformDown {
case h @ Hint(name, parameters, child) if BROADCAST_HINT_NAMES.contains(name.toUpperCase) =>
var resolvedChild = child
for (table <- parameters) {
var stop = false
val skipNodeSet = scala.collection.mutable.Set.empty[LogicalPlan]
resolvedChild = resolvedChild.transformDown {
case n if skipNodeSet.contains(n) =>
skipNodeSet -= n
n
case p @ Project(_, _) if p != resolvedChild =>
appendAllDescendant(skipNodeSet, p)
skipNodeSet -= p
p
case r @ BroadcastHint(UnresolvedRelation(t, _))
if !stop && resolver(t.table, table) =>
stop = true
r
case r @ UnresolvedRelation(t, alias) if !stop && resolver(t.table, table) =>
stop = true
if (alias.isDefined) {
SubqueryAlias(alias.get, BroadcastHint(r.copy(alias = None)))
} else {
BroadcastHint(r)
}
}
}
resolvedChild

// Remove unrecognized hints
case Hint(name, _, child) => child
}
}
}

/**
* Check and add proper window frames for all window functions.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,10 @@ trait CheckAnalysis extends PredicateHelper {
|in operator ${operator.simpleString}
""".stripMargin)

case Hint(_, _, _) =>
throw new IllegalStateException(
"logical hint operator should have been removed by analyzer")

case _ => // Analysis successful!
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}

// Window
withDistinct.optionalMap(windows)(withWindows)
val withWindow = withDistinct.optionalMap(windows)(withWindows)

// Hint
withWindow.optionalMap(ctx.hint)(withHints)
}
}

Expand Down Expand Up @@ -508,6 +511,16 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}
}

/**
* Add a Hint to a logical plan.
*/
private def withHints(
ctx: HintContext,
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
val stmt = ctx.hintStatement
Hint(stmt.hintName.getText, stmt.parameters.asScala.map(_.getText), query)
}

/**
* Add a [[Generate]] (Lateral View) to a logical plan.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,15 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true)
}

/**
* A general hint for the child.
* A pair of (name, parameters).
*/
case class Hint(name: String, parameters: Seq[String], child: LogicalPlan) extends UnaryNode {
override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = child.output
}

case class InsertIntoTable(
table: LogicalPlan,
partition: Map[String, Option[String]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ trait AnalysisTest extends PlanTest {
val conf = new SimpleCatalystConf(caseSensitive)
val catalog = new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, conf)
catalog.createTempView("TaBlE", TestRelations.testRelation, overrideIfExists = true)
catalog.createTempView("TaBlE2", TestRelations.testRelation2, overrideIfExists = true)
new Analyzer(catalog, conf) {
override val extendedResolutionRules = EliminateSubqueryAliases :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.plans.logical._

class SubstituteHintsSuite extends AnalysisTest {
import org.apache.spark.sql.catalyst.analysis.TestRelations._

val a = testRelation.output(0)
val b = testRelation2.output(0)

test("case-sensitive or insensitive parameters") {
checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
BroadcastHint(testRelation),
caseSensitive = false)

checkAnalysis(
Hint("MAPJOIN", Seq("table"), table("TaBlE")),
BroadcastHint(testRelation),
caseSensitive = false)

checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE")),
BroadcastHint(testRelation))

checkAnalysis(
Hint("MAPJOIN", Seq("table"), table("TaBlE")),
testRelation)
}

test("single hint") {
checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE").select(a)),
BroadcastHint(testRelation).select(a))

checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE").as("t").join(table("TaBlE2").as("u")).select(a)),
BroadcastHint(testRelation).join(testRelation2).select(a))

checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE2"),
table("TaBlE").as("t").join(table("TaBlE2").as("u")).select(a)),
testRelation.join(BroadcastHint(testRelation2)).select(a))
}

test("single hint with multiple parameters") {
checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE", "TaBlE"),
table("TaBlE").as("t").join(table("TaBlE2").as("u")).select(a)),
BroadcastHint(testRelation).join(testRelation2).select(a))

checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE", "TaBlE2"),
table("TaBlE").as("t").join(table("TaBlE2").as("u")).select(a)),
BroadcastHint(testRelation).join(BroadcastHint(testRelation2)).select(a))
}

test("duplicated nested hints are transformed into one") {
checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE"),
Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE").as("t").select('a))
.join(table("TaBlE2").as("u")).select(a)),
BroadcastHint(testRelation).select(a).join(testRelation2).select(a))

checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE2"),
table("TaBlE").as("t").select(a)
.join(Hint("MAPJOIN", Seq("TaBlE2"), table("TaBlE2").as("u").select(b))).select(a)),
testRelation.select(a).join(BroadcastHint(testRelation2).select(b)).select(a))
}

test("distinct nested two hints are handled separately") {
checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE2"),
Hint("MAPJOIN", Seq("TaBlE"), table("TaBlE").as("t").select(a))
.join(table("TaBlE2").as("u")).select(a)),
BroadcastHint(testRelation).select(a).join(BroadcastHint(testRelation2)).select(a))

checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE"),
table("TaBlE").as("t")
.join(Hint("MAPJOIN", Seq("TaBlE2"), table("TaBlE2").as("u").select(b))).select(a)),
BroadcastHint(testRelation).join(BroadcastHint(testRelation2).select(b)).select(a))
}

test("deep self join") {
checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE"),
table("TaBlE").join(table("TaBlE")).join(table("TaBlE")).join(table("TaBlE")).select(a)),
BroadcastHint(testRelation).join(testRelation).join(testRelation).join(testRelation)
.select(a))
}

test("subquery should be ignored") {
checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE"),
table("TaBlE").select(a).as("x").join(table("TaBlE")).select(a)),
testRelation.select(a).join(BroadcastHint(testRelation)).select(a))

checkAnalysis(
Hint("MAPJOIN", Seq("TaBlE"),
table("TaBlE").as("t").select(a).as("x")
.join(table("TaBlE2").as("t2")).select(a)),
testRelation.select(a).join(testRelation2).select(a))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -447,4 +447,46 @@ class PlanParserSuite extends PlanTest {
assertEqual("select a, b from db.c where x !> 1",
table("db", "c").where('x <= 1).select('a, 'b))
}

test("select hint syntax") {
// Hive compatibility: Missing parameter raises ParseException.
val m = intercept[ParseException] {
parsePlan("SELECT /*+ HINT() */ * FROM t")
}.getMessage
assert(m.contains("no viable alternative at input"))

// Hive compatibility: No database.
val m2 = intercept[ParseException] {
parsePlan("SELECT /*+ MAPJOIN(default.t) */ * from default.t")
}.getMessage
assert(m2.contains("no viable alternative at input"))

comparePlans(
parsePlan("SELECT /*+ HINT */ * FROM t"),
Hint("HINT", Seq.empty, table("t").select(star())))

comparePlans(
parsePlan("SELECT /*+ BROADCASTJOIN(u) */ * FROM t"),
Hint("BROADCASTJOIN", Seq("u"), table("t").select(star())))

comparePlans(
parsePlan("SELECT /*+ MAPJOIN(u) */ * FROM t"),
Hint("MAPJOIN", Seq("u"), table("t").select(star())))

comparePlans(
parsePlan("SELECT /*+ STREAMTABLE(a,b,c) */ * FROM t"),
Hint("STREAMTABLE", Seq("a", "b", "c"), table("t").select(star())))

comparePlans(
parsePlan("SELECT /*+ INDEX(t emp_job_ix) */ * FROM t"),
Hint("INDEX", Seq("t", "emp_job_ix"), table("t").select(star())))

comparePlans(
parsePlan("SELECT /*+ MAPJOIN(`default.t`) */ * from `default.t`"),
Hint("MAPJOIN", Seq("default.t"), table("default.t").select(star())))

comparePlans(
parsePlan("SELECT /*+ MAPJOIN(t) */ a from t where true group by a order by a"),
Hint("MAPJOIN", Seq("t"), table("t").where(Literal(true)).groupBy('a)('a)).orderBy('a.asc))
}
}
Loading