From e24529d0e1fc61aff5dbd331444b20dfc4d09c09 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 25 Nov 2015 01:14:59 +0800 Subject: [PATCH 1/7] Mark one side fields in merging schema for safely pushdowning filters in parquet. --- .../apache/spark/sql/types/StructType.scala | 15 +++++++++++--- .../datasources/parquet/ParquetFilters.scala | 20 ++++++++++--------- .../datasources/parquet/ParquetRelation.scala | 6 +----- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 9778df271ddd5..bdbd328cb80cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -352,24 +352,33 @@ object StructType extends AbstractDataType { case (StructType(leftFields), StructType(rightFields)) => val newFields = ArrayBuffer.empty[StructField] + val oneSide = new MetadataBuilder() val rightMapped = fieldsMap(rightFields) leftFields.foreach { case leftField @ StructField(leftName, leftType, leftNullable, _) => rightMapped.get(leftName) .map { case rightField @ StructField(_, rightType, rightNullable, _) => + oneSide.putBoolean("oneSide", false) leftField.copy( dataType = merge(leftType, rightType), - nullable = leftNullable || rightNullable) + nullable = leftNullable || rightNullable, + metadata = oneSide.build()) } - .orElse(Some(leftField)) + .orElse { + oneSide.putBoolean("oneSide", true) + Some(leftField.copy(metadata = oneSide.build())) + } .foreach(newFields += _) } val leftMapped = fieldsMap(leftFields) rightFields .filterNot(f => leftMapped.get(f.name).nonEmpty) - .foreach(newFields += _) + .foreach { f => + oneSide.putBoolean("oneSide", true) + newFields += f.copy(metadata = oneSide.build()) + } StructType(newFields) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 07714329370a5..0abe982a59c75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -211,7 +211,9 @@ private[sql] object ParquetFilters { * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { - val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap + val dataTypeOf = schema.filter { f => + !f.metadata.contains("oneSide") || f.metadata.getBoolean("oneSide") + }.map(f => f.name -> f.dataType).toMap relaxParquetValidTypeMap @@ -231,29 +233,29 @@ private[sql] object ParquetFilters { // Probably I missed something and obviously this should be changed. predicate match { - case sources.IsNull(name) => + case sources.IsNull(name) if dataTypeOf.contains(name) => makeEq.lift(dataTypeOf(name)).map(_(name, null)) case sources.IsNotNull(name) => makeNotEq.lift(dataTypeOf(name)).map(_(name, null)) - case sources.EqualTo(name, value) => + case sources.EqualTo(name, value) if dataTypeOf.contains(name) => makeEq.lift(dataTypeOf(name)).map(_(name, value)) case sources.Not(sources.EqualTo(name, value)) => makeNotEq.lift(dataTypeOf(name)).map(_(name, value)) - case sources.EqualNullSafe(name, value) => + case sources.EqualNullSafe(name, value) if dataTypeOf.contains(name) => makeEq.lift(dataTypeOf(name)).map(_(name, value)) - case sources.Not(sources.EqualNullSafe(name, value)) => + case sources.Not(sources.EqualNullSafe(name, value)) if dataTypeOf.contains(name) => makeNotEq.lift(dataTypeOf(name)).map(_(name, value)) - case sources.LessThan(name, value) => + case sources.LessThan(name, value) if dataTypeOf.contains(name) => makeLt.lift(dataTypeOf(name)).map(_(name, value)) - case sources.LessThanOrEqual(name, value) => + case sources.LessThanOrEqual(name, value) if dataTypeOf.contains(name) => makeLtEq.lift(dataTypeOf(name)).map(_(name, value)) - case sources.GreaterThan(name, value) => + case sources.GreaterThan(name, value) if dataTypeOf.contains(name) => makeGt.lift(dataTypeOf(name)).map(_(name, value)) - case sources.GreaterThanOrEqual(name, value) => + case sources.GreaterThanOrEqual(name, value) if dataTypeOf.contains(name) => makeGtEq.lift(dataTypeOf(name)).map(_(name, value)) case sources.And(lhs, rhs) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index fdd745f48e973..acd5d128f0abe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -292,10 +292,6 @@ private[sql] class ParquetRelation( val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp - // When merging schemas is enabled and the column of the given filter does not exist, - // Parquet emits an exception which is an issue of Parquet (PARQUET-389). - val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown - // Parquet row group size. We will use this value as the value for // mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value // of these flags are smaller than the parquet row group size. @@ -309,7 +305,7 @@ private[sql] class ParquetRelation( dataSchema, parquetBlockSize, useMetadataCache, - safeParquetFilterPushDown, + parquetFilterPushDown, assumeBinaryIsString, assumeInt96IsTimestamp) _ From ff4ef4cd1991ae7b2bbbcaf79a412fe2ab9c3d9b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 25 Nov 2015 10:22:16 +0800 Subject: [PATCH 2/7] Fix test. --- .../scala/org/apache/spark/sql/types/DataTypeSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala index 706ecd29d1355..e98993e1c9cad 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala @@ -122,7 +122,7 @@ class DataTypeSuite extends SparkFunSuite { val right = StructType(List()) val merged = left.merge(right) - assert(merged === left) + assert(DataType.equalsIgnoreCompatibleNullability(merged, left)) } test("merge where left is empty") { @@ -135,7 +135,7 @@ class DataTypeSuite extends SparkFunSuite { val merged = left.merge(right) - assert(right === merged) + assert(DataType.equalsIgnoreCompatibleNullability(merged, right)) } @@ -154,7 +154,7 @@ class DataTypeSuite extends SparkFunSuite { val merged = left.merge(right) - assert(merged === expected) + assert(DataType.equalsIgnoreCompatibleNullability(merged, expected)) } test("merge where right contains type conflict") { From 4536b72e450c28da8556d5214ebe8c749448e26d Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 29 Nov 2015 16:07:35 +0800 Subject: [PATCH 3/7] Fix wrong indentation. --- .../org/apache/spark/sql/types/StructType.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index bdbd328cb80cb..8c69af44b64a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -359,12 +359,12 @@ object StructType extends AbstractDataType { case leftField @ StructField(leftName, leftType, leftNullable, _) => rightMapped.get(leftName) .map { case rightField @ StructField(_, rightType, rightNullable, _) => - oneSide.putBoolean("oneSide", false) - leftField.copy( - dataType = merge(leftType, rightType), - nullable = leftNullable || rightNullable, - metadata = oneSide.build()) - } + oneSide.putBoolean("oneSide", false) + leftField.copy( + dataType = merge(leftType, rightType), + nullable = leftNullable || rightNullable, + metadata = oneSide.build()) + } .orElse { oneSide.putBoolean("oneSide", true) Some(leftField.copy(metadata = oneSide.build())) From 2a4e471db486dac3b7f25bf3a864c96be117ac44 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 30 Nov 2015 19:35:25 +0800 Subject: [PATCH 4/7] Use particular instead of oneSide. --- .../org/apache/spark/sql/types/StructType.scala | 15 ++++++++------- .../datasources/parquet/ParquetFilters.scala | 2 +- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 8c69af44b64a0..3d901f890055d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -352,22 +352,23 @@ object StructType extends AbstractDataType { case (StructType(leftFields), StructType(rightFields)) => val newFields = ArrayBuffer.empty[StructField] - val oneSide = new MetadataBuilder() + // This metadata will record the fields that only exist in one of merged StructTypes + val particularMeta = new MetadataBuilder() val rightMapped = fieldsMap(rightFields) leftFields.foreach { case leftField @ StructField(leftName, leftType, leftNullable, _) => rightMapped.get(leftName) .map { case rightField @ StructField(_, rightType, rightNullable, _) => - oneSide.putBoolean("oneSide", false) + particularMeta.putBoolean("particular", false) leftField.copy( dataType = merge(leftType, rightType), nullable = leftNullable || rightNullable, - metadata = oneSide.build()) + metadata = particularMeta.build()) } .orElse { - oneSide.putBoolean("oneSide", true) - Some(leftField.copy(metadata = oneSide.build())) + particularMeta.putBoolean("particular", true) + Some(leftField.copy(metadata = particularMeta.build())) } .foreach(newFields += _) } @@ -376,8 +377,8 @@ object StructType extends AbstractDataType { rightFields .filterNot(f => leftMapped.get(f.name).nonEmpty) .foreach { f => - oneSide.putBoolean("oneSide", true) - newFields += f.copy(metadata = oneSide.build()) + particularMeta.putBoolean("particular", true) + newFields += f.copy(metadata = particularMeta.build()) } StructType(newFields) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 0abe982a59c75..0c45b80da0d71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -212,7 +212,7 @@ private[sql] object ParquetFilters { */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = schema.filter { f => - !f.metadata.contains("oneSide") || f.metadata.getBoolean("oneSide") + !f.metadata.contains("particular") || f.metadata.getBoolean("particular") }.map(f => f.name -> f.dataType).toMap relaxParquetValidTypeMap From db8ffa3b73c80ec8f311ece5b1b9e17d5d7257ba Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 6 Dec 2015 01:16:09 +0800 Subject: [PATCH 5/7] Clear the temporary metadata before writing to Parquet file. --- .../org/apache/spark/sql/types/Metadata.scala | 5 ++++ .../apache/spark/sql/types/StructType.scala | 28 +++++++++++++------ .../datasources/parquet/ParquetFilters.scala | 6 ++-- .../datasources/parquet/ParquetRelation.scala | 7 ++++- .../parquet/ParquetFilterSuite.scala | 18 +++++++++++- 5 files changed, 50 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala index 6ee24ee0c1913..416329b8bc58c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -268,4 +268,9 @@ class MetadataBuilder { map.put(key, value) this } + + def remove(key: String): this.type = { + map.remove(key) + this + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 3d901f890055d..da2e036ae6bdb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -335,6 +335,18 @@ object StructType extends AbstractDataType { protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType = StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata))) + def removeMetadata(key: String, dt: DataType): DataType = + dt match { + case StructType(fields) => + val newFields = fields.map { f => + val mb = new MetadataBuilder() + f.copy(dataType = removeMetadata(key, f.dataType), + metadata = mb.withMetadata(f.metadata).remove(key).build()) + } + StructType(newFields) + case _ => dt + } + private[sql] def merge(left: DataType, right: DataType): DataType = (left, right) match { case (ArrayType(leftElementType, leftContainsNull), @@ -352,23 +364,21 @@ object StructType extends AbstractDataType { case (StructType(leftFields), StructType(rightFields)) => val newFields = ArrayBuffer.empty[StructField] - // This metadata will record the fields that only exist in one of merged StructTypes - val particularMeta = new MetadataBuilder() + // This metadata will record the fields that only exist in one of two StructTypes + val optionalMeta = new MetadataBuilder() val rightMapped = fieldsMap(rightFields) leftFields.foreach { case leftField @ StructField(leftName, leftType, leftNullable, _) => rightMapped.get(leftName) .map { case rightField @ StructField(_, rightType, rightNullable, _) => - particularMeta.putBoolean("particular", false) leftField.copy( dataType = merge(leftType, rightType), - nullable = leftNullable || rightNullable, - metadata = particularMeta.build()) + nullable = leftNullable || rightNullable) } .orElse { - particularMeta.putBoolean("particular", true) - Some(leftField.copy(metadata = particularMeta.build())) + optionalMeta.putBoolean("optional", true) + Some(leftField.copy(metadata = optionalMeta.build())) } .foreach(newFields += _) } @@ -377,8 +387,8 @@ object StructType extends AbstractDataType { rightFields .filterNot(f => leftMapped.get(f.name).nonEmpty) .foreach { f => - particularMeta.putBoolean("particular", true) - newFields += f.copy(metadata = particularMeta.build()) + optionalMeta.putBoolean("optional", true) + newFields += f.copy(metadata = optionalMeta.build()) } StructType(newFields) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 0c45b80da0d71..8a4dd06967712 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -212,7 +212,7 @@ private[sql] object ParquetFilters { */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { val dataTypeOf = schema.filter { f => - !f.metadata.contains("particular") || f.metadata.getBoolean("particular") + !f.metadata.contains("optional") || !f.metadata.getBoolean("optional") }.map(f => f.name -> f.dataType).toMap relaxParquetValidTypeMap @@ -235,12 +235,12 @@ private[sql] object ParquetFilters { predicate match { case sources.IsNull(name) if dataTypeOf.contains(name) => makeEq.lift(dataTypeOf(name)).map(_(name, null)) - case sources.IsNotNull(name) => + case sources.IsNotNull(name) if dataTypeOf.contains(name) => makeNotEq.lift(dataTypeOf(name)).map(_(name, null)) case sources.EqualTo(name, value) if dataTypeOf.contains(name) => makeEq.lift(dataTypeOf(name)).map(_(name, value)) - case sources.Not(sources.EqualTo(name, value)) => + case sources.Not(sources.EqualTo(name, value)) if dataTypeOf.contains(name) => makeNotEq.lift(dataTypeOf(name)).map(_(name, value)) case sources.EqualNullSafe(name, value) if dataTypeOf.contains(name) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 04040bc88a18c..fe572f2dc5441 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -249,7 +249,12 @@ private[sql] class ParquetRelation( job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) ParquetOutputFormat.setWriteSupportClass(job, classOf[CatalystWriteSupport]) - CatalystWriteSupport.setSchema(dataSchema, conf) + + // We want to clear this temporary metadata from saving into Parquet file. + // This metadata is only useful for detecting optional columns when pushdowning filters. + val dataSchemaToWrite = + StructType.removeMetadata("optional", dataSchema).asInstanceOf[StructType] + CatalystWriteSupport.setSchema(dataSchemaToWrite, conf) // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) // and `CatalystWriteSupport` (writing actual rows to Parquet files). diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index cc5aae03d5516..83680133516c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -330,9 +330,25 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // If the "c = 1" filter gets pushed down, this query will throw an exception which // Parquet emits. This is a Parquet issue (PARQUET-389). + val df = sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a") checkAnswer( - sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a"), + df, (1 to 1).map(i => Row(i, i.toString, null))) + + // The fields "a" and "c" only exist in one Parquet file. + df.schema.fields.foreach { f => + if (f.name == "a" || f.name == "c") { + assert(f.metadata.contains("optional")) + } + } + + val pathThree = s"${dir.getCanonicalPath}/table3" + df.write.parquet(pathThree) + + // We will remove the temporary metadata when writing Parquet file. + sqlContext.read.parquet(pathThree).schema.fields.foreach { f => + assert(!f.metadata.contains("optional")) + } } } } From 40533a79de07f1159d5f5e2a6327a160f278be0e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 7 Dec 2015 00:52:53 +0800 Subject: [PATCH 6/7] Recursively get optional fields. --- .../datasources/parquet/ParquetFilters.scala | 12 +++++-- .../parquet/ParquetFilterSuite.scala | 32 +++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 8a4dd06967712..3491ff3bae556 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -207,13 +207,19 @@ private[sql] object ParquetFilters { */ } + private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match { + case StructType(fields) => + fields.filter { f => + !f.metadata.contains("optional") || !f.metadata.getBoolean("optional") + }.map(f => f.name -> f.dataType) ++ fields.flatMap { f => getFieldMap(f.dataType) } + case _ => Array.empty[(String, DataType)] + } + /** * Converts data sources filters to Parquet filter predicates. */ def createFilter(schema: StructType, predicate: sources.Filter): Option[FilterPredicate] = { - val dataTypeOf = schema.filter { f => - !f.metadata.contains("optional") || !f.metadata.getBoolean("optional") - }.map(f => f.name -> f.dataType).toMap + val dataTypeOf = getFieldMap(schema).toMap relaxParquetValidTypeMap diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 83680133516c2..d68ff924203bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSQLContext /** @@ -349,6 +350,37 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex sqlContext.read.parquet(pathThree).schema.fields.foreach { f => assert(!f.metadata.contains("optional")) } + + val pathFour = s"${dir.getCanonicalPath}/table4" + val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b") + dfStruct.select(struct("a").as("s")).write.parquet(pathFour) + + val pathFive = s"${dir.getCanonicalPath}/table5" + val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b") + dfStruct2.select(struct("c").as("s")).write.parquet(pathFive) + + // If the "s.c = 1" filter gets pushed down, this query will throw an exception which + // Parquet emits. + val dfStruct3 = sqlContext.read.parquet(pathFour, pathFive).filter("s.c = 1") + .selectExpr("s.c", "s.a") + checkAnswer( + dfStruct3, + (1 to 1).map(i => Row(i, null))) + + // The fields "s.a" and "s.c" only exist in one Parquet file. + dfStruct3.schema.fields.foreach { f => + if (f.name == "s.a" || f.name == "s.c") { + assert(f.metadata.contains("optional")) + } + } + + val pathSix = s"${dir.getCanonicalPath}/table6" + dfStruct3.write.parquet(pathSix) + + // We will remove the temporary metadata when writing Parquet file. + sqlContext.read.parquet(pathSix).schema.fields.foreach { f => + assert(!f.metadata.contains("optional")) + } } } } From 1a11770a19b8a6ec263ca3c34def5b03ec4483c9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Wed, 27 Jan 2016 09:51:31 +0000 Subject: [PATCH 7/7] For comments. --- .../apache/spark/sql/types/StructType.scala | 6 ++-- .../spark/sql/types/DataTypeSuite.scala | 8 ++++- .../datasources/parquet/ParquetFilters.scala | 9 ++++- .../datasources/parquet/ParquetRelation.scala | 4 +-- .../parquet/ParquetFilterSuite.scala | 33 +++++++------------ 5 files changed, 33 insertions(+), 27 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 743f1a7d68108..da0c92864e9bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -334,6 +334,8 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru object StructType extends AbstractDataType { + private[sql] val metadataKeyForOptionalField = "_OPTIONAL_" + override private[sql] def defaultConcreteType: DataType = new StructType override private[sql] def acceptsType(other: DataType): Boolean = { @@ -401,7 +403,7 @@ object StructType extends AbstractDataType { nullable = leftNullable || rightNullable) } .orElse { - optionalMeta.putBoolean("optional", true) + optionalMeta.putBoolean(metadataKeyForOptionalField, true) Some(leftField.copy(metadata = optionalMeta.build())) } .foreach(newFields += _) @@ -411,7 +413,7 @@ object StructType extends AbstractDataType { rightFields .filterNot(f => leftMapped.get(f.name).nonEmpty) .foreach { f => - optionalMeta.putBoolean("optional", true) + optionalMeta.putBoolean(metadataKeyForOptionalField, true) newFields += f.copy(metadata = optionalMeta.build()) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala index e98993e1c9cad..c2bbca7c33f28 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala @@ -123,6 +123,8 @@ class DataTypeSuite extends SparkFunSuite { val merged = left.merge(right) assert(DataType.equalsIgnoreCompatibleNullability(merged, left)) + assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField)) } test("merge where left is empty") { @@ -136,7 +138,8 @@ class DataTypeSuite extends SparkFunSuite { val merged = left.merge(right) assert(DataType.equalsIgnoreCompatibleNullability(merged, right)) - + assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField)) } test("merge where both are non-empty") { @@ -155,6 +158,9 @@ class DataTypeSuite extends SparkFunSuite { val merged = left.merge(right) assert(DataType.equalsIgnoreCompatibleNullability(merged, expected)) + assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(merged("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) } test("merge where right contains type conflict") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 55225d66b7ce9..5a5cb5cf03d4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -207,10 +207,17 @@ private[sql] object ParquetFilters { */ } + /** + * SPARK-11955: The optional fields will have metadata StructType.metadataKeyForOptionalField. + * These fields only exist in one side of merged schemas. Due to that, we can't push down filters + * using such fields, otherwise Parquet library will throw exception. Here we filter out such + * fields. + */ private def getFieldMap(dataType: DataType): Array[(String, DataType)] = dataType match { case StructType(fields) => fields.filter { f => - !f.metadata.contains("optional") || !f.metadata.getBoolean("optional") + !f.metadata.contains(StructType.metadataKeyForOptionalField) || + !f.metadata.getBoolean(StructType.metadataKeyForOptionalField) }.map(f => f.name -> f.dataType) ++ fields.flatMap { f => getFieldMap(f.dataType) } case _ => Array.empty[(String, DataType)] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 1d1faaa576d0e..f87590095d344 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -261,8 +261,8 @@ private[sql] class ParquetRelation( // We want to clear this temporary metadata from saving into Parquet file. // This metadata is only useful for detecting optional columns when pushdowning filters. - val dataSchemaToWrite = - StructType.removeMetadata("optional", dataSchema).asInstanceOf[StructType] + val dataSchemaToWrite = StructType.removeMetadata(StructType.metadataKeyForOptionalField, + dataSchema).asInstanceOf[StructType] CatalystWriteSupport.setSchema(dataSchemaToWrite, conf) // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index c60a50bd785ed..1796b3af0e37a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -383,22 +383,18 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex val df = sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1").selectExpr("c", "b", "a") checkAnswer( df, - (1 to 1).map(i => Row(i, i.toString, null))) + Row(1, "1", null)) // The fields "a" and "c" only exist in one Parquet file. - df.schema.fields.foreach { f => - if (f.name == "a" || f.name == "c") { - assert(f.metadata.contains("optional")) - } - } + assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) val pathThree = s"${dir.getCanonicalPath}/table3" df.write.parquet(pathThree) // We will remove the temporary metadata when writing Parquet file. - sqlContext.read.parquet(pathThree).schema.fields.foreach { f => - assert(!f.metadata.contains("optional")) - } + val schema = sqlContext.read.parquet(pathThree).schema + assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) val pathFour = s"${dir.getCanonicalPath}/table4" val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b") @@ -411,25 +407,20 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // If the "s.c = 1" filter gets pushed down, this query will throw an exception which // Parquet emits. val dfStruct3 = sqlContext.read.parquet(pathFour, pathFive).filter("s.c = 1") - .selectExpr("s.c", "s.a") - checkAnswer( - dfStruct3, - (1 to 1).map(i => Row(i, null))) + .selectExpr("s") + checkAnswer(dfStruct3, Row(Row(null, 1))) // The fields "s.a" and "s.c" only exist in one Parquet file. - dfStruct3.schema.fields.foreach { f => - if (f.name == "s.a" || f.name == "s.c") { - assert(f.metadata.contains("optional")) - } - } + val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType] + assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField)) + assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField)) val pathSix = s"${dir.getCanonicalPath}/table6" dfStruct3.write.parquet(pathSix) // We will remove the temporary metadata when writing Parquet file. - sqlContext.read.parquet(pathSix).schema.fields.foreach { f => - assert(!f.metadata.contains("optional")) - } + val forPathSix = sqlContext.read.parquet(pathSix).schema + assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField))) } } }