From 264eed81e33d3af7d7ea50a3a49866dde18f163b Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 21 Jun 2018 12:35:20 +0800 Subject: [PATCH 1/7] Convert IN predicate to Parquet filter push-down --- .../datasources/parquet/ParquetFilters.scala | 4 +++ .../parquet/ParquetFilterSuite.scala | 30 ++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 310626197a76..252523130cbe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -270,6 +270,10 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) { case sources.Not(pred) => createFilter(schema, pred).map(FilterApi.not) + case sources.In(name, values) if canMakeFilterOn(name) && values.length < 100 => + val conditions = values.flatMap(v => makeEq.lift(nameToType(name)).map(_(name, v))) + Some(conditions.reduceLeft(FilterApi.or)) + case _ => None } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 90da7eb8c4fb..283e7f0de528 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.charset.StandardCharsets import java.sql.Date -import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators} +import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators} import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.filter2.predicate.Operators.{Column => _, _} @@ -660,6 +660,34 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex assert(df.where("col > 0").count() === 2) } } + + test("SPARK-17091: Convert IN predicate to Parquet filter push-down") { + val schema = StructType(Seq( + StructField("a", IntegerType, nullable = false) + )) + + assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) { + parquetFilters.createFilter(schema, sources.In("a", Array(10))) + } + + assertResult(Some(or( + FilterApi.eq(intColumn("a"), 10: Integer), + FilterApi.eq(intColumn("a"), 20: Integer))) + ) { + parquetFilters.createFilter(schema, sources.In("a", Array(10, 20))) + } + + assertResult(Some(or(or( + FilterApi.eq(intColumn("a"), 10: Integer), + FilterApi.eq(intColumn("a"), 20: Integer)), + FilterApi.eq(intColumn("a"), 30: Integer))) + ) { + parquetFilters.createFilter(schema, sources.In("a", Array(10, 20, 30))) + } + + assert(parquetFilters.createFilter(schema, sources.In("a", Range(1, 100).toArray)).isDefined) + assert(parquetFilters.createFilter(schema, sources.In("a", Range(1, 101).toArray)).isEmpty) + } } class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] { From 4f96881af4af6f613c049f3756ee3aba518ceab8 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Thu, 21 Jun 2018 12:49:12 +0800 Subject: [PATCH 2/7] Change threshold to 20. --- .../sql/execution/datasources/parquet/ParquetFilters.scala | 7 ++++--- .../execution/datasources/parquet/ParquetFilterSuite.scala | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 252523130cbe..9ffbf0d7c95a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -270,9 +270,10 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) { case sources.Not(pred) => createFilter(schema, pred).map(FilterApi.not) - case sources.In(name, values) if canMakeFilterOn(name) && values.length < 100 => - val conditions = values.flatMap(v => makeEq.lift(nameToType(name)).map(_(name, v))) - Some(conditions.reduceLeft(FilterApi.or)) + case sources.In(name, values) if canMakeFilterOn(name) && values.length < 20 => + values.flatMap { v => + makeEq.lift(nameToType(name)).map(_(name, v)) + }.reduceLeftOption(FilterApi.or) case _ => None } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 283e7f0de528..28342415040f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -685,8 +685,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex parquetFilters.createFilter(schema, sources.In("a", Array(10, 20, 30))) } - assert(parquetFilters.createFilter(schema, sources.In("a", Range(1, 100).toArray)).isDefined) - assert(parquetFilters.createFilter(schema, sources.In("a", Range(1, 101).toArray)).isEmpty) + assert(parquetFilters.createFilter(schema, sources.In("a", Range(1, 20).toArray)).isDefined) + assert(parquetFilters.createFilter(schema, sources.In("a", Range(1, 21).toArray)).isEmpty) } } From b9b3160061ef1e17ae32599ed9fbcfd44b0565b4 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 22 Jun 2018 12:33:11 +0800 Subject: [PATCH 3/7] Add spark.sql.parquet.pushdown.inFilterThreshold --- .../apache/spark/sql/internal/SQLConf.scala | 14 ++++++++ .../parquet/ParquetFileFormat.scala | 12 +++---- .../datasources/parquet/ParquetFilters.scala | 7 ++-- .../parquet/ParquetFilterSuite.scala | 35 +++++++++++++++++-- 4 files changed, 55 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8d2320d8a6ed..8d07a88a72f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -378,6 +378,17 @@ object SQLConf { .booleanConf .createWithDefault(true) + val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD = + buildConf("spark.sql.parquet.pushdown.inFilterThreshold") + .doc("The maximum number of values to filter push-down optimization for IN predicate. " + + "Large threshold will not provide much better performance. " + + "This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.") + .internal() + .intConf + .checkValue(threshold => threshold > 0 && threshold <= 300, + "The threshold must be greater than 0 and less than 300.") + .createWithDefault(10) + val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat") .doc("Whether to be compatible with the legacy Parquet format adopted by Spark 1.4 and prior " + "versions, when converting Parquet schema to Spark SQL schema and vice versa.") @@ -1420,6 +1431,9 @@ class SQLConf extends Serializable with Logging { def parquetFilterPushDownDate: Boolean = getConf(PARQUET_FILTER_PUSHDOWN_DATE_ENABLED) + def parquetFilterPushDownInFilterThreshold: Int = + getConf(PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD) + def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED) def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 60fc9ec7e1f8..7c71f64e11ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -335,16 +335,14 @@ class ParquetFileFormat val enableVectorizedReader: Boolean = sqlConf.parquetVectorizedReaderEnabled && resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) - val enableRecordFilter: Boolean = - sparkSession.sessionState.conf.parquetRecordFilterEnabled - val timestampConversion: Boolean = - sparkSession.sessionState.conf.isParquetINT96TimestampConversion + val enableRecordFilter: Boolean = sqlConf.parquetRecordFilterEnabled + val timestampConversion: Boolean = sqlConf.isParquetINT96TimestampConversion val capacity = sqlConf.parquetVectorizedReaderBatchSize - val enableParquetFilterPushDown: Boolean = - sparkSession.sessionState.conf.parquetFilterPushDown + val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown // Whole stage codegen (PhysicalRDD) is able to deal with batches directly val returningBatch = supportBatch(sparkSession, resultSchema) val pushDownDate = sqlConf.parquetFilterPushDownDate + val inThreshold = sqlConf.parquetFilterPushDownInFilterThreshold (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) @@ -355,7 +353,7 @@ class ParquetFileFormat // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. - .flatMap(new ParquetFilters(pushDownDate).createFilter(requiredSchema, _)) + .flatMap(new ParquetFilters(pushDownDate, inThreshold).createFilter(requiredSchema, _)) .reduceOption(FilterApi.and) } else { None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 9ffbf0d7c95a..3ce27636ba11 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.types._ /** * Some utility function to convert Spark data source filters to Parquet filters. */ -private[parquet] class ParquetFilters(pushDownDate: Boolean) { +private[parquet] class ParquetFilters(pushDownDate: Boolean, inFilterThreshold: Int) { private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) @@ -270,8 +270,9 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean) { case sources.Not(pred) => createFilter(schema, pred).map(FilterApi.not) - case sources.In(name, values) if canMakeFilterOn(name) && values.length < 20 => - values.flatMap { v => + case sources.In(name, values) + if canMakeFilterOn(name) && values.distinct.length <= inFilterThreshold => + values.distinct.flatMap { v => makeEq.lift(nameToType(name)).map(_(name, v)) }.reduceLeftOption(FilterApi.or) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 28342415040f..289c14cf9e87 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -55,7 +55,8 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} */ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { - private lazy val parquetFilters = new ParquetFilters(conf.parquetFilterPushDownDate) + private lazy val parquetFilters = + new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownInFilterThreshold) override def beforeEach(): Unit = { super.beforeEach() @@ -666,10 +667,19 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex StructField("a", IntegerType, nullable = false) )) + assertResult(Some(FilterApi.eq(intColumn("a"), null: Integer))) { + parquetFilters.createFilter(schema, sources.In("a", Array(null))) + } + assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) { parquetFilters.createFilter(schema, sources.In("a", Array(10))) } + // Remove duplicates + assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) { + parquetFilters.createFilter(schema, sources.In("a", Array(10, 10))) + } + assertResult(Some(or( FilterApi.eq(intColumn("a"), 10: Integer), FilterApi.eq(intColumn("a"), 20: Integer))) @@ -685,8 +695,27 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex parquetFilters.createFilter(schema, sources.In("a", Array(10, 20, 30))) } - assert(parquetFilters.createFilter(schema, sources.In("a", Range(1, 20).toArray)).isDefined) - assert(parquetFilters.createFilter(schema, sources.In("a", Range(1, 21).toArray)).isEmpty) + assert(parquetFilters.createFilter(schema, sources.In("a", + Range(0, conf.parquetFilterPushDownInFilterThreshold).toArray)).isDefined) + assert(parquetFilters.createFilter(schema, sources.In("a", + Range(0, conf.parquetFilterPushDownInFilterThreshold + 1).toArray)).isEmpty) + + import testImplicits._ + withTempPath { path => + (0 to 1024).toDF("a").coalesce(1) + .write.option("parquet.block.size", 512) + .parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + Seq(true, false).foreach { pushEnabled => + withSQLConf( + SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushEnabled.toString) { + Seq(1, 5, 10, 11, 1000).foreach { count => + assert(df.where(s"a in(${Range(0, count).mkString(",")})").count() === count) + } + assert(df.where(s"a in(null)").count() === 0) + } + } + } } } From d57f44c8142c7024f98c8e55bb07a7445a063d11 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 22 Jun 2018 20:19:18 +0800 Subject: [PATCH 4/7] ParquetFilters SQLConf.get --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../datasources/parquet/ParquetFileFormat.scala | 4 +--- .../datasources/parquet/ParquetFilters.scala | 10 ++++++++-- .../datasources/parquet/ParquetFilterSuite.scala | 11 ++++++----- 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8d07a88a72f4..ea0daeab24f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -381,12 +381,12 @@ object SQLConf { val PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD = buildConf("spark.sql.parquet.pushdown.inFilterThreshold") .doc("The maximum number of values to filter push-down optimization for IN predicate. " + - "Large threshold will not provide much better performance. " + + "Large threshold won't necessarily provide much better performance. " + + "The experiment argued that 300 is the limit threshold. " + "This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.") .internal() .intConf - .checkValue(threshold => threshold > 0 && threshold <= 300, - "The threshold must be greater than 0 and less than 300.") + .checkValue(threshold => threshold > 0, "The threshold must be greater than 0.") .createWithDefault(10) val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 7c71f64e11ab..21c4a6528cea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -341,8 +341,6 @@ class ParquetFileFormat val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown // Whole stage codegen (PhysicalRDD) is able to deal with batches directly val returningBatch = supportBatch(sparkSession, resultSchema) - val pushDownDate = sqlConf.parquetFilterPushDownDate - val inThreshold = sqlConf.parquetFilterPushDownInFilterThreshold (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) @@ -353,7 +351,7 @@ class ParquetFileFormat // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. - .flatMap(new ParquetFilters(pushDownDate, inThreshold).createFilter(requiredSchema, _)) + .flatMap(new ParquetFilters().createFilter(requiredSchema, _)) .reduceOption(FilterApi.and) } else { None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 3ce27636ba11..5474ff0f9640 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -25,13 +25,19 @@ import org.apache.parquet.io.api.Binary import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources import org.apache.spark.sql.types._ /** * Some utility function to convert Spark data source filters to Parquet filters. */ -private[parquet] class ParquetFilters(pushDownDate: Boolean, inFilterThreshold: Int) { +private[parquet] class ParquetFilters { + + val sqlConf: SQLConf = SQLConf.get + + val pushDownDate = sqlConf.parquetFilterPushDownDate + val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) @@ -271,7 +277,7 @@ private[parquet] class ParquetFilters(pushDownDate: Boolean, inFilterThreshold: createFilter(schema, pred).map(FilterApi.not) case sources.In(name, values) - if canMakeFilterOn(name) && values.distinct.length <= inFilterThreshold => + if canMakeFilterOn(name) && values.distinct.length <= pushDownInFilterThreshold => values.distinct.flatMap { v => makeEq.lift(nameToType(name)).map(_(name, v)) }.reduceLeftOption(FilterApi.or) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 289c14cf9e87..a7df0ac8d780 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -55,8 +55,7 @@ import org.apache.spark.util.{AccumulatorContext, AccumulatorV2} */ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContext { - private lazy val parquetFilters = - new ParquetFilters(conf.parquetFilterPushDownDate, conf.parquetFilterPushDownInFilterThreshold) + private lazy val parquetFilters = new ParquetFilters() override def beforeEach(): Unit = { super.beforeEach() @@ -702,8 +701,8 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex import testImplicits._ withTempPath { path => - (0 to 1024).toDF("a").coalesce(1) - .write.option("parquet.block.size", 512) + (0 to 1024).toDF("a").selectExpr("if (a = 1024, null, a) AS a") // convert 1024 to null + .coalesce(1).write.option("parquet.block.size", 512) .parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) Seq(true, false).foreach { pushEnabled => @@ -712,7 +711,9 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex Seq(1, 5, 10, 11, 1000).foreach { count => assert(df.where(s"a in(${Range(0, count).mkString(",")})").count() === count) } - assert(df.where(s"a in(null)").count() === 0) + assert(df.where("a in(null)").count() === 0) + assert(df.where("a = null").count() === 0) + assert(df.where("a is null").count() === 1) } } } From 82185965b4c8cf7f6a802125e3145ac9072fec29 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 25 Jun 2018 11:15:54 +0800 Subject: [PATCH 5/7] DataTypes that support `makeEq` can convert. --- .../datasources/parquet/ParquetFilters.scala | 11 +++++++++-- .../datasources/parquet/ParquetFilterSuite.scala | 5 +++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 5474ff0f9640..275bc32c66c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -213,6 +213,13 @@ private[parquet] class ParquetFilters { // See SPARK-20364. def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && !name.contains(".") + // All DataTypes that support `makeEq` can provide better performance. + def shouldConvertInPredicate(name: String): Boolean = nameToType(name) match { + case IntegerType | LongType | FloatType | DoubleType | StringType | BinaryType => true + case DateType if pushDownDate => true + case _ => false + } + // NOTE: // // For any comparison operator `cmp`, both `a cmp NULL` and `NULL cmp a` evaluate to `NULL`, @@ -276,8 +283,8 @@ private[parquet] class ParquetFilters { case sources.Not(pred) => createFilter(schema, pred).map(FilterApi.not) - case sources.In(name, values) - if canMakeFilterOn(name) && values.distinct.length <= pushDownInFilterThreshold => + case sources.In(name, values) if canMakeFilterOn(name) && shouldConvertInPredicate(name) + && values.distinct.length <= pushDownInFilterThreshold => values.distinct.flatMap { v => makeEq.lift(nameToType(name)).map(_(name, v)) }.reduceLeftOption(FilterApi.or) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index a7df0ac8d780..434c332cda56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -698,6 +698,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex Range(0, conf.parquetFilterPushDownInFilterThreshold).toArray)).isDefined) assert(parquetFilters.createFilter(schema, sources.In("a", Range(0, conf.parquetFilterPushDownInFilterThreshold + 1).toArray)).isEmpty) + // These DataType can't provide better performance. + Seq(DecimalType.DoubleDecimal, TimestampType, BooleanType).foreach { t => + val s = StructType(Seq(StructField("a", t))) + assert(parquetFilters.createFilter(s, sources.In("a", Array(1))).isEmpty) + } import testImplicits._ withTempPath { path => From 5e748f99995a0c298c9389561bc5f9feb15a4ca0 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 6 Jul 2018 14:14:43 +0800 Subject: [PATCH 6/7] Update benchmark result. --- .../FilterPushdownBenchmark-results.txt | 96 +++++++++---------- .../datasources/parquet/ParquetFilters.scala | 5 +- .../parquet/ParquetFilterSuite.scala | 21 ++-- 3 files changed, 60 insertions(+), 62 deletions(-) diff --git a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt index 29fe4345d69d..980754272603 100644 --- a/sql/core/benchmarks/FilterPushdownBenchmark-results.txt +++ b/sql/core/benchmarks/FilterPushdownBenchmark-results.txt @@ -417,120 +417,120 @@ Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 5, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 7477 / 7587 2.1 475.4 1.0X -Parquet Vectorized (Pushdown) 7862 / 8346 2.0 499.9 1.0X -Native ORC Vectorized 6447 / 7021 2.4 409.9 1.2X -Native ORC Vectorized (Pushdown) 983 / 1003 16.0 62.5 7.6X +Parquet Vectorized 7993 / 8104 2.0 508.2 1.0X +Parquet Vectorized (Pushdown) 507 / 532 31.0 32.2 15.8X +Native ORC Vectorized 6922 / 7163 2.3 440.1 1.2X +Native ORC Vectorized (Pushdown) 1017 / 1058 15.5 64.6 7.9X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 5, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 7107 / 7290 2.2 451.9 1.0X -Parquet Vectorized (Pushdown) 7196 / 7258 2.2 457.5 1.0X -Native ORC Vectorized 6102 / 6222 2.6 388.0 1.2X -Native ORC Vectorized (Pushdown) 926 / 958 17.0 58.9 7.7X +Parquet Vectorized 7855 / 7963 2.0 499.4 1.0X +Parquet Vectorized (Pushdown) 503 / 516 31.3 32.0 15.6X +Native ORC Vectorized 6825 / 6954 2.3 433.9 1.2X +Native ORC Vectorized (Pushdown) 1019 / 1044 15.4 64.8 7.7X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 5, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 7374 / 7692 2.1 468.8 1.0X -Parquet Vectorized (Pushdown) 7771 / 7848 2.0 494.1 0.9X -Native ORC Vectorized 6184 / 6356 2.5 393.2 1.2X -Native ORC Vectorized (Pushdown) 920 / 963 17.1 58.5 8.0X +Parquet Vectorized 7858 / 7928 2.0 499.6 1.0X +Parquet Vectorized (Pushdown) 490 / 519 32.1 31.1 16.0X +Native ORC Vectorized 7079 / 7966 2.2 450.1 1.1X +Native ORC Vectorized (Pushdown) 1276 / 1673 12.3 81.1 6.2X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 10, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 7073 / 7326 2.2 449.7 1.0X -Parquet Vectorized (Pushdown) 7304 / 7647 2.2 464.4 1.0X -Native ORC Vectorized 6222 / 6579 2.5 395.6 1.1X -Native ORC Vectorized (Pushdown) 958 / 994 16.4 60.9 7.4X +Parquet Vectorized 8007 / 11155 2.0 509.0 1.0X +Parquet Vectorized (Pushdown) 519 / 540 30.3 33.0 15.4X +Native ORC Vectorized 6848 / 7072 2.3 435.4 1.2X +Native ORC Vectorized (Pushdown) 1026 / 1050 15.3 65.2 7.8X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 10, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 7121 / 7501 2.2 452.7 1.0X -Parquet Vectorized (Pushdown) 7751 / 8334 2.0 492.8 0.9X -Native ORC Vectorized 6225 / 6680 2.5 395.8 1.1X -Native ORC Vectorized (Pushdown) 998 / 1020 15.8 63.5 7.1X +Parquet Vectorized 7876 / 7956 2.0 500.7 1.0X +Parquet Vectorized (Pushdown) 521 / 535 30.2 33.1 15.1X +Native ORC Vectorized 7051 / 7368 2.2 448.3 1.1X +Native ORC Vectorized (Pushdown) 1014 / 1035 15.5 64.5 7.8X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 10, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 7157 / 7399 2.2 455.1 1.0X -Parquet Vectorized (Pushdown) 7806 / 7911 2.0 496.3 0.9X -Native ORC Vectorized 6548 / 6720 2.4 416.3 1.1X -Native ORC Vectorized (Pushdown) 1016 / 1050 15.5 64.6 7.0X +Parquet Vectorized 7897 / 8229 2.0 502.1 1.0X +Parquet Vectorized (Pushdown) 513 / 530 30.7 32.6 15.4X +Native ORC Vectorized 6730 / 6990 2.3 427.9 1.2X +Native ORC Vectorized (Pushdown) 1003 / 1036 15.7 63.8 7.9X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 50, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 7662 / 7805 2.1 487.1 1.0X -Parquet Vectorized (Pushdown) 7590 / 7861 2.1 482.5 1.0X -Native ORC Vectorized 6840 / 8073 2.3 434.9 1.1X -Native ORC Vectorized (Pushdown) 1041 / 1075 15.1 66.2 7.4X +Parquet Vectorized 7967 / 8175 2.0 506.5 1.0X +Parquet Vectorized (Pushdown) 8155 / 8434 1.9 518.5 1.0X +Native ORC Vectorized 7002 / 7107 2.2 445.2 1.1X +Native ORC Vectorized (Pushdown) 1092 / 1139 14.4 69.4 7.3X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 50, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 8230 / 9266 1.9 523.2 1.0X -Parquet Vectorized (Pushdown) 7735 / 7960 2.0 491.8 1.1X -Native ORC Vectorized 6945 / 7109 2.3 441.6 1.2X -Native ORC Vectorized (Pushdown) 1123 / 1144 14.0 71.4 7.3X +Parquet Vectorized 8032 / 8122 2.0 510.7 1.0X +Parquet Vectorized (Pushdown) 8141 / 8908 1.9 517.6 1.0X +Native ORC Vectorized 7140 / 7387 2.2 454.0 1.1X +Native ORC Vectorized (Pushdown) 1156 / 1220 13.6 73.5 6.9X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 50, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 7656 / 8058 2.1 486.7 1.0X -Parquet Vectorized (Pushdown) 7860 / 8247 2.0 499.7 1.0X -Native ORC Vectorized 6684 / 7003 2.4 424.9 1.1X -Native ORC Vectorized (Pushdown) 1085 / 1172 14.5 69.0 7.1X +Parquet Vectorized 8088 / 8350 1.9 514.2 1.0X +Parquet Vectorized (Pushdown) 8629 / 8702 1.8 548.6 0.9X +Native ORC Vectorized 7480 / 7886 2.1 475.6 1.1X +Native ORC Vectorized (Pushdown) 1106 / 1145 14.2 70.3 7.3X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 100, distribution: 10): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 7594 / 8128 2.1 482.8 1.0X -Parquet Vectorized (Pushdown) 7845 / 7923 2.0 498.8 1.0X -Native ORC Vectorized 5859 / 6421 2.7 372.5 1.3X -Native ORC Vectorized (Pushdown) 1037 / 1054 15.2 66.0 7.3X +Parquet Vectorized 8028 / 8165 2.0 510.4 1.0X +Parquet Vectorized (Pushdown) 8349 / 8674 1.9 530.8 1.0X +Native ORC Vectorized 7107 / 7354 2.2 451.8 1.1X +Native ORC Vectorized (Pushdown) 1175 / 1207 13.4 74.7 6.8X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 100, distribution: 50): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 6762 / 6775 2.3 429.9 1.0X -Parquet Vectorized (Pushdown) 6911 / 6970 2.3 439.4 1.0X -Native ORC Vectorized 5884 / 5960 2.7 374.1 1.1X -Native ORC Vectorized (Pushdown) 1028 / 1052 15.3 65.4 6.6X +Parquet Vectorized 8041 / 8195 2.0 511.2 1.0X +Parquet Vectorized (Pushdown) 8466 / 8604 1.9 538.2 0.9X +Native ORC Vectorized 7116 / 7286 2.2 452.4 1.1X +Native ORC Vectorized (Pushdown) 1197 / 1214 13.1 76.1 6.7X Java HotSpot(TM) 64-Bit Server VM 1.8.0_151-b12 on Mac OS X 10.12.6 Intel(R) Core(TM) i7-7820HQ CPU @ 2.90GHz InSet -> InFilters (values count: 100, distribution: 90): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ -Parquet Vectorized 6718 / 6767 2.3 427.1 1.0X -Parquet Vectorized (Pushdown) 6812 / 6909 2.3 433.1 1.0X -Native ORC Vectorized 5842 / 5883 2.7 371.4 1.1X -Native ORC Vectorized (Pushdown) 1040 / 1058 15.1 66.1 6.5X +Parquet Vectorized 7998 / 8311 2.0 508.5 1.0X +Parquet Vectorized (Pushdown) 9366 / 11257 1.7 595.5 0.9X +Native ORC Vectorized 7856 / 9273 2.0 499.5 1.0X +Native ORC Vectorized (Pushdown) 1350 / 1747 11.7 85.8 5.9X ================================================================================================ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 8548c65eaf5d..07510cbbded8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -227,8 +227,9 @@ private[parquet] class ParquetFilters( // All DataTypes that support `makeEq` can provide better performance. def shouldConvertInPredicate(name: String): Boolean = nameToType(name) match { - case IntegerType | LongType | FloatType | DoubleType | StringType | BinaryType => true - case DateType if pushDownDate => true + case ParquetBooleanType | ParquetIntegerType | ParquetLongType | ParquetFloatType | + ParquetDoubleType | ParquetStringType | ParquetBinaryType => true + case ParquetDateType if pushDownDate => true case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 5f10b4c0b1b3..ba1e986787a1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -754,24 +754,26 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex StructField("a", IntegerType, nullable = false) )) + val parquetSchema = new SparkToParquetSchemaConverter(conf).convert(schema) + assertResult(Some(FilterApi.eq(intColumn("a"), null: Integer))) { - parquetFilters.createFilter(schema, sources.In("a", Array(null))) + parquetFilters.createFilter(parquetSchema, sources.In("a", Array(null))) } assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) { - parquetFilters.createFilter(schema, sources.In("a", Array(10))) + parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10))) } // Remove duplicates assertResult(Some(FilterApi.eq(intColumn("a"), 10: Integer))) { - parquetFilters.createFilter(schema, sources.In("a", Array(10, 10))) + parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 10))) } assertResult(Some(or( FilterApi.eq(intColumn("a"), 10: Integer), FilterApi.eq(intColumn("a"), 20: Integer))) ) { - parquetFilters.createFilter(schema, sources.In("a", Array(10, 20))) + parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 20))) } assertResult(Some(or(or( @@ -779,18 +781,13 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex FilterApi.eq(intColumn("a"), 20: Integer)), FilterApi.eq(intColumn("a"), 30: Integer))) ) { - parquetFilters.createFilter(schema, sources.In("a", Array(10, 20, 30))) + parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 20, 30))) } - assert(parquetFilters.createFilter(schema, sources.In("a", + assert(parquetFilters.createFilter(parquetSchema, sources.In("a", Range(0, conf.parquetFilterPushDownInFilterThreshold).toArray)).isDefined) - assert(parquetFilters.createFilter(schema, sources.In("a", + assert(parquetFilters.createFilter(parquetSchema, sources.In("a", Range(0, conf.parquetFilterPushDownInFilterThreshold + 1).toArray)).isEmpty) - // These DataType can't provide better performance. - Seq(DecimalType.DoubleDecimal, TimestampType, BooleanType).foreach { t => - val s = StructType(Seq(StructField("a", t))) - assert(parquetFilters.createFilter(s, sources.In("a", Array(1))).isEmpty) - } import testImplicits._ withTempPath { path => From c386e02c8bfbf1eb43dcf95717a9e2ea30123d57 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 14 Jul 2018 08:51:37 +0800 Subject: [PATCH 7/7] Improvement test --- .../apache/spark/sql/internal/SQLConf.scala | 3 ++- .../parquet/ParquetFileFormat.scala | 6 +++--- .../datasources/parquet/ParquetFilters.scala | 5 +++-- .../parquet/ParquetFilterSuite.scala | 21 ++++++++++--------- 4 files changed, 19 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 692f5f633f92..699e9394f5be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -391,10 +391,11 @@ object SQLConf { .doc("The maximum number of values to filter push-down optimization for IN predicate. " + "Large threshold won't necessarily provide much better performance. " + "The experiment argued that 300 is the limit threshold. " + + "By setting this value to 0 this feature can be disabled. " + "This configuration only has an effect when 'spark.sql.parquet.filterPushdown' is enabled.") .internal() .intConf - .checkValue(threshold => threshold > 0, "The threshold must be greater than 0.") + .checkValue(threshold => threshold >= 0, "The threshold must not be negative.") .createWithDefault(10) val PARQUET_WRITE_LEGACY_FORMAT = buildConf("spark.sql.parquet.writeLegacyFormat") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 702c67348268..efddf8d68eb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -366,13 +366,13 @@ class ParquetFileFormat val pushed = if (enableParquetFilterPushDown) { val parquetSchema = ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS) .getFileMetaData.getSchema + val parquetFilters = new ParquetFilters(pushDownDate, + pushDownStringStartWith, pushDownInFilterThreshold) filters // Collects all converted Parquet filter predicates. Notice that not all predicates can be // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. - .flatMap(new ParquetFilters(pushDownDate, pushDownStringStartWith, - pushDownInFilterThreshold) - .createFilter(parquetSchema, _)) + .flatMap(parquetFilters.createFilter(parquetSchema, _)) .reduceOption(FilterApi.and) } else { None diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 0f34d744b0a1..e590c153c415 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -237,8 +237,9 @@ private[parquet] class ParquetFilters( // All DataTypes that support `makeEq` can provide better performance. def shouldConvertInPredicate(name: String): Boolean = nameToType(name) match { - case ParquetBooleanType | ParquetIntegerType | ParquetLongType | ParquetFloatType | - ParquetDoubleType | ParquetStringType | ParquetBinaryType => true + case ParquetBooleanType | ParquetByteType | ParquetShortType | ParquetIntegerType + | ParquetLongType | ParquetFloatType | ParquetDoubleType | ParquetStringType + | ParquetBinaryType => true case ParquetDateType if pushDownDate => true case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 02dbdea0665f..00c191f75552 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -825,13 +825,6 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 10))) } - assertResult(Some(or( - FilterApi.eq(intColumn("a"), 10: Integer), - FilterApi.eq(intColumn("a"), 20: Integer))) - ) { - parquetFilters.createFilter(parquetSchema, sources.In("a", Array(10, 20))) - } - assertResult(Some(or(or( FilterApi.eq(intColumn("a"), 10: Integer), FilterApi.eq(intColumn("a"), 20: Integer)), @@ -847,15 +840,23 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex import testImplicits._ withTempPath { path => - (0 to 1024).toDF("a").selectExpr("if (a = 1024, null, a) AS a") // convert 1024 to null + val data = 0 to 1024 + data.toDF("a").selectExpr("if (a = 1024, null, a) AS a") // convert 1024 to null .coalesce(1).write.option("parquet.block.size", 512) .parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath) Seq(true, false).foreach { pushEnabled => withSQLConf( SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushEnabled.toString) { - Seq(1, 5, 10, 11, 1000).foreach { count => - assert(df.where(s"a in(${Range(0, count).mkString(",")})").count() === count) + Seq(1, 5, 10, 11).foreach { count => + val filter = s"a in(${Range(0, count).mkString(",")})" + assert(df.where(filter).count() === count) + val actual = stripSparkFilter(df.where(filter)).collect().length + if (pushEnabled && count <= conf.parquetFilterPushDownInFilterThreshold) { + assert(actual > 1 && actual < data.length) + } else { + assert(actual === data.length) + } } assert(df.where("a in(null)").count() === 0) assert(df.where("a = null").count() === 0)