Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,8 @@ class Analyzer(
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF),
Batch("FixNullability", Once,
FixNullability),
Batch("UpdateNullability", Once,
UpdateAttributeNullability),
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
Expand Down Expand Up @@ -1821,40 +1821,6 @@ class Analyzer(
}
}

/**
* Fixes nullability of Attributes in a resolved LogicalPlan by using the nullability of
* corresponding Attributes of its children output Attributes. This step is needed because
* users can use a resolved AttributeReference in the Dataset API and outer joins
* can change the nullability of an AttribtueReference. Without the fix, a nullable column's
* nullable field can be actually set as non-nullable, which cause illegal optimization
* (e.g., NULL propagation) and wrong answers.
* See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
*/
object FixNullability extends Rule[LogicalPlan] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this rule out of the Analyzer, so that it can be used in other places.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
case p if !p.resolved => p // Skip unresolved nodes.
case p: LogicalPlan if p.resolved =>
val childrenOutput = p.children.flatMap(c => c.output).groupBy(_.exprId).flatMap {
case (exprId, attributes) =>
// If there are multiple Attributes having the same ExprId, we need to resolve
// the conflict of nullable field. We do not really expect this happen.
val nullable = attributes.exists(_.nullable)
attributes.map(attr => attr.withNullability(nullable))
}.toSeq
// At here, we create an AttributeMap that only compare the exprId for the lookup
// operation. So, we can find the corresponding input attribute's nullability.
val attributeMap = AttributeMap[Attribute](childrenOutput.map(attr => attr -> attr))
// For an Attribute used by the current LogicalPlan, if it is from its children,
// we fix the nullable field by using the nullability setting of the corresponding
// output Attribute from the children.
p.transformExpressions {
case attr: Attribute if attributeMap.contains(attr) =>
attr.withNullability(attributeMap(attr).nullable)
}
}
}

/**
* Extracts [[WindowExpression]]s from the projectList of a [[Project]] operator and
* aggregateExpressions of an [[Aggregate]] operator and creates individual [[Window]]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It still looks weird to me if we call an analyzer rule in the optimizer. Our codegen impl depends on the correctness of nullability fields. I am wondering which rule could break it? join reordering?

I think our existing test cases might already have such a case. Could you throw an exception if this rule changes the nullability in the optimizer stage? I want to know the exact case why we need to run this in the optimizer stage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, if we introduce fixed-size array type, then CreateArray returns fixed-size array, and GetArrayItem can define the nullable smarter if the input is fixed-size array.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maropu do you have more use cases? If it's the only use case, maybe we can simply remove this optimization as its use case is rare. And we can optimize it in a better way in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to understand which cases are improved and then update the nullable at the right place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Removing it from the optimizer looks ok to me, but I remember the rule seems to be related to the existing tests? See: #18576 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we accept this patch and think about removing this optimization later?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, that sounds good to me. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When removing it in a following pr, could you reopen the jira, too? https://issues.apache.org/jira/browse/SPARK-21351

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, feel free to merge this PR if you think it's ready to go. thanks!


import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule

/**
* Updates nullability of Attributes in a resolved LogicalPlan by using the nullability of
* corresponding Attributes of its children output Attributes. This step is needed because
* users can use a resolved AttributeReference in the Dataset API and outer joins
* can change the nullability of an AttribtueReference. Without this rule, a nullable column's
* nullable field can be actually set as non-nullable, which cause illegal optimization
* (e.g., NULL propagation) and wrong answers.
* See SPARK-13484 and SPARK-13801 for the concrete queries of this case.
*
* This rule should be executed again at the end of optimization phase, as optimizer may change
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds weird that in optimization phase we make correct nullability to wrong again. Can we have other way to solve it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not about wrong nullability, it's an optimization. see https://github.com/apache/spark/pull/23390/files#r244326583

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I see. Then sounds like removing this optimization (https://github.com/apache/spark/pull/23390/files#r244326906) is ok.

* some expressions and their nullabilities as well. See SPARK-21351 for more details.
*/
object UpdateAttributeNullability extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
// Skip unresolved nodes.
case p if !p.resolved => p
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this check even after the resolution batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we don't. But it has no harm, and help us to merge these 2 rules.

// Skip leaf node, as it has no child and no need to update nullability.
case p: LeafNode => p
case p: LogicalPlan =>
val nullabilities = p.children.flatMap(c => c.output).groupBy(_.exprId).map {
// If there are multiple Attributes having the same ExprId, we need to resolve
// the conflict of nullable field. We do not really expect this to happen.
case (exprId, attributes) => exprId -> attributes.exists(_.nullable)
}
// For an Attribute used by the current LogicalPlan, if it is from its children,
// we fix the nullable field by using the nullability setting of the corresponding
// output Attribute from the children.
p.transformExpressions {
case attr: Attribute if nullabilities.contains(attr.exprId) =>
attr.withNullability(nullabilities(attr.exprId))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
ColumnPruning,
CollapseProject,
RemoveNoopOperators) :+
Batch("UpdateAttributeReferences", Once,
UpdateNullabilityInAttributeReferences) :+
Batch("UpdateNullability", Once, UpdateAttributeNullability) :+
// This batch must be executed after the `RewriteSubquery` batch, which creates joins.
Batch("NormalizeFloatingNumbers", Once, NormalizeFloatingNumbers)
}
Expand Down Expand Up @@ -1647,18 +1646,3 @@ object RemoveRepetitionFromGroupExpressions extends Rule[LogicalPlan] {
}
}
}

/**
* Updates nullability in [[AttributeReference]]s if nullability is different between
* non-leaf plan's expressions and the children output.
*/
object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p if !p.isInstanceOf[LeafNode] =>
val nullabilityMap = AttributeMap(p.children.flatMap(_.output).map { x => x -> x.nullable })
p transformExpressions {
case ar: AttributeReference if nullabilityMap.contains(ar) =>
ar.withNullability(nullabilityMap(ar))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{CreateArray, GetArrayItem}
Expand All @@ -25,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor


class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
class UpdateAttributeNullabilityInOptimizerSuite extends PlanTest {

object Optimizer extends RuleExecutor[LogicalPlan] {
val batches =
Expand All @@ -36,8 +37,8 @@ class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
SimplifyConditionals,
SimplifyBinaryComparison,
SimplifyExtractValueOps) ::
Batch("UpdateAttributeReferences", Once,
UpdateNullabilityInAttributeReferences) :: Nil
Batch("UpdateNullability", Once,
UpdateAttributeNullability) :: Nil
}

test("update nullability in AttributeReference") {
Expand All @@ -46,7 +47,7 @@ class UpdateNullabilityInAttributeReferencesSuite extends PlanTest {
// nullable AttributeReference to `b`, because both array indexing and map lookup are
// nullable expressions. After optimization, the same attribute is now non-nullable,
// but the AttributeReference is not updated to reflect this. So, we need to update nullability
// by the `UpdateNullabilityInAttributeReferences` rule.
// by the `UpdateAttributeNullability` rule.
val original = rel
.select(GetArrayItem(CreateArray(Seq('a, 'a + 1L)), 0) as "b")
.groupBy($"b")("1")
Expand Down