From b38a21ef6146784e4b93ef4ce8c899f1eee14572 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 16 Nov 2015 18:30:26 -0800 Subject: [PATCH 01/13] SPARK-11633 --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../spark/sql/hive/execution/SQLQuerySuite.scala | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 2f4670b55bdb..5a5b71e52dd7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -425,7 +425,8 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) + val attributeRewrites = + AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 3427152b2da0..5e00546a74c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,6 +51,8 @@ case class Order( state: String, month: Int) +case class Individual(F1: Integer, F2: Integer) + case class WindowData( month: Int, area: String, @@ -1479,4 +1481,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } + + test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { + val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) + val df = hiveContext.createDataFrame(rdd1) + df.registerTempTable("foo") + val df2 = sql("select f1, F2 as F2 from foo") + df2.registerTempTable("foo2") + df2.registerTempTable("foo3") + + checkAnswer(sql( + """ + SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 + """.stripMargin), Row(2) :: Row(1) :: Nil) + } } From 0546772f151f83d6d3cf4d000cbe341f52545007 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:56:45 -0800 Subject: [PATCH 02/13] converge --- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- .../spark/sql/hive/execution/SQLQuerySuite.scala | 15 --------------- 2 files changed, 1 insertion(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7c9512fbd00a..47962ebe6ef8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -417,8 +417,7 @@ class Analyzer( */ j case Some((oldRelation, newRelation)) => - val attributeRewrites = - AttributeMap(oldRelation.output.zip(newRelation.output).filter(x => x._1 != x._2)) + val attributeRewrites = AttributeMap(oldRelation.output.zip(newRelation.output)) val newRight = right transformUp { case r if r == oldRelation => newRelation } transformUp { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 5e00546a74c0..61d9dcd37572 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -51,8 +51,6 @@ case class Order( state: String, month: Int) -case class Individual(F1: Integer, F2: Integer) - case class WindowData( month: Int, area: String, @@ -1481,18 +1479,5 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - - test ("SPARK-11633: HiveContext throws TreeNode Exception : Failed to Copy Node") { - val rdd1 = sparkContext.parallelize(Seq( Individual(1,3), Individual(2,1))) - val df = hiveContext.createDataFrame(rdd1) - df.registerTempTable("foo") - val df2 = sql("select f1, F2 as F2 from foo") - df2.registerTempTable("foo2") - df2.registerTempTable("foo3") - - checkAnswer(sql( - """ - SELECT a.F1 FROM foo2 a INNER JOIN foo3 b ON a.F2=b.F2 - """.stripMargin), Row(2) :: Row(1) :: Nil) } } From b37a64f13956b6ddd0e38ddfd9fe1caee611f1a8 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Fri, 20 Nov 2015 10:58:37 -0800 Subject: [PATCH 03/13] converge --- .../org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 61d9dcd37572..3427152b2da0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1479,5 +1479,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { |FROM (SELECT '{"f1": "value1", "f2": 12}' json, 'hello' as str) test """.stripMargin), Row("value1", "12", 3.14, "hello")) } - } } From 95f25a6eb688a2cf3e3efa6ec7b7715884b1fa7b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 19 Mar 2016 21:00:32 -0700 Subject: [PATCH 04/13] group by ordinals --- .../spark/sql/catalyst/CatalystConf.scala | 11 ++- .../sql/catalyst/analysis/Analyzer.scala | 40 +++++++++- .../sql/catalyst/planning/patterns.scala | 14 ++++ .../sql/catalyst/analysis/AnalysisTest.scala | 4 +- .../analysis/DecimalPrecisionSuite.scala | 2 +- .../BooleanSimplificationSuite.scala | 5 +- .../apache/spark/sql/internal/SQLConf.scala | 6 ++ .../org/apache/spark/sql/SQLQuerySuite.scala | 75 +++++++++++++++++-- 8 files changed, 145 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala index 35884139b6be..24fa8ad3f8cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -22,6 +22,8 @@ import org.apache.spark.sql.catalyst.analysis._ private[spark] trait CatalystConf { def caseSensitiveAnalysis: Boolean + def groupByOrdinal: Boolean + /** * Returns the [[Resolver]] for the current configuration, which can be used to determin if two * identifiers are equal. @@ -43,8 +45,15 @@ object EmptyConf extends CatalystConf { override def caseSensitiveAnalysis: Boolean = { throw new UnsupportedOperationException } + + override def groupByOrdinal: Boolean = { + throw new UnsupportedOperationException + } } /** A CatalystConf that can be used for local testing. */ -case class SimpleCatalystConf(caseSensitiveAnalysis: Boolean) extends CatalystConf { +case class SimpleCatalystConf( + caseSensitiveAnalysis: Boolean, + groupByOrdinal: Boolean = true) + extends CatalystConf { } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e4e934a01541..d6243620aa9e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatal import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.planning.IntegerIndex import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -40,7 +41,10 @@ import org.apache.spark.sql.types._ * references. */ object SimpleAnalyzer - extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(true)) + extends Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = true)) /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and @@ -80,6 +84,7 @@ class Analyzer( ResolveGroupingAnalytics :: ResolvePivot :: ResolveUpCast :: + ResolveOrdinal :: ResolveSortReferences :: ResolveGenerate :: ResolveFunctions :: @@ -613,6 +618,39 @@ class Analyzer( } } + /** + * In many dialects of SQL it is valid to use ordinal positions in group by clauses. This rule is + * to convert ordinal positions to the corresponding expressions in the select list. + * + * Before the release of Spark 2.0, the literals in group by clauses have no effect on the results. + */ + object ResolveOrdinal extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + // Replace the index with the corresponding expression in aggregateExpressions. The index is + // a 1-base position of aggregateExpressions, which is output columns (select expression) + case a @ Aggregate(groups, aggs, child) + if conf.groupByOrdinal && child.resolved && aggs.forall(_.resolved) && + groups.exists(IntegerIndex.unapply(_).nonEmpty) => + val newGroups = groups.map { + case IntegerIndex(index) if index > 0 && index <= aggs.size => + aggs(index - 1) match { + case Alias(c, _) if c.isInstanceOf[AggregateExpression] => + throw new UnresolvedException(a, + s"Group by position: the '$index'th column in the select is " + + s"an aggregate function: ${c.sql}") + // Group by clause is unable to use the alias defined in aggregateExpressions + case Alias(c, _) => c + case o => o + } + case IntegerIndex(index) => + throw new UnresolvedException(a, + s"Group by position: '$index' exceeds the size of the select list '${aggs.size}'.") + case o => o + } + Aggregate(newGroups, aggs, child) + } + } + /** * In many dialects of SQL it is valid to sort by attributes that are not present in the SELECT * clause. This rule detects such queries and adds the required attributes to the original diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 62d54df98ecc..a3690b656262 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -24,6 +24,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types.IntegerType /** * A pattern that matches any number of project or filter operations on top of another relational @@ -202,3 +203,16 @@ object Unions { } } } + +/** + * Extractor for retrieving Int value. + */ +object IntegerIndex { + def unapply(a: Any): Option[Int] = a match { + case Literal(a: Int, IntegerType) => Some(a) + // When resolving ordinal in Sort and Group By, negative values are extracted + // for issuing error messages. + case UnaryMinus(IntegerLiteral(v)) => Some(-v) + case _ => None + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index ef825e606202..39166c4f8ef7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ trait AnalysisTest extends PlanTest { val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = { - val caseSensitiveConf = new SimpleCatalystConf(true) - val caseInsensitiveConf = new SimpleCatalystConf(false) + val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true) + val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false) val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index b2613e490928..9aa685e1e8f5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.types._ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { - val conf = new SimpleCatalystConf(true) + val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) val catalog = new SimpleCatalog(conf) val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index da43751b0a31..47b79fe46245 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -110,7 +110,10 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { } private val caseInsensitiveAnalyzer = - new Analyzer(EmptyCatalog, EmptyFunctionRegistry, new SimpleCatalystConf(false)) + new Analyzer( + EmptyCatalog, + EmptyFunctionRegistry, + new SimpleCatalystConf(caseSensitiveAnalysis = false)) test("(a && b) || (a && c) => a && (b || c) when case insensitive") { val plan = caseInsensitiveAnalyzer.execute( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 473cde56fdd3..5c8b0346736c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -435,6 +435,11 @@ object SQLConf { defaultValue = Some(true), doc = "When false, we will treat bucketed table as normal table") + val GROUP_BY_ORDINAL = booleanConf("spark.sql.groupByOrdinal", + defaultValue = Some(true), + doc = "When true, the ordinal numbers in group by clauses are treated as the position " + + "in the select list. When false, the ordinal numbers are ignored.") + // The output committer class used by HadoopFsRelation. The specified class needs to be a // subclass of org.apache.hadoop.mapreduce.OutputCommitter. // @@ -634,6 +639,7 @@ class SQLConf extends Serializable with CatalystConf with ParserConf with Loggin def supportSQL11ReservedKeywords: Boolean = getConf(PARSER_SUPPORT_SQL11_RESERVED_KEYWORDS) + override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL) /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6716982118fe..abc7dbcd5f3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -21,6 +21,8 @@ import java.math.MathContext import java.sql.Timestamp import org.apache.spark.AccumulatorSuite +import org.apache.spark.sql.catalyst.analysis.UnresolvedException +import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, CartesianProduct, SortMergeJoin} import org.apache.spark.sql.functions._ @@ -457,25 +459,86 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { Seq(Row(1, 3), Row(2, 3), Row(3, 3))) } - test("literal in agg grouping expressions") { + test("Group By Ordinal - basic") { checkAnswer( - sql("SELECT a, count(1) FROM testData2 GROUP BY a, 1"), - Seq(Row(1, 2), Row(2, 2), Row(3, 2))) - checkAnswer( - sql("SELECT a, count(2) FROM testData2 GROUP BY a, 2"), - Seq(Row(1, 2), Row(2, 2), Row(3, 2))) + sql("SELECT a, sum(b) FROM testData2 GROUP BY 1"), + sql("SELECT a, sum(b) FROM testData2 GROUP BY a")) + // duplicate group-by columns checkAnswer( sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1"), sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) + + checkAnswer( + sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY 1, 2"), + sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) + } + + test("Group By Ordinal - non aggregate expressions") { + checkAnswer( + sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, 2"), + sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2")) + + checkAnswer( + sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2"), + Seq(Row(1, 3, 1), Row(1, 4, 1), Row(2, 3, 1), Row(2, 4, 1), Row(3, 3, 1), Row(3, 4, 1))) + } + + test("Group By Ordinal - non-foldable constant expression") { + checkAnswer( + sql("SELECT a, b, sum(b) FROM testData2 GROUP BY a, b, 1 + 0"), + sql("SELECT a, b, sum(b) FROM testData2 GROUP BY a, b")) + checkAnswer( sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a, 1 + 2"), sql("SELECT a, 1, sum(b) FROM testData2 GROUP BY a")) + } + + test("Group By Ordinal - alias") { + checkAnswer( + sql("SELECT a, (b + 2) as c, count(2) FROM testData2 GROUP BY a, 2"), + sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2")) + + checkAnswer( + sql("SELECT a as b, b as a, sum(b) FROM testData2 GROUP BY 1, 2"), + sql("SELECT a, b, sum(b) FROM testData2 GROUP BY a, b")) + } + + test("Group By Ordinal - constants") { checkAnswer( sql("SELECT 1, 2, sum(b) FROM testData2 GROUP BY 1, 2"), sql("SELECT 1, 2, sum(b) FROM testData2")) } + test("Group By Ordinal - negative cases") { + intercept[UnresolvedException[Aggregate]] { + sql("SELECT * FROM testData2 GROUP BY -1").collect() + } + + intercept[UnresolvedException[Aggregate]] { + sql("SELECT * FROM testData2 GROUP BY 3").collect() + } + + val e = intercept[UnresolvedException[Aggregate]]( + sql("SELECT SUM(a) FROM testData2 GROUP BY 1").queryExecution.analyzed) + assert(e.getMessage contains + "Invalid call to Group by position: the '1'th column in the select is an aggregate function") + + val ae = intercept[AnalysisException]( + sql("SELECT a, rand(0), sum(b) FROM testData2 GROUP BY a, 2").queryExecution.analyzed) + assert(ae.getMessage contains + "nondeterministic expression rand(0) should not appear in grouping expression") + } + + test("Group By Ordinal: spark.sql.orderByOrdinal=false") { + withSQLConf(SQLConf.GROUP_BY_ORDINAL.key -> "false") { + // If spark.sql.groupByOrdinal=false, ignore the position number. + intercept[AnalysisException] { + sql("SELECT a, sum(b) FROM testData2 GROUP BY 1").collect() + } + } + } + test("aggregates with nulls") { checkAnswer( sql("SELECT SKEWNESS(a), KURTOSIS(a), MIN(a), MAX(a)," + From b10d076a71d863255a901861f5ca571816d8fca7 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 19 Mar 2016 21:11:34 -0700 Subject: [PATCH 05/13] fix messages. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d6243620aa9e..ce88d12c2e0e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -636,8 +636,8 @@ class Analyzer( aggs(index - 1) match { case Alias(c, _) if c.isInstanceOf[AggregateExpression] => throw new UnresolvedException(a, - s"Group by position: the '$index'th column in the select is " + - s"an aggregate function: ${c.sql}") + s"Group by position: the '$index'th column in the select is an aggregate " + + s"function: ${c.sql}. Aggregate functions are not allowed in GROUP BY") // Group by clause is unable to use the alias defined in aggregateExpressions case Alias(c, _) => c case o => o From 79a537aecdd788a80948aa22f61cca4901e8d0ee Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sat, 19 Mar 2016 21:45:33 -0700 Subject: [PATCH 06/13] '*' is not allowed to use in the select list when users specify ordinals in group by --- .../spark/sql/catalyst/analysis/Analyzer.scala | 10 ++++++++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 16 +++++++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ce88d12c2e0e..5df58a1117ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -495,9 +495,15 @@ class Analyzer( // If the aggregate function argument contains Stars, expand it. case a: Aggregate if containsStar(a.aggregateExpressions) => - val expanded = expandStarExpressions(a.aggregateExpressions, a.child) + if (conf.groupByOrdinal && a.groupingExpressions.exists(IntegerIndex.unapply(_).nonEmpty)) { + failAnalysis( + "Group by position: star is not allowed to use in the select list " + + "when using ordinals in group by") + } else { + val expanded = expandStarExpressions(a.aggregateExpressions, a.child) .map(_.asInstanceOf[NamedExpression]) - a.copy(aggregateExpressions = expanded) + a.copy(aggregateExpressions = expanded) + } // To resolve duplicate expression IDs for Join and Intersect case j @ Join(left, right, _, _) if !j.duplicateResolved => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index abc7dbcd5f3c..330d0e400043 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -512,11 +512,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("Group By Ordinal - negative cases") { intercept[UnresolvedException[Aggregate]] { - sql("SELECT * FROM testData2 GROUP BY -1").collect() + sql("SELECT a, b FROM testData2 GROUP BY -1").collect() } intercept[UnresolvedException[Aggregate]] { - sql("SELECT * FROM testData2 GROUP BY 3").collect() + sql("SELECT a, b FROM testData2 GROUP BY 3").collect() } val e = intercept[UnresolvedException[Aggregate]]( @@ -524,10 +524,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { assert(e.getMessage contains "Invalid call to Group by position: the '1'th column in the select is an aggregate function") - val ae = intercept[AnalysisException]( + var ae = intercept[AnalysisException]( sql("SELECT a, rand(0), sum(b) FROM testData2 GROUP BY a, 2").queryExecution.analyzed) assert(ae.getMessage contains "nondeterministic expression rand(0) should not appear in grouping expression") + + ae = intercept[AnalysisException]( + sql("SELECT * FROM testData2 GROUP BY a, b, 1").queryExecution.analyzed) + assert(ae.getMessage contains + "Group by position: star is not allowed to use in the select list " + + "when using ordinals in group by") } test("Group By Ordinal: spark.sql.orderByOrdinal=false") { @@ -536,6 +542,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { intercept[AnalysisException] { sql("SELECT a, sum(b) FROM testData2 GROUP BY 1").collect() } + // '*' is not allowed to use in the select list when users specify ordinals in group by + checkAnswer( + sql("SELECT * FROM testData2 GROUP BY a, b, 1"), + sql("SELECT * FROM testData2 GROUP BY a, b")) } } From a1835e5b567d64bbd71632395679acb563282f45 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Sun, 20 Mar 2016 22:25:14 -0700 Subject: [PATCH 07/13] address comments. --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 330d0e400043..21a7f5d9242d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -512,35 +512,35 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("Group By Ordinal - negative cases") { intercept[UnresolvedException[Aggregate]] { - sql("SELECT a, b FROM testData2 GROUP BY -1").collect() + sql("SELECT a, b FROM testData2 GROUP BY -1") } intercept[UnresolvedException[Aggregate]] { - sql("SELECT a, b FROM testData2 GROUP BY 3").collect() + sql("SELECT a, b FROM testData2 GROUP BY 3") } val e = intercept[UnresolvedException[Aggregate]]( - sql("SELECT SUM(a) FROM testData2 GROUP BY 1").queryExecution.analyzed) + sql("SELECT SUM(a) FROM testData2 GROUP BY 1")) assert(e.getMessage contains "Invalid call to Group by position: the '1'th column in the select is an aggregate function") var ae = intercept[AnalysisException]( - sql("SELECT a, rand(0), sum(b) FROM testData2 GROUP BY a, 2").queryExecution.analyzed) + sql("SELECT a, rand(0), sum(b) FROM testData2 GROUP BY a, 2")) assert(ae.getMessage contains "nondeterministic expression rand(0) should not appear in grouping expression") ae = intercept[AnalysisException]( - sql("SELECT * FROM testData2 GROUP BY a, b, 1").queryExecution.analyzed) + sql("SELECT * FROM testData2 GROUP BY a, b, 1")) assert(ae.getMessage contains "Group by position: star is not allowed to use in the select list " + "when using ordinals in group by") } - test("Group By Ordinal: spark.sql.orderByOrdinal=false") { + test("Group By Ordinal: spark.sql.groupByOrdinal=false") { withSQLConf(SQLConf.GROUP_BY_ORDINAL.key -> "false") { // If spark.sql.groupByOrdinal=false, ignore the position number. intercept[AnalysisException] { - sql("SELECT a, sum(b) FROM testData2 GROUP BY 1").collect() + sql("SELECT a, sum(b) FROM testData2 GROUP BY 1") } // '*' is not allowed to use in the select list when users specify ordinals in group by checkAnswer( From 960ead769dbd8bdb44c1820180d7edb1552efd89 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 21 Mar 2016 13:07:55 -0700 Subject: [PATCH 08/13] merge two ordinal resolution into the same one. --- .../sql/catalyst/analysis/Analyzer.scala | 55 ++++++++++--------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 886133fa22c1..1b9b18881e0d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -628,13 +628,38 @@ class Analyzer( } /** - * In many dialects of SQL it is valid to use ordinal positions in group by clauses. This rule is - * to convert ordinal positions to the corresponding expressions in the select list. + * In many dialects of SQL it is valid to use ordinal positions in order/sort by and group by + * clauses. This rule is to convert ordinal positions to the corresponding expressions in the + * select list. This support is introduced in Spark 2.0. * - * Before the release of Spark 2.0, the literals in group by clauses have no effect on the results. + * - When the sort references or group by expressions are not integer but foldable expressions, + * just ignore them. + * - When spark.sql.orderByOrdinal/spark.sql.groupByOrdinal is set to false, ignore the position + * numbers too. + * + * Before the release of Spark 2.0, the literals in order/sort by and group by clauses + * have no effect on the results. */ object ResolveOrdinal extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + // Replace the index with the related attribute for ORDER BY, + // which is a 1-base position of the projection list. + case s @ Sort(orders, global, child) + if conf.orderByOrdinal && child.resolved && + orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => + val newOrders = orders map { + case s @ SortOrder(IntegerIndex(index), direction) => + if (index > 0 && index <= child.output.size) { + SortOrder(child.output(index - 1), direction) + } else { + throw new UnresolvedException(s, + s"Order/sort By position: $index does not exist " + + s"The Select List is indexed from 1 to ${child.output.size}") + } + case o => o + } + Sort(newOrders, global, child) + // Replace the index with the corresponding expression in aggregateExpressions. The index is // a 1-base position of aggregateExpressions, which is output columns (select expression) case a @ Aggregate(groups, aggs, child) @@ -646,7 +671,7 @@ class Analyzer( case Alias(c, _) if c.isInstanceOf[AggregateExpression] => throw new UnresolvedException(a, s"Group by position: the '$index'th column in the select is an aggregate " + - s"function: ${c.sql}. Aggregate functions are not allowed in GROUP BY") + s"function: ${c.sql}. Aggregate functions are not allowed in GROUP BY") // Group by clause is unable to use the alias defined in aggregateExpressions case Alias(c, _) => c case o => o @@ -665,32 +690,10 @@ class Analyzer( * clause. This rule detects such queries and adds the required attributes to the original * projection, so that they will be available during sorting. Another projection is added to * remove these attributes after sorting. - * - * This rule also resolves the position number in sort references. This support is introduced - * in Spark 2.0. Before Spark 2.0, the integers in Order By has no effect on output sorting. - * - When the sort references are not integer but foldable expressions, ignore them. - * - When spark.sql.orderByOrdinal is set to false, ignore the position numbers too. */ object ResolveSortReferences extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case s: Sort if !s.child.resolved => s - // Replace the index with the related attribute for ORDER BY - // which is a 1-base position of the projection list. - case s @ Sort(orders, global, child) - if conf.orderByOrdinal && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => - val newOrders = orders map { - case s @ SortOrder(IntegerIndex(index), direction) => - if (index > 0 && index <= child.output.size) { - SortOrder(child.output(index - 1), direction) - } else { - throw new UnresolvedException(s, - s"Order/sort By position: $index does not exist " + - s"The Select List is indexed from 1 to ${child.output.size}") - } - case o => o - } - Sort(newOrders, global, child) - // Skip sort with aggregate. This will be handled in ResolveAggregateFunctions case sa @ Sort(_, _, child: Aggregate) => sa From b61345b962cce9f4981672156d1396db78f17cbc Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 21 Mar 2016 15:53:28 -0700 Subject: [PATCH 09/13] address comments. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1b9b18881e0d..9bda801d72dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -84,7 +84,7 @@ class Analyzer( ResolveGroupingAnalytics :: ResolvePivot :: ResolveUpCast :: - ResolveOrdinal :: + ResolveOrdinalInOrderByAndGroupBy :: ResolveSortReferences :: ResolveGenerate :: ResolveFunctions :: @@ -640,7 +640,7 @@ class Analyzer( * Before the release of Spark 2.0, the literals in order/sort by and group by clauses * have no effect on the results. */ - object ResolveOrdinal extends Rule[LogicalPlan] { + object ResolveOrdinalInOrderByAndGroupBy extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { // Replace the index with the related attribute for ORDER BY, // which is a 1-base position of the projection list. From dacf2d897a6682a54b2b4f58690a6193051c7c69 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 21 Mar 2016 21:08:15 -0700 Subject: [PATCH 10/13] temp fix. --- .../sql/catalyst/analysis/Analyzer.scala | 24 ++++++++++++------- .../org/apache/spark/sql/SQLQuerySuite.scala | 19 ++++++++++----- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 616f1a34bdc7..b8214b84aa36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -396,12 +396,18 @@ class Analyzer( Project(projectList = expanded, p.child) // If the aggregate function argument contains Stars, expand it. case a: Aggregate if containsStar(a.aggregateExpressions) => - val expanded = a.aggregateExpressions.flatMap { - case s: Star => s.expand(a.child, resolver) - case o if containsStar(o :: Nil) => expandStarExpression(o, a.child) :: Nil - case o => o :: Nil - }.map(_.asInstanceOf[NamedExpression]) - a.copy(aggregateExpressions = expanded) + if (conf.groupByOrdinal && a.groupingExpressions.exists(IntegerIndex.unapply(_).nonEmpty)) { + failAnalysis( + "Group by position: star is not allowed to use in the select list " + + "when using ordinals in group by") + } else { + val expanded = a.aggregateExpressions.flatMap { + case s: Star => s.expand(a.child, resolver) + case o if containsStar(o :: Nil) => expandStarExpression(o, a.child) :: Nil + case o => o :: Nil + }.map(_.asInstanceOf[NamedExpression]) + a.copy(aggregateExpressions = expanded) + } // If the script transformation input contains Stars, expand it. case t: ScriptTransformation if containsStar(t.input) => t.copy( @@ -670,10 +676,10 @@ class Analyzer( val newGroups = groups.map { case IntegerIndex(index) if index > 0 && index <= aggs.size => aggs(index - 1) match { - case Alias(c, _) if c.isInstanceOf[AggregateExpression] => + case e if ResolveAggregateFunctions.containsAggregate(e) => throw new UnresolvedException(a, - s"Group by position: the '$index'th column in the select is an aggregate " + - s"function: ${c.sql}. Aggregate functions are not allowed in GROUP BY") + s"Group by position: the '$index'th column in the select contains an " + + s"aggregate function: ${e.sql}. Aggregate functions are not allowed in GROUP BY") // Group by clause is unable to use the alias defined in aggregateExpressions case Alias(c, _) => c case o => o diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4b95822eeb30..49267de2f71d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -520,10 +520,17 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sql("SELECT a, b FROM testData2 GROUP BY 3") } - val e = intercept[UnresolvedException[Aggregate]]( + var e = intercept[UnresolvedException[Aggregate]]( sql("SELECT SUM(a) FROM testData2 GROUP BY 1")) assert(e.getMessage contains - "Invalid call to Group by position: the '1'th column in the select is an aggregate function") + "Invalid call to Group by position: the '1'th column in the select contains " + + "an aggregate function") + + e = intercept[UnresolvedException[Aggregate]]( + sql("SELECT SUM(a) + 1 FROM testData2 GROUP BY 1")) + assert(e.getMessage contains + "Invalid call to Group by position: the '1'th column in the select contains " + + "an aggregate function") var ae = intercept[AnalysisException]( sql("SELECT a, rand(0), sum(b) FROM testData2 GROUP BY a, 2")) @@ -540,10 +547,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("Group By Ordinal: spark.sql.groupByOrdinal=false") { withSQLConf(SQLConf.GROUP_BY_ORDINAL.key -> "false") { // If spark.sql.groupByOrdinal=false, ignore the position number. - intercept[AnalysisException] { - sql("SELECT a, sum(b) FROM testData2 GROUP BY 1") - } - // '*' is not allowed to use in the select list when users specify ordinals in group by + // intercept[AnalysisException] { + // sql("SELECT a, sum(b) FROM testData2 GROUP BY 1") + // } + // // '*' is not allowed to use in the select list when users specify ordinals in group by checkAnswer( sql("SELECT * FROM testData2 GROUP BY a, b, 1"), sql("SELECT * FROM testData2 GROUP BY a, b")) From b19b73ce731a34c2051d8c103ae0070036fe1a97 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Mon, 21 Mar 2016 22:20:24 -0700 Subject: [PATCH 11/13] fixed an issue in star expansion for group by --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 1 + .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b8214b84aa36..64591761a2b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -403,6 +403,7 @@ class Analyzer( } else { val expanded = a.aggregateExpressions.flatMap { case s: Star => s.expand(a.child, resolver) + case u @ UnresolvedAlias(_: Star, _) => expandStarExpression(u.child, a.child) :: Nil case o if containsStar(o :: Nil) => expandStarExpression(o, a.child) :: Nil case o => o :: Nil }.map(_.asInstanceOf[NamedExpression]) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 49267de2f71d..1692b12a96c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -547,10 +547,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { test("Group By Ordinal: spark.sql.groupByOrdinal=false") { withSQLConf(SQLConf.GROUP_BY_ORDINAL.key -> "false") { // If spark.sql.groupByOrdinal=false, ignore the position number. - // intercept[AnalysisException] { - // sql("SELECT a, sum(b) FROM testData2 GROUP BY 1") - // } - // // '*' is not allowed to use in the select list when users specify ordinals in group by + intercept[AnalysisException] { + sql("SELECT a, sum(b) FROM testData2 GROUP BY 1") + } + // '*' is not allowed to use in the select list when users specify ordinals in group by checkAnswer( sql("SELECT * FROM testData2 GROUP BY a, b, 1"), sql("SELECT * FROM testData2 GROUP BY a, b")) From 74a16be1ca2bfaf74a184eacbb7cf0f1fa53049b Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 22 Mar 2016 19:44:28 -0700 Subject: [PATCH 12/13] address comments. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 5 +---- .../src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 64591761a2b8..44eb585e1930 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -681,8 +681,6 @@ class Analyzer( throw new UnresolvedException(a, s"Group by position: the '$index'th column in the select contains an " + s"aggregate function: ${e.sql}. Aggregate functions are not allowed in GROUP BY") - // Group by clause is unable to use the alias defined in aggregateExpressions - case Alias(c, _) => c case o => o } case IntegerIndex(index) => @@ -702,11 +700,10 @@ class Analyzer( */ object ResolveSortReferences extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case s: Sort if !s.child.resolved => s // Skip sort with aggregate. This will be handled in ResolveAggregateFunctions case sa @ Sort(_, _, child: Aggregate) => sa - case s @ Sort(order, _, child) if !s.resolved => + case s @ Sort(order, _, child) if !s.resolved && child.resolved => try { val newOrder = order.map(resolveExpressionRecursively(_, child).asInstanceOf[SortOrder]) val requiredAttrs = AttributeSet(newOrder).filter(_.resolved) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1692b12a96c4..a6043efea3dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -481,8 +481,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2")) checkAnswer( - sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2"), - Seq(Row(1, 3, 1), Row(1, 4, 1), Row(2, 3, 1), Row(2, 4, 1), Row(3, 3, 1), Row(3, 4, 1))) + sql("SELECT a, b + 2 as c, count(2) FROM testData2 GROUP BY a, 2"), + sql("SELECT a, b + 2, count(2) FROM testData2 GROUP BY a, b + 2")) } test("Group By Ordinal - non-foldable constant expression") { From 6d080095db685735ab2a5d7af62195fa072efeb2 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 24 Mar 2016 19:21:05 -0700 Subject: [PATCH 13/13] address comments. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b26c2ff27fa0..d0a31e7620bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -656,11 +656,11 @@ class Analyzer( */ object ResolveOrdinalInOrderByAndGroupBy extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case p if !p.childrenResolved => p // Replace the index with the related attribute for ORDER BY, // which is a 1-base position of the projection list. case s @ Sort(orders, global, child) - if conf.orderByOrdinal && child.resolved && - orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => + if conf.orderByOrdinal && orders.exists(o => IntegerIndex.unapply(o.child).nonEmpty) => val newOrders = orders map { case s @ SortOrder(IntegerIndex(index), direction) => if (index > 0 && index <= child.output.size) { @@ -677,7 +677,7 @@ class Analyzer( // Replace the index with the corresponding expression in aggregateExpressions. The index is // a 1-base position of aggregateExpressions, which is output columns (select expression) case a @ Aggregate(groups, aggs, child) - if conf.groupByOrdinal && child.resolved && aggs.forall(_.resolved) && + if conf.groupByOrdinal && aggs.forall(_.resolved) && groups.exists(IntegerIndex.unapply(_).nonEmpty) => val newGroups = groups.map { case IntegerIndex(index) if index > 0 && index <= aggs.size =>