diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 0c286defb940..44a0d209e6e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -394,7 +394,13 @@ private[parquet] class ParquetFilters( */ def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { val nameToParquetField = getFieldMap(schema) + createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true) + } + private def createFilterHelper( + nameToParquetField: Map[String, ParquetField], + predicate: sources.Filter, + canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = { // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { @@ -488,26 +494,36 @@ private[parquet] class ParquetFilters( .map(_(nameToParquetField(name).fieldName, value)) case sources.And(lhs, rhs) => - // At here, it is not safe to just convert one side if we do not understand the - // other side. Here is an example used to explain the reason. + // At here, it is not safe to just convert one side and remove the other side + // if we do not understand what the parent filters are. + // + // Here is an example used to explain the reason. // Let's say we have NOT(a = 2 AND b in ('1')) and we do not understand how to // convert b in ('1'). If we only convert a = 2, we will end up with a filter // NOT(a = 2), which will generate wrong results. - // Pushing one side of AND down is only safe to do at the top level. - // You can see ParquetRelation's initializeLocalJobFunc method as an example. - for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) - } yield FilterApi.and(lhsFilter, rhsFilter) + // + // Pushing one side of AND down is only safe to do at the top level or in the child + // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate + // can be safely removed. + val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) + val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) + + (lhsFilterOption, rhsFilterOption) match { + case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter)) + case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) + case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + case _ => None + } case sources.Or(lhs, rhs) => for { - lhsFilter <- createFilter(schema, lhs) - rhsFilter <- createFilter(schema, rhs) + lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false) + rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false) } yield FilterApi.or(lhsFilter, rhsFilter) case sources.Not(pred) => - createFilter(schema, pred).map(FilterApi.not) + createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + .map(FilterApi.not) case sources.In(name, values) if canMakeFilterOn(name, values.head) && values.distinct.length <= pushDownInFilterThreshold => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 7ebb75009555..01e41b3c5df3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -750,7 +750,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } - test("SPARK-12218 Converting conjunctions into Parquet filter predicates") { + test("SPARK-12218 and SPARK-25559 Converting conjunctions into Parquet filter predicates") { val schema = StructType(Seq( StructField("a", IntegerType, nullable = false), StructField("b", StringType, nullable = true), @@ -770,7 +770,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("c", 1.5D))) } - assertResult(None) { + // Testing when `canRemoveOneSideInAnd == true` + // case sources.And(lhs, rhs) => + // ... + // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) + assertResult(Some(lt(intColumn("a"), 10: Integer))) { parquetFilters.createFilter( parquetSchema, sources.And( @@ -778,6 +782,83 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.StringContains("b", "prefix"))) } + // Testing when `canRemoveOneSideInAnd == true` + // case sources.And(lhs, rhs) => + // ... + // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + assertResult(Some(lt(intColumn("a"), 10: Integer))) { + parquetFilters.createFilter( + parquetSchema, + sources.And( + sources.StringContains("b", "prefix"), + sources.LessThan("a", 10))) + } + + // Testing complex And conditions + assertResult(Some( + FilterApi.and(lt(intColumn("a"), 10: Integer), gt(intColumn("a"), 5: Integer)))) { + parquetFilters.createFilter( + parquetSchema, + sources.And( + sources.And( + sources.LessThan("a", 10), + sources.StringContains("b", "prefix") + ), + sources.GreaterThan("a", 5))) + } + + // Testing complex And conditions + assertResult(Some( + FilterApi.and(gt(intColumn("a"), 5: Integer), lt(intColumn("a"), 10: Integer)))) { + parquetFilters.createFilter( + parquetSchema, + sources.And( + sources.GreaterThan("a", 5), + sources.And( + sources.StringContains("b", "prefix"), + sources.LessThan("a", 10) + ))) + } + + // Testing + // case sources.Or(lhs, rhs) => + // ... + // lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Or( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")), + sources.GreaterThan("a", 2))) + } + + // Testing + // case sources.Or(lhs, rhs) => + // ... + // rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Or( + sources.GreaterThan("a", 2), + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")))) + } + + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing when `canRemoveOneSideInAnd == false` + // case sources.And(lhs, rhs) => + // ... + // case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) assertResult(None) { parquetFilters.createFilter( parquetSchema, @@ -786,6 +867,68 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sources.GreaterThan("a", 1), sources.StringContains("b", "prefix")))) } + + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing when `canRemoveOneSideInAnd == false` + // case sources.And(lhs, rhs) => + // ... + // case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.And( + sources.StringContains("b", "prefix"), + sources.GreaterThan("a", 1)))) + } + + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing passing `canRemoveOneSideInAnd = false` into + // case sources.And(lhs, rhs) => + // val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.And( + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix")), + sources.GreaterThan("a", 2)))) + } + + // Testing + // case sources.Not(pred) => + // createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + // .map(FilterApi.not) + // + // and + // + // Testing passing `canRemoveOneSideInAnd = false` into + // case sources.And(lhs, rhs) => + // val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) + assertResult(None) { + parquetFilters.createFilter( + parquetSchema, + sources.Not( + sources.And( + sources.GreaterThan("a", 2), + sources.And( + sources.GreaterThan("a", 1), + sources.StringContains("b", "prefix"))))) + } } test("SPARK-16371 Do not push down filters when inner name and outer name are the same") {