Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class Analyzer(
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF),
Batch("FixNullability", Once,
FixNullability),
Batch("Cleanup", fixedPoint,
CleanupAliases)
)
Expand Down Expand Up @@ -1447,6 +1449,40 @@ class Analyzer(
}
}

/**
* Fixes nullability of Attributes in a resolved LogicalPlan by using the nullability of
* corresponding Attributes of its children output Attributes. This step is needed because
* users can use a resolved AttributeReference in the Dataset API and outer joins
* can change the nullability of an AttribtueReference. Without the fix, a nullable column's
* nullable field can be actually set as non-nullable, which cause illegal optimization
* (e.g., NULL propagation) and wrong answers.
* See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
*/
object FixNullability extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p if !p.resolved => p // Skip unresolved nodes.
case p: LogicalPlan if p.resolved =>
val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap {
case (exprId, attributes) =>
// If there are multiple Attributes having the same ExprId, we need to resolve
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with our current implementation, it will not happen.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then should we just put an assert/require here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we should add assert. Even when we hit that case, it is still fine to pass at here, right?

// the conflict of nullable field. We do not really expect this happen.
val nullable = attributes.exists(_.nullable)
attributes.map(attr => attr.withNullability(nullable))
}.toSeq
// At here, we create an AttributeMap that only compare the exprId for the lookup
// operation. So, we can find the corresponding input attribute's nullability.
val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr))
// For an Attribute used by the current LogicalPlan, if it is from its children,
// we fix the nullable field by using the nullability setting of the corresponding
// output Attribute from the children.
Copy link
Contributor

@liancheng liancheng May 31, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also explain that AttributeMap only compares attributes by expression ID. It's not quite intuitive why and how the attribute map helps here.

p.transformExpressions {
case attr: Attribute if attributeMap.contains(attr) =>
attr.withNullability(attributeMap(attr).nullable)
}
}
}

/**
* Extracts [[WindowExpression]]s from the projectList of a [[Project]] operator and
* aggregateExpressions of an [[Aggregate]] operator and creates individual [[Window]]
Expand Down Expand Up @@ -2127,4 +2163,3 @@ object TimeWindowing extends Rule[LogicalPlan] {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
val naturalPlan = r3.join(r4, NaturalJoin(FullOuter), None)
val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq(UnresolvedAttribute("b"))), None)
val expected = r3.join(r4, FullOuter, Some(EqualTo(bNotNull, bNotNull))).select(
Alias(Coalesce(Seq(bNotNull, bNotNull)), "b")(), a, c)
Alias(Coalesce(Seq(b, b)), "b")(), a, c)
checkAnalysis(naturalPlan, expected)
checkAnalysis(usingPlan, expected)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,25 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
leftJoin2Inner,
Row(1, 2, "1", 1, 3, "1") :: Nil)
}

test("process outer join results using the non-nullable columns in the join input") {
// Filter data using a non-nullable column from a right table
val df1 = Seq((0, 0), (1, 0), (2, 0), (3, 0), (4, 0)).toDF("id", "count")
val df2 = Seq(Tuple1(0), Tuple1(1)).toDF("id").groupBy("id").count
checkAnswer(
df1.join(df2, df1("id") === df2("id"), "left_outer").filter(df2("count").isNull),
Row(2, 0, null, null) ::
Row(3, 0, null, null) ::
Row(4, 0, null, null) :: Nil
)

// Coalesce data using non-nullable columns in input tables
val df3 = Seq((1, 1)).toDF("a", "b")
val df4 = Seq((2, 2)).toDF("a", "b")
checkAnswer(
df3.join(df4, df3("a") === df4("a"), "outer")
.select(coalesce(df3("a"), df3("b")), coalesce(df4("a"), df4("b"))),
Row(1, null) :: Row(null, 2) :: Nil
)
}
}