From e2dee098b2efb80238e38068d2ad8681504e03e0 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 10 Oct 2018 15:00:13 +0800 Subject: [PATCH 1/5] push more predicates --- .../datasources/orc/OrcFilters.scala | 68 ++++++++++++++----- .../datasources/orc/OrcFilterSuite.scala | 35 ++++++++++ 2 files changed, 87 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index dbafc468c6c40..7297fd9cb0335 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -138,6 +138,23 @@ private[sql] object OrcFilters { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { + createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) + } + + /** + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the input filter predicates. + * @param builder the input SearchArgument.Builder. + * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed + * down safely. Pushing ONLY one side of AND down is safe to + * do at the top level or none of its ancestors is NOT and OR. + * @return the builder so far. + */ + private def createBuilder( + dataTypeMap: Map[String, DataType], + expression: Filter, + builder: Builder, + canPartialPushDownConjuncts: Boolean): Option[Builder] = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) @@ -145,32 +162,51 @@ private[sql] object OrcFilters { expression match { case And(left, right) => - // At here, it is not safe to just convert one side if we do not understand the - // other side. Here is an example used to explain the reason. + // At here, it is not safe to just convert one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to // convert b in ('1'). If we only convert a = 2, we will end up with a filter // NOT(a = 2), which will generate wrong results. - // Pushing one side of AND down is only safe to do at the top level. - // You can see ParquetRelation's initializeLocalJobFunc method as an example. - for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) - lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) - } yield rhs.end() + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate + // can be safely removed. + val leftBuilderOption = createBuilder(dataTypeMap, left, + newBuilder, canPartialPushDownConjuncts) + val rightBuilderOption = + createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) + (leftBuilderOption, rightBuilderOption) match { + case (Some(_), Some(_)) => + for { + lhs <- createBuilder(dataTypeMap, left, + builder.startAnd(), canPartialPushDownConjuncts) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) + } yield rhs.end() + + case (Some(_), None) if canPartialPushDownConjuncts => + createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) + + case (None, Some(_)) if canPartialPushDownConjuncts => + createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) + + case _ => None + } case Or(left, right) => for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) - lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) + _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) + _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) + lhs <- createBuilder(dataTypeMap, left, + builder.startOr(), canPartialPushDownConjuncts = false) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) } yield rhs.end() case Not(child) => for { - _ <- buildSearchArgument(dataTypeMap, child, newBuilder) - negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) + _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) + negate <- createBuilder(dataTypeMap, child, builder.startNot(), false) } yield negate.end() // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 8680b86517b19..a667d08111f46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -382,5 +382,40 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { )) )).get.toString } + + // Can not remove unsupported `StringContains` predicate since it is under `Or` operator. + assert(OrcFilters.createFilter(schema, Array( + Or( + LessThan("a", 10), + And( + StringContains("b", "prefix"), + GreaterThan("a", 1) + ) + ) + )).isEmpty) + + // Safely remove unsupported `StringContains` predicate and push down `LessThan` + assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") { + OrcFilters.createFilter(schema, Array( + And( + LessThan("a", 10), + StringContains("b", "prefix") + ) + )).get.toString + } + + // Safely remove unsupported `StringContains` predicate and push down `LessThan` + assertResult("leaf-0 = (LESS_THAN a 10), leaf-1 = (LESS_THAN_EQUALS a 1)," + + " expr = (and leaf-0 (not leaf-1))") { + OrcFilters.createFilter(schema, Array( + And( + And( + LessThan("a", 10), + StringContains("b", "prefix") + ), + GreaterThan("a", 1) + ) + )).get.toString + } } } From 798ed5317f953c0aaaf6c5e92126b26f15fe66c0 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 10 Oct 2018 16:34:34 +0800 Subject: [PATCH 2/5] handle hive orc --- .../spark/sql/hive/orc/OrcFilters.scala | 68 ++++++++++++++----- .../sql/hive/orc/HiveOrcFilterSuite.scala | 43 ++++++++++++ 2 files changed, 95 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index aee9cb58a031e..c1fb0c6a9181d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -79,6 +79,23 @@ private[orc] object OrcFilters extends Logging { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { + createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) + } + + /** + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the input filter predicates. + * @param builder the input SearchArgument.Builder. + * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed + * down safely. Pushing ONLY one side of AND down is safe to + * do at the top level or none of its ancestors is NOT and OR. + * @return the builder so far. + */ + private def createBuilder( + dataTypeMap: Map[String, DataType], + expression: Filter, + builder: Builder, + canPartialPushDownConjuncts: Boolean): Option[Builder] = { def isSearchableType(dataType: DataType): Boolean = dataType match { // Only the values in the Spark types below can be recognized by // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. @@ -90,32 +107,51 @@ private[orc] object OrcFilters extends Logging { expression match { case And(left, right) => - // At here, it is not safe to just convert one side if we do not understand the - // other side. Here is an example used to explain the reason. + // At here, it is not safe to just convert one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to // convert b in ('1'). If we only convert a = 2, we will end up with a filter // NOT(a = 2), which will generate wrong results. - // Pushing one side of AND down is only safe to do at the top level. - // You can see ParquetRelation's initializeLocalJobFunc method as an example. - for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) - lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) - } yield rhs.end() + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate + // can be safely removed. + val leftBuilderOption = createBuilder(dataTypeMap, left, + newBuilder, canPartialPushDownConjuncts) + val rightBuilderOption = + createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) + (leftBuilderOption, rightBuilderOption) match { + case (Some(_), Some(_)) => + for { + lhs <- createBuilder(dataTypeMap, left, + builder.startAnd(), canPartialPushDownConjuncts) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) + } yield rhs.end() + + case (Some(_), None) if canPartialPushDownConjuncts => + createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) + + case (None, Some(_)) if canPartialPushDownConjuncts => + createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) + + case _ => None + } case Or(left, right) => for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) - lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) + _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) + _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) + lhs <- createBuilder(dataTypeMap, left, + builder.startOr(), canPartialPushDownConjuncts = false) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) } yield rhs.end() case Not(child) => for { - _ <- buildSearchArgument(dataTypeMap, child, newBuilder) - negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) + _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) + negate <- createBuilder(dataTypeMap, child, builder.startNot(), false) } yield negate.end() // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala index 283037caf4a9b..bcbefec8bb305 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala @@ -383,5 +383,48 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { )) )).get.toString } + + // Can not remove unsupported `StringContains` predicate since it is under `Or` operator. + assert(OrcFilters.createFilter(schema, Array( + Or( + LessThan("a", 10), + And( + StringContains("b", "prefix"), + GreaterThan("a", 1) + ) + ) + )).isEmpty) + + // Safely remove unsupported `StringContains` predicate and push down `LessThan` + assertResult( + """leaf-0 = (LESS_THAN a 10) + |expr = leaf-0 + """.stripMargin.trim + ) { + OrcFilters.createFilter(schema, Array( + And( + LessThan("a", 10), + StringContains("b", "prefix") + ) + )).get.toString + } + + // Safely remove unsupported `StringContains` predicate and push down `LessThan` + assertResult( + """leaf-0 = (LESS_THAN a 10) + |leaf-1 = (LESS_THAN_EQUALS a 1) + |expr = (and leaf-0 (not leaf-1)) + """.stripMargin.trim + ) { + OrcFilters.createFilter(schema, Array( + And( + And( + LessThan("a", 10), + StringContains("b", "prefix") + ), + GreaterThan("a", 1) + ) + )).get.toString + } } } From 28322b2efacd5fd1787c0ca1fe64e5db4a5b25fc Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 10 Oct 2018 16:43:29 +0800 Subject: [PATCH 3/5] add jira number in test case --- .../spark/sql/execution/datasources/orc/OrcFilterSuite.scala | 2 +- .../org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index a667d08111f46..28787e4e1e242 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -358,7 +358,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { } } - test("SPARK-12218 Converting conjunctions into ORC SearchArguments") { + test("SPARK-12218 and SPARK-25699 Converting conjunctions into ORC SearchArguments") { import org.apache.spark.sql.sources._ // The `LessThan` should be converted while the `StringContains` shouldn't val schema = new StructType( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala index bcbefec8bb305..2fd24b336ad28 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala @@ -351,7 +351,7 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { } } - test("SPARK-12218 Converting conjunctions into ORC SearchArguments") { + test("SPARK-12218 and SPARK-25699 Converting conjunctions into ORC SearchArguments") { import org.apache.spark.sql.sources._ // The `LessThan` should be converted while the `StringContains` shouldn't val schema = new StructType( From c2eb87ac8310a7045eb55ba7851a38ebf0498300 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Wed, 10 Oct 2018 21:14:45 +0800 Subject: [PATCH 4/5] address comment --- .../spark/sql/execution/datasources/orc/OrcFilterSuite.scala | 2 +- .../org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 28787e4e1e242..ee12f30892436 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -404,7 +404,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { )).get.toString } - // Safely remove unsupported `StringContains` predicate and push down `LessThan` + // Safely remove unsupported `StringContains` predicate, push down `LessThan` and `GreaterThan`. assertResult("leaf-0 = (LESS_THAN a 10), leaf-1 = (LESS_THAN_EQUALS a 1)," + " expr = (and leaf-0 (not leaf-1))") { OrcFilters.createFilter(schema, Array( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala index 2fd24b336ad28..5094763b0cd2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala @@ -409,7 +409,7 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { )).get.toString } - // Safely remove unsupported `StringContains` predicate and push down `LessThan` + // Safely remove unsupported `StringContains` predicate, push down `LessThan` and `GreaterThan`. assertResult( """leaf-0 = (LESS_THAN a 10) |leaf-1 = (LESS_THAN_EQUALS a 1) From 9d9ed2f8295e2d61ff880860abf0e8551afce04b Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 11 Oct 2018 02:15:06 +0800 Subject: [PATCH 5/5] address comments --- .../spark/sql/execution/datasources/orc/OrcFilters.scala | 7 ++++--- .../scala/org/apache/spark/sql/hive/orc/OrcFilters.scala | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index 7297fd9cb0335..2b17b479432fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -173,8 +173,8 @@ private[sql] object OrcFilters { // Pushing one side of AND down is only safe to do at the top level or in the child // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. - val leftBuilderOption = createBuilder(dataTypeMap, left, - newBuilder, canPartialPushDownConjuncts) + val leftBuilderOption = + createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) val rightBuilderOption = createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) (leftBuilderOption, rightBuilderOption) match { @@ -206,7 +206,8 @@ private[sql] object OrcFilters { case Not(child) => for { _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, child, builder.startNot(), false) + negate <- createBuilder(dataTypeMap, + child, builder.startNot(), canPartialPushDownConjuncts = false) } yield negate.end() // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index c1fb0c6a9181d..a82576a233acd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -118,8 +118,8 @@ private[orc] object OrcFilters extends Logging { // Pushing one side of AND down is only safe to do at the top level or in the child // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. - val leftBuilderOption = createBuilder(dataTypeMap, left, - newBuilder, canPartialPushDownConjuncts) + val leftBuilderOption = + createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) val rightBuilderOption = createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) (leftBuilderOption, rightBuilderOption) match { @@ -151,7 +151,8 @@ private[orc] object OrcFilters extends Logging { case Not(child) => for { _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) - negate <- createBuilder(dataTypeMap, child, builder.startNot(), false) + negate <- createBuilder(dataTypeMap, + child, builder.startNot(), canPartialPushDownConjuncts = false) } yield negate.end() // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()`