Skip to content

Commit 00a2e01

Browse files
CodingCathvanhovell
authored andcommitted
[SPARK-18058][SQL] [BRANCH-2.0]Comparing column types ignoring Nullability in Union and SetOperation
## What changes were proposed in this pull request? The PR tries to fix [SPARK-18058](https://issues.apache.org/jira/browse/SPARK-18058) which refers to a bug that the column types are compared with the extra care about Nullability in Union and SetOperation. This PR converts the columns types by setting all fields as nullable before comparison ## How was this patch tested? regular unit test cases Author: CodingCat <zhunansjtu@gmail.com> Closes #15602 from CodingCat/branch-2.0.
1 parent 0e0d83a commit 00a2e01

File tree

2 files changed

+24
-19
lines changed

2 files changed

+24
-19
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ case class Filter(condition: Expression, child: LogicalPlan)
116116

117117
abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {
118118

119+
def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
120+
119121
protected def leftConstraints: Set[Expression] = left.constraints
120122

121123
protected def rightConstraints: Set[Expression] = {
@@ -125,6 +127,13 @@ abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends Binar
125127
case a: Attribute => attributeRewrites(a)
126128
})
127129
}
130+
131+
override lazy val resolved: Boolean =
132+
childrenResolved &&
133+
left.output.length == right.output.length &&
134+
left.output.zip(right.output).forall {
135+
case (l, r) => l.dataType.asNullable == r.dataType.asNullable } &&
136+
duplicateResolved
128137
}
129138

130139
object SetOperation {
@@ -133,8 +142,6 @@ object SetOperation {
133142

134143
case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
135144

136-
def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
137-
138145
override def output: Seq[Attribute] =
139146
left.output.zip(right.output).map { case (leftAttr, rightAttr) =>
140147
leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
@@ -143,14 +150,6 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation
143150
override protected def validConstraints: Set[Expression] =
144151
leftConstraints.union(rightConstraints)
145152

146-
// Intersect are only resolved if they don't introduce ambiguous expression ids,
147-
// since the Optimizer will convert Intersect to Join.
148-
override lazy val resolved: Boolean =
149-
childrenResolved &&
150-
left.output.length == right.output.length &&
151-
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
152-
duplicateResolved
153-
154153
override def maxRows: Option[Long] = {
155154
if (children.exists(_.maxRows.isEmpty)) {
156155
None
@@ -171,19 +170,11 @@ case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation
171170

172171
case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
173172

174-
def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
175-
176173
/** We don't use right.output because those rows get excluded from the set. */
177174
override def output: Seq[Attribute] = left.output
178175

179176
override protected def validConstraints: Set[Expression] = leftConstraints
180177

181-
override lazy val resolved: Boolean =
182-
childrenResolved &&
183-
left.output.length == right.output.length &&
184-
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
185-
duplicateResolved
186-
187178
override lazy val statistics: Statistics = {
188179
left.statistics.copy()
189180
}
@@ -218,7 +209,7 @@ case class Union(children: Seq[LogicalPlan]) extends LogicalPlan {
218209
child.output.length == children.head.output.length &&
219210
// compare the data types with the first child
220211
child.output.zip(children.head.output).forall {
221-
case (l, r) => l.dataType == r.dataType }
212+
case (l, r) => l.dataType.asNullable == r.dataType.asNullable }
222213
)
223214

224215
children.length > 1 && childrenResolved && allChildrenCompatible

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,4 +377,18 @@ class AnalysisSuite extends AnalysisTest {
377377
assertExpressionType(sum(Divide(Decimal(1), 2.0)), DoubleType)
378378
assertExpressionType(sum(Divide(1.0, Decimal(2.0))), DoubleType)
379379
}
380+
381+
test("SPARK-18058: union and set operations shall not care about the nullability" +
382+
" when comparing column types") {
383+
val firstTable = LocalRelation(
384+
AttributeReference("a",
385+
StructType(Seq(StructField("a", IntegerType, nullable = true))), nullable = false)())
386+
val secondTable = LocalRelation(
387+
AttributeReference("a",
388+
StructType(Seq(StructField("a", IntegerType, nullable = false))), nullable = false)())
389+
390+
assertAnalysisSuccess(Union(firstTable, secondTable))
391+
assertAnalysisSuccess(Except(firstTable, secondTable))
392+
assertAnalysisSuccess(Intersect(firstTable, secondTable))
393+
}
380394
}

0 commit comments

Comments
 (0)