From 1ce61a4362e2cbdc684b254cce1e86bce7d4b45c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 28 Jan 2016 17:43:03 -0800 Subject: [PATCH 1/3] Better error message when Parquet schema merging fails --- .../apache/spark/sql/types/StructType.scala | 6 ++-- .../datasources/parquet/ParquetRelation.scala | 28 +++++++++++++++++-- .../parquet/ParquetSchemaSuite.scala | 15 ++++++++++ 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 3bd733fa2d26c..f4f8826a0f078 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -402,13 +402,13 @@ object StructType extends AbstractDataType { if ((leftPrecision == rightPrecision) && (leftScale == rightScale)) { DecimalType(leftPrecision, leftScale) } else if ((leftPrecision != rightPrecision) && (leftScale != rightScale)) { - throw new SparkException("Failed to merge Decimal Tpes with incompatible " + + throw new SparkException("Failed to merge Decimal Types with incompatible " + s"precision $leftPrecision and $rightPrecision & scale $leftScale and $rightScale") } else if (leftPrecision != rightPrecision) { - throw new SparkException("Failed to merge Decimal Tpes with incompatible " + + throw new SparkException("Failed to merge Decimal Types with incompatible " + s"precision $leftPrecision and $rightPrecision") } else { - throw new SparkException("Failed to merge Decimal Tpes with incompatible " + + throw new SparkException("Failed to merge Decimal Types with incompatible " + s"scala $leftScale and $rightScale") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index b460ec1d26047..0fe36601668f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -799,9 +799,31 @@ private[sql] object ParquetRelation extends Logging { assumeInt96IsTimestamp = assumeInt96IsTimestamp, writeLegacyParquetFormat = writeLegacyParquetFormat) - footers.map { footer => - ParquetRelation.readSchemaFromFooter(footer, converter) - }.reduceLeftOption(_ merge _).iterator + if (footers.isEmpty) { + Iterator.empty + } else { + val pathsAndSchemata = footers.map { footer => + footer.getFile -> ParquetRelation.readSchemaFromFooter(footer, converter) + } + + pathsAndSchemata.foldLeft(StructType(Nil)) { case (mergedSoFar, (file, schema)) => + try { + mergedSoFar.merge(schema) + } catch { + case cause: Throwable => + throw new SparkException( + s"""Failed merging schema of file $file: + |${schema.treeString} + """.stripMargin, + cause + ) + } + } + + footers.map { footer => + ParquetRelation.readSchemaFromFooter(footer, converter) + }.reduceLeftOption(_ merge _).iterator + } }.collect() partiallyMergedSchemas.reduceLeftOption(_ merge _) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 60fa81b1ab819..7f82bd6d3a2da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.TypeTag import org.apache.parquet.schema.MessageTypeParser +import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -449,6 +450,20 @@ class ParquetSchemaSuite extends ParquetSchemaTest { }.getMessage.contains("detected conflicting schemas")) } + test("schema merging failure error message") { + withTempPath { dir => + val path = dir.getCanonicalPath + sqlContext.range(3).write.parquet(s"$path/p=1") + sqlContext.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2") + + val message = intercept[SparkException] { + sqlContext.read.option("mergeSchema", "true").parquet(path).schema + }.getMessage + + assert(message.contains("Failed merging schema of file")) + } + } + // ======================================================= // Tests for converting Parquet LIST to Catalyst ArrayType // ======================================================= From bfe49879aa822c6ea6424dc550c298de510d580d Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 28 Jan 2016 18:30:17 -0800 Subject: [PATCH 2/3] Addresses PR comments --- .../org/apache/spark/sql/types/StructType.scala | 6 +++--- .../datasources/parquet/ParquetRelation.scala | 17 ++++++----------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index f4f8826a0f078..90f3f298ec810 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -402,13 +402,13 @@ object StructType extends AbstractDataType { if ((leftPrecision == rightPrecision) && (leftScale == rightScale)) { DecimalType(leftPrecision, leftScale) } else if ((leftPrecision != rightPrecision) && (leftScale != rightScale)) { - throw new SparkException("Failed to merge Decimal Types with incompatible " + + throw new SparkException("Failed to merge decimal types with incompatible " + s"precision $leftPrecision and $rightPrecision & scale $leftScale and $rightScale") } else if (leftPrecision != rightPrecision) { - throw new SparkException("Failed to merge Decimal Types with incompatible " + + throw new SparkException("Failed to merge decimal types with incompatible " + s"precision $leftPrecision and $rightPrecision") } else { - throw new SparkException("Failed to merge Decimal Types with incompatible " + + throw new SparkException("Failed to merge decimal types with incompatible " + s"scala $leftScale and $rightScale") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 0fe36601668f0..df47070042df6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -802,27 +802,22 @@ private[sql] object ParquetRelation extends Logging { if (footers.isEmpty) { Iterator.empty } else { - val pathsAndSchemata = footers.map { footer => - footer.getFile -> ParquetRelation.readSchemaFromFooter(footer, converter) - } - - pathsAndSchemata.foldLeft(StructType(Nil)) { case (mergedSoFar, (file, schema)) => + var mergedSchema = StructType(Nil) + footers.foreach { footer => + val schema = ParquetRelation.readSchemaFromFooter(footer, converter) try { - mergedSoFar.merge(schema) + mergedSchema = mergedSchema.merge(schema) } catch { case cause: Throwable => throw new SparkException( - s"""Failed merging schema of file $file: + s"""Failed merging schema of file ${footer.getFile}: |${schema.treeString} """.stripMargin, cause ) } } - - footers.map { footer => - ParquetRelation.readSchemaFromFooter(footer, converter) - }.reduceLeftOption(_ merge _).iterator + Iterator.single(mergedSchema) } }.collect() From 2e5eddbdadf58e49ca7374e4ffe4163013596dcb Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 28 Jan 2016 22:57:12 -0800 Subject: [PATCH 3/3] Addresses PR comments --- .../datasources/parquet/ParquetRelation.scala | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index df47070042df6..b444de801d1c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -807,14 +807,9 @@ private[sql] object ParquetRelation extends Logging { val schema = ParquetRelation.readSchemaFromFooter(footer, converter) try { mergedSchema = mergedSchema.merge(schema) - } catch { - case cause: Throwable => - throw new SparkException( - s"""Failed merging schema of file ${footer.getFile}: - |${schema.treeString} - """.stripMargin, - cause - ) + } catch { case cause: SparkException => + throw new SparkException( + s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause) } } Iterator.single(mergedSchema)