From 2327ad14b0a1f9b1a54098dc0d4ab105686ec8fa Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 17 Oct 2017 23:41:00 +0200 Subject: [PATCH 1/6] [SPARK-22249][FOLLOWUP][SQL] Check if list of value for IN is empty in the optimizer --- .../sql/catalyst/expressions/predicates.scala | 1 + .../sql/catalyst/optimizer/expressions.scala | 2 ++ .../sql/catalyst/optimizer/OptimizeInSuite.scala | 16 ++++++++++++++++ .../columnar/InMemoryTableScanExec.scala | 3 ++- 4 files changed, 21 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index efcd45fad779..452b0a616f22 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -204,6 +204,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { override def children: Seq[Expression] = value +: list lazy val inSetConvertible = list.forall(_.isInstanceOf[Literal]) + lazy val isListEmpty = list.isEmpty private lazy val ordering = TypeUtils.getInterpretedOrdering(value.dataType) override def nullable: Boolean = children.exists(_.nullable) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 273bc6ce27c5..5606044824d7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -176,6 +176,8 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { + case expr @ In(v, _) if expr.isListEmpty => + If(IsNull(v), Literal.create(null, BooleanType), FalseLiteral) case expr @ In(v, list) if expr.inSetConvertible => val newList = ExpressionSet(list).toSeq if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index eaad1e32a8ab..74651e1eae6b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -175,4 +175,20 @@ class OptimizeInSuite extends PlanTest { } } } + + test("OptimizedIn test: In empty list gets transformed to If(IsNull(value), null, false)") { + val originalQuery = + testRelation + .where(In(UnresolvedAttribute("a"), Nil)) + .analyze + + val optimized = Optimize.execute(originalQuery) + val correctAnswer = + testRelation + .where(If(IsNull(UnresolvedAttribute("a")), + Literal.create(null, BooleanType), Literal(false))) + .analyze + + comparePlans(optimized, correctAnswer) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 846ec03e46a1..27fcb0568038 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -102,7 +102,8 @@ case class InMemoryTableScanExec( case IsNull(a: Attribute) => statsFor(a).nullCount > 0 case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0 - case In(_: AttributeReference, list: Seq[Expression]) if list.isEmpty => Literal.FalseLiteral + // We rely on the optimizations in org.apache.spark.sql.catalyst.optimizer.OptimizeIn + // to be sure that the list cannot be empty case In(a: AttributeReference, list: Seq[Expression]) if list.forall(_.isInstanceOf[Literal]) => list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] && l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _) From ac819010cd1acbf360052d76321ca770b511c41a Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 17 Oct 2017 23:58:37 +0200 Subject: [PATCH 2/6] add check to list in InMemoryTableScanExec --- .../spark/sql/execution/columnar/InMemoryTableScanExec.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 27fcb0568038..139da1c519da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -102,9 +102,8 @@ case class InMemoryTableScanExec( case IsNull(a: Attribute) => statsFor(a).nullCount > 0 case IsNotNull(a: Attribute) => statsFor(a).count - statsFor(a).nullCount > 0 - // We rely on the optimizations in org.apache.spark.sql.catalyst.optimizer.OptimizeIn - // to be sure that the list cannot be empty - case In(a: AttributeReference, list: Seq[Expression]) if list.forall(_.isInstanceOf[Literal]) => + case In(a: AttributeReference, list: Seq[Expression]) + if list.forall(_.isInstanceOf[Literal]) && list.nonEmpty => list.map(l => statsFor(a).lowerBound <= l.asInstanceOf[Literal] && l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _) } From c99032334bcbfb0682f300605af804f1eec1abce Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 18 Oct 2017 00:00:49 +0200 Subject: [PATCH 3/6] fix comment --- .../apache/spark/sql/catalyst/optimizer/expressions.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 5606044824d7..ea00528e63ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -169,8 +169,10 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { /** * Optimize IN predicates: - * 1. Removes literal repetitions. - * 2. Replaces [[In (value, seq[Literal])]] with optimized version + * 1. Converts the predicate to [[If (IsNull(value), null, false)]] + * when the list is empty + * 2. Removes literal repetitions. + * 3. Replaces [[In (value, seq[Literal])]] with optimized version * [[InSet (value, HashSet[Literal])]] which is much faster. */ object OptimizeIn extends Rule[LogicalPlan] { From 55d84e6488a991c9af65f256870639eaed7a34b2 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 18 Oct 2017 00:53:41 +0200 Subject: [PATCH 4/6] optimizein only when attribute is not nullable --- .../apache/spark/sql/catalyst/optimizer/expressions.scala | 7 +++---- .../spark/sql/catalyst/optimizer/OptimizeInSuite.scala | 8 ++++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index ea00528e63ea..80719fd53b91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -169,8 +169,8 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { /** * Optimize IN predicates: - * 1. Converts the predicate to [[If (IsNull(value), null, false)]] - * when the list is empty + * 1. Converts the predicate to false when the list is empty and + * the value is not nullable. * 2. Removes literal repetitions. * 3. Replaces [[In (value, seq[Literal])]] with optimized version * [[InSet (value, HashSet[Literal])]] which is much faster. @@ -178,8 +178,7 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case expr @ In(v, _) if expr.isListEmpty => - If(IsNull(v), Literal.create(null, BooleanType), FalseLiteral) + case expr @ In(v, _) if expr.isListEmpty && !v.nullable => FalseLiteral case expr @ In(v, list) if expr.inSetConvertible => val newList = ExpressionSet(list).toSeq if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index 74651e1eae6b..d7acd139225c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -176,17 +176,17 @@ class OptimizeInSuite extends PlanTest { } } - test("OptimizedIn test: In empty list gets transformed to If(IsNull(value), null, false)") { + test("OptimizedIn test: In empty list gets transformed to FalseLiteral " + + "when value is not nullable") { val originalQuery = testRelation - .where(In(UnresolvedAttribute("a"), Nil)) + .where(In(Literal("a"), Nil)) .analyze val optimized = Optimize.execute(originalQuery) val correctAnswer = testRelation - .where(If(IsNull(UnresolvedAttribute("a")), - Literal.create(null, BooleanType), Literal(false))) + .where(Literal(false)) .analyze comparePlans(optimized, correctAnswer) From 8594231e4aa08100774dc9d8f06cd6eac03222fa Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 18 Oct 2017 02:04:21 +0200 Subject: [PATCH 5/6] add UT to check buildFilters behavior --- .../columnar/InMemoryColumnarQuerySuite.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 75d17bc79477..2f249c850a08 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -21,8 +21,9 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.apache.spark.sql.{DataFrame, QueryTest, Row} -import org.apache.spark.sql.catalyst.expressions.AttributeSet +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, AttributeSet, In} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning +import org.apache.spark.sql.execution.LocalTableScanExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -444,4 +445,13 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { assert(dfNulls.filter($"id".isin(2, 3)).count() == 0) dfNulls.unpersist() } + + test("SPARK-22249: buildFilter should not throw exception when In contains an empty list") { + val attribute = AttributeReference("a", IntegerType)() + val testRelation = InMemoryRelation(false, 1, MEMORY_ONLY, + LocalTableScanExec(Seq(attribute), Nil), None) + val tableScanExec = InMemoryTableScanExec(Seq(attribute), + Seq(In(attribute, Nil)), testRelation) + assert(tableScanExec.partitionFilters.isEmpty) + } } From e95bc7b395e027aa3d1e719d987b4f5a4461c34b Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 18 Oct 2017 02:06:06 +0200 Subject: [PATCH 6/6] Revert "[SPARK-22249][FOLLOWUP][SQL] Check if list of value for IN is empty in the optimizer" --- .../sql/catalyst/expressions/predicates.scala | 1 - .../sql/catalyst/optimizer/expressions.scala | 7 ++----- .../sql/catalyst/optimizer/OptimizeInSuite.scala | 16 ---------------- 3 files changed, 2 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 452b0a616f22..efcd45fad779 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -204,7 +204,6 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { override def children: Seq[Expression] = value +: list lazy val inSetConvertible = list.forall(_.isInstanceOf[Literal]) - lazy val isListEmpty = list.isEmpty private lazy val ordering = TypeUtils.getInterpretedOrdering(value.dataType) override def nullable: Boolean = children.exists(_.nullable) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 80719fd53b91..273bc6ce27c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -169,16 +169,13 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { /** * Optimize IN predicates: - * 1. Converts the predicate to false when the list is empty and - * the value is not nullable. - * 2. Removes literal repetitions. - * 3. Replaces [[In (value, seq[Literal])]] with optimized version + * 1. Removes literal repetitions. + * 2. Replaces [[In (value, seq[Literal])]] with optimized version * [[InSet (value, HashSet[Literal])]] which is much faster. */ object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case expr @ In(v, _) if expr.isListEmpty && !v.nullable => FalseLiteral case expr @ In(v, list) if expr.inSetConvertible => val newList = ExpressionSet(list).toSeq if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index d7acd139225c..eaad1e32a8ab 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -175,20 +175,4 @@ class OptimizeInSuite extends PlanTest { } } } - - test("OptimizedIn test: In empty list gets transformed to FalseLiteral " + - "when value is not nullable") { - val originalQuery = - testRelation - .where(In(Literal("a"), Nil)) - .analyze - - val optimized = Optimize.execute(originalQuery) - val correctAnswer = - testRelation - .where(Literal(false)) - .analyze - - comparePlans(optimized, correctAnswer) - } }