From 8d6b324ef714f1b450bd3a8e443b165268df7b18 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 6 Mar 2015 21:33:30 +0800 Subject: [PATCH 1/5] fix 6145 completely --- .../spark/sql/catalyst/analysis/Analyzer.scala | 4 +--- .../sql/catalyst/plans/logical/basicOperators.scala | 12 ++++++++++++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 12 +++++++----- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7753331748d7b..07c09d756000a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.util.collection.OpenHashSet import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -321,8 +320,7 @@ class Analyzer(catalog: Catalog, if !s.resolved && p.resolved => val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name }) val resolved = unresolved.flatMap(child.resolve(_, resolver)) - val requiredAttributes = - AttributeSet(resolved.flatMap(_.collect { case a: Attribute => a })) + val requiredAttributes = AttributeSet(resolved) val missingInProject = requiredAttributes -- p.output if (missingInProject.nonEmpty) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 20cc8e90a71a3..255ceee8ccf18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.types._ @@ -152,6 +153,17 @@ case class Sort( global: Boolean, child: LogicalPlan) extends UnaryNode { override def output = child.output + + override def resolveChildren(name: String, resolver: Resolver) = { + val input = child match { + case Project(list, c) => list.filter { + case Alias(g: GetField, _) => false + case _ => true + }.map(_.toAttribute) + case _ => child.flatMap(_.output) + } + resolve(name, input, resolver) + } } case class Aggregate( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 4dedcd365f6cc..cdf6d8d2ff9fb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1053,10 +1053,12 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { test("SPARK-6145: ORDER BY test for nested fields") { jsonRDD(sparkContext.makeRDD( """{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil)).registerTempTable("nestedOrder") - // These should be successfully analyzed - sql("SELECT 1 FROM nestedOrder ORDER BY a.b").queryExecution.analyzed - sql("SELECT a.b FROM nestedOrder ORDER BY a.b").queryExecution.analyzed - sql("SELECT 1 FROM nestedOrder ORDER BY a.a.a").queryExecution.analyzed - sql("SELECT 1 FROM nestedOrder ORDER BY c[0].d").queryExecution.analyzed + + checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.b"), Row(1)) + checkAnswer(sql("SELECT a.b FROM nestedOrder ORDER BY a.b"), Row(1)) + checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.a.a"), Row(1)) + checkAnswer(sql("SELECT a.a.a FROM nestedOrder ORDER BY a.a.a"), Row(1)) + checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY c[0].d"), Row(1)) + checkAnswer(sql("SELECT c[0].d FROM nestedOrder ORDER BY c[0].d"), Row(1)) } } From e383154ab754dbd648e511602b5decab28a4c9f4 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 6 Mar 2015 22:25:27 +0800 Subject: [PATCH 2/5] handle special cases --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 1 + .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 255ceee8ccf18..b69cda5b459ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -158,6 +158,7 @@ case class Sort( val input = child match { case Project(list, c) => list.filter { case Alias(g: GetField, _) => false + case Alias(g: GetItem, _) => false case _ => true }.map(_.toAttribute) case _ => child.flatMap(_.output) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index cdf6d8d2ff9fb..3e4f34a10a242 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1061,4 +1061,11 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY c[0].d"), Row(1)) checkAnswer(sql("SELECT c[0].d FROM nestedOrder ORDER BY c[0].d"), Row(1)) } + + test("SPARK-6145: special cases") { + jsonRDD(sparkContext.makeRDD( + """{"a": {"b": [1]}, "b": [{"a": 1}], "c0": {"a": 1}}""" :: Nil)).registerTempTable("t") + checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY c0.a"), Row(1)) + checkAnswer(sql("SELECT b[0].a FROM t ORDER BY c0.a"), Row(1)) + } } From 187d97e0b35c351188f60781419e744a8fc5c060 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sat, 7 Mar 2015 08:45:02 +0800 Subject: [PATCH 3/5] feedback for marmbrus --- .../spark/sql/catalyst/analysis/Analyzer.scala | 13 ++++++++++++- .../sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- .../sql/catalyst/plans/logical/basicOperators.scala | 13 ------------- 3 files changed, 13 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 07c09d756000a..4870c73965048 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -256,11 +256,22 @@ class Analyzer(catalog: Catalog, case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString}") - q transformExpressionsUp { + q transformExpressionsUp { case u @ UnresolvedAttribute(name) if resolver(name, VirtualColumn.groupingIdName) && q.isInstanceOf[GroupingAnalytics] => // Resolve the virtual column GROUPING__ID for the operator GroupingAnalytics q.asInstanceOf[GroupingAnalytics].gid + case u @ UnresolvedAttribute(name) if q.isInstanceOf[Sort] => + val s = q.asInstanceOf[Sort] + val input = s.child match { + case Project(list, c) => list.filter { + case Alias(g: GetField, _) => false + case Alias(g: GetItem, _) => false + case _ => true + }.map(_.toAttribute) + case _ => s.child.flatMap(_.output) + } + s.resolve(name, input, resolver).getOrElse(u) case u @ UnresolvedAttribute(name) => // Leave unchanged if resolution fails. Hopefully will be resolved next round. val result = q.resolveChildren(name, resolver).getOrElse(u) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 8c4f09b58a4f2..98d49b5c1a8a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -155,7 +155,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { } /** Performs attribute resolution given a name and a sequence of possible attributes. */ - protected def resolve( + def resolve( name: String, input: Seq[Attribute], resolver: Resolver): Option[NamedExpression] = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index b69cda5b459ce..20cc8e90a71a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalyst.analysis.Resolver import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.types._ @@ -153,18 +152,6 @@ case class Sort( global: Boolean, child: LogicalPlan) extends UnaryNode { override def output = child.output - - override def resolveChildren(name: String, resolver: Resolver) = { - val input = child match { - case Project(list, c) => list.filter { - case Alias(g: GetField, _) => false - case Alias(g: GetItem, _) => false - case _ => true - }.map(_.toAttribute) - case _ => child.flatMap(_.output) - } - resolve(name, input, resolver) - } } case class Aggregate( From eab0b43d42c1ca727827a80067f5b91762a79ff2 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Sun, 8 Mar 2015 21:11:40 +0800 Subject: [PATCH 4/5] fix hive tests caused by a mistake --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4870c73965048..99eeb7e4f0e1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -269,7 +269,7 @@ class Analyzer(catalog: Catalog, case Alias(g: GetItem, _) => false case _ => true }.map(_.toAttribute) - case _ => s.child.flatMap(_.output) + case other => other.output } s.resolve(name, input, resolver).getOrElse(u) case u @ UnresolvedAttribute(name) => From 7b56fb90bc1e6babe7256bfe820b4a08585ee8f2 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 16 Mar 2015 10:14:20 +0800 Subject: [PATCH 5/5] minor enhance --- .../spark/sql/catalyst/analysis/Analyzer.scala | 18 ++++++++++-------- .../catalyst/plans/logical/LogicalPlan.scala | 2 +- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 99eeb7e4f0e1e..ab1a4946b3950 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -263,15 +263,17 @@ class Analyzer(catalog: Catalog, q.asInstanceOf[GroupingAnalytics].gid case u @ UnresolvedAttribute(name) if q.isInstanceOf[Sort] => val s = q.asInstanceOf[Sort] - val input = s.child match { - case Project(list, c) => list.filter { - case Alias(g: GetField, _) => false - case Alias(g: GetItem, _) => false - case _ => true - }.map(_.toAttribute) - case other => other.output + val newChild = s.child match { + case Project(list, c) => + val newList = list.filter { + case Alias(g: GetField, _) => false + case Alias(g: GetItem, _) => false + case _ => true + } + Project(newList, c) + case other => other } - s.resolve(name, input, resolver).getOrElse(u) + Sort(s.order, s.global, newChild).resolveChildren(name, resolver).getOrElse(u) case u @ UnresolvedAttribute(name) => // Leave unchanged if resolution fails. Hopefully will be resolved next round. val result = q.resolveChildren(name, resolver).getOrElse(u) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 98d49b5c1a8a2..8c4f09b58a4f2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -155,7 +155,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { } /** Performs attribute resolution given a name and a sequence of possible attributes. */ - def resolve( + protected def resolve( name: String, input: Seq[Attribute], resolver: Resolver): Option[NamedExpression] = {