Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,14 @@ private[parquet] object ParquetFilters {
* Converts data sources filters to Parquet filter predicates.
*/
def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = {
val dataTypeOf = getFieldMap(schema)
val nameToType = getFieldMap(schema)

// Parquet does not allow dots in the column name because dots are used as a column path
// delimiter. Since Parquet 1.8.2 (PARQUET-389), Parquet accepts the filter predicates
// with missing columns. The incorrect results could be got from Parquet when we push down
// filters for the column having dots in the names. Thus, we do not push down such filters.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that a missing column is treated like a NULL value. The results will be changed only for some predicates, e.g, IsNull and IsNotNull. For other predicates, can we still push down them?

Copy link
Member Author

@HyukjinKwon HyukjinKwon May 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the problem is, it (almost) always evaluates it with NULL when the columns have dots in the names because column paths become nested (a.b not `a.b`) in the Parquet predicate filter up to my knowledge.

You are right for IsNull. I pointed out this in #17680 (comment) as it looks they (almost) always evaluate it to true in Parquet-side but it is filtered in Spark-side. So, for input/output, it is not an issue in this case but I believe we should disable this for this case too.

I think this example explains the case

val dfs = Seq(
  Seq(Some(1), None).toDF("col.dots"),
  Seq(Some(1L), None).toDF("col.dots"),
  Seq(Some(1.0F), None).toDF("col.dots"),
  Seq(Some(1.0D), None).toDF("col.dots"),
  Seq(true, false).toDF("col.dots"),
  Seq("apple", null).toDF("col.dots"),
  Seq("apple", null).toDF("col.dots")
)
 
val predicates = Seq(
  "`col.dots` > 0",
  "`col.dots` >= 1L",
  "`col.dots` < 2.0",
  "`col.dots` <= 1.0D",
  "`col.dots` == true",
  "`col.dots` IS NOT NULL",
  "`col.dots` IS NULL"
)

dfs.zip(predicates).zipWithIndex.foreach { case ((df, predicate), i) =>
  val path = s"/tmp/abcd$i"
  df.write.mode("overwrite").parquet(path)
  spark.read.parquet(path).where(predicate).show()	
}
+--------+
|col.dots|
+--------+
+--------+

+--------+
|col.dots|
+--------+
+--------+

+--------+
|col.dots|
+--------+
+--------+

+--------+
|col.dots|
+--------+
+--------+

+--------+
|col.dots|
+--------+
+--------+

+--------+
|col.dots|
+--------+
+--------+

+--------+
|col.dots|
+--------+
|    null|
+--------+

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any way, I believe we should disable because it appears the pushed Parquet filter indicates another column.

// See SPARK-20364.
def canMakeFilterOn(name: String): Boolean = nameToType.contains(name) && !name.contains(".")

// NOTE:
//
Expand All @@ -184,30 +191,30 @@ private[parquet] object ParquetFilters {
// Probably I missed something and obviously this should be changed.

predicate match {
case sources.IsNull(name) if dataTypeOf.contains(name) =>
makeEq.lift(dataTypeOf(name)).map(_(name, null))
case sources.IsNotNull(name) if dataTypeOf.contains(name) =>
makeNotEq.lift(dataTypeOf(name)).map(_(name, null))

case sources.EqualTo(name, value) if dataTypeOf.contains(name) =>
makeEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.Not(sources.EqualTo(name, value)) if dataTypeOf.contains(name) =>
makeNotEq.lift(dataTypeOf(name)).map(_(name, value))

case sources.EqualNullSafe(name, value) if dataTypeOf.contains(name) =>
makeEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.Not(sources.EqualNullSafe(name, value)) if dataTypeOf.contains(name) =>
makeNotEq.lift(dataTypeOf(name)).map(_(name, value))

case sources.LessThan(name, value) if dataTypeOf.contains(name) =>
makeLt.lift(dataTypeOf(name)).map(_(name, value))
case sources.LessThanOrEqual(name, value) if dataTypeOf.contains(name) =>
makeLtEq.lift(dataTypeOf(name)).map(_(name, value))

case sources.GreaterThan(name, value) if dataTypeOf.contains(name) =>
makeGt.lift(dataTypeOf(name)).map(_(name, value))
case sources.GreaterThanOrEqual(name, value) if dataTypeOf.contains(name) =>
makeGtEq.lift(dataTypeOf(name)).map(_(name, value))
case sources.IsNull(name) if canMakeFilterOn(name) =>
makeEq.lift(nameToType(name)).map(_(name, null))
case sources.IsNotNull(name) if canMakeFilterOn(name) =>
makeNotEq.lift(nameToType(name)).map(_(name, null))

case sources.EqualTo(name, value) if canMakeFilterOn(name) =>
makeEq.lift(nameToType(name)).map(_(name, value))
case sources.Not(sources.EqualTo(name, value)) if canMakeFilterOn(name) =>
makeNotEq.lift(nameToType(name)).map(_(name, value))

case sources.EqualNullSafe(name, value) if canMakeFilterOn(name) =>
makeEq.lift(nameToType(name)).map(_(name, value))
case sources.Not(sources.EqualNullSafe(name, value)) if canMakeFilterOn(name) =>
makeNotEq.lift(nameToType(name)).map(_(name, value))

case sources.LessThan(name, value) if canMakeFilterOn(name) =>
makeLt.lift(nameToType(name)).map(_(name, value))
case sources.LessThanOrEqual(name, value) if canMakeFilterOn(name) =>
makeLtEq.lift(nameToType(name)).map(_(name, value))

case sources.GreaterThan(name, value) if canMakeFilterOn(name) =>
makeGt.lift(nameToType(name)).map(_(name, value))
case sources.GreaterThanOrEqual(name, value) if canMakeFilterOn(name) =>
makeGtEq.lift(nameToType(name)).map(_(name, value))

case sources.And(lhs, rhs) =>
// At here, it is not safe to just convert one side if we do not understand the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,21 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// scalastyle:on nonascii
}
}

test("SPARK-20364: Disable Parquet predicate pushdown for fields having dots in the names") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much better now.

import testImplicits._

Seq(true, false).foreach { vectorized =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString,
SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> true.toString) {
withTempPath { path =>
Seq(Some(1), None).toDF("col.dots").write.parquet(path.getAbsolutePath)
val readBack = spark.read.parquet(path.getAbsolutePath).where("`col.dots` IS NOT NULL")
assert(readBack.count() == 1)
}
}
}
}
}

class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
Expand Down