From 13f54a93c0cf31a38455e90aec722e890af980c6 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 12 Jan 2017 10:59:22 -0800 Subject: [PATCH 1/3] Rewrite Alias in StreamExecution if necessary --- .../execution/streaming/StreamExecution.scala | 7 ++++- .../spark/sql/streaming/StreamSuite.scala | 26 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index a35950e2dc17..86668f8265f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.ExplainCommand @@ -495,8 +495,13 @@ class StreamExecution( // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) + val exprIdMap = + replacements.map { case (oldAttr, newAttr) => (oldAttr.exprId, newAttr.exprId)}.toMap val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case a: Alias if exprIdMap.contains(a.exprId) => + // Also rewrite `Alias`s as they may use the same `exprId` of `Attribute`s. + Alias(a.child, a.name)(exprIdMap(a.exprId), a.qualifier, a.explicitMetadata, a.isGenerated) case ct: CurrentTimestamp => CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, ct.dataType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index e964e646d22a..5bb4d063040a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -304,6 +304,32 @@ class StreamSuite extends StreamTest { q.stop() } } + + test("SPARK-19065 Alia should be replaced as well") { + withTempPath { testPath => + val data = Seq((1, 2), (2, 3), (3, 4)) + data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath) + val schema = spark.read.json(testPath.getCanonicalPath).schema + val query = spark + .readStream + .schema(schema) + .json(testPath.getCanonicalPath) + .dropDuplicates("_1") // dropDuplicates will create an Alias using the same exprId. + .writeStream + .format("memory") + .queryName("testquery") + .outputMode("complete") + .start() + try { + query.processAllAvailable() + if (query.exception.isDefined) { + throw query.exception.get + } + } finally { + query.stop() + } + } + } } /** From 4f38e3b6fb753f2662b90228b332b51f1dab43d5 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Thu, 12 Jan 2017 14:03:06 -0800 Subject: [PATCH 2/3] Don't inherit expression id in dropDuplicates --- .../scala/org/apache/spark/sql/Dataset.scala | 5 +--- .../execution/streaming/StreamExecution.scala | 7 +---- .../org/apache/spark/sql/DatasetSuite.scala | 15 +++++++---- .../spark/sql/streaming/StreamSuite.scala | 26 ------------------- 4 files changed, 12 insertions(+), 41 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 1a7a5ba79807..24b9b810fc5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2003,10 +2003,7 @@ class Dataset[T] private[sql]( if (groupColExprIds.contains(attr.exprId)) { attr } else { - // Removing duplicate rows should not change output attributes. We should keep - // the original exprId of the attribute. Otherwise, to select a column in original - // dataset will cause analysis exception due to unresolved attribute. - Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId) + Alias(new First(attr).toAggregateExpression(), attr.name)() } } Aggregate(groupCols, aggCols, logicalPlan) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 86668f8265f7..a35950e2dc17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.RowEncoder -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.ExplainCommand @@ -495,13 +495,8 @@ class StreamExecution( // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) - val exprIdMap = - replacements.map { case (oldAttr, newAttr) => (oldAttr.exprId, newAttr.exprId)}.toMap val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a) - case a: Alias if exprIdMap.contains(a.exprId) => - // Also rewrite `Alias`s as they may use the same `exprId` of `Attribute`s. - Alias(a.child, a.name)(exprIdMap(a.exprId), a.qualifier, a.explicitMetadata, a.isGenerated) case ct: CurrentTimestamp => CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, ct.dataType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 731a28c237ba..a8cc92405277 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -21,6 +21,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} +import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SortExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange} @@ -898,11 +899,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { (1, 2), (1, 1), (2, 1), (2, 2)) } - test("dropDuplicates should not change child plan output") { - val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS() - checkDataset( - ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]), - ("a", 1), ("b", 1)) + test("SPARK-19065 dropDuplicates should not create expressions using the same id") { + val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS().dropDuplicates("_1") + var exprs = Set.empty[NamedExpression] + ds.logicalPlan.transformAllExpressions { case e: NamedExpression => + exprs += e + e + } + val duplicatedExprs = exprs.groupBy(expr => expr.exprId).filter(_._2.size > 1).values + assert(duplicatedExprs.isEmpty) } test("SPARK-16097: Encoders.tuple should handle null object correctly") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 5bb4d063040a..e964e646d22a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -304,32 +304,6 @@ class StreamSuite extends StreamTest { q.stop() } } - - test("SPARK-19065 Alia should be replaced as well") { - withTempPath { testPath => - val data = Seq((1, 2), (2, 3), (3, 4)) - data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath) - val schema = spark.read.json(testPath.getCanonicalPath).schema - val query = spark - .readStream - .schema(schema) - .json(testPath.getCanonicalPath) - .dropDuplicates("_1") // dropDuplicates will create an Alias using the same exprId. - .writeStream - .format("memory") - .queryName("testquery") - .outputMode("complete") - .start() - try { - query.processAllAvailable() - if (query.exception.isDefined) { - throw query.exception.get - } - } finally { - query.stop() - } - } - } } /** From 26652a09be891de4a26fe54e4d3755b1cd42094f Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Mon, 16 Jan 2017 23:27:08 -0800 Subject: [PATCH 3/3] Add a regression test --- .../org/apache/spark/sql/DatasetSuite.scala | 12 --------- .../spark/sql/streaming/StreamSuite.scala | 26 +++++++++++++++++++ 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index a8cc92405277..b37bf131e8dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -21,7 +21,6 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} -import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SortExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchange} @@ -899,17 +898,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext { (1, 2), (1, 1), (2, 1), (2, 2)) } - test("SPARK-19065 dropDuplicates should not create expressions using the same id") { - val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS().dropDuplicates("_1") - var exprs = Set.empty[NamedExpression] - ds.logicalPlan.transformAllExpressions { case e: NamedExpression => - exprs += e - e - } - val duplicatedExprs = exprs.groupBy(expr => expr.exprId).filter(_._2.size > 1).values - assert(duplicatedExprs.isEmpty) - } - test("SPARK-16097: Encoders.tuple should handle null object correctly") { val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING) val data = Seq((("a", "b"), "c"), (null, "d")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index e964e646d22a..f31dc8add48d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -304,6 +304,32 @@ class StreamSuite extends StreamTest { q.stop() } } + + test("SPARK-19065: dropDuplicates should not create expressions using the same id") { + withTempPath { testPath => + val data = Seq((1, 2), (2, 3), (3, 4)) + data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath) + val schema = spark.read.json(testPath.getCanonicalPath).schema + val query = spark + .readStream + .schema(schema) + .json(testPath.getCanonicalPath) + .dropDuplicates("_1") + .writeStream + .format("memory") + .queryName("testquery") + .outputMode("complete") + .start() + try { + query.processAllAvailable() + if (query.exception.isDefined) { + throw query.exception.get + } + } finally { + query.stop() + } + } + } } /**