From 11eb148866a72eb47a6c097fd3dfa66ba8c37a97 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 3 Apr 2020 16:24:35 +0800 Subject: [PATCH 01/12] save --- .../sql/catalyst/analysis/Analyzer.scala | 76 +++++++++---------- .../org/apache/spark/sql/SQLQuerySuite.scala | 40 ++++++++++ 2 files changed, 78 insertions(+), 38 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index edcfd6fe8ab6..a6780cbd9a26 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -211,44 +211,44 @@ class Analyzer( EliminateUnions, new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, - ResolveTableValuedFunctions :: - ResolveNamespace(catalogManager) :: - new ResolveCatalogs(catalogManager) :: - ResolveInsertInto :: - ResolveRelations :: - ResolveTables :: - ResolveReferences :: - ResolveCreateNamedStruct :: - ResolveDeserializer :: - ResolveNewInstance :: - ResolveUpCast :: - ResolveGroupingAnalytics :: - ResolvePivot :: - ResolveOrdinalInOrderByAndGroupBy :: - ResolveAggAliasInGroupBy :: - ResolveMissingReferences :: - ExtractGenerator :: - ResolveGenerate :: - ResolveFunctions :: - ResolveAliases :: - ResolveSubquery :: - ResolveSubqueryColumnAliases :: - ResolveWindowOrder :: - ResolveWindowFrame :: - ResolveNaturalAndUsingJoin :: - ResolveOutputRelation :: - ExtractWindowExpressions :: - GlobalAggregates :: - ResolveAggregateFunctions :: - TimeWindowing :: - ResolveInlineTables(conf) :: - ResolveHigherOrderFunctions(v1SessionCatalog) :: - ResolveLambdaVariables(conf) :: - ResolveTimeZone(conf) :: - ResolveRandomSeed :: - ResolveBinaryArithmetic(conf) :: - TypeCoercion.typeCoercionRules(conf) ++ - extendedResolutionRules : _*), + (ResolveTableValuedFunctions +: + ResolveNamespace(catalogManager) +: + new ResolveCatalogs(catalogManager) +: + ResolveInsertInto +: + ResolveRelations +: + ResolveTables +: + ResolveReferences +: + ResolveCreateNamedStruct +: + ResolveDeserializer +: + ResolveNewInstance +: + ResolveUpCast +: + ResolveGroupingAnalytics +: + ResolvePivot +: + ResolveOrdinalInOrderByAndGroupBy +: + ResolveAggAliasInGroupBy +: + ResolveMissingReferences +: + ExtractGenerator +: + ResolveGenerate +: + ResolveFunctions +: + ResolveAliases +: + ResolveSubquery +: + ResolveSubqueryColumnAliases +: + ResolveWindowOrder +: + ResolveWindowFrame +: + ResolveNaturalAndUsingJoin +: + ResolveOutputRelation +: + ExtractWindowExpressions +: + GlobalAggregates +: + TypeCoercion.typeCoercionRules(conf) :+ + ResolveAggregateFunctions :+ + TimeWindowing :+ + ResolveInlineTables(conf) :+ + ResolveHigherOrderFunctions(v1SessionCatalog) :+ + ResolveLambdaVariables(conf) :+ + ResolveTimeZone(conf) :+ + ResolveRandomSeed :+ + ResolveBinaryArithmetic(conf)) + .++:(extendedResolutionRules): _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("Normalize Alter Table", Once, ResolveAlterTableChanges), Batch("Remove Unresolved Hints", Once, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index de0f7801a39a..f6610e4a7cdf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3493,6 +3493,46 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark sql("(SELECT map()) UNION ALL (SELECT map(1, 2))"), Seq(Row(Map[Int, Int]()), Row(Map(1 -> 2)))) } + + + test("SPARK-31334: TypeCoercion should before then ResolveAggregateFunctions") { + Seq( + (1, 3), + (2, 3), + (3, 6), + (4, 7), + (5, 9), + (6, 9) + ).toDF("a", "b").createOrReplaceTempView("testData1") + + checkAnswer(sql( + """ + | SELECT b, sum(a) as a + | FROM testData1 + | GROUP BY b + | HAVING sum(a) > 3 + """.stripMargin), + Row(7, 4) :: Row(9, 11) :: Nil) + + Seq( + ("1", 3), + ("2", 3), + ("3", 6), + ("4", 7), + ("5", 9), + ("6", 9) + ).toDF("a", "b").createOrReplaceTempView("testData2") + + checkAnswer(sql( + """ + | SELECT b, sum(a) as a + | FROM testData2 + | GROUP BY b + | HAVING sum(a) > 3 + """.stripMargin), + Row(7, 4.0) :: Row(9, 11.0) :: Nil) + } + } case class Foo(bar: Option[String]) From df257df1b2e1d7915c222cae8c3a4589a6d8293d Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 3 Apr 2020 22:22:22 +0800 Subject: [PATCH 02/12] Update Analyzer.scala --- .../sql/catalyst/analysis/Analyzer.scala | 109 +++++++++++------- 1 file changed, 69 insertions(+), 40 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d7fe014d2fc1..e0b9a2bd6659 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -211,44 +211,44 @@ class Analyzer( EliminateUnions, new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, - (ResolveTableValuedFunctions +: - ResolveNamespace(catalogManager) +: - new ResolveCatalogs(catalogManager) +: - ResolveInsertInto +: - ResolveRelations +: - ResolveTables +: - ResolveReferences +: - ResolveCreateNamedStruct +: - ResolveDeserializer +: - ResolveNewInstance +: - ResolveUpCast +: - ResolveGroupingAnalytics +: - ResolvePivot +: - ResolveOrdinalInOrderByAndGroupBy +: - ResolveAggAliasInGroupBy +: - ResolveMissingReferences +: - ExtractGenerator +: - ResolveGenerate +: - ResolveFunctions +: - ResolveAliases +: - ResolveSubquery +: - ResolveSubqueryColumnAliases +: - ResolveWindowOrder +: - ResolveWindowFrame +: - ResolveNaturalAndUsingJoin +: - ResolveOutputRelation +: - ExtractWindowExpressions +: - GlobalAggregates +: - TypeCoercion.typeCoercionRules(conf) :+ - ResolveAggregateFunctions :+ - TimeWindowing :+ - ResolveInlineTables(conf) :+ - ResolveHigherOrderFunctions(v1SessionCatalog) :+ - ResolveLambdaVariables(conf) :+ - ResolveTimeZone(conf) :+ - ResolveRandomSeed :+ - ResolveBinaryArithmetic(conf)) - .++:(extendedResolutionRules): _*), + ResolveTableValuedFunctions :: + ResolveNamespace(catalogManager) :: + new ResolveCatalogs(catalogManager) :: + ResolveInsertInto :: + ResolveRelations :: + ResolveTables :: + ResolveReferences :: + ResolveCreateNamedStruct :: + ResolveDeserializer :: + ResolveNewInstance :: + ResolveUpCast :: + ResolveGroupingAnalytics :: + ResolvePivot :: + ResolveOrdinalInOrderByAndGroupBy :: + ResolveAggAliasInGroupBy :: + ResolveMissingReferences :: + ExtractGenerator :: + ResolveGenerate :: + ResolveFunctions :: + ResolveAliases :: + ResolveSubquery :: + ResolveSubqueryColumnAliases :: + ResolveWindowOrder :: + ResolveWindowFrame :: + ResolveNaturalAndUsingJoin :: + ResolveOutputRelation :: + ExtractWindowExpressions :: + GlobalAggregates :: + ResolveAggregateFunctions :: + TimeWindowing :: + ResolveInlineTables(conf) :: + ResolveHigherOrderFunctions(v1SessionCatalog) :: + ResolveLambdaVariables(conf) :: + ResolveTimeZone(conf) :: + ResolveRandomSeed :: + ResolveBinaryArithmetic(conf) :: + TypeCoercion.typeCoercionRules(conf) ++ + extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("Normalize Alter Table", Once, ResolveAlterTableChanges), Batch("Remove Unresolved Hints", Once, @@ -1393,7 +1393,24 @@ class Analyzer( case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") - q.mapExpressions(resolveExpressionTopDown(_, q)) + q.mapExpressions { e => + q match { + case _: Filter if containsAggregate(e) => + e + case _ => + resolveExpressionTopDown(e, q) + } + } + } + + def containsAggregate(e: Expression): Boolean = { + e.find { + case func: UnresolvedFunction => + v1SessionCatalog.lookupFunction(func.name, func.arguments) + .isInstanceOf[AggregateFunction] + case _ => + false + }.isDefined || e.find(_.isInstanceOf[AggregateExpression]).isDefined } def resolveAssignments( @@ -1679,7 +1696,9 @@ class Analyzer( Project(child.output, newSort) } - case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved => + case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved + && !containsAggregate(cond) => + val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) if (child.output == newChild.output) { f.copy(condition = newCond.head) @@ -1690,6 +1709,16 @@ class Analyzer( } } + def containsAggregate(e: Expression): Boolean = { + e.find { + case func: UnresolvedFunction => + v1SessionCatalog.lookupFunction(func.name, func.arguments) + .isInstanceOf[AggregateFunction] + case _ => + false + }.isDefined || e.find(_.isInstanceOf[AggregateExpression]).isDefined + } + /** * This method tries to resolve expressions and find missing attributes recursively. Specially, * when the expressions used in `Sort` or `Filter` contain unresolved attributes or resolved From 54df9d330d7d373216878813a0b9fd881b625a19 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 3 Apr 2020 22:28:30 +0800 Subject: [PATCH 03/12] Update Analyzer.scala --- .../sql/catalyst/analysis/Analyzer.scala | 74 +++++++++---------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e0b9a2bd6659..297c3c937aae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -212,43 +212,43 @@ class Analyzer( new SubstituteUnresolvedOrdinals(conf)), Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: - ResolveNamespace(catalogManager) :: - new ResolveCatalogs(catalogManager) :: - ResolveInsertInto :: - ResolveRelations :: - ResolveTables :: - ResolveReferences :: - ResolveCreateNamedStruct :: - ResolveDeserializer :: - ResolveNewInstance :: - ResolveUpCast :: - ResolveGroupingAnalytics :: - ResolvePivot :: - ResolveOrdinalInOrderByAndGroupBy :: - ResolveAggAliasInGroupBy :: - ResolveMissingReferences :: - ExtractGenerator :: - ResolveGenerate :: - ResolveFunctions :: - ResolveAliases :: - ResolveSubquery :: - ResolveSubqueryColumnAliases :: - ResolveWindowOrder :: - ResolveWindowFrame :: - ResolveNaturalAndUsingJoin :: - ResolveOutputRelation :: - ExtractWindowExpressions :: - GlobalAggregates :: - ResolveAggregateFunctions :: - TimeWindowing :: - ResolveInlineTables(conf) :: - ResolveHigherOrderFunctions(v1SessionCatalog) :: - ResolveLambdaVariables(conf) :: - ResolveTimeZone(conf) :: - ResolveRandomSeed :: - ResolveBinaryArithmetic(conf) :: - TypeCoercion.typeCoercionRules(conf) ++ - extendedResolutionRules : _*), + ResolveNamespace(catalogManager) :: + new ResolveCatalogs(catalogManager) :: + ResolveInsertInto :: + ResolveRelations :: + ResolveTables :: + ResolveReferences :: + ResolveCreateNamedStruct :: + ResolveDeserializer :: + ResolveNewInstance :: + ResolveUpCast :: + ResolveGroupingAnalytics :: + ResolvePivot :: + ResolveOrdinalInOrderByAndGroupBy :: + ResolveAggAliasInGroupBy :: + ResolveMissingReferences :: + ExtractGenerator :: + ResolveGenerate :: + ResolveFunctions :: + ResolveAliases :: + ResolveSubquery :: + ResolveSubqueryColumnAliases :: + ResolveWindowOrder :: + ResolveWindowFrame :: + ResolveNaturalAndUsingJoin :: + ResolveOutputRelation :: + ExtractWindowExpressions :: + GlobalAggregates :: + ResolveAggregateFunctions :: + TimeWindowing :: + ResolveInlineTables(conf) :: + ResolveHigherOrderFunctions(v1SessionCatalog) :: + ResolveLambdaVariables(conf) :: + ResolveTimeZone(conf) :: + ResolveRandomSeed :: + ResolveBinaryArithmetic(conf) :: + TypeCoercion.typeCoercionRules(conf) ++ + extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("Normalize Alter Table", Once, ResolveAlterTableChanges), Batch("Remove Unresolved Hints", Once, From 24e9d4db919a6e076058d056d0b270163f13fb48 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 3 Apr 2020 22:44:05 +0800 Subject: [PATCH 04/12] Update Analyzer.scala --- .../spark/sql/catalyst/analysis/Analyzer.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 297c3c937aae..0cf76ed59446 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1391,16 +1391,18 @@ class Analyzer( notMatchedActions = newNotMatchedActions) } + case f @ Filter(cond, _) if containsAggregate(cond) => f case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") - q.mapExpressions { e => - q match { - case _: Filter if containsAggregate(e) => - e - case _ => - resolveExpressionTopDown(e, q) - } - } + // q.mapExpressions { e => + // q match { + // case _: Filter if containsAggregate(e) => + // e + // case _ => + // resolveExpressionTopDown(e, q) + // } + // } + q.mapExpressions(resolveExpressionTopDown(_, q)) } def containsAggregate(e: Expression): Boolean = { From 0d81b4d538804ba6616f44ab331cdcf359e249c6 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 3 Apr 2020 22:47:16 +0800 Subject: [PATCH 05/12] follow comment --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 10 +--------- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0cf76ed59446..568aa3340071 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1067,7 +1067,7 @@ class Analyzer( } } } - + /** * Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from * a logical plan node's children. @@ -1394,14 +1394,6 @@ class Analyzer( case f @ Filter(cond, _) if containsAggregate(cond) => f case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") - // q.mapExpressions { e => - // q match { - // case _: Filter if containsAggregate(e) => - // e - // case _ => - // resolveExpressionTopDown(e, q) - // } - // } q.mapExpressions(resolveExpressionTopDown(_, q)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 8b321e31e4d8..28a8afd57988 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3494,7 +3494,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark Seq(Row(Map[Int, Int]()), Row(Map(1 -> 2)))) } - test("SPARK-31334: TypeCoercion should before then ResolveAggregateFunctions") { + test("SPARK-31334: Don't ResolveReference/ResolveMissingReference when " + + "Filter condition with aggregate expression") { Seq( (1, 3), (2, 3), @@ -3542,7 +3543,6 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark assert(SQLConf.get.getConf(SQLConf.CODEGEN_FALLBACK) === true) } } - } case class Foo(bar: Option[String]) From dfda7ff9be387c69f5e2a207bc19e69fb25c53b7 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 3 Apr 2020 22:49:52 +0800 Subject: [PATCH 06/12] Update Analyzer.scala --- .../spark/sql/catalyst/analysis/Analyzer.scala | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 568aa3340071..6d2bfd8caf4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1067,7 +1067,7 @@ class Analyzer( } } } - + /** * Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from * a logical plan node's children. @@ -1392,6 +1392,7 @@ class Analyzer( } case f @ Filter(cond, _) if containsAggregate(cond) => f + case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") q.mapExpressions(resolveExpressionTopDown(_, q)) @@ -1691,7 +1692,7 @@ class Analyzer( } case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved - && !containsAggregate(cond) => + && !ResolveReferences.containsAggregate(cond) => val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) if (child.output == newChild.output) { @@ -1703,16 +1704,6 @@ class Analyzer( } } - def containsAggregate(e: Expression): Boolean = { - e.find { - case func: UnresolvedFunction => - v1SessionCatalog.lookupFunction(func.name, func.arguments) - .isInstanceOf[AggregateFunction] - case _ => - false - }.isDefined || e.find(_.isInstanceOf[AggregateExpression]).isDefined - } - /** * This method tries to resolve expressions and find missing attributes recursively. Specially, * when the expressions used in `Sort` or `Filter` contain unresolved attributes or resolved From a5cf877410a0f408272fe8c6b357efb36b859bd5 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 3 Apr 2020 22:54:04 +0800 Subject: [PATCH 07/12] Update Analyzer.scala --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6d2bfd8caf4e..43f842ff6dce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1693,7 +1693,6 @@ class Analyzer( case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved && !ResolveReferences.containsAggregate(cond) => - val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) if (child.output == newChild.output) { f.copy(condition = newCond.head) From 4a799fe9b538993c5b922a36d8ac2effa12190b5 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 4 Apr 2020 10:18:28 +0800 Subject: [PATCH 08/12] Update Analyzer.scala --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 43f842ff6dce..a3cfc7ef41a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1391,7 +1391,7 @@ class Analyzer( notMatchedActions = newNotMatchedActions) } - case f @ Filter(cond, _) if containsAggregate(cond) => f + case f @ Filter(cond, agg @ Aggregate(_, _, _)) if containsAggregate(cond) => f case q: LogicalPlan => logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") From 5f6eef135a0cfd6bc548dbe575933bdaef73af71 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 4 Apr 2020 13:38:26 +0800 Subject: [PATCH 09/12] Update Analyzer.scala --- .../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a3cfc7ef41a4..dc8881bcb553 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1692,7 +1692,7 @@ class Analyzer( } case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved - && !ResolveReferences.containsAggregate(cond) => + && (!child.isInstanceOf[Aggregate] || !ResolveReferences.containsAggregate(cond)) => val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) if (child.output == newChild.output) { f.copy(condition = newCond.head) From cc0b018dead6c07d35a6d54570e3e9f7c55bcec0 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 4 Apr 2020 17:10:15 +0800 Subject: [PATCH 10/12] solve UDF problem --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index dc8881bcb553..fa948f841305 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1401,8 +1401,12 @@ class Analyzer( def containsAggregate(e: Expression): Boolean = { e.find { case func: UnresolvedFunction => - v1SessionCatalog.lookupFunction(func.name, func.arguments) - .isInstanceOf[AggregateFunction] + try { + v1SessionCatalog.lookupFunction(func.name, func.arguments) + .isInstanceOf[AggregateFunction] + } catch { + case _: Exception => true + } case _ => false }.isDefined || e.find(_.isInstanceOf[AggregateExpression]).isDefined From b2855bd8bc05f55facbafa045b02f9b9ae7f3e61 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sat, 4 Apr 2020 21:56:13 +0800 Subject: [PATCH 11/12] add comment --- .../spark/sql/catalyst/analysis/Analyzer.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fa948f841305..421b843d28e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1391,6 +1391,11 @@ class Analyzer( notMatchedActions = newNotMatchedActions) } + // When filter condition is havingConditions, columns haven't been handled by + // TypeCoercion, this make a situation that cond isn't resolved because of aggregate + // functions's checkInputDataType method. Then it can't be handled by + // ResolveAggregateFunctions, finally cause column resolve error. + // For this situation, we don't resolve cond's reference here case f @ Filter(cond, agg @ Aggregate(_, _, _)) if containsAggregate(cond) => f case q: LogicalPlan => @@ -1400,11 +1405,18 @@ class Analyzer( def containsAggregate(e: Expression): Boolean = { e.find { + // In current loop, functions maybe unresolved, + // we should judge if it is aggregate function now case func: UnresolvedFunction => try { v1SessionCatalog.lookupFunction(func.name, func.arguments) .isInstanceOf[AggregateFunction] } catch { + // When UnresolvedFunction is a UDF function, we can't lookup function since + // it's arguments is unresolved. If throw exception when lookup functions, + // let's assume that we don't deal with this situation right now, + // after next loop, this function's arguments will be resolved + // then we can judge if this function is aggregate function next time. case _: Exception => true } case _ => @@ -1695,6 +1707,11 @@ class Analyzer( Project(child.output, newSort) } + // When filter condition is havingConditions, columns haven't been handled by + // TypeCoercion, this make a situation that cond isn't resolved because of aggregate + // functions's checkInputDataType method. Then it can't be handled by + // ResolveAggregateFunctions, finally cause column resolve error. + // For this situation, we don't resolve cond's reference here case f @ Filter(cond, child) if (!f.resolved || f.missingInput.nonEmpty) && child.resolved && (!child.isInstanceOf[Aggregate] || !ResolveReferences.containsAggregate(cond)) => val (newCond, newChild) = resolveExprsAndAddMissingAttrs(Seq(cond), child) From 20febcefcf88f02c6188a119722be70085a0a17c Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 6 Apr 2020 17:35:22 +0800 Subject: [PATCH 12/12] Update SQLQuerySuite.scala --- .../org/apache/spark/sql/SQLQuerySuite.scala | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 28a8afd57988..5fe4bd42fab4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -3496,24 +3496,6 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark test("SPARK-31334: Don't ResolveReference/ResolveMissingReference when " + "Filter condition with aggregate expression") { - Seq( - (1, 3), - (2, 3), - (3, 6), - (4, 7), - (5, 9), - (6, 9) - ).toDF("a", "b").createOrReplaceTempView("testData1") - - checkAnswer(sql( - """ - | SELECT b, sum(a) as a - | FROM testData1 - | GROUP BY b - | HAVING sum(a) > 3 - """.stripMargin), - Row(7, 4) :: Row(9, 11) :: Nil) - Seq( ("1", 3), ("2", 3),