From 85dadbcb458eec1067b6bae838ca930b8d958f4a Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 28 Oct 2015 17:30:15 +0900 Subject: [PATCH 1/2] [SPARK-11103][SQL] Disable predicate push down when using merged schema for Parquet. --- .../sql/execution/datasources/parquet/ParquetRelation.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 77d851ca486b..4713b52ff5f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -292,6 +292,10 @@ private[sql] class ParquetRelation( val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp + // When using merged schema and the column of the given filter does not exist, Parquet emits + // an error which is an issue of Parquet (PARQUET-389). + val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown + // Parquet row group size. We will use this value as the value for // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value // of these flags are smaller than the parquet row group size. @@ -305,7 +309,7 @@ private[sql] class ParquetRelation( dataSchema, parquetBlockSize, useMetadataCache, - parquetFilterPushDown, + safeParquetFilterPushDown, assumeBinaryIsString, assumeInt96IsTimestamp) _ From 7007c21779537844691d87d71ec8e86e60835860 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 30 Oct 2015 14:39:47 +0900 Subject: [PATCH 2/2] [SPARK-11103][SQL] Add test code and update comments --- .../datasources/parquet/ParquetRelation.scala | 4 ++-- .../parquet/ParquetFilterSuite.scala | 20 +++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 4713b52ff5f6..44649a68b3c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -292,8 +292,8 @@ private[sql] class ParquetRelation( val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp - // When using merged schema and the column of the given filter does not exist, Parquet emits - // an error which is an issue of Parquet (PARQUET-389). + // When merging schemas is enabled and the column of the given filter does not exist, + // Parquet emits an exception which is an issue of Parquet (PARQUET-389). val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown // Parquet row group size. We will use this value as the value for diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 7a23f57f4039..66dfc7973dfa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -314,4 +314,24 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex } } } + + test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") { + import testImplicits._ + + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", + SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { + withTempPath { dir => + var pathOne = s"${dir.getCanonicalPath}/table1" + (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne) + var pathTwo = s"${dir.getCanonicalPath}/table2" + (1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo) + + // If the "c = 1" filter gets pushed down, this query will throw an exception which + // Parquet emits. This is a Parquet issue (PARQUET-389). + checkAnswer( + sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1"), + (1 to 1).map(i => Row(i, i.toString, null))) + } + } + } }