diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 19800ad88c03..b0c6bb668037 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -33,9 +33,7 @@ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.{InputSplit, Job, JobContext} - import parquet.filter2.predicate.FilterApi -import parquet.format.converter.ParquetMetadataConverter import parquet.hadoop.metadata.CompressionCodecName import parquet.hadoop.util.ContextUtil import parquet.hadoop.{ParquetInputFormat, _} @@ -226,7 +224,7 @@ private[sql] case class ParquetRelation2( private var commonMetadataStatuses: Array[FileStatus] = _ // Parquet footer cache. - var footers: Map[FileStatus, Footer] = _ + var footers: Map[Path, Footer] = _ // `FileStatus` objects of all data files (Parquet part-files). var dataStatuses: Array[FileStatus] = _ @@ -292,11 +290,20 @@ private[sql] case class ParquetRelation2( commonMetadataStatuses = leaves.filter(_.getPath.getName == ParquetFileWriter.PARQUET_COMMON_METADATA_FILE) - footers = (dataStatuses ++ metadataStatuses ++ commonMetadataStatuses).par.map { f => - val parquetMetadata = ParquetFileReader.readFooter( - sparkContext.hadoopConfiguration, f, ParquetMetadataConverter.NO_FILTER) - f -> new Footer(f.getPath, parquetMetadata) - }.seq.toMap + footers = { + val taskSideMetaData = + sparkContext.hadoopConfiguration.getBoolean(ParquetInputFormat.TASK_SIDE_METADATA, true) + + val rawFooters = if (shouldMergeSchemas) { + ParquetFileReader.readAllFootersInParallel( + sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData) + } else { + ParquetFileReader.readAllFootersInParallelUsingSummaryFiles( + sparkContext.hadoopConfiguration, seqAsJavaList(leaves), taskSideMetaData) + } + + rawFooters.map(footer => footer.getFile -> footer).toMap + } partitionSpec = maybePartitionSpec.getOrElse { val partitionDirs = leaves @@ -381,7 +388,7 @@ private[sql] case class ParquetRelation2( .toSeq } - ParquetRelation2.readSchema(filesToTouch.map(footers.apply), sqlContext) + ParquetRelation2.readSchema(filesToTouch.map(f => footers.apply(f.getPath)), sqlContext) } } @@ -426,7 +433,7 @@ private[sql] case class ParquetRelation2( } else { metadataCache.dataStatuses.toSeq } - val selectedFooters = selectedFiles.map(metadataCache.footers) + val selectedFooters = selectedFiles.map(f => metadataCache.footers(f.getPath)) // FileInputFormat cannot handle empty lists. if (selectedFiles.nonEmpty) { @@ -774,7 +781,7 @@ private[sql] object ParquetRelation2 extends Logging { val ordinalMap = metastoreSchema.zipWithIndex.map { case (field, index) => field.name.toLowerCase -> index }.toMap - val reorderedParquetSchema = mergedParquetSchema.sortBy(f => + val reorderedParquetSchema = mergedParquetSchema.sortBy(f => ordinalMap.getOrElse(f.name.toLowerCase, metastoreSchema.size + 1)) StructType(metastoreSchema.zip(reorderedParquetSchema).map {