Skip to content

Commit 9f2851b

Browse files
committed
Address comments here first
1 parent 45ac9c7 commit 9f2851b

File tree

2 files changed

+25
-30
lines changed

2 files changed

+25
-30
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ private[parquet] object ParquetFilters {
243243
* Note that, this is a hacky workaround to allow dots in column names. Currently, column APIs
244244
* in Parquet's `FilterApi` only allows dot-separated names so here we resemble those columns
245245
* but only allow single column path that allows dots in the names as we don't currently push
246-
* down filters with nested fields.
246+
* down filters with nested fields. The functions in this object are based on
247+
* the codes in `org.apache.parquet.filter2.predicate`.
247248
*/
248249
private[parquet] object ParquetColumns {
249250
def intColumn(columnPath: String): Column[Integer] with SupportsLtGt = {

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -545,35 +545,29 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
545545

546546
Seq(true, false).foreach { vectorized =>
547547
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) {
548-
withTempPath { path =>
549-
Seq(Some(1), None).toDF("col.dots").write.parquet(path.getAbsolutePath)
550-
assert(spark.read.parquet(path.getAbsolutePath).where("`col.dots` > 0").count() == 1)
551-
}
552-
553-
withTempPath { path =>
554-
Seq(Some(1L), None).toDF("col.dots").write.parquet(path.getAbsolutePath)
555-
assert(spark.read.parquet(path.getAbsolutePath).where("`col.dots` >= 1L").count() == 1)
556-
}
557-
558-
withTempPath { path =>
559-
Seq(Some(1.0F), None).toDF("col.dots").write.parquet(path.getAbsolutePath)
560-
assert(spark.read.parquet(path.getAbsolutePath).where("`col.dots` < 2.0").count() == 1)
561-
}
562-
563-
withTempPath { path =>
564-
Seq(Some(1.0D), None).toDF("col.dots").write.parquet(path.getAbsolutePath)
565-
assert(spark.read.parquet(path.getAbsolutePath).where("`col.dots` <= 1.0D").count() == 1)
566-
}
567-
568-
withTempPath { path =>
569-
Seq(true, false).toDF("col.dots").write.parquet(path.getAbsolutePath)
570-
assert(spark.read.parquet(path.getAbsolutePath).where("`col.dots` == true").count() == 1)
571-
}
572-
573-
withTempPath { path =>
574-
Seq("apple", null).toDF("col.dots").write.parquet(path.getAbsolutePath)
575-
assert(
576-
spark.read.parquet(path.getAbsolutePath).where("`col.dots` IS NOT NULL").count() == 1)
548+
val dfs = Seq(
549+
Seq(Some(1), None).toDF("col.dots"),
550+
Seq(Some(1L), None).toDF("col.dots"),
551+
Seq(Some(1.0F), None).toDF("col.dots"),
552+
Seq(Some(1.0D), None).toDF("col.dots"),
553+
Seq(true, false).toDF("col.dots"),
554+
Seq("apple", null).toDF("col.dots")
555+
)
556+
557+
val predicates = Seq(
558+
"`col.dots` > 0",
559+
"`col.dots` >= 1L",
560+
"`col.dots` < 2.0",
561+
"`col.dots` <= 1.0D",
562+
"`col.dots` == true",
563+
"`col.dots` IS NOT NULL"
564+
)
565+
566+
dfs.zip(predicates).foreach { case (df, predicate) =>
567+
withTempPath { path =>
568+
df.write.parquet(path.getAbsolutePath)
569+
assert(spark.read.parquet(path.getAbsolutePath).where(predicate).count() == 1)
570+
}
577571
}
578572
}
579573
}

0 commit comments

Comments
 (0)