From 87fc62ab80324552ab6811aa05b7d897402a5464 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sat, 13 Oct 2018 19:17:18 +0200 Subject: [PATCH 1/5] [SPARK-25691][SQL] Use semantic equality in order to compare attributes --- .../spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- .../apache/spark/sql/catalyst/analysis/view.scala | 8 ++++---- .../spark/sql/catalyst/optimizer/Optimizer.scala | 7 ++----- .../sql/catalyst/plans/logical/LogicalPlan.scala | 14 ++++++++++++++ 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7f641ace4629..8f1ca932a263 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1179,7 +1179,7 @@ class Analyzer( if (!s.resolved || s.missingInput.nonEmpty) && child.resolved => val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, child) val ordering = newOrder.map(_.asInstanceOf[SortOrder]) - if (child.output == newChild.output) { + if (child.sameOutput(newChild)) { s.copy(order = ordering) } else { // Add missing attributes and then project them away. @@ -1189,7 +1189,7 @@ class Analyzer( case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved => val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) - if (child.output == newChild.output) { + if (child.sameOutput(newChild)) { f.copy(condition = newCond.head) } else { // Add missing attributes and then project them away. @@ -2087,7 +2087,7 @@ class Analyzer( // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. - case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => + case p: UnaryNode if p.sameOutput(p.child) && p.expressions.exists(!_.deterministic) => val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala index af74693000c4..6134d54531a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.internal.SQLConf */ case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp { - case v @ View(desc, output, child) if child.resolved && output != child.output => + case v @ View(desc, output, child) if child.resolved && !v.sameOutput(child) => val resolver = conf.resolver val queryColumnNames = desc.viewQueryColumnNames val queryOutput = if (queryColumnNames.nonEmpty) { @@ -70,7 +70,7 @@ case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupp } // Map the attributes in the query output to the attributes in the view output by index. val newOutput = output.zip(queryOutput).map { - case (attr, originAttr) if attr != originAttr => + case (attr, originAttr) if !attr.semanticEquals(originAttr) => // The dataType of the output attributes may be not the same with that of the view // output, so we should cast the attribute to the dataType of the view output attribute. // Will throw an AnalysisException if the cast can't perform or might truncate. @@ -112,8 +112,8 @@ object EliminateView extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { // The child should have the same output attributes with the View operator, so we simply // remove the View operator. - case View(_, output, child) => - assert(output == child.output, + case v @ View(_, output, child) => + assert(v.sameOutput(child), s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " + s"view output ${output.mkString("[", ",", "]")}") child diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index da8009d50b5e..0dd5e99b210d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -405,7 +405,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { */ object RemoveRedundantProject extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case p @ Project(_, child) if p.output == child.output => child + case p @ Project(_, child) if p.sameOutput(child) => child } } @@ -530,9 +530,6 @@ object PushProjectionThroughUnion extends Rule[LogicalPlan] with PredicateHelper * p2 is usually inserted by this rule and useless, p1 could prune the columns anyway. */ object ColumnPruning extends Rule[LogicalPlan] { - private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = - output1.size == output2.size && - output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2)) def apply(plan: LogicalPlan): LogicalPlan = removeProjectBeforeFilter(plan transform { // Prunes the unused columns from project list of Project/Aggregate/Expand @@ -607,7 +604,7 @@ object ColumnPruning extends Rule[LogicalPlan] { case w: Window if w.windowExpressions.isEmpty => w.child // Eliminate no-op Projects - case p @ Project(_, child) if sameOutput(child.output, p.output) => child + case p @ Project(_, child) if child.sameOutput(p) => child // Can't prune the columns on LeafNode case p @ Project(_, _: LeafNode) => p diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 5f136629eb15..1bf1254814a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -130,6 +130,20 @@ abstract class LogicalPlan * Returns the output ordering that this plan generates. */ def outputOrdering: Seq[SortOrder] = Nil + + /** + * Returns true iff `other`'s output is semantically the same, ie.: + * - it contains the same number of `Attribute`s; + * - references are the same; + * - the order is equal too. + */ + def sameOutput(other: LogicalPlan): Boolean = { + val thisOutput = this.output + val otherOutput = other.output + thisOutput.length == otherOutput.length && thisOutput.zip(otherOutput).forall { + case (a1, a2) => a1.semanticEquals(a2) + } + } } /** From 64aafc5fa08d995f508065d3d33e9b29bb4da6c5 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 14 Oct 2018 10:48:45 +0200 Subject: [PATCH 2/5] revert changes in Analyzer + add UT --- .../sql/catalyst/analysis/Analyzer.scala | 22 ++++++++++++++----- .../spark/sql/catalyst/dsl/package.scala | 3 +++ .../RemoveRedundantAliasAndProjectSuite.scala | 7 ++++++ 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8f1ca932a263..ced5d84426ef 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -40,18 +40,28 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** - * A trivial [[Analyzer]] with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]]. + * Trivial [[Analyzer]]s with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]]. * Used for testing when all relations are already filled in and the analyzer needs only * to resolve attribute references. */ -object SimpleAnalyzer extends Analyzer( +sealed class BaseSimpleAnalyzer(caseSensitive: Boolean) extends Analyzer( new SessionCatalog( new InMemoryCatalog, EmptyFunctionRegistry, - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) { + new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive)) { override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean) {} }, - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) + new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive)) + +/** + * A trivial analyzer which use case sensitive resolution. + */ +object SimpleAnalyzer extends BaseSimpleAnalyzer(true) + +/** + * A trivial analyzer which use case insensitive resolution. + */ +object SimpleCaseInsensitiveAnalyzer extends BaseSimpleAnalyzer(false) /** * Provides a way to keep state during the analysis, this enables us to decouple the concerns @@ -1189,7 +1199,7 @@ class Analyzer( case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved => val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) - if (child.sameOutput(newChild)) { + if (child.output == newChild.output) { f.copy(condition = newCond.head) } else { // Add missing attributes and then project them away. @@ -2087,7 +2097,7 @@ class Analyzer( // todo: It's hard to write a general rule to pull out nondeterministic expressions // from LogicalPlan, currently we only do it for UnaryNode which has same output // schema with its child. - case p: UnaryNode if p.sameOutput(p.child) && p.expressions.exists(!_.deterministic) => + case p: UnaryNode if p.output == p.child.output && p.expressions.exists(!_.deterministic) => val nondeterToAttr = getNondeterToAttr(p.expressions) val newPlan = p.transformExpressions { case e => nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 176ea823b1fc..405da3124d26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -401,6 +401,9 @@ package object dsl { def analyze: LogicalPlan = EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan)) + def analyzeCaseInsensitive: LogicalPlan = + EliminateSubqueryAliases(analysis.SimpleCaseInsensitiveAnalyzer.execute(logicalPlan)) + def hint(name: String, parameters: Any*): LogicalPlan = UnresolvedHint(name, parameters, logicalPlan) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala index 1973b5abb462..5b333098ffcc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala @@ -124,4 +124,11 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper val expected = Subquery(relation.select('a as "a", 'b).where('b < 10).select('a).analyze) comparePlans(optimized, expected) } + + test("SPARK-25691: RemoveRedundantProject works also with different cases") { + val relation = LocalRelation('a.int, 'b.int) + val query = relation.select('A, 'b).analyzeCaseInsensitive + val optimized = Optimize.execute(query) + comparePlans(optimized, relation) + } } From c4aaa8d07042eebb3a3425011fdef913230364a6 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 15 Oct 2018 11:19:05 +0200 Subject: [PATCH 3/5] narrow down to view only --- .../sql/catalyst/analysis/Analyzer.scala | 20 +++++-------------- .../spark/sql/catalyst/dsl/package.scala | 3 --- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../RemoveRedundantAliasAndProjectSuite.scala | 7 ------- 4 files changed, 6 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index ced5d84426ef..7f641ace4629 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -40,28 +40,18 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** - * Trivial [[Analyzer]]s with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]]. + * A trivial [[Analyzer]] with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]]. * Used for testing when all relations are already filled in and the analyzer needs only * to resolve attribute references. */ -sealed class BaseSimpleAnalyzer(caseSensitive: Boolean) extends Analyzer( +object SimpleAnalyzer extends Analyzer( new SessionCatalog( new InMemoryCatalog, EmptyFunctionRegistry, - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive)) { + new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) { override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean) {} }, - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> caseSensitive)) - -/** - * A trivial analyzer which use case sensitive resolution. - */ -object SimpleAnalyzer extends BaseSimpleAnalyzer(true) - -/** - * A trivial analyzer which use case insensitive resolution. - */ -object SimpleCaseInsensitiveAnalyzer extends BaseSimpleAnalyzer(false) + new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) /** * Provides a way to keep state during the analysis, this enables us to decouple the concerns @@ -1189,7 +1179,7 @@ class Analyzer( if (!s.resolved || s.missingInput.nonEmpty) && child.resolved => val (newOrder, newChild) = resolveExprsAndAddMissingAttrs(order, child) val ordering = newOrder.map(_.asInstanceOf[SortOrder]) - if (child.sameOutput(newChild)) { + if (child.output == newChild.output) { s.copy(order = ordering) } else { // Add missing attributes and then project them away. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 405da3124d26..176ea823b1fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -401,9 +401,6 @@ package object dsl { def analyze: LogicalPlan = EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan)) - def analyzeCaseInsensitive: LogicalPlan = - EliminateSubqueryAliases(analysis.SimpleCaseInsensitiveAnalyzer.execute(logicalPlan)) - def hint(name: String, parameters: Any*): LogicalPlan = UnresolvedHint(name, parameters, logicalPlan) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0dd5e99b210d..95455ffc0495 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -405,7 +405,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { */ object RemoveRedundantProject extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { - case p @ Project(_, child) if p.sameOutput(child) => child + case p @ Project(_, child) if p.output == child.output => child } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala index 5b333098ffcc..1973b5abb462 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala @@ -124,11 +124,4 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper val expected = Subquery(relation.select('a as "a", 'b).where('b < 10).select('a).analyze) comparePlans(optimized, expected) } - - test("SPARK-25691: RemoveRedundantProject works also with different cases") { - val relation = LocalRelation('a.int, 'b.int) - val query = relation.select('A, 'b).analyzeCaseInsensitive - val optimized = Optimize.execute(query) - comparePlans(optimized, relation) - } } From 0d334e33dcbbfbbf3c69cd0c26b5ce497a77675c Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 16 Oct 2018 12:31:57 +0200 Subject: [PATCH 4/5] add ut --- .../sql/catalyst/analysis/AnalysisSuite.scala | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index cf76c92b093b..e296d71e6b42 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.analysis -import java.util.TimeZone +import java.util.{Locale, TimeZone} import scala.reflect.ClassTag @@ -25,6 +25,7 @@ import org.scalatest.Matchers import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -32,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.{Cross, Inner} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning} +import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -604,4 +606,28 @@ class AnalysisSuite extends AnalysisTest with Matchers { checkAnalysis(input, expected) } } + + test("SPARK-25691: AliasViewChild with different nullabilities") { + object ViewAnalyzer extends RuleExecutor[LogicalPlan] { + val batches = Batch("View", Once, AliasViewChild(conf), EliminateView) :: Nil + } + def intNotNullableAttr(name: String): Attribute = { + AttributeReference(name, IntegerType, nullable = false)() + } + val relation = LocalRelation(intNotNullableAttr("a"), 'b.string) + val view = View(CatalogTable( + identifier = TableIdentifier("v1"), + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = StructType(Seq(StructField("a", IntegerType), StructField("b", StringType)))), + output = Seq('a.int, 'b.string), + child = relation) + val tz = Option(conf.sessionLocalTimeZone) + val expected = Project(Seq( + Alias(Cast(intNotNullableAttr("a"), IntegerType, tz), "a")(), + Alias(Cast('b.string, StringType, tz), "b")()), + relation) + val res = ViewAnalyzer.execute(view) + comparePlans(res, expected) + } } From 0e49e91acd6a9ab18993fb9c0ea34ddc200cc902 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 30 Oct 2018 15:13:49 +0100 Subject: [PATCH 5/5] address comments --- .../apache/spark/sql/catalyst/analysis/AnalysisSuite.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index e296d71e6b42..5447a45fe721 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -611,10 +611,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { object ViewAnalyzer extends RuleExecutor[LogicalPlan] { val batches = Batch("View", Once, AliasViewChild(conf), EliminateView) :: Nil } - def intNotNullableAttr(name: String): Attribute = { - AttributeReference(name, IntegerType, nullable = false)() - } - val relation = LocalRelation(intNotNullableAttr("a"), 'b.string) + val relation = LocalRelation('a.int.notNull, 'b.string) val view = View(CatalogTable( identifier = TableIdentifier("v1"), tableType = CatalogTableType.VIEW, @@ -624,7 +621,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { child = relation) val tz = Option(conf.sessionLocalTimeZone) val expected = Project(Seq( - Alias(Cast(intNotNullableAttr("a"), IntegerType, tz), "a")(), + Alias(Cast('a.int.notNull, IntegerType, tz), "a")(), Alias(Cast('b.string, StringType, tz), "b")()), relation) val res = ViewAnalyzer.execute(view)