From f947e38f42b7ace2aeefb094424ee2c7447afac5 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 16 Nov 2023 00:45:04 -0800 Subject: [PATCH] [SPARK-45920][SQL] group by ordinal should be idempotent GROUP BY ordinal is not idempotent today. If the ordinal points to another integer literal and the plan get analyzed again, we will re-do the ordinal resolution which can lead to wrong result or index out-of-bound error. This PR fixes it by using a hack: if the ordinal points to another integer literal, don't replace the ordinal. For advanced users or Spark plugins, they may manipulate the logical plans directly. We need to make the framework more reliable. No new test no Closes #43797 from cloud-fan/group. Authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/analysis/Analyzer.scala | 14 ++++++++++- .../SubstituteUnresolvedOrdinalsSuite.scala | 23 +++++++++++++++++-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index b7d174089bc7..c2efac4c84ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1993,7 +1993,19 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor throw QueryCompilationErrors.groupByPositionRefersToAggregateFunctionError( index, ordinalExpr) } else { - ordinalExpr + trimAliases(ordinalExpr) match { + // HACK ALERT: If the ordinal expression is also an integer literal, don't use it + // but still keep the ordinal literal. The reason is we may repeatedly + // analyze the plan. Using a different integer literal may lead to + // a repeat GROUP BY ordinal resolution which is wrong. GROUP BY + // constant is meaningless so whatever value does not matter here. + // TODO: (SPARK-45932) GROUP BY ordinal should pull out grouping expressions to + // a Project, then the resolved ordinal expression is always + // `AttributeReference`. + case Literal(_: Int, IntegerType) => + Literal(index) + case _ => ordinalExpr + } } } else { throw QueryCompilationErrors.groupByPositionRangeError(index, aggs.size) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinalsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinalsSuite.scala index b0d7ace646e2..953b2c8bb101 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinalsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/SubstituteUnresolvedOrdinalsSuite.scala @@ -17,10 +17,11 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.catalyst.analysis.TestRelations.testRelation2 +import org.apache.spark.sql.catalyst.analysis.TestRelations.{testRelation, testRelation2} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal} +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.internal.SQLConf class SubstituteUnresolvedOrdinalsSuite extends AnalysisTest { @@ -67,4 +68,22 @@ class SubstituteUnresolvedOrdinalsSuite extends AnalysisTest { testRelation2.groupBy(Literal(1), Literal(2))($"a", $"b")) } } + + test("SPARK-45920: group by ordinal repeated analysis") { + val plan = testRelation.groupBy(Literal(1))(Literal(100).as("a")).analyze + comparePlans( + plan, + testRelation.groupBy(Literal(1))(Literal(100).as("a")) + ) + + val testRelationWithData = testRelation.copy(data = Seq(new GenericInternalRow(Array(1: Any)))) + // Copy the plan to reset its `analyzed` flag, so that analyzer rules will re-apply. + val copiedPlan = plan.transform { + case _: LocalRelation => testRelationWithData + } + comparePlans( + copiedPlan.analyze, // repeated analysis + testRelationWithData.groupBy(Literal(1))(Literal(100).as("a")) + ) + } }