From 9fff5a165de2ef257e7879e66bf44582f28de3d4 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 22 Dec 2023 11:13:23 +0800 Subject: [PATCH 1/4] init --- .../spark/sql/catalyst/plans/logical/AnalysisHelper.scala | 5 ++++- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 6 +++++- .../org/apache/spark/sql/catalyst/rules/RuleExecutor.scala | 4 ++++ 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala index 56f6b116759a7..0f51a892c37ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala @@ -165,7 +165,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan => ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[LogicalPlan, LogicalPlan]) : LogicalPlan = { if (!analyzed && cond.apply(self) && !isRuleIneffective(ruleId)) { - AnalysisHelper.allowInvokingTransformsInAnalyzer { + val newPlan = AnalysisHelper.allowInvokingTransformsInAnalyzer { val afterRule = CurrentOrigin.withOrigin(origin) { rule.applyOrElse(self, identity[LogicalPlan]) } @@ -183,6 +183,9 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan => afterRule.mapChildren(_.resolveOperatorsDownWithPruning(cond, ruleId)(rule)) } } + self.getTagValue(LogicalPlan.PLAN_ID_TAG) + .foreach(id => newPlan.setTagValue(LogicalPlan.PLAN_ID_TAG, id)) + newPlan } else { self } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index cce385e8d9d16..80d443f04174f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -102,7 +102,11 @@ abstract class LogicalPlan */ lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved - override protected def statePrefix = if (!resolved) "'" else super.statePrefix + override protected def statePrefix = { + var prefix = if (!resolved) "'" else super.statePrefix + this.getTagValue(LogicalPlan.PLAN_ID_TAG).foreach(id => prefix += s"(planId=$id)") + prefix + } /** * Returns true if all its children of this query plan have been resolved. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index d5cd5a90e3382..50b9695e61520 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -45,6 +45,7 @@ object RuleExecutor { } } +// scalastyle:off println class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { private val logLevel = SQLConf.get.planChangeLogLevel @@ -63,6 +64,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { """.stripMargin } + println(message()) logBasedOnLevel(message()) } } @@ -81,6 +83,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { } } + println(message()) logBasedOnLevel(message()) } } @@ -97,6 +100,7 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { |Total time of effective runs: $totalTimeEffective seconds """.stripMargin + println(message) logBasedOnLevel(message) } From d8b6c4630d43ce74f80e4a9e1f2545d3822ce596 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 22 Dec 2023 11:27:17 +0800 Subject: [PATCH 2/4] add test --- python/pyspark/sql/tests/test_dataframe.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/pyspark/sql/tests/test_dataframe.py b/python/pyspark/sql/tests/test_dataframe.py index 692cf77d9afb9..c5c7dae4aeecb 100644 --- a/python/pyspark/sql/tests/test_dataframe.py +++ b/python/pyspark/sql/tests/test_dataframe.py @@ -963,6 +963,16 @@ def test_unpivot_negative(self): ): df.unpivot("id", ["int", "str"], "var", "val").collect() + def test_melt_groupby(self): + df = self.spark.createDataFrame( + [(1, 2, 3, 4, 5, 6)], + ["f1", "f2", "label", "pred", "model_version", "ts"], + ) + self.assertEqual( + df.melt("model_version", ["label", "f2"], "f1", "f2").groupby("f1").count().count(), + 2, + ) + def test_observe(self): # SPARK-36263: tests the DataFrame.observe(Observation, *Column) method from pyspark.sql import Observation From 6b201051c45171518939cfb8a0426ed1197845e9 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 22 Dec 2023 11:58:52 +0800 Subject: [PATCH 3/4] remove debug code --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 6 +----- .../org/apache/spark/sql/catalyst/rules/RuleExecutor.scala | 4 ---- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 80d443f04174f..cce385e8d9d16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -102,11 +102,7 @@ abstract class LogicalPlan */ lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved - override protected def statePrefix = { - var prefix = if (!resolved) "'" else super.statePrefix - this.getTagValue(LogicalPlan.PLAN_ID_TAG).foreach(id => prefix += s"(planId=$id)") - prefix - } + override protected def statePrefix = if (!resolved) "'" else super.statePrefix /** * Returns true if all its children of this query plan have been resolved. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 50b9695e61520..d5cd5a90e3382 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -45,7 +45,6 @@ object RuleExecutor { } } -// scalastyle:off println class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { private val logLevel = SQLConf.get.planChangeLogLevel @@ -64,7 +63,6 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { """.stripMargin } - println(message()) logBasedOnLevel(message()) } } @@ -83,7 +81,6 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { } } - println(message()) logBasedOnLevel(message()) } } @@ -100,7 +97,6 @@ class PlanChangeLogger[TreeType <: TreeNode[_]] extends Logging { |Total time of effective runs: $totalTimeEffective seconds """.stripMargin - println(message) logBasedOnLevel(message) } From eb5f2b42f59a8c454055e629cd9d2eb5bc91b3f6 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 22 Dec 2023 12:11:34 +0800 Subject: [PATCH 4/4] helper function --- .../catalyst/plans/logical/AnalysisHelper.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala index 0f51a892c37ea..3403e3c5b71cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/AnalysisHelper.scala @@ -165,7 +165,7 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan => ruleId: RuleId = UnknownRuleId)(rule: PartialFunction[LogicalPlan, LogicalPlan]) : LogicalPlan = { if (!analyzed && cond.apply(self) && !isRuleIneffective(ruleId)) { - val newPlan = AnalysisHelper.allowInvokingTransformsInAnalyzer { + AnalysisHelper.allowInvokingTransformsInAnalyzer { val afterRule = CurrentOrigin.withOrigin(origin) { rule.applyOrElse(self, identity[LogicalPlan]) } @@ -177,20 +177,24 @@ trait AnalysisHelper extends QueryPlan[LogicalPlan] { self: LogicalPlan => self.markRuleAsIneffective(ruleId) self } else { - rewritten_plan + copyPlanIdTag(self, rewritten_plan) } } else { - afterRule.mapChildren(_.resolveOperatorsDownWithPruning(cond, ruleId)(rule)) + copyPlanIdTag(self, + afterRule.mapChildren(_.resolveOperatorsDownWithPruning(cond, ruleId)(rule))) } } - self.getTagValue(LogicalPlan.PLAN_ID_TAG) - .foreach(id => newPlan.setTagValue(LogicalPlan.PLAN_ID_TAG, id)) - newPlan } else { self } } + def copyPlanIdTag(oldPlan: LogicalPlan, newPlan: LogicalPlan): LogicalPlan = { + oldPlan.getTagValue(LogicalPlan.PLAN_ID_TAG) + .foreach(id => newPlan.setTagValue(LogicalPlan.PLAN_ID_TAG, id)) + newPlan + } + /** * A variant of `transformUpWithNewOutput`, which skips touching already analyzed plan. */