From d45eab253484418852b5639feb42beb9e1372b1f Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Tue, 11 Jul 2023 08:37:52 +0800 Subject: [PATCH 01/12] [SPARK-43781][SQL] Fix IllegalStateException when cogrouping two datasets derived from the same source --- .../analysis/DeduplicateRelations.scala | 49 ++++++++++++++++++- .../org/apache/spark/sql/DatasetSuite.scala | 26 ++++++++++ 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 39dda9a13dad..b7a3dff4c5a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, AttributeSet, NamedExpression, OuterReference, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeSet, NamedExpression, OuterReference, SortOrder, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ @@ -220,7 +220,52 @@ object DeduplicateRelations extends Rule[LogicalPlan] { if (attrMap.isEmpty) { planWithNewChildren } else { - planWithNewChildren.rewriteAttrs(attrMap) + def rewriteAttrsMatchWithSubPlan( + attrs: Seq[Attribute], + attrMap: AttributeMap[Attribute], + planOutput: Seq[Attribute]): Seq[Attribute] = { + val canRewriteAttrs = attrMap.filter(a => planOutput.contains(a._2)) + attrs.map(attr => canRewriteAttrs.getOrElse(attr, attr)) + } + + def rewriteOrderMatchWithSubPlan( + attrs: Seq[SortOrder], + attrMap: AttributeMap[Attribute], + planOutput: Seq[Attribute]): Seq[SortOrder] = { + val canRewriteAttrs = attrMap.filter(a => planOutput.contains(a._2)) + attrs.filter(_.child.isInstanceOf[Attribute]).map(attr => { + if (canRewriteAttrs.contains(attr.child.asInstanceOf[Attribute])) { + attr.copy(child = canRewriteAttrs(attr.child.asInstanceOf[Attribute])) + } else { + attr + } + }) + } + + planWithNewChildren match { + case c @ CoGroup(_, keyDeserializer, leftDeserializer, rightDeserializer, + leftGroup, rightGroup, leftAttr, rightAttr, leftOrder, rightOrder, _, left, + right) => + // SPARK-43781: CoGroup is a special case, as it has different output attributes + // from its children. We need to update the output attributes of CoGroup manually. + val newLeftAttr = rewriteAttrsMatchWithSubPlan(leftAttr, attrMap, left.output) + val newRightAttr = rewriteAttrsMatchWithSubPlan(rightAttr, attrMap, right.output) + val newLeftGroup = rewriteAttrsMatchWithSubPlan(leftGroup, attrMap, left.output) + c.copy( + keyDeserializer = keyDeserializer.asInstanceOf[UnresolvedDeserializer] + .copy(inputAttributes = newLeftGroup), + leftDeserializer = leftDeserializer.asInstanceOf[UnresolvedDeserializer] + .copy(inputAttributes = newLeftAttr), + rightDeserializer = rightDeserializer.asInstanceOf[UnresolvedDeserializer] + .copy(inputAttributes = newRightAttr), + leftGroup = newLeftGroup, + rightGroup = rewriteAttrsMatchWithSubPlan(rightGroup, attrMap, right.output), + leftAttr = newLeftAttr, + rightAttr = newRightAttr, + leftOrder = rewriteOrderMatchWithSubPlan(leftOrder, attrMap, left.output), + rightOrder = rewriteOrderMatchWithSubPlan(rightOrder, attrMap, right.output)) + case _ => planWithNewChildren.rewriteAttrs(attrMap) + } } } else { planWithNewSubquery.withNewChildren(newChildren.toSeq) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index c09c6d18b665..4580d3ac264d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -916,6 +916,32 @@ class DatasetSuite extends QueryTest } } + test("SPARK-43781: cogroup two datasets derived from the same source") { + val inputType = StructType(Array(StructField("id", LongType, false), + StructField("type", StringType, false))) + val keyType = StructType(Array(StructField("id", LongType, false))) + + val inputRows = new java.util.ArrayList[Row]() + inputRows.add(Row(1L, "foo")) + inputRows.add(Row(1L, "bar")) + inputRows.add(Row(2L, "foo")) + val input = spark.createDataFrame(inputRows, inputType) + val fooGroups = input.filter("type = 'foo'").groupBy("id").as(RowEncoder(keyType), + RowEncoder(inputType)) + val barGroups = input.filter("type = 'bar'").groupBy("id").as(RowEncoder(keyType), + RowEncoder(inputType)) + + val result = fooGroups.cogroup(barGroups) { case (row, iterator, iterator1) => + iterator.toSeq ++ iterator1.toSeq + }(RowEncoder(inputType)).collect() + assert(result.length == 3) + + val result2 = fooGroups.cogroupSorted(barGroups)($"id")($"id") { + case (row, iterator, iterator1) => iterator.toSeq ++ iterator1.toSeq + }(RowEncoder(inputType)).collect() + assert(result2.length == 3) + } + test("SPARK-34806: observation on datasets") { val namedObservation = Observation("named") val unnamedObservation = Observation() From f45b8c6083958a6b4d904414b73698ad979d29fd Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 27 Jul 2023 19:15:51 +0800 Subject: [PATCH 02/12] fix merger problem --- .../scala/org/apache/spark/sql/DatasetSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 024b64bea687..44b7c577bac7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -926,19 +926,19 @@ class DatasetSuite extends QueryTest inputRows.add(Row(1L, "bar")) inputRows.add(Row(2L, "foo")) val input = spark.createDataFrame(inputRows, inputType) - val fooGroups = input.filter("type = 'foo'").groupBy("id").as(RowEncoder(keyType), - RowEncoder(inputType)) - val barGroups = input.filter("type = 'bar'").groupBy("id").as(RowEncoder(keyType), - RowEncoder(inputType)) + val fooGroups = input.filter("type = 'foo'").groupBy("id").as(ExpressionEncoder(keyType), + ExpressionEncoder(inputType)) + val barGroups = input.filter("type = 'bar'").groupBy("id").as(ExpressionEncoder(keyType), + ExpressionEncoder(inputType)) val result = fooGroups.cogroup(barGroups) { case (row, iterator, iterator1) => iterator.toSeq ++ iterator1.toSeq - }(RowEncoder(inputType)).collect() + }(ExpressionEncoder(inputType)).collect() assert(result.length == 3) val result2 = fooGroups.cogroupSorted(barGroups)($"id")($"id") { case (row, iterator, iterator1) => iterator.toSeq ++ iterator1.toSeq - }(RowEncoder(inputType)).collect() + }(ExpressionEncoder(inputType)).collect() assert(result2.length == 3) } From e111a40480e37e517482cc16cd1c470c447ad4a0 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Mon, 31 Jul 2023 16:42:54 +0800 Subject: [PATCH 03/12] fix review --- .../analysis/DeduplicateRelations.scala | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index b7a3dff4c5a3..4201733de5cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -243,27 +243,26 @@ object DeduplicateRelations extends Rule[LogicalPlan] { } planWithNewChildren match { - case c @ CoGroup(_, keyDeserializer, leftDeserializer, rightDeserializer, - leftGroup, rightGroup, leftAttr, rightAttr, leftOrder, rightOrder, _, left, - right) => + case c: CoGroup => // SPARK-43781: CoGroup is a special case, as it has different output attributes // from its children. We need to update the output attributes of CoGroup manually. - val newLeftAttr = rewriteAttrsMatchWithSubPlan(leftAttr, attrMap, left.output) - val newRightAttr = rewriteAttrsMatchWithSubPlan(rightAttr, attrMap, right.output) - val newLeftGroup = rewriteAttrsMatchWithSubPlan(leftGroup, attrMap, left.output) + val newLeftAttr = c.leftAttr.map(attr => attrMap.getOrElse(attr, attr)) + val newRightAttr = rewriteAttrsMatchWithSubPlan(c.rightAttr, attrMap, + c.right.output) + val newLeftGroup = rewriteAttrsMatchWithSubPlan(c.leftGroup, attrMap, c.left.output) c.copy( - keyDeserializer = keyDeserializer.asInstanceOf[UnresolvedDeserializer] + keyDeserializer = c.keyDeserializer.asInstanceOf[UnresolvedDeserializer] .copy(inputAttributes = newLeftGroup), - leftDeserializer = leftDeserializer.asInstanceOf[UnresolvedDeserializer] + leftDeserializer = c.leftDeserializer.asInstanceOf[UnresolvedDeserializer] .copy(inputAttributes = newLeftAttr), - rightDeserializer = rightDeserializer.asInstanceOf[UnresolvedDeserializer] + rightDeserializer = c.rightDeserializer.asInstanceOf[UnresolvedDeserializer] .copy(inputAttributes = newRightAttr), leftGroup = newLeftGroup, - rightGroup = rewriteAttrsMatchWithSubPlan(rightGroup, attrMap, right.output), + rightGroup = rewriteAttrsMatchWithSubPlan(c.rightGroup, attrMap, c.right.output), leftAttr = newLeftAttr, rightAttr = newRightAttr, - leftOrder = rewriteOrderMatchWithSubPlan(leftOrder, attrMap, left.output), - rightOrder = rewriteOrderMatchWithSubPlan(rightOrder, attrMap, right.output)) + leftOrder = rewriteOrderMatchWithSubPlan(c.leftOrder, attrMap, c.left.output), + rightOrder = rewriteOrderMatchWithSubPlan(c.rightOrder, attrMap, c.right.output)) case _ => planWithNewChildren.rewriteAttrs(attrMap) } } From 45f0505d901b4efe1b5d4cdcdfb9436ddc8bdc30 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Tue, 8 Aug 2023 20:32:44 +0800 Subject: [PATCH 04/12] update --- .../analysis/DeduplicateRelations.scala | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 4201733de5cc..3050107b6118 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeSet, NamedExpression, OuterReference, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, NamedExpression, OuterReference, SortOrder, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ @@ -233,12 +233,12 @@ object DeduplicateRelations extends Rule[LogicalPlan] { attrMap: AttributeMap[Attribute], planOutput: Seq[Attribute]): Seq[SortOrder] = { val canRewriteAttrs = attrMap.filter(a => planOutput.contains(a._2)) - attrs.filter(_.child.isInstanceOf[Attribute]).map(attr => { - if (canRewriteAttrs.contains(attr.child.asInstanceOf[Attribute])) { - attr.copy(child = canRewriteAttrs(attr.child.asInstanceOf[Attribute])) - } else { - attr + attrs.map(attr => { + attr.transformWithPruning(_.containsPattern(ATTRIBUTE_REFERENCE)) { + case a: AttributeReference => + canRewriteAttrs.getOrElse(a, a) } + attr }) } @@ -250,19 +250,22 @@ object DeduplicateRelations extends Rule[LogicalPlan] { val newRightAttr = rewriteAttrsMatchWithSubPlan(c.rightAttr, attrMap, c.right.output) val newLeftGroup = rewriteAttrsMatchWithSubPlan(c.leftGroup, attrMap, c.left.output) - c.copy( - keyDeserializer = c.keyDeserializer.asInstanceOf[UnresolvedDeserializer] - .copy(inputAttributes = newLeftGroup), - leftDeserializer = c.leftDeserializer.asInstanceOf[UnresolvedDeserializer] - .copy(inputAttributes = newLeftAttr), - rightDeserializer = c.rightDeserializer.asInstanceOf[UnresolvedDeserializer] - .copy(inputAttributes = newRightAttr), - leftGroup = newLeftGroup, - rightGroup = rewriteAttrsMatchWithSubPlan(c.rightGroup, attrMap, c.right.output), - leftAttr = newLeftAttr, - rightAttr = newRightAttr, - leftOrder = rewriteOrderMatchWithSubPlan(c.leftOrder, attrMap, c.left.output), - rightOrder = rewriteOrderMatchWithSubPlan(c.rightOrder, attrMap, c.right.output)) + val newRightGroup = rewriteAttrsMatchWithSubPlan(c.rightGroup, attrMap, + c.right.output) + val newLeftOrder = rewriteOrderMatchWithSubPlan(c.leftOrder, attrMap, + c.left.output) + val newRightOrder = rewriteOrderMatchWithSubPlan(c.rightOrder, attrMap, + c.right.output) + val newKeyDes = c.keyDeserializer.asInstanceOf[UnresolvedDeserializer] + .copy(inputAttributes = newLeftGroup) + val newLeftDes = c.leftDeserializer.asInstanceOf[UnresolvedDeserializer] + .copy(inputAttributes = newLeftAttr) + val newRightDes = c.rightDeserializer.asInstanceOf[UnresolvedDeserializer] + .copy(inputAttributes = newRightAttr) + c.copy(keyDeserializer = newKeyDes, leftDeserializer = newLeftDes, + rightDeserializer = newRightDes, leftGroup = newLeftGroup, + rightGroup = newRightGroup, leftAttr = newLeftAttr, rightAttr = newRightAttr, + leftOrder = newLeftOrder, rightOrder = newRightOrder) case _ => planWithNewChildren.rewriteAttrs(attrMap) } } From 3a9bad5ad79f851171c6d6ebeea5a20a669a5ba0 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 9 Aug 2023 20:57:49 +0800 Subject: [PATCH 05/12] update describe --- .../spark/sql/catalyst/analysis/DeduplicateRelations.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 3050107b6118..cac5275a4312 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -244,8 +244,9 @@ object DeduplicateRelations extends Rule[LogicalPlan] { planWithNewChildren match { case c: CoGroup => - // SPARK-43781: CoGroup is a special case, as it has different output attributes - // from its children. We need to update the output attributes of CoGroup manually. + // SPARK-43781: CoGroup is a special case, `rewriteAttrs` will incorrectly update + // some fields that do not need to be updated. We need to update the output + // attributes of CoGroup manually. val newLeftAttr = c.leftAttr.map(attr => attrMap.getOrElse(attr, attr)) val newRightAttr = rewriteAttrsMatchWithSubPlan(c.rightAttr, attrMap, c.right.output) From f8b0a284a9d25db2d9859e99ec1f8ba20829cac1 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 9 Aug 2023 21:38:23 +0800 Subject: [PATCH 06/12] update --- .../analysis/DeduplicateRelations.scala | 23 ++++++------------- 1 file changed, 7 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index cac5275a4312..3cf99f983f5d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, NamedExpression, OuterReference, SortOrder, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, NamedExpression, OuterReference, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ @@ -220,25 +220,16 @@ object DeduplicateRelations extends Rule[LogicalPlan] { if (attrMap.isEmpty) { planWithNewChildren } else { - def rewriteAttrsMatchWithSubPlan( - attrs: Seq[Attribute], + def rewriteAttrsMatchWithSubPlan[T <: Expression]( + attrs: Seq[T], attrMap: AttributeMap[Attribute], - planOutput: Seq[Attribute]): Seq[Attribute] = { - val canRewriteAttrs = attrMap.filter(a => planOutput.contains(a._2)) - attrs.map(attr => canRewriteAttrs.getOrElse(attr, attr)) - } - - def rewriteOrderMatchWithSubPlan( - attrs: Seq[SortOrder], - attrMap: AttributeMap[Attribute], - planOutput: Seq[Attribute]): Seq[SortOrder] = { + planOutput: Seq[Attribute]): Seq[T] = { val canRewriteAttrs = attrMap.filter(a => planOutput.contains(a._2)) attrs.map(attr => { attr.transformWithPruning(_.containsPattern(ATTRIBUTE_REFERENCE)) { case a: AttributeReference => canRewriteAttrs.getOrElse(a, a) - } - attr + }.asInstanceOf[T] }) } @@ -253,9 +244,9 @@ object DeduplicateRelations extends Rule[LogicalPlan] { val newLeftGroup = rewriteAttrsMatchWithSubPlan(c.leftGroup, attrMap, c.left.output) val newRightGroup = rewriteAttrsMatchWithSubPlan(c.rightGroup, attrMap, c.right.output) - val newLeftOrder = rewriteOrderMatchWithSubPlan(c.leftOrder, attrMap, + val newLeftOrder = rewriteAttrsMatchWithSubPlan(c.leftOrder, attrMap, c.left.output) - val newRightOrder = rewriteOrderMatchWithSubPlan(c.rightOrder, attrMap, + val newRightOrder = rewriteAttrsMatchWithSubPlan(c.rightOrder, attrMap, c.right.output) val newKeyDes = c.keyDeserializer.asInstanceOf[UnresolvedDeserializer] .copy(inputAttributes = newLeftGroup) From 8bf5fb3b5a9823fc8322e14b1c039305020aaaf7 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 9 Aug 2023 23:09:08 +0800 Subject: [PATCH 07/12] update --- .../analysis/DeduplicateRelations.scala | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 3cf99f983f5d..970374cac9e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -222,13 +222,11 @@ object DeduplicateRelations extends Rule[LogicalPlan] { } else { def rewriteAttrsMatchWithSubPlan[T <: Expression]( attrs: Seq[T], - attrMap: AttributeMap[Attribute], - planOutput: Seq[Attribute]): Seq[T] = { - val canRewriteAttrs = attrMap.filter(a => planOutput.contains(a._2)) + attrMap: Map[Attribute, Attribute]): Seq[T] = { attrs.map(attr => { attr.transformWithPruning(_.containsPattern(ATTRIBUTE_REFERENCE)) { case a: AttributeReference => - canRewriteAttrs.getOrElse(a, a) + attrMap.getOrElse(a, a) }.asInstanceOf[T] }) } @@ -238,16 +236,14 @@ object DeduplicateRelations extends Rule[LogicalPlan] { // SPARK-43781: CoGroup is a special case, `rewriteAttrs` will incorrectly update // some fields that do not need to be updated. We need to update the output // attributes of CoGroup manually. + val leftAttrMap = attrMap.filter(a => c.left.output.contains(a._2)) + val rightAttrMap = attrMap.filter(a => c.right.output.contains(a._2)) val newLeftAttr = c.leftAttr.map(attr => attrMap.getOrElse(attr, attr)) - val newRightAttr = rewriteAttrsMatchWithSubPlan(c.rightAttr, attrMap, - c.right.output) - val newLeftGroup = rewriteAttrsMatchWithSubPlan(c.leftGroup, attrMap, c.left.output) - val newRightGroup = rewriteAttrsMatchWithSubPlan(c.rightGroup, attrMap, - c.right.output) - val newLeftOrder = rewriteAttrsMatchWithSubPlan(c.leftOrder, attrMap, - c.left.output) - val newRightOrder = rewriteAttrsMatchWithSubPlan(c.rightOrder, attrMap, - c.right.output) + val newRightAttr = rewriteAttrsMatchWithSubPlan(c.rightAttr, rightAttrMap) + val newLeftGroup = rewriteAttrsMatchWithSubPlan(c.leftGroup, leftAttrMap) + val newRightGroup = rewriteAttrsMatchWithSubPlan(c.rightGroup, rightAttrMap) + val newLeftOrder = rewriteAttrsMatchWithSubPlan(c.leftOrder, leftAttrMap) + val newRightOrder = rewriteAttrsMatchWithSubPlan(c.rightOrder, rightAttrMap) val newKeyDes = c.keyDeserializer.asInstanceOf[UnresolvedDeserializer] .copy(inputAttributes = newLeftGroup) val newLeftDes = c.leftDeserializer.asInstanceOf[UnresolvedDeserializer] From b7d4ece3d504a595ce952338ab57bc8b615cba63 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Wed, 9 Aug 2023 23:49:42 +0800 Subject: [PATCH 08/12] update --- .../analysis/DeduplicateRelations.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 970374cac9e6..aa516cc5f586 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -220,11 +220,11 @@ object DeduplicateRelations extends Rule[LogicalPlan] { if (attrMap.isEmpty) { planWithNewChildren } else { - def rewriteAttrsMatchWithSubPlan[T <: Expression]( - attrs: Seq[T], + def rewriteAttrs[T <: Expression]( + exprs: Seq[T], attrMap: Map[Attribute, Attribute]): Seq[T] = { - attrs.map(attr => { - attr.transformWithPruning(_.containsPattern(ATTRIBUTE_REFERENCE)) { + exprs.map(expr => { + expr.transformWithPruning(_.containsPattern(ATTRIBUTE_REFERENCE)) { case a: AttributeReference => attrMap.getOrElse(a, a) }.asInstanceOf[T] @@ -238,12 +238,12 @@ object DeduplicateRelations extends Rule[LogicalPlan] { // attributes of CoGroup manually. val leftAttrMap = attrMap.filter(a => c.left.output.contains(a._2)) val rightAttrMap = attrMap.filter(a => c.right.output.contains(a._2)) - val newLeftAttr = c.leftAttr.map(attr => attrMap.getOrElse(attr, attr)) - val newRightAttr = rewriteAttrsMatchWithSubPlan(c.rightAttr, rightAttrMap) - val newLeftGroup = rewriteAttrsMatchWithSubPlan(c.leftGroup, leftAttrMap) - val newRightGroup = rewriteAttrsMatchWithSubPlan(c.rightGroup, rightAttrMap) - val newLeftOrder = rewriteAttrsMatchWithSubPlan(c.leftOrder, leftAttrMap) - val newRightOrder = rewriteAttrsMatchWithSubPlan(c.rightOrder, rightAttrMap) + val newLeftAttr = rewriteAttrs(c.leftAttr, leftAttrMap) + val newRightAttr = rewriteAttrs(c.rightAttr, rightAttrMap) + val newLeftGroup = rewriteAttrs(c.leftGroup, leftAttrMap) + val newRightGroup = rewriteAttrs(c.rightGroup, rightAttrMap) + val newLeftOrder = rewriteAttrs(c.leftOrder, leftAttrMap) + val newRightOrder = rewriteAttrs(c.rightOrder, rightAttrMap) val newKeyDes = c.keyDeserializer.asInstanceOf[UnresolvedDeserializer] .copy(inputAttributes = newLeftGroup) val newLeftDes = c.leftDeserializer.asInstanceOf[UnresolvedDeserializer] From f88d1ea4bb25c8118b34cdd6a940bc7dbb82f0a3 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 10 Aug 2023 13:26:25 +0800 Subject: [PATCH 09/12] update --- .../spark/sql/catalyst/analysis/DeduplicateRelations.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index aa516cc5f586..790ed9c76614 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -223,12 +223,11 @@ object DeduplicateRelations extends Rule[LogicalPlan] { def rewriteAttrs[T <: Expression]( exprs: Seq[T], attrMap: Map[Attribute, Attribute]): Seq[T] = { - exprs.map(expr => { + exprs.map(expr => expr.transformWithPruning(_.containsPattern(ATTRIBUTE_REFERENCE)) { case a: AttributeReference => attrMap.getOrElse(a, a) - }.asInstanceOf[T] - }) + }.asInstanceOf[T]) } planWithNewChildren match { From 410b2369e22b9e8606df0bbe4083433ccbd9a5d9 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 10 Aug 2023 14:26:28 +0800 Subject: [PATCH 10/12] update --- .../spark/sql/catalyst/analysis/DeduplicateRelations.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 790ed9c76614..7d34de551558 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -223,11 +223,12 @@ object DeduplicateRelations extends Rule[LogicalPlan] { def rewriteAttrs[T <: Expression]( exprs: Seq[T], attrMap: Map[Attribute, Attribute]): Seq[T] = { - exprs.map(expr => + exprs.map { expr => expr.transformWithPruning(_.containsPattern(ATTRIBUTE_REFERENCE)) { case a: AttributeReference => attrMap.getOrElse(a, a) - }.asInstanceOf[T]) + }.asInstanceOf[T] + } } planWithNewChildren match { From acb9455ab4e64bc8fb5d79ec26480175edad8cb9 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 10 Aug 2023 14:54:42 +0800 Subject: [PATCH 11/12] update --- .../spark/sql/catalyst/analysis/DeduplicateRelations.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 7d34de551558..2f91b56cc0d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -225,13 +225,13 @@ object DeduplicateRelations extends Rule[LogicalPlan] { attrMap: Map[Attribute, Attribute]): Seq[T] = { exprs.map { expr => expr.transformWithPruning(_.containsPattern(ATTRIBUTE_REFERENCE)) { - case a: AttributeReference => - attrMap.getOrElse(a, a) + case a: AttributeReference => attrMap.getOrElse(a, a) }.asInstanceOf[T] } } planWithNewChildren match { + // TODO: we should handle all special cases here. SPARK-44754 case c: CoGroup => // SPARK-43781: CoGroup is a special case, `rewriteAttrs` will incorrectly update // some fields that do not need to be updated. We need to update the output From 7f54c5c17f2bca6d198101b185305b5d77d626a7 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 10 Aug 2023 14:55:40 +0800 Subject: [PATCH 12/12] update --- .../spark/sql/catalyst/analysis/DeduplicateRelations.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 2f91b56cc0d0..153cdb5c69a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -231,7 +231,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] { } planWithNewChildren match { - // TODO: we should handle all special cases here. SPARK-44754 + // TODO (SPARK-44754): we should handle all special cases here. case c: CoGroup => // SPARK-43781: CoGroup is a special case, `rewriteAttrs` will incorrectly update // some fields that do not need to be updated. We need to update the output