Skip to content

Conversation

@xwu0226
Copy link
Contributor

@xwu0226 xwu0226 commented Dec 5, 2016

What changes were proposed in this pull request?

When spark.sql.parquet.filterPushdown is on, SparkSQL's Parquet reader pushes down filters to Parquet file when creating reader, in order to start with filtered blocks. However, when the parquet file does not have the predicate column(s), Parquet-mr throw exceptions complaining the filter column does not existing. This issue will be fixed in parquet-mr 1.9. But Spark 2.1 is still on parquet 1.8.

This PR is to tolerate such exception thrown by Parquet-mr and just return all the blocks from the current parquet file to the created SparkSQL parquet reader. Filters will be applied again anyway in later physical plan operation. According to following example physical plan:

== Physical Plan ==
*Project [a#2805, b#2806, c#2807]
+- *Filter ((isnotnull(a#2805) && isnull(c#2807)) && (a#2805 < 2))
  +- *FileScan parquet [a#2805,b#2806,c#2807] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/Users/xinwu/spark/target/tmp/spark-ed6f0c12-6494-4ac5-b485-5b986ef475cc], PartitionFilters: [], PushedFilters: [IsNotNull(a), IsNull(c), LessThan(a,2)], ReadSchema: struct<a:int,b:string,c:int>

How was this patch tested?

A unit test case is added.

@xwu0226
Copy link
Contributor Author

xwu0226 commented Dec 5, 2016

@gatorsmile @liancheng Thanks!

// PARQUET-389 will resolve this issue in Parquet 1.9, which may be used
// by future Spark versions. This is a workaround for current Spark version.
// Also the assumption here is that the predicates will be applied later
blocks = footer.getBlocks();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a TODO item int the comment so that we won't forget to remove this after upgrading to 1.9? Thanks!

// TODO Remove this hack after upgrading to parquet-mr 1.9.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Will do. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about creating the JIRA issue first and writing the JIRA number here together in comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. We may just mention SPARK-18140 here. It's for upgrading parquet-mr to 1.9.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

test("SPARK-18539 - filtered by non-existing parquet column") {
Seq("parquet").foreach { format =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the parquet-only test case, we can just set format to parquet directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Thanks!

checkAnswer(readDf2.filter("c is null and a < 2"),
Row(1, "abc", null) :: Nil)

readDf2.filter("c is null and a < 2").explain(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove it. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. yeah. Forget to remove it. Will do. Thanks!


// Right now, there are 2 parquet files. one with the column "c",
// the other does not have it.
val readDf2 = spark.read.schema(schema).format(format).load(path.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, we don't enable Parquet schema merging. Could you please reorganize this test case into something like this to have both code paths tested?

Seq(true, false).foreach { merge =>
  test("SPARK-18539 - filtered by non-existing parquet column - schema merging enabled: $merge") {
    // ...
    val readDf2 = spark.read
      .option("mergeSchema", merge.toString)
      .schema(schema)
      .format(format)
      .load(path.toString)
    // ...
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add it. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, we probably also want to test both the normal Parquet reader and the vectorized Parquet reader using the similar trick by setting SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the code change is in SpecificParquetRecordReaderBase.initialize, which is a common code path for both parquet readers, I did not include it originally. But it is safe to test both. I will add it . Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, what I concern is that we are using the vectorized Parquet reader by default now and may not have enough test coverage of the normal Parquet reader. Would be good to have test cases to prevent regression for the normal Parquet reader code path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test case. Thanks!

@liancheng
Copy link
Contributor

liancheng commented Dec 5, 2016

Actually, PR #9940 should have already fixed this issue. I'm checking why it doesn't work under 2.0.1 and 2.0.2.

@liancheng
Copy link
Contributor

BTW, I think this PR is a cleaner fix than #9940, which introduces a temporary metadata while merging two StructTypes and erased it in a later phase. We may want to remove the hack done in #9940 later if possible. But for now, let's make the fix as surgical as possible to lower the risk for 2.1 release.

// the column(s) in the filter, we don't do filtering at this level
// PARQUET-389 will resolve this issue in Parquet 1.9, which may be used
// by future Spark versions. This is a workaround for current Spark version.
// Also the assumption here is that the predicates will be applied later
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? PushedFilter seems to be eaten silently here.

// Also the assumption here is that the predicates will be applied later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No matter filters are pushed down to Parquet reader or not, Spark will always apply all the filters again at a higher level to ensure that all filters are applied as expected. Spark treats data source filter push-down in a "best effort" manner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! Thank you, @liancheng .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filters pushed down to parquet reader and fail Parquet's schema validation will be ignored at this level. But after SparkSQL's parquet reader is created with all the blocks for this particular parquet file, the parent physical plan operation Filter will re-apply the filters anyway.

Example:

== Physical Plan ==
*Project [a#2805, b#2806, c#2807]
+- *Filter ((isnotnull(a#2805) && isnull(c#2807)) && (a#2805 < 2))
  +- *FileScan parquet [a#2805,b#2806,c#2807] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/Users/xinwu/spark/target/tmp/spark-ed6f0c12-6494-4ac5-b485-5b986ef475cc], PartitionFilters: [], PushedFilters: [IsNotNull(a), IsNull(c), LessThan(a,2)], ReadSchema: struct<a:int,b:string,c:int>

This is why my test case still works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @xwu0226 !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liancheng I just wonder if it is safe to rely on exception handling where it might be a common case for reading multiple Parquet files with a merged schema. Up to my understanding, we here know the schema of the target parquet file from its footer and the referenced column from the filter. Would we maybe try to check this and simply use if-else condition?

@liancheng
Copy link
Contributor

liancheng commented Dec 5, 2016

@xwu0226 Just verified that this issue also affects the normal Parquet reader (by setting spark.sql.parquet.enableVectorizedReader to false). That's also why #9940 couldn't take a similar approach as this one. Because ParquetRecordReader is a 3rd party class provided by parquet-mr.

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.DataTypes._
import org.apache.spark.sql.types.{StructField, StructType}

spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")

val jsonRDD = spark.sparkContext.parallelize(Seq("""{"a":1}"""))

spark.read
  .schema(StructType(Seq(StructField("a", IntegerType))))
  .json(jsonRDD)
  .write
  .mode("overwrite")
  .parquet("/tmp/test")

spark.read
  .schema(StructType(Seq(StructField("a", IntegerType), StructField("b", IntegerType, nullable = true))))
  .load("/tmp/test")
  .createOrReplaceTempView("table")

spark.sql("select b from table where b is not null").show()

@xwu0226
Copy link
Contributor Author

xwu0226 commented Dec 5, 2016

@liancheng I see. In normal parquet reader, ParquetFileFormat is using hadoop's ParquetRecordReader, which we can not add such toleration code.

@xwu0226
Copy link
Contributor Author

xwu0226 commented Dec 5, 2016

For normal parquet reader case, we have the following code

} else {
        logDebug(s"Falling back to parquet-mr")
        // ParquetRecordReader returns UnsafeRow
        val reader = pushed match {
          case Some(filter) =>
            new ParquetRecordReader[UnsafeRow](
              new ParquetReadSupport,
              FilterCompat.get(filter, null))
          case _ =>
            new ParquetRecordReader[UnsafeRow](new ParquetReadSupport)
        }
        reader.initialize(split, hadoopAttemptContext)
        reader
      }

I am wondering we could try-catch the reader.initialize and recreate the ParquetRecordReader without the filter and initialize again.
What do you think?

@liancheng
Copy link
Contributor

liancheng commented Dec 5, 2016

Hey @xwu0226 and @gatorsmile, did some investigation, and I don't think this is a bug now. Please refer to my JIRA comment for more details.

@HyukjinKwon
Copy link
Member

Would there be another way to avoid try-catch? I think it is a normal reading path logic and it seems it might not be safe to rely on exception handling.

@HyukjinKwon
Copy link
Member

@liancheng Ah, thank you. I should have tested this first.

@SparkQA
Copy link

SparkQA commented Dec 6, 2016

Test build #69688 has finished for PR 16156 at commit 096ab18.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 1, 2017

Test build #72253 has finished for PR 16156 at commit 096ab18.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xwu0226
Copy link
Contributor Author

xwu0226 commented Feb 4, 2017

https://issues.apache.org/jira/browse/SPARK-19409 is resolved to upgrade to parquet-1.8.2 that fixes this issue.

@xwu0226 xwu0226 closed this Feb 4, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants