Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -402,13 +402,13 @@ object StructType extends AbstractDataType {
if ((leftPrecision == rightPrecision) && (leftScale == rightScale)) {
DecimalType(leftPrecision, leftScale)
} else if ((leftPrecision != rightPrecision) && (leftScale != rightScale)) {
throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
throw new SparkException("Failed to merge decimal types with incompatible " +
s"precision $leftPrecision and $rightPrecision & scale $leftScale and $rightScale")
} else if (leftPrecision != rightPrecision) {
throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
throw new SparkException("Failed to merge decimal types with incompatible " +
s"precision $leftPrecision and $rightPrecision")
} else {
throw new SparkException("Failed to merge Decimal Tpes with incompatible " +
throw new SparkException("Failed to merge decimal types with incompatible " +
s"scala $leftScale and $rightScale")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -799,9 +799,21 @@ private[sql] object ParquetRelation extends Logging {
assumeInt96IsTimestamp = assumeInt96IsTimestamp,
writeLegacyParquetFormat = writeLegacyParquetFormat)

footers.map { footer =>
ParquetRelation.readSchemaFromFooter(footer, converter)
}.reduceLeftOption(_ merge _).iterator
if (footers.isEmpty) {
Iterator.empty
} else {
var mergedSchema = StructType(Nil)
footers.foreach { footer =>
val schema = ParquetRelation.readSchemaFromFooter(footer, converter)
try {
mergedSchema = mergedSchema.merge(schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we get the schema from first footer and then go through this loop for remaining footers? Because you merge the first schema with an empty schema, I think the all fields in merged schema will be optional in their metadata. So the pushing down of filters will not normally work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you'll right, filter push-down can be affected due to #9940 (which I just merged today). Thanks for pointing this out!

} catch { case cause: SparkException =>
throw new SparkException(
s"Failed merging schema of file ${footer.getFile}:\n${schema.treeString}", cause)
}
}
Iterator.single(mergedSchema)
}
}.collect()

partiallyMergedSchemas.reduceLeftOption(_ merge _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.TypeTag

import org.apache.parquet.schema.MessageTypeParser

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -449,6 +450,20 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
}.getMessage.contains("detected conflicting schemas"))
}

test("schema merging failure error message") {
withTempPath { dir =>
val path = dir.getCanonicalPath
sqlContext.range(3).write.parquet(s"$path/p=1")
sqlContext.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2")

val message = intercept[SparkException] {
sqlContext.read.option("mergeSchema", "true").parquet(path).schema
}.getMessage

assert(message.contains("Failed merging schema of file"))
}
}

// =======================================================
// Tests for converting Parquet LIST to Catalyst ArrayType
// =======================================================
Expand Down