From 4f4d7db51ea896d8506bdbee4b764d0c13f994a6 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 30 May 2016 17:04:45 +0800 Subject: [PATCH 1/4] Fix transformed Dataset attributes resolve failure --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++++ .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 9 +++++++++ 2 files changed, 13 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index bf221e0d7cfc4..eb46c0e72ecd9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -524,6 +524,10 @@ class Analyzer( val newVersion = oldVersion.newInstance() (oldVersion, newVersion) + case oldVersion: SerializeFromObject + if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty => + (oldVersion, oldVersion.copy(serializer = oldVersion.serializer.map(_.newInstance()))) + // Handle projects that create conflicting aliases. case oldVersion @ Project(projectList, _) if findAliases(projectList).intersect(conflictingAttributes).nonEmpty => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 0b6874e3b8ad3..0ece79d2563fa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -767,6 +767,15 @@ class DatasetSuite extends QueryTest with SharedSQLContext { assertResult(Seq(ClassData("bar", 2))) { ds.filter(_.b > 1).collect().toSeq } + + test("transformed dataset correctly solve the attributes") { + val dataset = Seq(1, 2, 3).toDS() + val ds1 = dataset.map(_ + 1).as("d1") + val ds2 = dataset.map(_ + 2).as("d2") + + checkDataset(ds1.joinWith(ds2, $"d1.value" === $"d2.value"), (3, 3), (4, 4)) + checkDataset(ds1.intersect(ds2), 3, 4) + checkDataset(ds1.except(ds2), 2) } test("SPARK-15441: Dataset outer join") { From e3d0893fb12a3ab4b198356c7718ce0be1777654 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 30 May 2016 18:59:53 +0800 Subject: [PATCH 2/4] Style fix --- sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 0ece79d2563fa..e0a3454790652 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -767,6 +767,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { assertResult(Seq(ClassData("bar", 2))) { ds.filter(_.b > 1).collect().toSeq } + } test("transformed dataset correctly solve the attributes") { val dataset = Seq(1, 2, 3).toDS() From 4f5179180fa82c5b9cea674899c333a26c21b9b0 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 1 Jun 2016 14:53:41 +0800 Subject: [PATCH 3/4] Update the unittest --- .../test/scala/org/apache/spark/sql/DatasetSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index e0a3454790652..f7f31da7ca2c0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -771,12 +771,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext { test("transformed dataset correctly solve the attributes") { val dataset = Seq(1, 2, 3).toDS() - val ds1 = dataset.map(_ + 1).as("d1") - val ds2 = dataset.map(_ + 2).as("d2") + val ds1 = dataset.map(_ + 1) - checkDataset(ds1.joinWith(ds2, $"d1.value" === $"d2.value"), (3, 3), (4, 4)) - checkDataset(ds1.intersect(ds2), 3, 4) - checkDataset(ds1.except(ds2), 2) + checkDataset(ds1.as("d1").joinWith(ds1.as("d2"), $"d1.value" === $"d2.value"), + (2, 2), (3, 3), (4, 4)) + checkDataset(ds1.as("d1").intersect(ds1.as("d2")), 2, 3, 4) + checkDataset(ds1.as("d1").except(ds1.as("d2"))) } test("SPARK-15441: Dataset outer join") { From dcca095c4a7fa3f2094e72e877a086698bd3046c Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 2 Jun 2016 10:14:04 +0800 Subject: [PATCH 4/4] Address the comments --- .../org/apache/spark/sql/DatasetSuite.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index f7f31da7ca2c0..a3881ff920159 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -769,14 +769,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext { } } - test("transformed dataset correctly solve the attributes") { - val dataset = Seq(1, 2, 3).toDS() - val ds1 = dataset.map(_ + 1) - - checkDataset(ds1.as("d1").joinWith(ds1.as("d2"), $"d1.value" === $"d2.value"), - (2, 2), (3, 3), (4, 4)) - checkDataset(ds1.as("d1").intersect(ds1.as("d2")), 2, 3, 4) - checkDataset(ds1.as("d1").except(ds1.as("d2"))) + test("mapped dataset should resolve duplicated attributes for self join") { + val ds = Seq(1, 2, 3).toDS().map(_ + 1) + val ds1 = ds.as("d1") + val ds2 = ds.as("d2") + + checkDataset(ds1.joinWith(ds2, $"d1.value" === $"d2.value"), (2, 2), (3, 3), (4, 4)) + checkDataset(ds1.intersect(ds2), 2, 3, 4) + checkDataset(ds1.except(ds1)) } test("SPARK-15441: Dataset outer join") {