diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index 06cd9ea2d242c..de5b2e820a50d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -107,7 +107,16 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont footer = readFooter(configuration, file, range(split.getStart(), split.getEnd())); MessageType fileSchema = footer.getFileMetaData().getSchema(); FilterCompat.Filter filter = getFilter(configuration); - blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); + try { + blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema); + } catch (IllegalArgumentException e) { + // In the case where a particular parquet files does not contain + // the column(s) in the filter, we don't do filtering at this level + // PARQUET-389 will resolve this issue in Parquet 1.9, which may be used + // by future Spark versions. This is a workaround for current Spark version. + // Also the assumption here is that the predicates will be applied later + blocks = footer.getBlocks(); + } } else { // otherwise we find the row groups that were selected on the client footer = readFooter(configuration, file, NO_FILTER); diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index a0d57d79f045a..6fdd0621050b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -578,4 +578,66 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // scalastyle:on nonascii } } + + test("SPARK-18539 - filtered by non-existing parquet column") { + Seq("parquet").foreach { format => + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withTempPath { path => + import testImplicits._ + Seq((1, "abc"), (2, "hello")).toDF("a", "b").write.format(format).save(path.toString) + + // user-specified schema contains nonexistent columns + val schema = StructType( + Seq(StructField("a", IntegerType), + StructField("b", StringType), + StructField("c", IntegerType))) + val readDf1 = spark.read.schema(schema).format(format).load(path.toString) + + // Read the table without any filter + checkAnswer(readDf1, Row(1, "abc", null) :: Row(2, "hello", null) :: Nil) + // Read the table with a filter on existing columns + checkAnswer(readDf1.filter("a < 2"), Row(1, "abc", null) :: Nil) + + // Read the table with a filter on nonexistent columns + checkAnswer(readDf1.filter("c < 2"), Nil) + checkAnswer(readDf1.filter("c is not null"), Nil) + + checkAnswer(readDf1.filter("c is null"), + Row(1, "abc", null) :: Row(2, "hello", null) :: Nil) + + checkAnswer(readDf1.filter("c is null and a < 2"), + Row(1, "abc", null) :: Nil) + + // With another parquet file that contains the column "c" + Seq((2, "abc", 3), (3, "hello", 4)).toDF("a", "b", "c") + .write.format(format).mode(SaveMode.Append).save(path.toString) + + // Right now, there are 2 parquet files. one with the column "c", + // the other does not have it. + val readDf2 = spark.read.schema(schema).format(format).load(path.toString) + // Read the table without any filter + checkAnswer(readDf2, + Row(1, "abc", null) :: Row(2, "hello", null) :: + Row(2, "abc", 3) :: Row(3, "hello", 4) :: Nil) + + // Read the table with a filter on existing columns + checkAnswer(readDf2.filter("a < 2"), Row(1, "abc", null) :: Nil) + + // column "c" is in one parquet file, not in the other parquet file + checkAnswer(readDf2.filter("c < 4"), + Row(2, "abc", 3) :: Nil) + checkAnswer(readDf2.filter("c is not null"), + Row(2, "abc", 3) :: Row(3, "hello", 4) :: Nil) + + checkAnswer(readDf2.filter("c is null"), + Row(1, "abc", null) :: Row(2, "hello", null) :: Nil) + + checkAnswer(readDf2.filter("c is null and a < 2"), + Row(1, "abc", null) :: Nil) + + readDf2.filter("c is null and a < 2").explain(true) + } + } + } + } }