Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SchemaRDD}
/**
* A test suite that tests Parquet filter2 API based filter pushdown optimization.
*
* Notice that `!(a cmp b)` are always transformed to its negated form `a cmp' b` by the
* `BooleanSimplification` optimization rule whenever possible. As a result, predicate `!(a < 1)`
* results a `GtEq` filter predicate rather than a `Not`.
* NOTE:
*
* @todo Add test cases for `IsNull` and `IsNotNull` after merging PR #3367
* 1. `!(a cmp b)` is always transformed to its negated form `a cmp' b` by the
* `BooleanSimplification` optimization rule whenever possible. As a result, predicate `!(a < 1)`
* results in a `GtEq` filter predicate rather than a `Not`.
*
* 2. `Tuple1(Option(x))` is used together with `AnyVal` types like `Int` to ensure the inferred
* data type is nullable.
*/
class ParquetFilterSuite extends QueryTest with ParquetTest {
val sqlContext = TestSQLContext
Expand Down Expand Up @@ -85,14 +88,26 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
}

test("filter pushdown - boolean") {
withParquetRDD((true :: false :: Nil).map(Tuple1.apply)) { rdd =>
withParquetRDD((true :: false :: Nil).map(b => Tuple1.apply(Option(b)))) { rdd =>
checkFilterPushdown(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Boolean]])(Seq.empty[Row])
checkFilterPushdown(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Boolean]]) {
Seq(Row(true), Row(false))
}

checkFilterPushdown(rdd, '_1)('_1 === true, classOf[Eq[java.lang.Boolean]])(true)
checkFilterPushdown(rdd, '_1)('_1 !== true, classOf[Operators.NotEq[java.lang.Boolean]])(false)
checkFilterPushdown(rdd, '_1)('_1 !== true, classOf[Operators.NotEq[java.lang.Boolean]]) {
false
}
}
}

test("filter pushdown - integer") {
withParquetRDD((1 to 4).map(Tuple1.apply)) { rdd =>
withParquetRDD((1 to 4).map(i => Tuple1(Option(i)))) { rdd =>
checkFilterPushdown(rdd, '_1)('_1.isNull, classOf[Eq[Integer]])(Seq.empty[Row])
checkFilterPushdown(rdd, '_1)('_1.isNotNull, classOf[NotEq[Integer]]) {
(1 to 4).map(Row.apply(_))
}

checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[Integer]])(1)
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[Integer]]) {
(2 to 4).map(Row.apply(_))
Expand All @@ -118,7 +133,12 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
}

test("filter pushdown - long") {
withParquetRDD((1 to 4).map(i => Tuple1(i.toLong))) { rdd =>
withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toLong)))) { rdd =>
checkFilterPushdown(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Long]])(Seq.empty[Row])
checkFilterPushdown(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Long]]) {
(1 to 4).map(Row.apply(_))
}

checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Long]])(1)
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Long]]) {
(2 to 4).map(Row.apply(_))
Expand All @@ -144,7 +164,12 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
}

test("filter pushdown - float") {
withParquetRDD((1 to 4).map(i => Tuple1(i.toFloat))) { rdd =>
withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toFloat)))) { rdd =>
checkFilterPushdown(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Float]])(Seq.empty[Row])
checkFilterPushdown(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Float]]) {
(1 to 4).map(Row.apply(_))
}

checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Float]])(1)
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Float]]) {
(2 to 4).map(Row.apply(_))
Expand All @@ -170,7 +195,12 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
}

test("filter pushdown - double") {
withParquetRDD((1 to 4).map(i => Tuple1(i.toDouble))) { rdd =>
withParquetRDD((1 to 4).map(i => Tuple1(Option(i.toDouble)))) { rdd =>
checkFilterPushdown(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.Double]])(Seq.empty[Row])
checkFilterPushdown(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.Double]]) {
(1 to 4).map(Row.apply(_))
}

checkFilterPushdown(rdd, '_1)('_1 === 1, classOf[Eq[java.lang.Double]])(1)
checkFilterPushdown(rdd, '_1)('_1 !== 1, classOf[Operators.NotEq[java.lang.Double]]) {
(2 to 4).map(Row.apply(_))
Expand All @@ -197,6 +227,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {

test("filter pushdown - string") {
withParquetRDD((1 to 4).map(i => Tuple1(i.toString))) { rdd =>
checkFilterPushdown(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.String]])(Seq.empty[Row])
checkFilterPushdown(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.String]]) {
(1 to 4).map(i => Row.apply(i.toString))
}

checkFilterPushdown(rdd, '_1)('_1 === "1", classOf[Eq[String]])("1")
checkFilterPushdown(rdd, '_1)('_1 !== "1", classOf[Operators.NotEq[String]]) {
(2 to 4).map(i => Row.apply(i.toString))
Expand Down Expand Up @@ -227,6 +262,11 @@ class ParquetFilterSuite extends QueryTest with ParquetTest {
}

withParquetRDD((1 to 4).map(i => Tuple1(i.b))) { rdd =>
checkBinaryFilterPushdown(rdd, '_1)('_1.isNull, classOf[Eq[java.lang.String]])(Seq.empty[Row])
checkBinaryFilterPushdown(rdd, '_1)('_1.isNotNull, classOf[NotEq[java.lang.String]]) {
(1 to 4).map(i => Row.apply(i.b)).toSeq
}

checkBinaryFilterPushdown(rdd, '_1)('_1 === 1.b, classOf[Eq[Array[Byte]]])(1.b)
checkBinaryFilterPushdown(rdd, '_1)('_1 !== 1.b, classOf[Operators.NotEq[Array[Byte]]]) {
(2 to 4).map(i => Row.apply(i.b)).toSeq
Expand Down