diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala index dbafc468c6c40..2b17b479432fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala @@ -138,6 +138,23 @@ private[sql] object OrcFilters { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { + createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) + } + + /** + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the input filter predicates. + * @param builder the input SearchArgument.Builder. + * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed + * down safely. Pushing ONLY one side of AND down is safe to + * do at the top level or none of its ancestors is NOT and OR. + * @return the builder so far. + */ + private def createBuilder( + dataTypeMap: Map[String, DataType], + expression: Filter, + builder: Builder, + canPartialPushDownConjuncts: Boolean): Option[Builder] = { def getType(attribute: String): PredicateLeaf.Type = getPredicateLeafType(dataTypeMap(attribute)) @@ -145,32 +162,52 @@ private[sql] object OrcFilters { expression match { case And(left, right) => - // At here, it is not safe to just convert one side if we do not understand the - // other side. Here is an example used to explain the reason. + // At here, it is not safe to just convert one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to // convert b in ('1'). If we only convert a = 2, we will end up with a filter // NOT(a = 2), which will generate wrong results. - // Pushing one side of AND down is only safe to do at the top level. - // You can see ParquetRelation's initializeLocalJobFunc method as an example. - for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) - lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) - } yield rhs.end() + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate + // can be safely removed. + val leftBuilderOption = + createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) + val rightBuilderOption = + createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) + (leftBuilderOption, rightBuilderOption) match { + case (Some(_), Some(_)) => + for { + lhs <- createBuilder(dataTypeMap, left, + builder.startAnd(), canPartialPushDownConjuncts) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) + } yield rhs.end() + + case (Some(_), None) if canPartialPushDownConjuncts => + createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) + + case (None, Some(_)) if canPartialPushDownConjuncts => + createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) + + case _ => None + } case Or(left, right) => for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) - lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) + _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) + _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) + lhs <- createBuilder(dataTypeMap, left, + builder.startOr(), canPartialPushDownConjuncts = false) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) } yield rhs.end() case Not(child) => for { - _ <- buildSearchArgument(dataTypeMap, child, newBuilder) - negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) + _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) + negate <- createBuilder(dataTypeMap, + child, builder.startNot(), canPartialPushDownConjuncts = false) } yield negate.end() // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index 8680b86517b19..ee12f30892436 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -358,7 +358,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { } } - test("SPARK-12218 Converting conjunctions into ORC SearchArguments") { + test("SPARK-12218 and SPARK-25699 Converting conjunctions into ORC SearchArguments") { import org.apache.spark.sql.sources._ // The `LessThan` should be converted while the `StringContains` shouldn't val schema = new StructType( @@ -382,5 +382,40 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { )) )).get.toString } + + // Can not remove unsupported `StringContains` predicate since it is under `Or` operator. + assert(OrcFilters.createFilter(schema, Array( + Or( + LessThan("a", 10), + And( + StringContains("b", "prefix"), + GreaterThan("a", 1) + ) + ) + )).isEmpty) + + // Safely remove unsupported `StringContains` predicate and push down `LessThan` + assertResult("leaf-0 = (LESS_THAN a 10), expr = leaf-0") { + OrcFilters.createFilter(schema, Array( + And( + LessThan("a", 10), + StringContains("b", "prefix") + ) + )).get.toString + } + + // Safely remove unsupported `StringContains` predicate, push down `LessThan` and `GreaterThan`. + assertResult("leaf-0 = (LESS_THAN a 10), leaf-1 = (LESS_THAN_EQUALS a 1)," + + " expr = (and leaf-0 (not leaf-1))") { + OrcFilters.createFilter(schema, Array( + And( + And( + LessThan("a", 10), + StringContains("b", "prefix") + ), + GreaterThan("a", 1) + ) + )).get.toString + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index aee9cb58a031e..a82576a233acd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -79,6 +79,23 @@ private[orc] object OrcFilters extends Logging { dataTypeMap: Map[String, DataType], expression: Filter, builder: Builder): Option[Builder] = { + createBuilder(dataTypeMap, expression, builder, canPartialPushDownConjuncts = true) + } + + /** + * @param dataTypeMap a map from the attribute name to its data type. + * @param expression the input filter predicates. + * @param builder the input SearchArgument.Builder. + * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed + * down safely. Pushing ONLY one side of AND down is safe to + * do at the top level or none of its ancestors is NOT and OR. + * @return the builder so far. + */ + private def createBuilder( + dataTypeMap: Map[String, DataType], + expression: Filter, + builder: Builder, + canPartialPushDownConjuncts: Boolean): Option[Builder] = { def isSearchableType(dataType: DataType): Boolean = dataType match { // Only the values in the Spark types below can be recognized by // the `SearchArgumentImpl.BuilderImpl.boxLiteral()` method. @@ -90,32 +107,52 @@ private[orc] object OrcFilters extends Logging { expression match { case And(left, right) => - // At here, it is not safe to just convert one side if we do not understand the - // other side. Here is an example used to explain the reason. + // At here, it is not safe to just convert one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to // convert b in ('1'). If we only convert a = 2, we will end up with a filter // NOT(a = 2), which will generate wrong results. - // Pushing one side of AND down is only safe to do at the top level. - // You can see ParquetRelation's initializeLocalJobFunc method as an example. - for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) - lhs <- buildSearchArgument(dataTypeMap, left, builder.startAnd()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) - } yield rhs.end() + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate + // can be safely removed. + val leftBuilderOption = + createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts) + val rightBuilderOption = + createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts) + (leftBuilderOption, rightBuilderOption) match { + case (Some(_), Some(_)) => + for { + lhs <- createBuilder(dataTypeMap, left, + builder.startAnd(), canPartialPushDownConjuncts) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts) + } yield rhs.end() + + case (Some(_), None) if canPartialPushDownConjuncts => + createBuilder(dataTypeMap, left, builder, canPartialPushDownConjuncts) + + case (None, Some(_)) if canPartialPushDownConjuncts => + createBuilder(dataTypeMap, right, builder, canPartialPushDownConjuncts) + + case _ => None + } case Or(left, right) => for { - _ <- buildSearchArgument(dataTypeMap, left, newBuilder) - _ <- buildSearchArgument(dataTypeMap, right, newBuilder) - lhs <- buildSearchArgument(dataTypeMap, left, builder.startOr()) - rhs <- buildSearchArgument(dataTypeMap, right, lhs) + _ <- createBuilder(dataTypeMap, left, newBuilder, canPartialPushDownConjuncts = false) + _ <- createBuilder(dataTypeMap, right, newBuilder, canPartialPushDownConjuncts = false) + lhs <- createBuilder(dataTypeMap, left, + builder.startOr(), canPartialPushDownConjuncts = false) + rhs <- createBuilder(dataTypeMap, right, lhs, canPartialPushDownConjuncts = false) } yield rhs.end() case Not(child) => for { - _ <- buildSearchArgument(dataTypeMap, child, newBuilder) - negate <- buildSearchArgument(dataTypeMap, child, builder.startNot()) + _ <- createBuilder(dataTypeMap, child, newBuilder, canPartialPushDownConjuncts = false) + negate <- createBuilder(dataTypeMap, + child, builder.startNot(), canPartialPushDownConjuncts = false) } yield negate.end() // NOTE: For all case branches dealing with leaf predicates below, the additional `startAnd()` diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala index 283037caf4a9b..5094763b0cd2a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcFilterSuite.scala @@ -351,7 +351,7 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { } } - test("SPARK-12218 Converting conjunctions into ORC SearchArguments") { + test("SPARK-12218 and SPARK-25699 Converting conjunctions into ORC SearchArguments") { import org.apache.spark.sql.sources._ // The `LessThan` should be converted while the `StringContains` shouldn't val schema = new StructType( @@ -383,5 +383,48 @@ class HiveOrcFilterSuite extends OrcTest with TestHiveSingleton { )) )).get.toString } + + // Can not remove unsupported `StringContains` predicate since it is under `Or` operator. + assert(OrcFilters.createFilter(schema, Array( + Or( + LessThan("a", 10), + And( + StringContains("b", "prefix"), + GreaterThan("a", 1) + ) + ) + )).isEmpty) + + // Safely remove unsupported `StringContains` predicate and push down `LessThan` + assertResult( + """leaf-0 = (LESS_THAN a 10) + |expr = leaf-0 + """.stripMargin.trim + ) { + OrcFilters.createFilter(schema, Array( + And( + LessThan("a", 10), + StringContains("b", "prefix") + ) + )).get.toString + } + + // Safely remove unsupported `StringContains` predicate, push down `LessThan` and `GreaterThan`. + assertResult( + """leaf-0 = (LESS_THAN a 10) + |leaf-1 = (LESS_THAN_EQUALS a 1) + |expr = (and leaf-0 (not leaf-1)) + """.stripMargin.trim + ) { + OrcFilters.createFilter(schema, Array( + And( + And( + LessThan("a", 10), + StringContains("b", "prefix") + ), + GreaterThan("a", 1) + ) + )).get.toString + } } }