Skip to content

Commit d024fff

Browse files
committed
Merge remote-tracking branch 'upstream/master' into nested-union
Conflicts: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala
2 parents a00261d + d806ed3 commit d024fff

File tree

17 files changed

+392
-13
lines changed

17 files changed

+392
-13
lines changed

graphx/src/main/scala/org/apache/spark/graphx/lib/StronglyConnectedComponents.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ object StronglyConnectedComponents {
3636
* @return a graph with vertex attributes containing the smallest vertex id in each SCC
3737
*/
3838
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED] = {
39-
39+
require(numIter > 0, s"Number of iterations ${numIter} must be greater than 0.")
4040
// the graph we update with final SCC ids, and the graph we return at the end
4141
var sccGraph = graph.mapVertices { case (vid, _) => vid }
4242
// graph we are going to work with in our iterations

sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ atomExpression
205205
| whenExpression
206206
| (functionName LPAREN) => function
207207
| tableOrColumn
208+
| (LPAREN KW_SELECT) => subQueryExpression
209+
-> ^(TOK_SUBQUERY_EXPR ^(TOK_SUBQUERY_OP) subQueryExpression)
208210
| LPAREN! expression RPAREN!
209211
;
210212

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
667667
UnresolvedAttribute(nameParts :+ cleanIdentifier(attr))
668668
case other => UnresolvedExtractValue(other, Literal(cleanIdentifier(attr)))
669669
}
670+
case Token("TOK_SUBQUERY_EXPR", Token("TOK_SUBQUERY_OP", Nil) :: subquery :: Nil) =>
671+
ScalarSubquery(nodeToPlan(subquery))
670672

671673
/* Stars (*) */
672674
case Token("TOK_ALLCOLREF", Nil) => UnresolvedStar(None)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ class Analyzer(
8080
ResolveGenerate ::
8181
ResolveFunctions ::
8282
ResolveAliases ::
83+
ResolveSubquery ::
8384
ResolveWindowOrder ::
8485
ResolveWindowFrame ::
8586
ResolveNaturalJoin ::
@@ -120,7 +121,14 @@ class Analyzer(
120121
withAlias.getOrElse(relation)
121122
}
122123
substituted.getOrElse(u)
124+
case other =>
125+
// This can't be done in ResolveSubquery because that does not know the CTE.
126+
other transformExpressions {
127+
case e: SubqueryExpression =>
128+
e.withNewPlan(substituteCTE(e.query, cteRelations))
129+
}
123130
}
131+
124132
}
125133
}
126134

@@ -716,6 +724,30 @@ class Analyzer(
716724
}
717725
}
718726

727+
/**
728+
* This rule resolve subqueries inside expressions.
729+
*
730+
* Note: CTE are handled in CTESubstitution.
731+
*/
732+
object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {
733+
734+
private def hasSubquery(e: Expression): Boolean = {
735+
e.find(_.isInstanceOf[SubqueryExpression]).isDefined
736+
}
737+
738+
private def hasSubquery(q: LogicalPlan): Boolean = {
739+
q.expressions.exists(hasSubquery)
740+
}
741+
742+
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
743+
case q: LogicalPlan if q.childrenResolved && hasSubquery(q) =>
744+
q transformExpressions {
745+
case e: SubqueryExpression if !e.query.resolved =>
746+
e.withNewPlan(execute(e.query))
747+
}
748+
}
749+
}
750+
719751
/**
720752
* Turns projections that contain aggregate expressions into aggregations.
721753
*/
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.expressions
19+
20+
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
21+
import org.apache.spark.sql.catalyst.plans.QueryPlan
22+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
23+
import org.apache.spark.sql.types.DataType
24+
25+
/**
26+
* An interface for subquery that is used in expressions.
27+
*/
28+
abstract class SubqueryExpression extends LeafExpression {
29+
30+
/**
31+
* The logical plan of the query.
32+
*/
33+
def query: LogicalPlan
34+
35+
/**
36+
* Either a logical plan or a physical plan. The generated tree string (explain output) uses this
37+
* field to explain the subquery.
38+
*/
39+
def plan: QueryPlan[_]
40+
41+
/**
42+
* Updates the query with new logical plan.
43+
*/
44+
def withNewPlan(plan: LogicalPlan): SubqueryExpression
45+
}
46+
47+
/**
48+
* A subquery that will return only one row and one column.
49+
*
50+
* This will be converted into [[execution.ScalarSubquery]] during physical planning.
51+
*
52+
* Note: `exprId` is used to have unique name in explain string output.
53+
*/
54+
case class ScalarSubquery(
55+
query: LogicalPlan,
56+
exprId: ExprId = NamedExpression.newExprId)
57+
extends SubqueryExpression with Unevaluable {
58+
59+
override def plan: LogicalPlan = Subquery(toString, query)
60+
61+
override lazy val resolved: Boolean = query.resolved
62+
63+
override def dataType: DataType = query.schema.fields.head.dataType
64+
65+
override def checkInputDataTypes(): TypeCheckResult = {
66+
if (query.schema.length != 1) {
67+
TypeCheckResult.TypeCheckFailure("Scalar subquery must return only one column, but got " +
68+
query.schema.length.toString)
69+
} else {
70+
TypeCheckResult.TypeCheckSuccess
71+
}
72+
}
73+
74+
override def foldable: Boolean = false
75+
override def nullable: Boolean = true
76+
77+
override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan, exprId)
78+
79+
override def toString: String = s"subquery#${exprId.id}"
80+
81+
// TODO: support sql()
82+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,19 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
9090
Batch("Decimal Optimizations", FixedPoint(100),
9191
DecimalAggregates) ::
9292
Batch("LocalRelation", FixedPoint(100),
93-
ConvertToLocalRelation) :: Nil
93+
ConvertToLocalRelation) ::
94+
Batch("Subquery", Once,
95+
OptimizeSubqueries) :: Nil
96+
}
97+
98+
/**
99+
* Optimize all the subqueries inside expression.
100+
*/
101+
object OptimizeSubqueries extends Rule[LogicalPlan] {
102+
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
103+
case subquery: SubqueryExpression =>
104+
subquery.withNewPlan(Optimizer.this.execute(subquery.query))
105+
}
94106
}
95107
}
96108

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.sql.catalyst.plans
1919

2020
import org.apache.spark.sql.catalyst.expressions._
21+
import org.apache.spark.sql.catalyst.plans.logical.Subquery
2122
import org.apache.spark.sql.catalyst.trees.TreeNode
2223
import org.apache.spark.sql.types.{DataType, StructType}
2324

@@ -226,4 +227,9 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
226227
protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
227228

228229
override def simpleString: String = statePrefix + super.simpleString
230+
231+
override def treeChildren: Seq[PlanType] = {
232+
val subqueries = expressions.flatMap(_.collect {case e: SubqueryExpression => e})
233+
children ++ subqueries.map(e => e.plan.asInstanceOf[PlanType])
234+
}
229235
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
448448
}
449449
}
450450

451+
/**
452+
* All the nodes that will be used to generate tree string.
453+
*/
454+
protected def treeChildren: Seq[BaseType] = children
455+
451456
/**
452457
* Appends the string represent of this node and its children to the given StringBuilder.
453458
*
@@ -470,9 +475,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
470475
builder.append(simpleString)
471476
builder.append("\n")
472477

473-
if (children.nonEmpty) {
474-
children.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
475-
children.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
478+
if (treeChildren.nonEmpty) {
479+
treeChildren.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
480+
treeChildren.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
476481
}
477482

478483
builder

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis._
2222
import org.apache.spark.sql.catalyst.expressions._
2323
import org.apache.spark.sql.catalyst.plans.PlanTest
2424
import org.apache.spark.sql.catalyst.plans.logical._
25+
import org.apache.spark.sql.types.BooleanType
2526
import org.apache.spark.unsafe.types.CalendarInterval
2627

2728
class CatalystQlSuite extends PlanTest {
@@ -265,4 +266,10 @@ class CatalystQlSuite extends PlanTest {
265266

266267
comparePlans(parsed2, expected2)
267268
}
269+
270+
test("subquery") {
271+
parser.parsePlan("select (select max(b) from s) ss from t")
272+
parser.parsePlan("select * from t where a = (select b from s)")
273+
parser.parsePlan("select * from t group by g having a > (select b from s)")
274+
}
268275
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,17 @@ class AnalysisErrorSuite extends AnalysisTest {
113113

114114
val dateLit = Literal.create(null, DateType)
115115

116+
errorTest(
117+
"scalar subquery with 2 columns",
118+
testRelation.select(
119+
(ScalarSubquery(testRelation.select('a, dateLit.as('b))) + Literal(1)).as('a)),
120+
"Scalar subquery must return only one column, but got 2" :: Nil)
121+
122+
errorTest(
123+
"scalar subquery with no column",
124+
testRelation.select(ScalarSubquery(LocalRelation()).as('a)),
125+
"Scalar subquery must return only one column, but got 0" :: Nil)
126+
116127
errorTest(
117128
"single invalid type, single arg",
118129
testRelation.select(TestFunction(dateLit :: Nil, IntegerType :: Nil).as('a)),

0 commit comments

Comments
 (0)