From 276879ca2bd8d2966b829b7e41e140362c4e4160 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 17 Aug 2018 02:37:18 +0800 Subject: [PATCH 1/6] insert datasource table may all null when select from view --- .../sql/execution/datasources/rules.scala | 3 ++- .../org/apache/spark/sql/SQLQuerySuite.scala | 24 +++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 949aa665527a..d1d40a57d3f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -490,7 +490,8 @@ object DDLPreprocessingUtils { case (expected, actual) => if (expected.dataType.sameType(actual.dataType) && expected.name == actual.name && - expected.metadata == actual.metadata) { + expected.metadata == actual.metadata && + expected.exprId.id == actual.exprId.id) { actual } else { // Renaming is needed for handling the following cases like diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 84efd2b7a1dc..854f792af88f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2852,4 +2852,28 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { spark.sql(s"select * from spark_25084 distribute by ($distributeExprs)").count === count) } } + + test("SPARK-25135: insert datasource table may all null when select from view") { + withTempDir { dir => + val path = dir.getCanonicalPath + val cnt = 30 + spark.range(cnt).selectExpr("cast(id as bigint) as col1", "cast(id as bigint) as col2") + .write.mode(SaveMode.Overwrite).parquet(path) + withTable("table1", "table2") { + spark.sql("CREATE TABLE table1(col1 bigint, col2 bigint) using parquet " + + s"location '$path'") + + withView("view1") { + spark.sql("create view view1 as select col1, col2 from table1 where col1 > -20") + + spark.sql("create table table2 (COL1 BIGINT, COL2 BIGINT) using parquet") + + spark.sql("insert overwrite table table2 select COL1, COL2 from view1") + + assert(spark.table("table2").count() === cnt) + checkAnswer(spark.table("table1"), spark.table("table2")) + } + } + } + } } From bb93ca0e5202809bbd610f11091cfc8c493784e6 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 17 Aug 2018 15:33:33 +0800 Subject: [PATCH 2/6] Add new rule to handle Project contains View --- .../sql/catalyst/analysis/Analyzer.scala | 6 +++++ .../sql/execution/datasources/rules.scala | 3 +-- .../org/apache/spark/sql/SQLQuerySuite.scala | 27 +++++++++++++------ 3 files changed, 26 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d00b82d35d7d..5c5994dae339 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -901,6 +901,12 @@ class Analyzer( // If the projection list contains Stars, expand it. case p: Project if containsStar(p.projectList) => p.copy(projectList = buildExpandedProjectList(p.projectList, p.child)) + case p @ Project(projectList, _ @ SubqueryAlias(_, view: View)) + if projectList.forall(_.resolved) => + val newProjectList = projectList.map { col => + view.output.find(_.name.equalsIgnoreCase(col.name)).get + } + p.copy(projectList = newProjectList) // If the aggregate function argument contains Stars, expand it. case a: Aggregate if containsStar(a.aggregateExpressions) => if (a.groupingExpressions.exists(_.isInstanceOf[UnresolvedOrdinal])) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index d1d40a57d3f9..949aa665527a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -490,8 +490,7 @@ object DDLPreprocessingUtils { case (expected, actual) => if (expected.dataType.sameType(actual.dataType) && expected.name == actual.name && - expected.metadata == actual.metadata && - expected.exprId.id == actual.exprId.id) { + expected.metadata == actual.metadata) { actual } else { // Renaming is needed for handling the following cases like diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 854f792af88f..2afc2bc71ed8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2859,19 +2859,30 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { val cnt = 30 spark.range(cnt).selectExpr("cast(id as bigint) as col1", "cast(id as bigint) as col2") .write.mode(SaveMode.Overwrite).parquet(path) - withTable("table1", "table2") { - spark.sql("CREATE TABLE table1(col1 bigint, col2 bigint) using parquet " + - s"location '$path'") + withTable("table1", "table2", "table3", "table4") { + spark.sql(s"CREATE TABLE table1(col1 bigint, col2 bigint) using parquet location '$path'") withView("view1") { - spark.sql("create view view1 as select col1, col2 from table1 where col1 > -20") - - spark.sql("create table table2 (COL1 BIGINT, COL2 BIGINT) using parquet") - - spark.sql("insert overwrite table table2 select COL1, COL2 from view1") + spark.sql("CREATE VIEW view1 as select col1, col2 from table1 where col1 > -20") + spark.sql("CREATE TABLE table2 (COL1 BIGINT, COL2 BIGINT) using parquet") + spark.sql("INSERT OVERWRITE TABLE table2 select COL1, COL2 from view1") assert(spark.table("table2").count() === cnt) checkAnswer(spark.table("table1"), spark.table("table2")) + + spark.sql("CREATE TABLE table3 (COL1 BIGINT) using parquet") + spark.sql("INSERT OVERWRITE TABLE table3 select COL1 from view1") + assert(spark.table("table3").count() === cnt) + checkAnswer(spark.table("table1").select("COL1"), spark.table("table3")) + + spark.sql("CREATE TABLE table4 (COL1 BIGINT, COL2 BIGINT, COL3 BIGINT) using parquet") + spark.sql("INSERT OVERWRITE TABLE table4 select COL1, COL1, COL2 from view1") + assert(spark.table("table4").count() === cnt) + checkAnswer(spark.table("table1").select("col1", "col1", "col2"), spark.table("table4")) + + assertThrows[AnalysisException] { + spark.sql("INSERT OVERWRITE TABLE table4 select COL1, COL3, COL2 from view1") + } } } } From 9b16ff0f0581366f587db735658e3110237ceef0 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 18 Aug 2018 18:09:41 +0800 Subject: [PATCH 3/6] Fix test error --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 9 ++++++--- .../test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5c5994dae339..f079266237ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -901,10 +901,13 @@ class Analyzer( // If the projection list contains Stars, expand it. case p: Project if containsStar(p.projectList) => p.copy(projectList = buildExpandedProjectList(p.projectList, p.child)) - case p @ Project(projectList, _ @ SubqueryAlias(_, view: View)) + case p @ Project(projectList, _ @ SubqueryAlias(_, _ @ View(_, output, _))) if projectList.forall(_.resolved) => - val newProjectList = projectList.map { col => - view.output.find(_.name.equalsIgnoreCase(col.name)).get + val newProjectList = projectList.map { n => + output.find(_.exprId == n.exprId) match { + case Some(a) => a + case _ => n + } } p.copy(projectList = newProjectList) // If the aggregate function argument contains Stars, expand it. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 2afc2bc71ed8..961d80f110e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2880,6 +2880,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { assert(spark.table("table4").count() === cnt) checkAnswer(spark.table("table1").select("col1", "col1", "col2"), spark.table("table4")) + spark.sql("INSERT OVERWRITE TABLE table4 select 1, COL1, COL2 from view1") + assert(spark.table("table4").count() === cnt) + checkAnswer(spark.table("table1").selectExpr("1", "col1", "col2"), spark.table("table4")) + assertThrows[AnalysisException] { spark.sql("INSERT OVERWRITE TABLE table4 select COL1, COL3, COL2 from view1") } From c5a015cbee4c5d8ac745a291fc3543da5f16114f Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 20 Aug 2018 15:32:34 +0800 Subject: [PATCH 4/6] Do not remove redundant aliases if plan is a Command --- .../spark/sql/catalyst/analysis/Analyzer.scala | 9 --------- .../spark/sql/catalyst/optimizer/Optimizer.scala | 7 ++++++- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 14 ++++++++++++-- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f079266237ce..d00b82d35d7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -901,15 +901,6 @@ class Analyzer( // If the projection list contains Stars, expand it. case p: Project if containsStar(p.projectList) => p.copy(projectList = buildExpandedProjectList(p.projectList, p.child)) - case p @ Project(projectList, _ @ SubqueryAlias(_, _ @ View(_, output, _))) - if projectList.forall(_.resolved) => - val newProjectList = projectList.map { n => - output.find(_.exprId == n.exprId) match { - case Some(a) => a - case _ => n - } - } - p.copy(projectList = newProjectList) // If the aggregate function argument contains Stars, expand it. case a: Aggregate if containsStar(a.aggregateExpressions) => if (a.groupingExpressions.exists(_.isInstanceOf[UnresolvedOrdinal])) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2ff67689c349..134d78ebab13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -384,7 +384,12 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { } } - def apply(plan: LogicalPlan): LogicalPlan = removeRedundantAliases(plan, AttributeSet.empty) + def apply(plan: LogicalPlan): LogicalPlan = { + plan match { + case c: Command => c + case _ => removeRedundantAliases(plan, AttributeSet.empty) + } + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 961d80f110e3..89e5f6ccbe26 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2853,7 +2853,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("SPARK-25135: insert datasource table may all null when select from view") { + test("SPARK-25135: insert table may all null when select from view") { withTempDir { dir => val path = dir.getCanonicalPath val cnt = 30 @@ -2862,7 +2862,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { withTable("table1", "table2", "table3", "table4") { spark.sql(s"CREATE TABLE table1(col1 bigint, col2 bigint) using parquet location '$path'") - withView("view1") { + withView("view1", "view2") { spark.sql("CREATE VIEW view1 as select col1, col2 from table1 where col1 > -20") spark.sql("CREATE TABLE table2 (COL1 BIGINT, COL2 BIGINT) using parquet") @@ -2887,6 +2887,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { assertThrows[AnalysisException] { spark.sql("INSERT OVERWRITE TABLE table4 select COL1, COL3, COL2 from view1") } + + spark.sql("CREATE TEMP VIEW view2 as select col1, 1 as col2 from view1") + + spark.sql("INSERT OVERWRITE TABLE table2 select COL1, COL2 from view2") + assert(spark.table("table2").count() === cnt) + checkAnswer(spark.table("table1").selectExpr("col1", "1"), spark.table("table2")) + + spark.sql("INSERT OVERWRITE TABLE table2 select col1, COL2 from view2") + assert(spark.table("table2").count() === cnt) + checkAnswer(spark.table("table1").selectExpr("col1", "1"), spark.table("table2")) } } } From 419a87461735a604a7856703efd089da2b8148ae Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 20 Aug 2018 17:22:28 +0800 Subject: [PATCH 5/6] Don't remove redundant aliases in the top project. --- .../sql/catalyst/optimizer/Optimizer.scala | 69 +++++++++++-------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 134d78ebab13..fc00021cb08c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -325,6 +325,33 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { case a => a } + private def removeSpecialRedundantAliases( + plan: LogicalPlan, + currentNextAttrPairs: mutable.Buffer[(Attribute, Attribute)], + newNode: LogicalPlan, + blacklist: AttributeSet): LogicalPlan = { + // Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate + // keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this + // case we use the the first mapping (which should be provided by the first child). + val mapping = AttributeMap(currentNextAttrPairs) + + // Create a an expression cleaning function for nodes that can actually produce redundant + // aliases, use identity otherwise. + val clean: Expression => Expression = plan match { + case _: Project => removeRedundantAlias(_, blacklist) + case _: Aggregate => removeRedundantAlias(_, blacklist) + case _: Window => removeRedundantAlias(_, blacklist) + case _ => identity[Expression] + } + + // Transform the expressions. + newNode.mapExpressions { expr => + clean(expr.transform { + case a: Attribute => mapping.getOrElse(a, a) + }) + } + } + /** * Remove redundant alias expression from a LogicalPlan and its subtree. A blacklist is used to * prevent the removal of seemingly redundant aliases used to deduplicate the input for a (self) @@ -346,12 +373,23 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { val newRight = removeRedundantAliases(right, blacklist ++ newLeft.outputSet) val mapping = AttributeMap( createAttributeMapping(left, newLeft) ++ - createAttributeMapping(right, newRight)) + createAttributeMapping(right, newRight)) val newCondition = condition.map(_.transform { case a: Attribute => mapping.getOrElse(a, a) }) Join(newLeft, newRight, joinType, newCondition) + case command: Command => + // Add child.outputSet to blacklist otherwise + // the schema written in the file may not match the schema of the table. + val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)] + val newNode = command.mapChildren { child => + val newChild = removeRedundantAliases(child, blacklist ++ child.outputSet) + currentNextAttrPairs ++= createAttributeMapping(child, newChild) + newChild + } + removeSpecialRedundantAliases(plan, currentNextAttrPairs, newNode, blacklist) + case _ => // Remove redundant aliases in the subtree(s). val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)] @@ -360,36 +398,11 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { currentNextAttrPairs ++= createAttributeMapping(child, newChild) newChild } - - // Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate - // keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this - // case we use the the first mapping (which should be provided by the first child). - val mapping = AttributeMap(currentNextAttrPairs) - - // Create a an expression cleaning function for nodes that can actually produce redundant - // aliases, use identity otherwise. - val clean: Expression => Expression = plan match { - case _: Project => removeRedundantAlias(_, blacklist) - case _: Aggregate => removeRedundantAlias(_, blacklist) - case _: Window => removeRedundantAlias(_, blacklist) - case _ => identity[Expression] - } - - // Transform the expressions. - newNode.mapExpressions { expr => - clean(expr.transform { - case a: Attribute => mapping.getOrElse(a, a) - }) - } + removeSpecialRedundantAliases(plan, currentNextAttrPairs, newNode, blacklist) } } - def apply(plan: LogicalPlan): LogicalPlan = { - plan match { - case c: Command => c - case _ => removeRedundantAliases(plan, AttributeSet.empty) - } - } + def apply(plan: LogicalPlan): LogicalPlan = removeRedundantAliases(plan, AttributeSet.empty) } /** From 72bde208e191483c2151bc9d4952488d41d2f35d Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 21 Aug 2018 16:18:01 +0800 Subject: [PATCH 6/6] Use has been resolved Attribute instead of create new one. --- .../sql/catalyst/expressions/package.scala | 2 +- .../sql/catalyst/optimizer/Optimizer.scala | 62 +++++++------------ .../analysis/DataSourceV2AnalysisSuite.scala | 2 +- .../command/DataWritingCommand.scala | 2 +- .../results/order-by-nulls-ordering.sql.out | 8 +-- .../results/query_regex_column.sql.out | 4 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 49 --------------- .../parquet/ParquetQuerySuite.scala | 25 ++++++++ 8 files changed, 56 insertions(+), 98 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index 11dcc3ebf798..61e40b1eba65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -177,7 +177,7 @@ package object expressions { // Collect matching attributes given a name and a lookup. def collectMatches(name: String, candidates: Option[Seq[Attribute]]): Seq[Attribute] = { candidates.toSeq.flatMap(_.collect { - case a if resolver(a.name, name) => a.withName(name) + case a if resolver(a.name, name) => a }) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 5b5311f62f89..63a62cd0cbfe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -326,33 +326,6 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { case a => a } - private def removeSpecialRedundantAliases( - plan: LogicalPlan, - currentNextAttrPairs: mutable.Buffer[(Attribute, Attribute)], - newNode: LogicalPlan, - blacklist: AttributeSet): LogicalPlan = { - // Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate - // keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this - // case we use the the first mapping (which should be provided by the first child). - val mapping = AttributeMap(currentNextAttrPairs) - - // Create a an expression cleaning function for nodes that can actually produce redundant - // aliases, use identity otherwise. - val clean: Expression => Expression = plan match { - case _: Project => removeRedundantAlias(_, blacklist) - case _: Aggregate => removeRedundantAlias(_, blacklist) - case _: Window => removeRedundantAlias(_, blacklist) - case _ => identity[Expression] - } - - // Transform the expressions. - newNode.mapExpressions { expr => - clean(expr.transform { - case a: Attribute => mapping.getOrElse(a, a) - }) - } - } - /** * Remove redundant alias expression from a LogicalPlan and its subtree. A blacklist is used to * prevent the removal of seemingly redundant aliases used to deduplicate the input for a (self) @@ -374,23 +347,12 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { val newRight = removeRedundantAliases(right, blacklist ++ newLeft.outputSet) val mapping = AttributeMap( createAttributeMapping(left, newLeft) ++ - createAttributeMapping(right, newRight)) + createAttributeMapping(right, newRight)) val newCondition = condition.map(_.transform { case a: Attribute => mapping.getOrElse(a, a) }) Join(newLeft, newRight, joinType, newCondition) - case command: Command => - // Add child.outputSet to blacklist otherwise - // the schema written in the file may not match the schema of the table. - val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)] - val newNode = command.mapChildren { child => - val newChild = removeRedundantAliases(child, blacklist ++ child.outputSet) - currentNextAttrPairs ++= createAttributeMapping(child, newChild) - newChild - } - removeSpecialRedundantAliases(plan, currentNextAttrPairs, newNode, blacklist) - case _ => // Remove redundant aliases in the subtree(s). val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)] @@ -399,7 +361,27 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { currentNextAttrPairs ++= createAttributeMapping(child, newChild) newChild } - removeSpecialRedundantAliases(plan, currentNextAttrPairs, newNode, blacklist) + + // Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate + // keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this + // case we use the the first mapping (which should be provided by the first child). + val mapping = AttributeMap(currentNextAttrPairs) + + // Create a an expression cleaning function for nodes that can actually produce redundant + // aliases, use identity otherwise. + val clean: Expression => Expression = plan match { + case _: Project => removeRedundantAlias(_, blacklist) + case _: Aggregate => removeRedundantAlias(_, blacklist) + case _: Window => removeRedundantAlias(_, blacklist) + case _ => identity[Expression] + } + + // Transform the expressions. + newNode.mapExpressions { expr => + clean(expr.transform { + case a: Attribute => mapping.getOrElse(a, a) + }) + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index 6c899b610ac5..9c0576b29d42 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -87,7 +87,7 @@ class DataSourceV2AnalysisSuite extends AnalysisTest { val parsedPlan = AppendData.byName(table, query) val expectedPlan = AppendData.byName(table, Project(Seq( - Alias(Cast(toLower(X), FloatType, Some(conf.sessionLocalTimeZone)), "x")(), + Alias(Cast(X, FloatType, Some(conf.sessionLocalTimeZone)), "x")(), Alias(Cast(y, FloatType, Some(conf.sessionLocalTimeZone)), "y")()), query)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index e11dbd201004..e5075404b4d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker import org.apache.spark.sql.execution.datasources.FileFormatWriter -import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.util.SerializableConfiguration /** diff --git a/sql/core/src/test/resources/sql-tests/results/order-by-nulls-ordering.sql.out b/sql/core/src/test/resources/sql-tests/results/order-by-nulls-ordering.sql.out index c1b63dfb8cae..2dcf1be47259 100644 --- a/sql/core/src/test/resources/sql-tests/results/order-by-nulls-ordering.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/order-by-nulls-ordering.sql.out @@ -102,7 +102,7 @@ struct -- !query 6 SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 ASC NULLS FIRST, COL2 -- !query 6 schema -struct +struct -- !query 6 output 6 10 NULL 6 13 NULL @@ -118,7 +118,7 @@ struct -- !query 7 SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 NULLS LAST, COL2 -- !query 7 schema -struct +struct -- !query 7 output 6 7 4 6 11 4 @@ -134,7 +134,7 @@ struct -- !query 8 SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 DESC NULLS FIRST, COL2 -- !query 8 schema -struct +struct -- !query 8 output 6 10 NULL 6 13 NULL @@ -150,7 +150,7 @@ struct -- !query 9 SELECT COL1, COL2, COL3 FROM spark_10747 ORDER BY COL3 DESC NULLS LAST, COL2 -- !query 9 schema -struct +struct -- !query 9 output 6 9 10 6 12 10 diff --git a/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out b/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out index 2dade86f35df..72bd46cc7705 100644 --- a/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/query_regex_column.sql.out @@ -183,7 +183,7 @@ struct<> -- !query 20 SELECT p.`(KEY)?+.+`, b, testdata2.`(b)?+.+` FROM testData p join testData2 ON p.key = testData2.a WHERE key < 3 -- !query 20 schema -struct +struct -- !query 20 output 1 11 1 1 1 2 1 11 2 1 1 2 @@ -194,7 +194,7 @@ struct -- !query 21 SELECT p.`(key)?+.+`, b, testdata2.`(b)?+.+` FROM testData p join testData2 ON p.key = testData2.a WHERE key < 3 -- !query 21 schema -struct +struct -- !query 21 output 1 11 1 1 1 2 1 11 2 1 1 2 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1e9283ef99dc..01dc28d70184 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2853,55 +2853,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } } - test("SPARK-25135: insert table may all null when select from view") { - withTempDir { dir => - val path = dir.getCanonicalPath - val cnt = 30 - spark.range(cnt).selectExpr("cast(id as bigint) as col1", "cast(id as bigint) as col2") - .write.mode(SaveMode.Overwrite).parquet(path) - withTable("table1", "table2", "table3", "table4") { - spark.sql(s"CREATE TABLE table1(col1 bigint, col2 bigint) using parquet location '$path'") - - withView("view1", "view2") { - spark.sql("CREATE VIEW view1 as select col1, col2 from table1 where col1 > -20") - - spark.sql("CREATE TABLE table2 (COL1 BIGINT, COL2 BIGINT) using parquet") - spark.sql("INSERT OVERWRITE TABLE table2 select COL1, COL2 from view1") - assert(spark.table("table2").count() === cnt) - checkAnswer(spark.table("table1"), spark.table("table2")) - - spark.sql("CREATE TABLE table3 (COL1 BIGINT) using parquet") - spark.sql("INSERT OVERWRITE TABLE table3 select COL1 from view1") - assert(spark.table("table3").count() === cnt) - checkAnswer(spark.table("table1").select("COL1"), spark.table("table3")) - - spark.sql("CREATE TABLE table4 (COL1 BIGINT, COL2 BIGINT, COL3 BIGINT) using parquet") - spark.sql("INSERT OVERWRITE TABLE table4 select COL1, COL1, COL2 from view1") - assert(spark.table("table4").count() === cnt) - checkAnswer(spark.table("table1").select("col1", "col1", "col2"), spark.table("table4")) - - spark.sql("INSERT OVERWRITE TABLE table4 select 1, COL1, COL2 from view1") - assert(spark.table("table4").count() === cnt) - checkAnswer(spark.table("table1").selectExpr("1", "col1", "col2"), spark.table("table4")) - - assertThrows[AnalysisException] { - spark.sql("INSERT OVERWRITE TABLE table4 select COL1, COL3, COL2 from view1") - } - - spark.sql("CREATE TEMP VIEW view2 as select col1, 1 as col2 from view1") - - spark.sql("INSERT OVERWRITE TABLE table2 select COL1, COL2 from view2") - assert(spark.table("table2").count() === cnt) - checkAnswer(spark.table("table1").selectExpr("col1", "1"), spark.table("table2")) - - spark.sql("INSERT OVERWRITE TABLE table2 select col1, COL2 from view2") - assert(spark.table("table2").count() === cnt) - checkAnswer(spark.table("table1").selectExpr("col1", "1"), spark.table("table2")) - } - } - } - } - test("SPARK-25144 'distinct' causes memory leak") { val ds = List(Foo(Some("bar"))).toDS val result = ds.flatMap(_.bar).distinct diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index dbf637783e6d..1cf6136fb077 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -891,6 +891,31 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-25135: insert parquet table may all null when select from view") { + withTempDir { dir => + val path = dir.getCanonicalPath + val cnt = 30 + val table1Path = s"$path/table1" + val table2Path = s"$path/table2" + spark.range(cnt).selectExpr("cast(id as bigint) as col1") + .write.mode(SaveMode.Overwrite).parquet(table1Path) + withTable("table1", "table2") { + spark.sql(s"CREATE TABLE table1(col1 bigint) using parquet location '$table1Path/'") + spark.sql(s"CREATE TABLE table2(COL1 bigint) using parquet location '$table2Path/'") + + withView("view1") { + spark.sql("CREATE VIEW view1 as select col1 from table1 where col1 > -20") + spark.sql("INSERT OVERWRITE TABLE table2 select COL1 from view1") + assert(spark.table("table2").count() === cnt) + spark.read.parquet(table2Path).schema.zip( + spark.table("table2").schema).foreach { case (actual, table) => + assert(actual.name.equals(table.name)) + } + } + } + } + } } object TestingUDT {