Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

Expand All @@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._


/**
* A trivial [[Analyzer]] with a dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]].
* Used for testing when all relations are already filled in and the analyzer needs only
Expand Down Expand Up @@ -145,6 +147,8 @@ class Analyzer(
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("DeduplicateAliases", Once,
DeduplicateAliases),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
Expand Down Expand Up @@ -284,6 +288,80 @@ class Analyzer(
}
}

/**
* Replaces [[Alias]] with the same exprId but different references with [[Alias]] having
* different exprIds. This is a rare situation which can cause incorrect results.
*/
object DeduplicateAliases extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what problem does it try to resolve?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main problem which causes the added UT to fail is that FoldablePropagation replaces all foldable aliases which are considered to be the same. If the same alias with same exprId is located in different part of the plan (referencing actually different things, despite they have the same id...) this can cause wrong replacement to happen. So in the added UT, the plan is:

== Analyzed Logical Plan ==
a: int, b: int, n: bigint
Union
:- Project [a#5, b#17, n#19L]
:  +- Project [a#5, b#17, n#19L, n#19L]
:     +- Window [count(1) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n#19L]
:        +- Project [a#5, b#6 AS b#17]
:           +- Project [_1#2 AS a#5, _2#3 AS b#6]
:              +- LocalRelation [_1#2, _2#3]
+- Project [a#12, b#17, n#19L]
   +- Project [a#12, b#17, n#19L, n#19L]
      +- Window [count(1) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS n#19L]
         +- Project [a#12, b#14 AS b#17]
            +- Project [a#12, 0 AS b#14]
               +- Project [value#10 AS a#12]
                  +- LocalRelation [value#10]

Please note that in both the branches of the union we have the same b#17 attribute, but they are referencing different things. As the lower one is a foldable value which evaluates to 0, all the b#17 are replace with a literal 0, causing a wrong result.

Despite we might fix this specific problem in the related Optimizer rule, I think that in general we assume that items with the same id are the same. So I proposed this solution in order to fix all the possible issues which may arise due to this situation which is not expected.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the root cause is in FoldablePropagation. We should only replace attribute with literal from children, not siblings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is also true. But in many places in the codebase we just compare attributes using semanticEquals or in some other cases, even equals. Well, if we admit that different attributes can have the same exprId, all these places should be checked in order to be sure that the same problem cannot happen there too. Moreover (this is more a nit), the semanticEquals or sameRef method itself would be wrong according to its semantic, as it may return true even when two attributes don't have the same reference. This is the reason why I opted for this solution, which seems to me cleaner as it solves the root cause of the problem. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kindly ping @cloud-fan

def apply(plan: LogicalPlan): LogicalPlan = {
val allAliases = collectAllAliasesInPlan(plan)
val dupAliases = allAliases.groupBy(_.exprId).collect {
case (eId, aliases) if containsDifferentAliases(aliases) => eId
}.toSeq
if (dupAliases.nonEmpty) {
val exprIdsDictionary = mutable.HashMap[ExprId, ExprId]()
resolveConflictingAliases(plan, dupAliases, exprIdsDictionary)
} else {
plan
}
}

def containsDifferentAliases(aliases: Seq[Alias]): Boolean = {
aliases.exists(a1 => aliases.exists(a2 => !a1.fastEquals(a2)))
}

def collectAllAliasesInPlan(plan: LogicalPlan): Seq[Alias] = {
plan.flatMap {
case Project(projectList, _) => projectList.collect { case a: Alias => a }
case AnalysisBarrier(child) => collectAllAliasesInPlan(child)
case _ => Nil
}
}

def containsExprIds(
projectList: Seq[NamedExpression],
exprIds: Seq[ExprId]): Boolean = {
projectList.count {
case a: Alias if exprIds.contains(a.exprId) => true
case a: AttributeReference if exprIds.contains(a.exprId) => true
case _ => false
} > 0
}

def renewConflictingAliases(
exprs: Seq[NamedExpression],
exprIds: Seq[ExprId],
exprIdsDictionary: mutable.HashMap[ExprId, ExprId]): Seq[NamedExpression] = {
exprs.map {
case a: Alias if exprIds.contains(a.exprId) =>
val newAlias = Alias(a.child, a.name)()
// update the map with the new id to replace
// since we are in a transformUp, all the parent nodes will see the updated map
exprIdsDictionary(a.exprId) = newAlias.exprId
newAlias
case a: AttributeReference if exprIds.contains(a.exprId) =>
// replace with the new id
a.withExprId(exprIdsDictionary(a.exprId))
case other => other
}
}

def resolveConflictingAliases(
plan: LogicalPlan,
dupAliases: Seq[ExprId],
exprIdsDictionary: mutable.HashMap[ExprId, ExprId]): LogicalPlan = {
plan.transformUp {
case p @ Project(projectList, _) if containsExprIds(projectList, dupAliases) =>
p.copy(renewConflictingAliases(projectList, dupAliases, exprIdsDictionary))
case a @ Aggregate(_, aggs, _) if containsExprIds(aggs, dupAliases) =>
a.copy(aggregateExpressions =
renewConflictingAliases(aggs, dupAliases, exprIdsDictionary))
case AnalysisBarrier(child) =>
AnalysisBarrier(resolveConflictingAliases(child, dupAliases, exprIdsDictionary))
}
}
}

object ResolveGroupingAnalytics extends Rule[LogicalPlan] {
/*
* GROUP BY a, b, c WITH ROLLUP
Expand Down
12 changes: 12 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Unio
import org.apache.spark.sql.execution.{FilterExec, QueryExecution, WholeStageCodegenExec}
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext}
Expand Down Expand Up @@ -2265,4 +2266,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
val df = spark.range(1).select($"id", new Column(Uuid()))
checkAnswer(df, df.collect())
}

test("SPARK-24051: using the same alias can produce incorrect result") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test case failed without your changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, without the change the result is:

+---+---+---+
|  a|  b|  n|
+---+---+---+
|  1|  0|  2|
|  2|  0|  2|
|  3|  0|  1|
+---+---+---+

val ds1 = Seq((1, 42), (2, 99)).toDF("a", "b")
val ds2 = Seq(3).toDF("a").withColumn("b", lit(0))

val cols = Seq(col("a"), col("b").alias("b"),
count(lit(1)).over(Window.partitionBy()).alias("n"))

val df = ds1.select(cols: _*).union(ds2.select(cols: _*))
checkAnswer(df, Row(1, 42, 2) :: Row(2, 99, 2) :: Row(3, 0, 1) :: Nil)
}
}