From 7d9ead88cc514948564445ec9d5ef51c74776c22 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Wed, 30 Apr 2025 16:29:37 +0800 Subject: [PATCH 01/11] Reduce HDFS NameNode RPC on vectorized Parquet reader --- .../parquet/ParquetFooterReader.java | 16 +---- .../SpecificParquetRecordReaderBase.java | 18 +++--- .../VectorizedParquetRecordReader.java | 10 ++- .../sql/execution/PartitionedFileUtil.scala | 4 +- .../execution/datasources/FileFormat.scala | 4 +- .../sql/execution/datasources/FileIndex.scala | 2 +- .../execution/datasources/FileScanRDD.scala | 4 +- .../parquet/ParquetFileFormat.scala | 38 +++++++++--- .../ParquetPartitionReaderFactory.scala | 61 ++++++++++++++----- .../datasources/FileSourceStrategySuite.scala | 18 ++++-- .../HadoopFileLinesReaderSuite.scala | 7 ++- .../ParquetInteroperabilitySuite.scala | 5 +- .../datasources/parquet/ParquetTest.scala | 4 +- .../sql/hive/execution/HiveDDLSuite.scala | 4 +- 14 files changed, 129 insertions(+), 66 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java index 438fffa11784..afeb4a2b1396 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java @@ -20,8 +20,6 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.format.converter.ParquetMetadataConverter; @@ -63,20 +61,10 @@ public static ParquetMetadata readFooter( .build() .getMetadataFilter(); } - return readFooter(configuration, file.toPath(), filter); + return readFooter(HadoopInputFile.fromStatus(file.fileStatus(), configuration), filter); } - public static ParquetMetadata readFooter(Configuration configuration, - Path file, ParquetMetadataConverter.MetadataFilter filter) throws IOException { - return readFooter(HadoopInputFile.fromPath(file, configuration), filter); - } - - public static ParquetMetadata readFooter(Configuration configuration, - FileStatus fileStatus, ParquetMetadataConverter.MetadataFilter filter) throws IOException { - return readFooter(HadoopInputFile.fromStatus(fileStatus, configuration), filter); - } - - private static ParquetMetadata readFooter(HadoopInputFile inputFile, + public static ParquetMetadata readFooter(HadoopInputFile inputFile, ParquetMetadataConverter.MetadataFilter filter) throws IOException { ParquetReadOptions readOptions = HadoopReadOptions.builder(inputFile.getConfiguration(), inputFile.getPath()) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index d3716ef18447..038112086e47 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -49,6 +49,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; @@ -89,24 +90,27 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader inputFile, + Option inputStream, Option fileFooter) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); FileSplit split = (FileSplit) inputSplit; this.file = split.getPath(); + ParquetReadOptions options = HadoopReadOptions + .builder(configuration, file) + .withRange(split.getStart(), split.getStart() + split.getLength()) + .build(); ParquetFileReader fileReader; - if (fileFooter.isDefined()) { - fileReader = new ParquetFileReader(configuration, file, fileFooter.get()); + if (inputFile.isDefined() && fileFooter.isDefined() && inputStream.isDefined()) { + fileReader = new ParquetFileReader( + inputFile.get(), fileFooter.get(), options, inputStream.get()); } else { - ParquetReadOptions options = HadoopReadOptions - .builder(configuration, file) - .withRange(split.getStart(), split.getStart() + split.getLength()) - .build(); fileReader = new ParquetFileReader( HadoopInputFile.fromPath(file, configuration), options); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 9010c6e30be0..b15f79df527e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Set; -import org.apache.spark.SparkUnsupportedOperationException; -import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; import scala.Option; import scala.jdk.javaapi.CollectionConverters; @@ -35,11 +33,15 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; import org.apache.spark.sql.execution.vectorized.ConstantColumnVector; @@ -190,9 +192,11 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext, + Option inputFile, + Option inputStream, Option fileFooter) throws IOException, InterruptedException, UnsupportedOperationException { - super.initialize(inputSplit, taskAttemptContext, fileFooter); + super.initialize(inputSplit, taskAttemptContext, inputFile, inputStream, fileFooter); initializeInternal(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala index 1411eae1aace..14a2c49015f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala @@ -48,8 +48,8 @@ object PartitionedFileUtil { start: Long, length: Long): PartitionedFile = { val hosts = getBlockHosts(getBlockLocations(file.fileStatus), start, length) - PartitionedFile(partitionValues, SparkPath.fromPath(filePath), start, length, hosts, - file.getModificationTime, file.getLen, file.metadata) + PartitionedFile(partitionValues, SparkPath.fromPath(filePath), start, length, + file.fileStatus, hosts, file.getModificationTime, file.getLen, file.metadata) } private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 8a254b464da7..9e54ae17ed9b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -317,7 +317,8 @@ object FileFormat { fieldNames: Seq[String], filePath: SparkPath, fileSize: Long, - fileModificationTime: Long): InternalRow = { + fileModificationTime: Long, + fileStatus: FileStatus): InternalRow = { // When scanning files directly from the filesystem, we only support file-constant metadata // fields whose values can be derived from a file status. In particular, we don't have accurate // file split information yet, nor do we have a way to provide custom metadata column values. @@ -330,6 +331,7 @@ object FileFormat { filePath = filePath, start = 0L, length = fileSize, + fileStatus = fileStatus, locations = Array.empty, modificationTime = fileModificationTime, fileSize = fileSize, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala index 0291a5fd28a7..3948dca7700b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala @@ -85,7 +85,7 @@ class FilePruningRunner(filters: Seq[Expression]) { boundedFilterMetadataStructOpt.forall { boundedFilter => val row = FileFormat.createMetadataInternalRow(partitionValues, requiredMetadataColumnNames.toSeq, - SparkPath.fromFileStatus(f), f.getLen, f.getModificationTime) + SparkPath.fromFileStatus(f), f.getLen, f.getModificationTime, f) boundedFilter.eval(row) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 5dc13ccee9ce..2097ad1b39ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import java.io.{Closeable, FileNotFoundException} import java.net.URI -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hdfs.BlockMissingException import org.apache.hadoop.security.AccessControlException @@ -50,6 +50,7 @@ import org.apache.spark.util.NextIterator * @param filePath URI of the file to read * @param start the beginning offset (in bytes) of the block. * @param length number of bytes to read. + * @param fileStatus The FileStatus instance of the file to read. * @param modificationTime The modification time of the input file, in milliseconds. * @param fileSize The length of the input file (not the block), in bytes. * @param otherConstantMetadataColumnValues The values of any additional constant metadata columns. @@ -59,6 +60,7 @@ case class PartitionedFile( filePath: SparkPath, start: Long, length: Long, + fileStatus: FileStatus, @transient locations: Array[String] = Array.empty, modificationTime: Long = 0L, fileSize: Long = 0L, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index be6e5d188667..efd7f319a202 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -26,10 +26,13 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapred.FileSplit import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.parquet.HadoopReadOptions import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop._ +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -207,15 +210,31 @@ class ParquetFileFormat val sharedConf = broadcastedHadoopConf.value.value - val fileFooter = if (enableVectorizedReader) { - // When there are vectorized reads, we can avoid reading the footer twice by reading - // all row groups in advance and filter row groups according to filters that require - // push down (no need to read the footer metadata again). - ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS) + // When there are vectorized reads, we can avoid + // 1. opening the file twice by transfering the SeekableInputStream + // 2. reading the footer twice by reading all row groups in advance and filter row groups + // according to filters that require push down + val metadataFilter = if (enableVectorizedReader) { + HadoopReadOptions.builder(sharedConf, filePath) + .withRange(file.start, file.start + file.length) + .build.getMetadataFilter } else { - ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS) + ParquetMetadataConverter.SKIP_ROW_GROUPS } + val readOptions = HadoopReadOptions.builder(sharedConf, filePath) + .withMetadataFilter(metadataFilter).build + + val inputFile = HadoopInputFile.fromStatus(file.fileStatus, sharedConf) + val inputStream = inputFile.newStream() + val fileReader = ParquetFileReader.open(inputFile, readOptions, inputStream) + val fileFooter = fileReader.getFooter + if (enableVectorizedReader) { + // Keep the file input stream open so it can be reused later + fileReader.detachFileInputStream() + } + fileReader.close() + val footerFileMetaData = fileFooter.getFileMetaData val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( footerFileMetaData.getKeyValueMetaData.get, @@ -289,7 +308,8 @@ class ParquetFileFormat // Instead, we use FileScanRDD's task completion listener to close this iterator. val iter = new RecordReaderIterator(vectorizedReader) try { - vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter)) + vectorizedReader.initialize( + split, hadoopAttemptContext, Some(inputFile), Some(inputStream), Some(fileFooter)) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { @@ -447,9 +467,9 @@ object ParquetFileFormat extends Logging { // Skips row group information since we only need the schema. // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, // when it can't read the footer. - Some(new Footer(currentFile.getPath(), + Some(new Footer(currentFile.getPath, ParquetFooterReader.readFooter( - conf, currentFile, SKIP_ROW_GROUPS))) + HadoopInputFile.fromStatus(currentFile, conf), SKIP_ROW_GROUPS))) } catch { case e: RuntimeException => if (ignoreCorruptFiles) { logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, currentFile)}", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 70ae8068a03a..ab6845d0819d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -21,10 +21,14 @@ import java.time.ZoneId import org.apache.hadoop.mapred.FileSplit import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl +import org.apache.parquet.HadoopReadOptions import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} -import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.util.HadoopInputFile +import org.apache.parquet.io.SeekableInputStream import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast @@ -86,17 +90,43 @@ case class ParquetPartitionReaderFactory( private val parquetReaderCallback = new ParquetReaderCallback() - private def getFooter(file: PartitionedFile): ParquetMetadata = { - val conf = broadcastedConf.value.value - if (aggregation.isDefined || enableVectorizedReader) { - // There are two purposes for reading footer with row groups: - // 1. When there are aggregates to push down, we get max/min/count from footer statistics. - // 2. When there are vectorized reads, we can avoid reading the footer twice by reading - // all row groups in advance and filter row groups according to filters that require - // push down (no need to read the footer metadata again). - ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.WITH_ROW_GROUPS) + private def getFooter(file: PartitionedFile): + (Option[HadoopInputFile], Option[SeekableInputStream], ParquetMetadata) = { + val hadoopConf = broadcastedConf.value.value + if (aggregation.isDefined) { + // When there are aggregates to push down, we get max/min/count from footer statistics. + val footer = ParquetFooterReader.readFooter( + hadoopConf, file, ParquetFooterReader.WITH_ROW_GROUPS) + (None, None, footer) } else { - ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.SKIP_ROW_GROUPS) + // When there are vectorized reads, we can avoid + // 1. opening the file twice by transfering the SeekableInputStream + // 2. reading the footer twice by reading all row groups in advance and filter row groups + // according to filters that require push down + val metadataFilter = if (enableVectorizedReader) { + HadoopReadOptions.builder(hadoopConf, file.toPath) + .withRange(file.start, file.start + file.length) + .build.getMetadataFilter + } else { + ParquetMetadataConverter.SKIP_ROW_GROUPS + } + val readOptions = HadoopReadOptions.builder(hadoopConf, file.toPath) + .withMetadataFilter(metadataFilter).build + + val inputFile = HadoopInputFile.fromStatus(file.fileStatus, hadoopConf) + val inputStream = inputFile.newStream() + val fileReader = ParquetFileReader.open(inputFile, readOptions, inputStream) + val fileFooter = fileReader.getFooter + if (enableVectorizedReader) { + // Keep the file input stream open so it can be reused later + fileReader.detachFileInputStream() + } + fileReader.close() + if (enableVectorizedReader) { + (Some(inputFile), Some(inputStream), fileFooter) + } else { + (None, None, fileFooter) + } } } @@ -130,7 +160,7 @@ case class ParquetPartitionReaderFactory( new PartitionReader[InternalRow] { private var hasNext = true private lazy val row: InternalRow = { - val footer = getFooter(file) + val (_, _, footer) = getFooter(file) if (footer != null && footer.getBlocks.size > 0) { ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, @@ -175,7 +205,7 @@ case class ParquetPartitionReaderFactory( new PartitionReader[ColumnarBatch] { private var hasNext = true private val batch: ColumnarBatch = { - val footer = getFooter(file) + val (_, _, footer) = getFooter(file) if (footer != null && footer.getBlocks.size > 0) { val row = ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, dataSchema, partitionSchema, aggregation.get, readDataSchema, file.partitionValues, @@ -213,7 +243,7 @@ case class ParquetPartitionReaderFactory( val filePath = file.toPath val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - val fileFooter = getFooter(file) + val (inputFile, inputStream, fileFooter) = getFooter(file) val footerFileMetaData = fileFooter.getFileMetaData val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData) // Try to push down filters when filter push-down is enabled. @@ -274,7 +304,8 @@ case class ParquetPartitionReaderFactory( ) { reader => reader match { case vectorizedReader: VectorizedParquetRecordReader => - vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter)) + vectorizedReader.initialize( + split, hadoopAttemptContext, inputFile, inputStream, Some(fileFooter)) case _ => reader.initialize(split, hadoopAttemptContext) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index afeca756208e..e5b4dc444904 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -286,10 +286,14 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession { test("Locality support for FileScanRDD") { val partition = FilePartition(0, Array( - PartitionedFile(InternalRow.empty, sp("fakePath0"), 0, 10, Array("host0", "host1")), - PartitionedFile(InternalRow.empty, sp("fakePath0"), 10, 20, Array("host1", "host2")), - PartitionedFile(InternalRow.empty, sp("fakePath1"), 0, 5, Array("host3")), - PartitionedFile(InternalRow.empty, sp("fakePath2"), 0, 5, Array("host4")) + PartitionedFile(InternalRow.empty, sp("fakePath0"), 0, 10, + new FileStatus(20, false, 3, 0, 0, sp("fakePath0").toPath), Array("host0", "host1")), + PartitionedFile(InternalRow.empty, sp("fakePath0"), 10, 20, + new FileStatus(20, false, 3, 0, 0, sp("fakePath0").toPath), Array("host1", "host2")), + PartitionedFile(InternalRow.empty, sp("fakePath1"), 0, 5, + new FileStatus(5, false, 3, 0, 0, sp("fakePath1").toPath), Array("host3")), + PartitionedFile(InternalRow.empty, sp("fakePath2"), 0, 5, + new FileStatus(5, false, 3, 0, 0, sp("fakePath2").toPath), Array("host4")) )) val fakeRDD = new FileScanRDD( @@ -605,8 +609,10 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession { } test(s"SPARK-44021: Test ${SQLConf.FILES_MAX_PARTITION_NUM.key} works as expected") { - val files = - Range(0, 300000).map(p => PartitionedFile(InternalRow.empty, sp(s"$p"), 0, 50000000)) + val files = Range(0, 300000).map { p => + PartitionedFile(InternalRow.empty, sp(s"$p"), 0, 50000000, + new FileStatus(0, false, 1, 0, 0, sp(s"$p").toPath)) + } val maxPartitionBytes = conf.filesMaxPartitionBytes val defaultPartitions = FilePartition.getFilePartitions(spark, files, maxPartitionBytes) assert(defaultPartitions.size === 150000) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala index b6b89ab30439..942f974a3710 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.Files import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus import org.apache.spark.paths.SparkPath import org.apache.spark.sql.catalyst.InternalRow @@ -38,11 +39,13 @@ class HadoopFileLinesReaderSuite extends SharedSparkSession { Files.write(path.toPath, text.getBytes(StandardCharsets.UTF_8)) val lines = ranges.flatMap { case (start, length) => + val sp = SparkPath.fromPathString(path.getCanonicalPath) val file = PartitionedFile( InternalRow.empty, - SparkPath.fromPathString(path.getCanonicalPath), + sp, start, - length) + length, + new FileStatus(path.length(), false, 1, 0, 0, sp.toPath)) val hadoopConf = conf.getOrElse(spark.sessionState.newHadoopConf()) val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index 257a89754f4e..044f6ce202d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -22,6 +22,7 @@ import java.time.ZoneOffset import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.sql.Row @@ -229,8 +230,8 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS // sure the test is configured correctly. assert(parts.size == 2) parts.foreach { part => - val oneFooter = - ParquetFooterReader.readFooter(hadoopConf, part.getPath, NO_FILTER) + val oneFooter = ParquetFooterReader.readFooter( + HadoopInputFile.fromStatus(part, hadoopConf), NO_FILTER) assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 1) val typeName = oneFooter .getFileMetaData.getSchema.getColumns.get(0).getPrimitiveType.getPrimitiveTypeName diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index b7b082e32965..12daef65eabc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -144,8 +144,8 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { protected def readFooter(path: Path, configuration: Configuration): ParquetMetadata = { ParquetFooterReader.readFooter( - configuration, - new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE), + HadoopInputFile.fromPath( + new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE), configuration), ParquetMetadataConverter.NO_FILTER) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d3f625542d96..769b633a9c52 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.apache.parquet.hadoop.util.HadoopInputFile import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} import org.scalatest.BeforeAndAfterEach @@ -2710,8 +2711,9 @@ class HiveDDLSuite OrcFileOperator.getFileReader(maybeFile.get.toPath.toString).get.getCompression.name case "parquet" => + val hadoopConf = sparkContext.hadoopConfiguration val footer = ParquetFooterReader.readFooter( - sparkContext.hadoopConfiguration, new Path(maybeFile.get.getPath), NO_FILTER) + HadoopInputFile.fromPath(new Path(maybeFile.get.getPath), hadoopConf), NO_FILTER) footer.getBlocks.get(0).getColumns.get(0).getCodec.toString } From 7f5889aa88fa1220781a0882bbc93082f206ee05 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Tue, 9 Sep 2025 16:18:35 +0800 Subject: [PATCH 02/11] address comments --- .../sql/execution/DataSourceScanExec.scala | 3 +-- .../sql/execution/PartitionedFileUtil.scala | 13 +++++------ .../execution/datasources/FileFormat.scala | 9 +------- .../sql/execution/datasources/FileIndex.scala | 4 +--- .../execution/datasources/FileScanRDD.scala | 9 +++----- .../parquet/ParquetFileFormat.scala | 18 ++++++++------- .../execution/datasources/v2/FileScan.scala | 4 +--- .../ParquetPartitionReaderFactory.scala | 22 ++++++++++--------- .../datasources/FileSourceStrategySuite.scala | 10 ++++----- .../HadoopFileLinesReaderSuite.scala | 1 - 10 files changed, 39 insertions(+), 54 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 2488b6aa5115..a434065af88c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -640,7 +640,7 @@ trait FileSourceScanLike extends DataSourceScanExec with SessionStateHelper { override def toPartitionArray: Array[PartitionedFile] = { partitionDirectories.flatMap { p => p.files.map { f => - PartitionedFileUtil.getPartitionedFile(f, f.getPath, p.values, 0, f.getLen) + PartitionedFileUtil.getPartitionedFile(f, p.values, 0, f.getLen) } } } @@ -876,7 +876,6 @@ case class FileSourceScanExec( relation.sparkSession, relation.options, filePath) PartitionedFileUtil.splitFiles( file = file, - filePath = filePath, isSplitable = isSplitable, maxSplitBytes = maxSplitBytes, partitionValues = partitionVals diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala index 14a2c49015f7..2dfe2a29b3cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala @@ -17,16 +17,14 @@ package org.apache.spark.sql.execution -import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path} +import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus} -import org.apache.spark.paths.SparkPath import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.datasources._ object PartitionedFileUtil { def splitFiles( file: FileStatusWithMetadata, - filePath: Path, isSplitable: Boolean, maxSplitBytes: Long, partitionValues: InternalRow): Seq[PartitionedFile] = { @@ -34,22 +32,21 @@ object PartitionedFileUtil { (0L until file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining - getPartitionedFile(file, filePath, partitionValues, offset, size) + getPartitionedFile(file, partitionValues, offset, size) } } else { - Seq(getPartitionedFile(file, filePath, partitionValues, 0, file.getLen)) + Seq(getPartitionedFile(file, partitionValues, 0, file.getLen)) } } def getPartitionedFile( file: FileStatusWithMetadata, - filePath: Path, partitionValues: InternalRow, start: Long, length: Long): PartitionedFile = { val hosts = getBlockHosts(getBlockLocations(file.fileStatus), start, length) - PartitionedFile(partitionValues, SparkPath.fromPath(filePath), start, length, - file.fileStatus, hosts, file.getModificationTime, file.getLen, file.metadata) + PartitionedFile(partitionValues, start, length, + file.fileStatus, hosts, file.metadata) } private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index 9e54ae17ed9b..1f13ca0feac9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -22,7 +22,6 @@ import org.apache.hadoop.fs._ import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.mapreduce.Job -import org.apache.spark.paths.SparkPath import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -315,9 +314,6 @@ object FileFormat { def createMetadataInternalRow( partitionValues: InternalRow, fieldNames: Seq[String], - filePath: SparkPath, - fileSize: Long, - fileModificationTime: Long, fileStatus: FileStatus): InternalRow = { // When scanning files directly from the filesystem, we only support file-constant metadata // fields whose values can be derived from a file status. In particular, we don't have accurate @@ -328,13 +324,10 @@ object FileFormat { assert(fieldNames.forall(validFieldNames.contains)) val pf = PartitionedFile( partitionValues = partitionValues, - filePath = filePath, start = 0L, - length = fileSize, + length = fileStatus.getLen, fileStatus = fileStatus, locations = Array.empty, - modificationTime = fileModificationTime, - fileSize = fileSize, otherConstantMetadataColumnValues = Map.empty) updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames, pf, extractors) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala index 3948dca7700b..e786153d4469 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala @@ -21,7 +21,6 @@ import scala.collection.mutable import org.apache.hadoop.fs._ -import org.apache.spark.paths.SparkPath import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.StructType @@ -84,8 +83,7 @@ class FilePruningRunner(filters: Seq[Expression]) { // use option.forall, so if there is no filter no metadata struct, return true boundedFilterMetadataStructOpt.forall { boundedFilter => val row = - FileFormat.createMetadataInternalRow(partitionValues, requiredMetadataColumnNames.toSeq, - SparkPath.fromFileStatus(f), f.getLen, f.getModificationTime, f) + FileFormat.createMetadataInternalRow(partitionValues, requiredMetadataColumnNames.toSeq, f) boundedFilter.eval(row) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala index 2097ad1b39ab..621004e28f8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala @@ -47,28 +47,25 @@ import org.apache.spark.util.NextIterator * that need to be prepended to each row. * * @param partitionValues value of partition columns to be prepended to each row. - * @param filePath URI of the file to read * @param start the beginning offset (in bytes) of the block. * @param length number of bytes to read. * @param fileStatus The FileStatus instance of the file to read. - * @param modificationTime The modification time of the input file, in milliseconds. - * @param fileSize The length of the input file (not the block), in bytes. * @param otherConstantMetadataColumnValues The values of any additional constant metadata columns. */ case class PartitionedFile( partitionValues: InternalRow, - filePath: SparkPath, start: Long, length: Long, fileStatus: FileStatus, @transient locations: Array[String] = Array.empty, - modificationTime: Long = 0L, - fileSize: Long = 0L, otherConstantMetadataColumnValues: Map[String, Any] = Map.empty) { + @transient lazy val filePath: SparkPath = SparkPath.fromFileStatus(fileStatus) def pathUri: URI = filePath.toUri def toPath: Path = filePath.toPath def urlEncodedPath: String = filePath.urlEncoded + def modificationTime: Long = fileStatus.getModificationTime + def fileSize: Long = fileStatus.getLen override def toString: String = { s"path: $filePath, range: $start-${start + length}, partition values: $partitionValues" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index efd7f319a202..fd702391dad3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapC import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils} class ParquetFileFormat extends FileFormat @@ -227,13 +227,15 @@ class ParquetFileFormat val inputFile = HadoopInputFile.fromStatus(file.fileStatus, sharedConf) val inputStream = inputFile.newStream() - val fileReader = ParquetFileReader.open(inputFile, readOptions, inputStream) - val fileFooter = fileReader.getFooter - if (enableVectorizedReader) { - // Keep the file input stream open so it can be reused later - fileReader.detachFileInputStream() + val fileFooter = Utils.tryWithResource( + ParquetFileReader.open(inputFile, readOptions, inputStream)) { fileReader => + val footer = fileReader.getFooter + if (enableVectorizedReader) { + // Keep the file input stream open so it can be reused later + fileReader.detachFileInputStream() + } + footer } - fileReader.close() val footerFileMetaData = fileFooter.getFileMetaData val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( @@ -309,7 +311,7 @@ class ParquetFileFormat val iter = new RecordReaderIterator(vectorizedReader) try { vectorizedReader.initialize( - split, hadoopAttemptContext, Some(inputFile), Some(inputStream), Some(fileFooter)) + split, hadoopAttemptContext, Option(inputFile), Option(inputStream), Option(fileFooter)) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala index 5348f9ab6df6..6c5595f75b0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScan.scala @@ -159,11 +159,9 @@ trait FileScan extends Scan partition.values } partition.files.flatMap { file => - val filePath = file.getPath PartitionedFileUtil.splitFiles( file = file, - filePath = filePath, - isSplitable = isSplitable(filePath), + isSplitable = isSplitable(file.getPath), maxSplitBytes = maxSplitBytes, partitionValues = partitionValues ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index ab6845d0819d..3c780e6a28bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -90,7 +90,7 @@ case class ParquetPartitionReaderFactory( private val parquetReaderCallback = new ParquetReaderCallback() - private def getFooter(file: PartitionedFile): + private def openFileAndReadFooter(file: PartitionedFile): (Option[HadoopInputFile], Option[SeekableInputStream], ParquetMetadata) = { val hadoopConf = broadcastedConf.value.value if (aggregation.isDefined) { @@ -115,13 +115,15 @@ case class ParquetPartitionReaderFactory( val inputFile = HadoopInputFile.fromStatus(file.fileStatus, hadoopConf) val inputStream = inputFile.newStream() - val fileReader = ParquetFileReader.open(inputFile, readOptions, inputStream) - val fileFooter = fileReader.getFooter - if (enableVectorizedReader) { - // Keep the file input stream open so it can be reused later - fileReader.detachFileInputStream() + val fileFooter = Utils.tryWithResource( + ParquetFileReader.open(inputFile, readOptions, inputStream)) { fileReader => + val footer = fileReader.getFooter + if (enableVectorizedReader) { + // Keep the file input stream open so it can be reused later + fileReader.detachFileInputStream() + } + footer } - fileReader.close() if (enableVectorizedReader) { (Some(inputFile), Some(inputStream), fileFooter) } else { @@ -160,7 +162,7 @@ case class ParquetPartitionReaderFactory( new PartitionReader[InternalRow] { private var hasNext = true private lazy val row: InternalRow = { - val (_, _, footer) = getFooter(file) + val (_, _, footer) = openFileAndReadFooter(file) if (footer != null && footer.getBlocks.size > 0) { ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, @@ -205,7 +207,7 @@ case class ParquetPartitionReaderFactory( new PartitionReader[ColumnarBatch] { private var hasNext = true private val batch: ColumnarBatch = { - val (_, _, footer) = getFooter(file) + val (_, _, footer) = openFileAndReadFooter(file) if (footer != null && footer.getBlocks.size > 0) { val row = ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, dataSchema, partitionSchema, aggregation.get, readDataSchema, file.partitionValues, @@ -243,7 +245,7 @@ case class ParquetPartitionReaderFactory( val filePath = file.toPath val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - val (inputFile, inputStream, fileFooter) = getFooter(file) + val (inputFile, inputStream, fileFooter) = openFileAndReadFooter(file) val footerFileMetaData = fileFooter.getFileMetaData val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData) // Try to push down filters when filter push-down is enabled. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index e5b4dc444904..df113790e0af 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -286,13 +286,13 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession { test("Locality support for FileScanRDD") { val partition = FilePartition(0, Array( - PartitionedFile(InternalRow.empty, sp("fakePath0"), 0, 10, + PartitionedFile(InternalRow.empty, 0, 10, new FileStatus(20, false, 3, 0, 0, sp("fakePath0").toPath), Array("host0", "host1")), - PartitionedFile(InternalRow.empty, sp("fakePath0"), 10, 20, + PartitionedFile(InternalRow.empty, 10, 20, new FileStatus(20, false, 3, 0, 0, sp("fakePath0").toPath), Array("host1", "host2")), - PartitionedFile(InternalRow.empty, sp("fakePath1"), 0, 5, + PartitionedFile(InternalRow.empty, 0, 5, new FileStatus(5, false, 3, 0, 0, sp("fakePath1").toPath), Array("host3")), - PartitionedFile(InternalRow.empty, sp("fakePath2"), 0, 5, + PartitionedFile(InternalRow.empty, 0, 5, new FileStatus(5, false, 3, 0, 0, sp("fakePath2").toPath), Array("host4")) )) @@ -610,7 +610,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession { test(s"SPARK-44021: Test ${SQLConf.FILES_MAX_PARTITION_NUM.key} works as expected") { val files = Range(0, 300000).map { p => - PartitionedFile(InternalRow.empty, sp(s"$p"), 0, 50000000, + PartitionedFile(InternalRow.empty, 0, 50000000, new FileStatus(0, false, 1, 0, 0, sp(s"$p").toPath)) } val maxPartitionBytes = conf.filesMaxPartitionBytes diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala index 942f974a3710..095363f107d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala @@ -42,7 +42,6 @@ class HadoopFileLinesReaderSuite extends SharedSparkSession { val sp = SparkPath.fromPathString(path.getCanonicalPath) val file = PartitionedFile( InternalRow.empty, - sp, start, length, new FileStatus(path.length(), false, 1, 0, 0, sp.toPath)) From a88f032bd0889226946e6821a475734142b99c26 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 11 Sep 2025 15:09:15 +0800 Subject: [PATCH 03/11] extract common code to ParquetFooterReader --- .../parquet/ParquetFooterReader.java | 77 ------------- .../parquet/ParquetFileFormat.scala | 34 +----- .../parquet/ParquetFooterReader.scala | 106 ++++++++++++++++++ .../ParquetPartitionReaderFactory.scala | 36 +----- 4 files changed, 116 insertions(+), 137 deletions(-) delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.scala diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java deleted file mode 100644 index afeb4a2b1396..000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.datasources.parquet; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.parquet.HadoopReadOptions; -import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.format.converter.ParquetMetadataConverter; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.hadoop.util.HadoopInputFile; - -import org.apache.spark.sql.execution.datasources.PartitionedFile; - -/** - * `ParquetFooterReader` is a util class which encapsulates the helper - * methods of reading parquet file footer - */ -public class ParquetFooterReader { - - public static final boolean SKIP_ROW_GROUPS = true; - public static final boolean WITH_ROW_GROUPS = false; - - /** - * Reads footer for the input Parquet file 'split'. If 'skipRowGroup' is true, - * this will skip reading the Parquet row group metadata. - * - * @param file a part (i.e. "block") of a single file that should be read - * @param configuration hadoop configuration of file - * @param skipRowGroup If true, skip reading row groups; - * if false, read row groups according to the file split range - */ - public static ParquetMetadata readFooter( - Configuration configuration, - PartitionedFile file, - boolean skipRowGroup) throws IOException { - long fileStart = file.start(); - ParquetMetadataConverter.MetadataFilter filter; - if (skipRowGroup) { - filter = ParquetMetadataConverter.SKIP_ROW_GROUPS; - } else { - filter = HadoopReadOptions.builder(configuration, file.toPath()) - .withRange(fileStart, fileStart + file.length()) - .build() - .getMetadataFilter(); - } - return readFooter(HadoopInputFile.fromStatus(file.fileStatus(), configuration), filter); - } - - public static ParquetMetadata readFooter(HadoopInputFile inputFile, - ParquetMetadataConverter.MetadataFilter filter) throws IOException { - ParquetReadOptions readOptions = - HadoopReadOptions.builder(inputFile.getConfiguration(), inputFile.getPath()) - .withMetadataFilter(filter).build(); - // Use try-with-resources to ensure fd is closed. - try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, readOptions)) { - return fileReader.getFooter(); - } - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index fd702391dad3..fd36753d7ca3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -26,10 +26,8 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapred.FileSplit import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.parquet.HadoopReadOptions import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.FilterApi -import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.util.HadoopInputFile @@ -50,7 +48,7 @@ import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapC import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils} +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} class ParquetFileFormat extends FileFormat @@ -205,37 +203,15 @@ class ParquetFileFormat (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) - val filePath = file.toPath - val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - + val split = new FileSplit(file.toPath, file.start, file.length, Array.empty[String]) val sharedConf = broadcastedHadoopConf.value.value // When there are vectorized reads, we can avoid // 1. opening the file twice by transfering the SeekableInputStream // 2. reading the footer twice by reading all row groups in advance and filter row groups // according to filters that require push down - val metadataFilter = if (enableVectorizedReader) { - HadoopReadOptions.builder(sharedConf, filePath) - .withRange(file.start, file.start + file.length) - .build.getMetadataFilter - } else { - ParquetMetadataConverter.SKIP_ROW_GROUPS - } - - val readOptions = HadoopReadOptions.builder(sharedConf, filePath) - .withMetadataFilter(metadataFilter).build - - val inputFile = HadoopInputFile.fromStatus(file.fileStatus, sharedConf) - val inputStream = inputFile.newStream() - val fileFooter = Utils.tryWithResource( - ParquetFileReader.open(inputFile, readOptions, inputStream)) { fileReader => - val footer = fileReader.getFooter - if (enableVectorizedReader) { - // Keep the file input stream open so it can be reused later - fileReader.detachFileInputStream() - } - footer - } + val (inputFileOpt, inputStreamOpt, fileFooter) = + ParquetFooterReader.openFileAndReadFooter(sharedConf, file, enableVectorizedReader) val footerFileMetaData = fileFooter.getFileMetaData val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( @@ -311,7 +287,7 @@ class ParquetFileFormat val iter = new RecordReaderIterator(vectorizedReader) try { vectorizedReader.initialize( - split, hadoopAttemptContext, Option(inputFile), Option(inputStream), Option(fileFooter)) + split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Option(fileFooter)) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.scala new file mode 100644 index 000000000000..c1420d784df9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.hadoop.conf.Configuration +import org.apache.parquet.HadoopReadOptions +import org.apache.parquet.format.converter.ParquetMetadataConverter +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.metadata.ParquetMetadata +import org.apache.parquet.hadoop.util.HadoopInputFile +import org.apache.parquet.io.SeekableInputStream + +import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.util.Utils + +object ParquetFooterReader { + + val SKIP_ROW_GROUPS = true + val WITH_ROW_GROUPS = false + + /** + * Build a filter for reading footer of the input Parquet file 'split'. + * If 'skipRowGroup' is true, this will skip reading the Parquet row group metadata. + * + * @param hadoopConf hadoop configuration of file + * @param file a part (i.e. "block") of a single file that should be read + * @param skipRowGroup If true, skip reading row groups; + * if false, read row groups according to the file split range + */ + def buildFilter( + hadoopConf: Configuration, + file: PartitionedFile, + skipRowGroup: Boolean): ParquetMetadataConverter.MetadataFilter = { + if (skipRowGroup) { + ParquetMetadataConverter.SKIP_ROW_GROUPS + } else { + HadoopReadOptions.builder(hadoopConf, file.toPath) + .withRange(file.start, file.start + file.length) + .build() + .getMetadataFilter + } + } + + def readFooter( + inputFile: HadoopInputFile, + filter: ParquetMetadataConverter.MetadataFilter): ParquetMetadata = { + val readOptions = HadoopReadOptions.builder(inputFile.getConfiguration, inputFile.getPath) + .withMetadataFilter(filter).build() + Utils.tryWithResource(ParquetFileReader.open(inputFile, readOptions)) { fileReader => + fileReader.getFooter + } + } + + /** + * Decoding Parquet files generally involves two steps: + * 1. read and resolve the metadata (footer), + * 2. read and decode the row groups/column chunks. + * + * It's possible to avoid opening the file twice by resuing the SeekableInputStream. + * When detachFileInputStream is true, the caller takes responsibility to close the + * SeekableInputStream. Currently, this is only supported by parquet vectorized reader. + * + * @param hadoopConf hadoop configuration of file + * @param file a part (i.e. "block") of a single file that should be read + * @param detachFileInputStream when true, keep the SeekableInputStream of file opened + * @return if detachFileInputStream is true, return + * (Some(HadoopInputFile), Soem(SeekableInputStream), ParquetMetadata), + * otherwise, return (None, None, ParquetMetadata). + */ + def openFileAndReadFooter( + hadoopConf: Configuration, + file: PartitionedFile, + detachFileInputStream: Boolean): + (Option[HadoopInputFile], Option[SeekableInputStream], ParquetMetadata) = { + val readOptions = HadoopReadOptions.builder(hadoopConf, file.toPath) + .withMetadataFilter(buildFilter(hadoopConf, file, !detachFileInputStream)) + .build() + val inputFile = HadoopInputFile.fromStatus(file.fileStatus, hadoopConf) + val inputStream = inputFile.newStream() + Utils.tryWithResource( + ParquetFileReader.open(inputFile, readOptions, inputStream)) { fileReader => + val footer = fileReader.getFooter + if (detachFileInputStream) { + fileReader.detachFileInputStream() + (Some(inputFile), Some(inputStream), footer) + } else { + (None, None, footer) + } + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 3c780e6a28bd..3a42f149f403 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -21,10 +21,8 @@ import java.time.ZoneId import org.apache.hadoop.mapred.FileSplit import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import org.apache.parquet.HadoopReadOptions import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} -import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop._ import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata} import org.apache.parquet.hadoop.util.HadoopInputFile @@ -96,39 +94,15 @@ case class ParquetPartitionReaderFactory( if (aggregation.isDefined) { // When there are aggregates to push down, we get max/min/count from footer statistics. val footer = ParquetFooterReader.readFooter( - hadoopConf, file, ParquetFooterReader.WITH_ROW_GROUPS) + HadoopInputFile.fromStatus(file.fileStatus, hadoopConf), + ParquetFooterReader.buildFilter(hadoopConf, file, ParquetFooterReader.WITH_ROW_GROUPS)) (None, None, footer) } else { // When there are vectorized reads, we can avoid // 1. opening the file twice by transfering the SeekableInputStream // 2. reading the footer twice by reading all row groups in advance and filter row groups // according to filters that require push down - val metadataFilter = if (enableVectorizedReader) { - HadoopReadOptions.builder(hadoopConf, file.toPath) - .withRange(file.start, file.start + file.length) - .build.getMetadataFilter - } else { - ParquetMetadataConverter.SKIP_ROW_GROUPS - } - val readOptions = HadoopReadOptions.builder(hadoopConf, file.toPath) - .withMetadataFilter(metadataFilter).build - - val inputFile = HadoopInputFile.fromStatus(file.fileStatus, hadoopConf) - val inputStream = inputFile.newStream() - val fileFooter = Utils.tryWithResource( - ParquetFileReader.open(inputFile, readOptions, inputStream)) { fileReader => - val footer = fileReader.getFooter - if (enableVectorizedReader) { - // Keep the file input stream open so it can be reused later - fileReader.detachFileInputStream() - } - footer - } - if (enableVectorizedReader) { - (Some(inputFile), Some(inputStream), fileFooter) - } else { - (None, None, fileFooter) - } + ParquetFooterReader.openFileAndReadFooter(hadoopConf, file, enableVectorizedReader) } } @@ -245,7 +219,7 @@ case class ParquetPartitionReaderFactory( val filePath = file.toPath val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - val (inputFile, inputStream, fileFooter) = openFileAndReadFooter(file) + val (inputFileOpt, inputStreamOpt, fileFooter) = openFileAndReadFooter(file) val footerFileMetaData = fileFooter.getFileMetaData val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData) // Try to push down filters when filter push-down is enabled. @@ -307,7 +281,7 @@ case class ParquetPartitionReaderFactory( reader match { case vectorizedReader: VectorizedParquetRecordReader => vectorizedReader.initialize( - split, hadoopAttemptContext, inputFile, inputStream, Some(fileFooter)) + split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) case _ => reader.initialize(split, hadoopAttemptContext) } From 45a6e7cae825669d7c7a87fe5b7521379d57d0de Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 11 Sep 2025 15:46:07 +0800 Subject: [PATCH 04/11] nit --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index fd36753d7ca3..5c2537e21c70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -287,7 +287,7 @@ class ParquetFileFormat val iter = new RecordReaderIterator(vectorizedReader) try { vectorizedReader.initialize( - split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Option(fileFooter)) + split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { From f2f0da97ccde599183cac661eba80cc306f6a676 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 11 Sep 2025 15:50:34 +0800 Subject: [PATCH 05/11] nit --- .../execution/datasources/parquet/ParquetFooterReader.scala | 3 --- .../datasources/v2/parquet/ParquetPartitionReaderFactory.scala | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.scala index c1420d784df9..0b18b69d7b14 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.scala @@ -30,9 +30,6 @@ import org.apache.spark.util.Utils object ParquetFooterReader { - val SKIP_ROW_GROUPS = true - val WITH_ROW_GROUPS = false - /** * Build a filter for reading footer of the input Parquet file 'split'. * If 'skipRowGroup' is true, this will skip reading the Parquet row group metadata. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 3a42f149f403..0153103cba64 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -95,7 +95,7 @@ case class ParquetPartitionReaderFactory( // When there are aggregates to push down, we get max/min/count from footer statistics. val footer = ParquetFooterReader.readFooter( HadoopInputFile.fromStatus(file.fileStatus, hadoopConf), - ParquetFooterReader.buildFilter(hadoopConf, file, ParquetFooterReader.WITH_ROW_GROUPS)) + ParquetFooterReader.buildFilter(hadoopConf, file, skipRowGroup = false)) (None, None, footer) } else { // When there are vectorized reads, we can avoid From a5646c95fd33c32bd95b6be45f4eab543705ca2a Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 11 Sep 2025 17:03:04 +0800 Subject: [PATCH 06/11] close inputstream --- .../parquet/ParquetFileFormat.scala | 245 +++++++++--------- .../ParquetPartitionReaderFactory.scala | 136 +++++----- 2 files changed, 203 insertions(+), 178 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 5c2537e21c70..6989a763ac60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -48,7 +48,7 @@ import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapC import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils} class ParquetFileFormat extends FileFormat @@ -213,132 +213,145 @@ class ParquetFileFormat val (inputFileOpt, inputStreamOpt, fileFooter) = ParquetFooterReader.openFileAndReadFooter(sharedConf, file, enableVectorizedReader) - val footerFileMetaData = fileFooter.getFileMetaData - val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - datetimeRebaseModeInRead) - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) - - // Try to push down filters when filter push-down is enabled. - val pushed = if (enableParquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringPredicate, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(parquetFilters.createFilter(_)) - .reduceOption(FilterApi.and) - } else { - None - } - - // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' - // *only* if the file was created by something other than "parquet-mr", so check the actual - // writer here for this file. We have to do this per-file, as each file in the table may - // have different writers. - // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. - def isCreatedByParquetMr: Boolean = - footerFileMetaData.getCreatedBy().startsWith("parquet-mr") - - val convertTz = - if (timestampConversion && !isCreatedByParquetMr) { - Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + // Before transform the ownership of InputStream to the VectorizedParquetRecordReader, + // we must to close the InputStream leak if something goes wrong to avoid resource. + var shouldCloseInputStream = inputStreamOpt.isDefined + try { + val footerFileMetaData = fileFooter.getFileMetaData + val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) + val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringPredicate, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates + // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why + // a `flatMap` is used here. + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) } else { None } + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 + // timestamps *only* if the file was created by something other than "parquet-mr", + // so check the actual writer here for this file. We have to do this per-file, + // as each file in the table may have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy.startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = - new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) - } - val taskContext = Option(TaskContext.get()) - if (enableVectorizedReader) { - val vectorizedReader = new VectorizedParquetRecordReader( - convertTz.orNull, - datetimeRebaseSpec.mode.toString, - datetimeRebaseSpec.timeZone, - int96RebaseSpec.mode.toString, - int96RebaseSpec.timeZone, - enableOffHeapColumnVector && taskContext.isDefined, - capacity) - // SPARK-37089: We cannot register a task completion listener to close this iterator here - // because downstream exec nodes have already registered their listeners. Since listeners - // are executed in reverse order of registration, a listener registered here would close the - // iterator while downstream exec nodes are still running. When off-heap column vectors are - // enabled, this can cause a use-after-free bug leading to a segfault. - // - // Instead, we use FileScanRDD's task completion listener to close this iterator. - val iter = new RecordReaderIterator(vectorizedReader) - try { - vectorizedReader.initialize( - split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) - logDebug(s"Appending $partitionSchema ${file.partitionValues}") - vectorizedReader.initBatch(partitionSchema, file.partitionValues) - if (returningBatch) { - vectorizedReader.enableReturningBatches() - } + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - iter.asInstanceOf[Iterator[InternalRow]] - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } - } else { - logDebug(s"Falling back to parquet-mr") - // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( - convertTz, - enableVectorizedReader = false, - datetimeRebaseSpec, - int96RebaseSpec) - val reader = if (pushed.isDefined && enableRecordFilter) { - val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[InternalRow](readSupport, parquetFilter) + val taskContext = Option(TaskContext.get()) + if (enableVectorizedReader) { + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseSpec.mode.toString, + datetimeRebaseSpec.timeZone, + int96RebaseSpec.mode.toString, + int96RebaseSpec.timeZone, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + // SPARK-37089: We cannot register a task completion listener to close this iterator + // here because downstream exec nodes have already registered their listeners. Since + // listeners are executed in reverse order of registration, a listener registered + // here would close the iterator while downstream exec nodes are still running. When + // off-heap column vectors are enabled, this can cause a use-after-free bug leading to + // a segfault. + // + // Instead, we use FileScanRDD's task completion listener to close this iterator. + val iter = new RecordReaderIterator(vectorizedReader) + try { + shouldCloseInputStream = false + // We don't need to take care the closeness of inputStream because this transfers + // the ownership of inputStream to the vectorizedReader + vectorizedReader.initialize( + split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) + logDebug(s"Appending $partitionSchema ${file.partitionValues}") + vectorizedReader.initBatch(partitionSchema, file.partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator + // to avoid leaking resources. + iter.close() + throw e + } } else { - new ParquetRecordReader[InternalRow](readSupport) - } - val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, - requiredSchema) - val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) - try { - readerWithRowIndexes.initialize(split, hadoopAttemptContext) - - val fullSchema = toAttributes(requiredSchema) ++ toAttributes(partitionSchema) - val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - if (partitionSchema.length == 0) { - // There is no partition columns - iter.map(unsafeProjection) + logDebug(s"Falling back to parquet-mr") + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseSpec, + int96RebaseSpec) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) } else { - val joinedRow = new JoinedRow() - iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + new ParquetRecordReader[InternalRow](readSupport) + } + val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, + requiredSchema) + val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) + try { + readerWithRowIndexes.initialize(split, hadoopAttemptContext) + + val fullSchema = toAttributes(requiredSchema) ++ toAttributes(partitionSchema) + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) + } + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator + // to avoid leaking resources. + iter.close() + throw e } - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + } + } finally { + if (shouldCloseInputStream) { + inputStreamOpt.foreach(Utils.closeQuietly) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 0153103cba64..6f80f976ba34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -217,75 +217,87 @@ case class ParquetPartitionReaderFactory( RebaseSpec) => RecordReader[Void, T]): RecordReader[Void, T] = { val conf = broadcastedConf.value.value - val filePath = file.toPath - val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) + val split = new FileSplit(file.toPath, file.start, file.length, Array.empty[String]) val (inputFileOpt, inputStreamOpt, fileFooter) = openFileAndReadFooter(file) - val footerFileMetaData = fileFooter.getFileMetaData - val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData) - // Try to push down filters when filter push-down is enabled. - val pushed = if (enableParquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringPredicate, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(parquetFilters.createFilter) - .reduceOption(FilterApi.and) - } else { - None - } - // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' - // *only* if the file was created by something other than "parquet-mr", so check the actual - // writer here for this file. We have to do this per-file, as each file in the table may - // have different writers. - // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. - def isCreatedByParquetMr: Boolean = - footerFileMetaData.getCreatedBy().startsWith("parquet-mr") - - val convertTz = - if (timestampConversion && !isCreatedByParquetMr) { - Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + // Before transform the ownership of InputStream to the VectorizedParquetRecordReader, + // we must to close the InputStream leak if something goes wrong to avoid resource. + var shouldCloseInputStream = inputStreamOpt.isDefined + try { + val footerFileMetaData = fileFooter.getFileMetaData + val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData) + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringPredicate, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) } else { None } - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy().startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) - } - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - int96RebaseModeInRead) - Utils.createResourceUninterruptiblyIfInTaskThread { - Utils.tryInitializeResource( - buildReaderFunc( - file.partitionValues, - pushed, - convertTz, - datetimeRebaseSpec, - int96RebaseSpec) - ) { reader => - reader match { - case vectorizedReader: VectorizedParquetRecordReader => - vectorizedReader.initialize( - split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) - case _ => - reader.initialize(split, hadoopAttemptContext) + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + if (pushed.isDefined) { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) + } + val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + int96RebaseModeInRead) + Utils.createResourceUninterruptiblyIfInTaskThread { + Utils.tryInitializeResource( + buildReaderFunc( + file.partitionValues, + pushed, + convertTz, + datetimeRebaseSpec, + int96RebaseSpec) + ) { reader => + reader match { + case vectorizedReader: VectorizedParquetRecordReader => + shouldCloseInputStream = false + // We don't need to take care the closeness of inputStream because this transfers + // the ownership of inputStream to the vectorizedReader + vectorizedReader.initialize( + split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) + case _ => + reader.initialize(split, hadoopAttemptContext) + } + reader } - reader + } + } finally { + if (shouldCloseInputStream) { + inputStreamOpt.foreach(Utils.closeQuietly) } } } From e712ca336e8bf04ea849b053b01274385c33f4c8 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 11 Sep 2025 17:10:32 +0800 Subject: [PATCH 07/11] nit --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- .../datasources/v2/parquet/ParquetPartitionReaderFactory.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 6989a763ac60..be4f59626f35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -214,7 +214,7 @@ class ParquetFileFormat ParquetFooterReader.openFileAndReadFooter(sharedConf, file, enableVectorizedReader) // Before transform the ownership of InputStream to the VectorizedParquetRecordReader, - // we must to close the InputStream leak if something goes wrong to avoid resource. + // we must to close the InputStream leak if something goes wrong to avoid resource leak. var shouldCloseInputStream = inputStreamOpt.isDefined try { val footerFileMetaData = fileFooter.getFileMetaData diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 6f80f976ba34..1606dd9061b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -220,7 +220,7 @@ case class ParquetPartitionReaderFactory( val split = new FileSplit(file.toPath, file.start, file.length, Array.empty[String]) val (inputFileOpt, inputStreamOpt, fileFooter) = openFileAndReadFooter(file) // Before transform the ownership of InputStream to the VectorizedParquetRecordReader, - // we must to close the InputStream leak if something goes wrong to avoid resource. + // we must to close the InputStream leak if something goes wrong to avoid resource leak. var shouldCloseInputStream = inputStreamOpt.isDefined try { val footerFileMetaData = fileFooter.getFileMetaData From 035742d7e9ce0c953e6184e6c32398f12c837f94 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 11 Sep 2025 17:12:56 +0800 Subject: [PATCH 08/11] nit --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- .../datasources/v2/parquet/ParquetPartitionReaderFactory.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index be4f59626f35..63990e78c8b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -292,7 +292,7 @@ class ParquetFileFormat val iter = new RecordReaderIterator(vectorizedReader) try { shouldCloseInputStream = false - // We don't need to take care the closeness of inputStream because this transfers + // We don't need to take care the close of inputStream because this transfers // the ownership of inputStream to the vectorizedReader vectorizedReader.initialize( split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 1606dd9061b8..5b87a696fc39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -285,7 +285,7 @@ case class ParquetPartitionReaderFactory( reader match { case vectorizedReader: VectorizedParquetRecordReader => shouldCloseInputStream = false - // We don't need to take care the closeness of inputStream because this transfers + // We don't need to take care the close of inputStream because this transfers // the ownership of inputStream to the vectorizedReader vectorizedReader.initialize( split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) From 9d2b2dd50d701d70f97e3b64c2c36d8cb38b6475 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 11 Sep 2025 18:08:58 +0800 Subject: [PATCH 09/11] nit --- .../execution/datasources/parquet/ParquetFileFormat.scala | 5 +++-- .../v2/parquet/ParquetPartitionReaderFactory.scala | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 63990e78c8b0..cb51ca6889fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -213,8 +213,9 @@ class ParquetFileFormat val (inputFileOpt, inputStreamOpt, fileFooter) = ParquetFooterReader.openFileAndReadFooter(sharedConf, file, enableVectorizedReader) - // Before transform the ownership of InputStream to the VectorizedParquetRecordReader, - // we must to close the InputStream leak if something goes wrong to avoid resource leak. + // Before transferring the ownership of inputStream to the vectorizedReader, + // we must take responsibility to close the inputStream if something goes wrong + // to avoid resource leak. var shouldCloseInputStream = inputStreamOpt.isDefined try { val footerFileMetaData = fileFooter.getFileMetaData diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 5b87a696fc39..881e7a764da5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -219,8 +219,9 @@ case class ParquetPartitionReaderFactory( val split = new FileSplit(file.toPath, file.start, file.length, Array.empty[String]) val (inputFileOpt, inputStreamOpt, fileFooter) = openFileAndReadFooter(file) - // Before transform the ownership of InputStream to the VectorizedParquetRecordReader, - // we must to close the InputStream leak if something goes wrong to avoid resource leak. + // Before transferring the ownership of inputStream to the vectorizedReader, + // we must take responsibility to close the inputStream if something goes wrong + // to avoid resource leak. var shouldCloseInputStream = inputStreamOpt.isDefined try { val footerFileMetaData = fileFooter.getFileMetaData From f043c69e1223f7a429c3b91edb1243f58476508d Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 11 Sep 2025 18:18:32 +0800 Subject: [PATCH 10/11] style --- .../org/apache/spark/sql/execution/PartitionedFileUtil.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala index 2dfe2a29b3cf..9ea23efbaec4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/PartitionedFileUtil.scala @@ -45,8 +45,7 @@ object PartitionedFileUtil { start: Long, length: Long): PartitionedFile = { val hosts = getBlockHosts(getBlockLocations(file.fileStatus), start, length) - PartitionedFile(partitionValues, start, length, - file.fileStatus, hosts, file.metadata) + PartitionedFile(partitionValues, start, length, file.fileStatus, hosts, file.metadata) } private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match { From 9f3bd92df6035deef18a36fb75b75d814c583162 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 12 Sep 2025 20:07:02 +0800 Subject: [PATCH 11/11] close --- .../sql/execution/datasources/parquet/ParquetFileFormat.scala | 2 +- .../datasources/v2/parquet/ParquetPartitionReaderFactory.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index cb51ca6889fe..134faf5a5ff9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -292,11 +292,11 @@ class ParquetFileFormat // Instead, we use FileScanRDD's task completion listener to close this iterator. val iter = new RecordReaderIterator(vectorizedReader) try { - shouldCloseInputStream = false // We don't need to take care the close of inputStream because this transfers // the ownership of inputStream to the vectorizedReader vectorizedReader.initialize( split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) + shouldCloseInputStream = false logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 881e7a764da5..d6f1eb91c1d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -285,11 +285,11 @@ case class ParquetPartitionReaderFactory( ) { reader => reader match { case vectorizedReader: VectorizedParquetRecordReader => - shouldCloseInputStream = false // We don't need to take care the close of inputStream because this transfers // the ownership of inputStream to the vectorizedReader vectorizedReader.initialize( split, hadoopAttemptContext, inputFileOpt, inputStreamOpt, Some(fileFooter)) + shouldCloseInputStream = false case _ => reader.initialize(split, hadoopAttemptContext) }