Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,16 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
MessageType fileSchema = footer.getFileMetaData().getSchema();
FilterCompat.Filter filter = getFilter(configuration);
blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
try {
blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
} catch (IllegalArgumentException e) {
// In the case where a particular parquet files does not contain
// the column(s) in the filter, we don't do filtering at this level
// PARQUET-389 will resolve this issue in Parquet 1.9, which may be used
// by future Spark versions. This is a workaround for current Spark version.
// Also the assumption here is that the predicates will be applied later
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean? PushedFilter seems to be eaten silently here.

// Also the assumption here is that the predicates will be applied later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No matter filters are pushed down to Parquet reader or not, Spark will always apply all the filters again at a higher level to ensure that all filters are applied as expected. Spark treats data source filter push-down in a "best effort" manner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see! Thank you, @liancheng .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filters pushed down to parquet reader and fail Parquet's schema validation will be ignored at this level. But after SparkSQL's parquet reader is created with all the blocks for this particular parquet file, the parent physical plan operation Filter will re-apply the filters anyway.

Example:

== Physical Plan ==
*Project [a#2805, b#2806, c#2807]
+- *Filter ((isnotnull(a#2805) && isnull(c#2807)) && (a#2805 < 2))
  +- *FileScan parquet [a#2805,b#2806,c#2807] Batched: true, Format: ParquetFormat, Location: InMemoryFileIndex[file:/Users/xinwu/spark/target/tmp/spark-ed6f0c12-6494-4ac5-b485-5b986ef475cc], PartitionFilters: [], PushedFilters: [IsNotNull(a), IsNull(c), LessThan(a,2)], ReadSchema: struct<a:int,b:string,c:int>

This is why my test case still works.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @xwu0226 !

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liancheng I just wonder if it is safe to rely on exception handling where it might be a common case for reading multiple Parquet files with a merged schema. Up to my understanding, we here know the schema of the target parquet file from its footer and the referenced column from the filter. Would we maybe try to check this and simply use if-else condition?

blocks = footer.getBlocks();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add a TODO item int the comment so that we won't forget to remove this after upgrading to 1.9? Thanks!

// TODO Remove this hack after upgrading to parquet-mr 1.9.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Will do. Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about creating the JIRA issue first and writing the JIRA number here together in comment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. We may just mention SPARK-18140 here. It's for upgrading parquet-mr to 1.9.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}
} else {
// otherwise we find the row groups that were selected on the client
footer = readFooter(configuration, file, NO_FILTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,4 +578,66 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// scalastyle:on nonascii
}
}

test("SPARK-18539 - filtered by non-existing parquet column") {
Seq("parquet").foreach { format =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the parquet-only test case, we can just set format to parquet directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Thanks!

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { path =>
import testImplicits._
Seq((1, "abc"), (2, "hello")).toDF("a", "b").write.format(format).save(path.toString)

// user-specified schema contains nonexistent columns
val schema = StructType(
Seq(StructField("a", IntegerType),
StructField("b", StringType),
StructField("c", IntegerType)))
val readDf1 = spark.read.schema(schema).format(format).load(path.toString)

// Read the table without any filter
checkAnswer(readDf1, Row(1, "abc", null) :: Row(2, "hello", null) :: Nil)
// Read the table with a filter on existing columns
checkAnswer(readDf1.filter("a < 2"), Row(1, "abc", null) :: Nil)

// Read the table with a filter on nonexistent columns
checkAnswer(readDf1.filter("c < 2"), Nil)
checkAnswer(readDf1.filter("c is not null"), Nil)

checkAnswer(readDf1.filter("c is null"),
Row(1, "abc", null) :: Row(2, "hello", null) :: Nil)

checkAnswer(readDf1.filter("c is null and a < 2"),
Row(1, "abc", null) :: Nil)

// With another parquet file that contains the column "c"
Seq((2, "abc", 3), (3, "hello", 4)).toDF("a", "b", "c")
.write.format(format).mode(SaveMode.Append).save(path.toString)

// Right now, there are 2 parquet files. one with the column "c",
// the other does not have it.
val readDf2 = spark.read.schema(schema).format(format).load(path.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default, we don't enable Parquet schema merging. Could you please reorganize this test case into something like this to have both code paths tested?

Seq(true, false).foreach { merge =>
  test("SPARK-18539 - filtered by non-existing parquet column - schema merging enabled: $merge") {
    // ...
    val readDf2 = spark.read
      .option("mergeSchema", merge.toString)
      .schema(schema)
      .format(format)
      .load(path.toString)
    // ...
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add it. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, we probably also want to test both the normal Parquet reader and the vectorized Parquet reader using the similar trick by setting SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the code change is in SpecificParquetRecordReaderBase.initialize, which is a common code path for both parquet readers, I did not include it originally. But it is safe to test both. I will add it . Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, what I concern is that we are using the vectorized Parquet reader by default now and may not have enough test coverage of the normal Parquet reader. Would be good to have test cases to prevent regression for the normal Parquet reader code path.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test case. Thanks!

// Read the table without any filter
checkAnswer(readDf2,
Row(1, "abc", null) :: Row(2, "hello", null) ::
Row(2, "abc", 3) :: Row(3, "hello", 4) :: Nil)

// Read the table with a filter on existing columns
checkAnswer(readDf2.filter("a < 2"), Row(1, "abc", null) :: Nil)

// column "c" is in one parquet file, not in the other parquet file
checkAnswer(readDf2.filter("c < 4"),
Row(2, "abc", 3) :: Nil)
checkAnswer(readDf2.filter("c is not null"),
Row(2, "abc", 3) :: Row(3, "hello", 4) :: Nil)

checkAnswer(readDf2.filter("c is null"),
Row(1, "abc", null) :: Row(2, "hello", null) :: Nil)

checkAnswer(readDf2.filter("c is null and a < 2"),
Row(1, "abc", null) :: Nil)

readDf2.filter("c is null and a < 2").explain(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: remove it. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh. yeah. Forget to remove it. Will do. Thanks!

}
}
}
}
}