diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/OpenedParquetFooter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/OpenedParquetFooter.java new file mode 100644 index 000000000000..5893609d2e9d --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/OpenedParquetFooter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet; + +import java.util.Optional; + +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.SeekableInputStream; + +public record OpenedParquetFooter( + ParquetMetadata footer, + HadoopInputFile inputFile, + Optional inputStreamOpt) { + + public SeekableInputStream inputStream() { + return inputStreamOpt.get(); + } +} diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java index 438fffa11784..3edb328451fe 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetFooterReader.java @@ -18,10 +18,9 @@ package org.apache.spark.sql.execution.datasources.parquet; import java.io.IOException; +import java.util.Optional; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.format.converter.ParquetMetadataConverter; @@ -37,53 +36,77 @@ */ public class ParquetFooterReader { - public static final boolean SKIP_ROW_GROUPS = true; - public static final boolean WITH_ROW_GROUPS = false; - /** - * Reads footer for the input Parquet file 'split'. If 'skipRowGroup' is true, - * this will skip reading the Parquet row group metadata. + * Build a filter for reading footer of the input Parquet file 'split'. + * If 'skipRowGroup' is true, this will skip reading the Parquet row group metadata. * * @param file a part (i.e. "block") of a single file that should be read - * @param configuration hadoop configuration of file + * @param hadoopConf hadoop configuration of file * @param skipRowGroup If true, skip reading row groups; * if false, read row groups according to the file split range */ - public static ParquetMetadata readFooter( - Configuration configuration, - PartitionedFile file, - boolean skipRowGroup) throws IOException { - long fileStart = file.start(); - ParquetMetadataConverter.MetadataFilter filter; + public static ParquetMetadataConverter.MetadataFilter buildFilter( + Configuration hadoopConf, PartitionedFile file, boolean skipRowGroup) { if (skipRowGroup) { - filter = ParquetMetadataConverter.SKIP_ROW_GROUPS; + return ParquetMetadataConverter.SKIP_ROW_GROUPS; } else { - filter = HadoopReadOptions.builder(configuration, file.toPath()) + long fileStart = file.start(); + return HadoopReadOptions.builder(hadoopConf, file.toPath()) .withRange(fileStart, fileStart + file.length()) .build() .getMetadataFilter(); } - return readFooter(configuration, file.toPath(), filter); - } - - public static ParquetMetadata readFooter(Configuration configuration, - Path file, ParquetMetadataConverter.MetadataFilter filter) throws IOException { - return readFooter(HadoopInputFile.fromPath(file, configuration), filter); - } - - public static ParquetMetadata readFooter(Configuration configuration, - FileStatus fileStatus, ParquetMetadataConverter.MetadataFilter filter) throws IOException { - return readFooter(HadoopInputFile.fromStatus(fileStatus, configuration), filter); } - private static ParquetMetadata readFooter(HadoopInputFile inputFile, + public static ParquetMetadata readFooter( + HadoopInputFile inputFile, ParquetMetadataConverter.MetadataFilter filter) throws IOException { - ParquetReadOptions readOptions = - HadoopReadOptions.builder(inputFile.getConfiguration(), inputFile.getPath()) + ParquetReadOptions readOptions = HadoopReadOptions + .builder(inputFile.getConfiguration(), inputFile.getPath()) .withMetadataFilter(filter).build(); - // Use try-with-resources to ensure fd is closed. - try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, readOptions)) { + try (var fileReader = ParquetFileReader.open(inputFile, readOptions)) { return fileReader.getFooter(); } } + + /** + * Decoding Parquet files generally involves two steps: + * 1. read and resolve the metadata (footer), + * 2. read and decode the row groups/column chunks. + *

+ * It's possible to avoid opening the file twice by resuing the SeekableInputStream. + * When keepInputStreamOpen is true, the caller takes responsibility to close the + * SeekableInputStream. Currently, this is only supported by parquet vectorized reader. + * + * @param hadoopConf hadoop configuration of file + * @param file a part (i.e. "block") of a single file that should be read + * @param keepInputStreamOpen when true, keep the SeekableInputStream of file being open + * @return if keepInputStreamOpen is true, the returned OpenedParquetFooter carries + * Some(SeekableInputStream), otherwise None. + */ + public static OpenedParquetFooter openFileAndReadFooter( + Configuration hadoopConf, + PartitionedFile file, + boolean keepInputStreamOpen) throws IOException { + var readOptions = HadoopReadOptions.builder(hadoopConf, file.toPath()) + // `keepInputStreamOpen` is true only when parquet vectorized reader is used + // on the caller side, in such a case, the footer will be resued later on + // reading row groups, so here must read row groups metadata ahead. + // when false, the caller uses parquet-mr to read the file, only file metadata + // is required on planning phase, and parquet-mr will read the footer again + // on reading row groups. + .withMetadataFilter(buildFilter(hadoopConf, file, !keepInputStreamOpen)) + .build(); + var inputFile = HadoopInputFile.fromPath(file.toPath(), hadoopConf); + var inputStream = inputFile.newStream(); + try (var fileReader = ParquetFileReader.open(inputFile, readOptions, inputStream)) { + var footer = fileReader.getFooter(); + if (keepInputStreamOpen) { + fileReader.detachFileInputStream(); + return new OpenedParquetFooter(footer, inputFile, Optional.of(inputStream)); + } else { + return new OpenedParquetFooter(footer, inputFile, Optional.empty()); + } + } + } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java index d3716ef18447..038112086e47 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java @@ -49,6 +49,7 @@ import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.ConfigurationUtil; import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; @@ -89,24 +90,27 @@ public abstract class SpecificParquetRecordReaderBase extends RecordReader inputFile, + Option inputStream, Option fileFooter) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); FileSplit split = (FileSplit) inputSplit; this.file = split.getPath(); + ParquetReadOptions options = HadoopReadOptions + .builder(configuration, file) + .withRange(split.getStart(), split.getStart() + split.getLength()) + .build(); ParquetFileReader fileReader; - if (fileFooter.isDefined()) { - fileReader = new ParquetFileReader(configuration, file, fileFooter.get()); + if (inputFile.isDefined() && fileFooter.isDefined() && inputStream.isDefined()) { + fileReader = new ParquetFileReader( + inputFile.get(), fileFooter.get(), options, inputStream.get()); } else { - ParquetReadOptions options = HadoopReadOptions - .builder(configuration, file) - .withRange(split.getStart(), split.getStart() + split.getLength()) - .build(); fileReader = new ParquetFileReader( HadoopInputFile.fromPath(file, configuration), options); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 9010c6e30be0..b15f79df527e 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -24,8 +24,6 @@ import java.util.List; import java.util.Set; -import org.apache.spark.SparkUnsupportedOperationException; -import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; import scala.Option; import scala.jdk.javaapi.CollectionConverters; @@ -35,11 +33,15 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; +import org.apache.spark.SparkUnsupportedOperationException; import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; import org.apache.spark.sql.execution.vectorized.ConstantColumnVector; @@ -190,9 +192,11 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont public void initialize( InputSplit inputSplit, TaskAttemptContext taskAttemptContext, + Option inputFile, + Option inputStream, Option fileFooter) throws IOException, InterruptedException, UnsupportedOperationException { - super.initialize(inputSplit, taskAttemptContext, fileFooter); + super.initialize(inputSplit, taskAttemptContext, inputFile, inputStream, fileFooter); initializeInternal(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index be6e5d188667..4cc3fe61d22b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -17,6 +17,10 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.io.Closeable +import java.time.ZoneId +import java.util.concurrent.atomic.AtomicBoolean + import scala.collection.mutable import scala.jdk.CollectionConverters._ import scala.util.{Failure, Try} @@ -27,9 +31,10 @@ import org.apache.hadoop.mapred.FileSplit import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.parquet.filter2.compat.FilterCompat -import org.apache.parquet.filter2.predicate.FilterApi +import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop._ +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -40,14 +45,15 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateTimeUtils, RebaseDateTime} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, OffHeapColumnVector, OnHeapColumnVector} import org.apache.spark.sql.internal.{SessionStateHelper, SQLConf} +import org.apache.spark.sql.internal.SQLConf._ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils} class ParquetFileFormat extends FileFormat @@ -108,28 +114,11 @@ class ParquetFileFormat true } - /** - * Build the reader. - * - * @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether - * the reader should return row or columnar output. - * If the caller can handle both, pass - * FileFormat.OPTION_RETURNING_BATCH -> - * supportBatch(sparkSession, - * StructType(requiredSchema.fields ++ partitionSchema.fields)) - * as the option. - * It should be set to "true" only if this reader can support it. - */ - override def buildReaderWithPartitionValues( - sparkSession: SparkSession, - dataSchema: StructType, - partitionSchema: StructType, - requiredSchema: StructType, - filters: Seq[Filter], - options: Map[String, String], - hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = { - val sqlConf = getSqlConf(sparkSession) - hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, classOf[ParquetReadSupport].getName) + private def setupHadoopConf( + hadoopConf: Configuration, sqlConf: SQLConf, requiredSchema: StructType): Unit = { + hadoopConf.set( + ParquetInputFormat.READ_SUPPORT_CLASS, + classOf[ParquetReadSupport].getName) hadoopConf.set( ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, requiredSchema.json) @@ -159,7 +148,30 @@ class ParquetFileFormat hadoopConf.setBoolean( SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, sqlConf.legacyParquetNanosAsLong) + } + /** + * Build the reader. + * + * @note It is required to pass FileFormat.OPTION_RETURNING_BATCH in options, to indicate whether + * the reader should return row or columnar output. + * If the caller can handle both, pass + * FileFormat.OPTION_RETURNING_BATCH -> + * supportBatch(sparkSession, + * StructType(requiredSchema.fields ++ partitionSchema.fields)) + * as the option. + * It should be set to "true" only if this reader can support it. + */ + override def buildReaderWithPartitionValues( + sparkSession: SparkSession, + dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, + filters: Seq[Filter], + options: Map[String, String], + hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + val sqlConf = getSqlConf(sparkSession) + setupHadoopConf(hadoopConf, sqlConf, requiredSchema) val broadcastedHadoopConf = SerializableConfiguration.broadcast(sparkSession.sparkContext, hadoopConf) @@ -191,7 +203,7 @@ class ParquetFileFormat options.getOrElse(FileFormat.OPTION_RETURNING_BATCH, throw new IllegalArgumentException( "OPTION_RETURNING_BATCH should always be set for ParquetFileFormat. " + - "To workaround this issue, set spark.sql.parquet.enableVectorizedReader=false.")) + s"To workaround this issue, set ${PARQUET_VECTORIZED_READER_ENABLED.key}=false.")) .equals("true") if (returningBatch) { // If the passed option said that we are to return batches, we need to also be able to @@ -202,150 +214,199 @@ class ParquetFileFormat (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) - val filePath = file.toPath - val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - + val split = new FileSplit(file.toPath, file.start, file.length, Array.empty[String]) val sharedConf = broadcastedHadoopConf.value.value - val fileFooter = if (enableVectorizedReader) { - // When there are vectorized reads, we can avoid reading the footer twice by reading - // all row groups in advance and filter row groups according to filters that require - // push down (no need to read the footer metadata again). - ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS) - } else { - ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS) - } - - val footerFileMetaData = fileFooter.getFileMetaData - val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - datetimeRebaseModeInRead) - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) - - // Try to push down filters when filter push-down is enabled. - val pushed = if (enableParquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringPredicate, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(parquetFilters.createFilter(_)) - .reduceOption(FilterApi.and) - } else { - None - } - - // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' - // *only* if the file was created by something other than "parquet-mr", so check the actual - // writer here for this file. We have to do this per-file, as each file in the table may - // have different writers. - // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. - def isCreatedByParquetMr: Boolean = - footerFileMetaData.getCreatedBy().startsWith("parquet-mr") - - val convertTz = - if (timestampConversion && !isCreatedByParquetMr) { - Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + // When there are vectorized reads, we can avoid + // 1. opening the file twice by transfering the SeekableInputStream + // 2. reading the footer twice by reading all row groups in advance and filter row groups + // according to filters that require push down + val openedFooter = + ParquetFooterReader.openFileAndReadFooter(sharedConf, file, enableVectorizedReader) + assert(openedFooter.inputStreamOpt.isPresent == enableVectorizedReader) + + // Before transferring the ownership of inputStream to the vectorizedReader, + // we must take responsibility to close the inputStream if something goes wrong + // to avoid resource leak. + val shouldCloseInputStream = new AtomicBoolean(openedFooter.inputStreamOpt.isPresent) + try { + val footerFileMetaData = openedFooter.footer.getFileMetaData + val datetimeRebaseSpec = DataSourceUtils.datetimeRebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + datetimeRebaseModeInRead) + val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, int96RebaseModeInRead) + + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringPredicate, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates + // can be converted (`ParquetFilters.createFilter` returns an `Option`). That's why + // a `flatMap` is used here. + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) } else { None } - - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = - new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) - - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) - } - val taskContext = Option(TaskContext.get()) - if (enableVectorizedReader) { - val vectorizedReader = new VectorizedParquetRecordReader( - convertTz.orNull, - datetimeRebaseSpec.mode.toString, - datetimeRebaseSpec.timeZone, - int96RebaseSpec.mode.toString, - int96RebaseSpec.timeZone, - enableOffHeapColumnVector && taskContext.isDefined, - capacity) - // SPARK-37089: We cannot register a task completion listener to close this iterator here - // because downstream exec nodes have already registered their listeners. Since listeners - // are executed in reverse order of registration, a listener registered here would close the - // iterator while downstream exec nodes are still running. When off-heap column vectors are - // enabled, this can cause a use-after-free bug leading to a segfault. - // - // Instead, we use FileScanRDD's task completion listener to close this iterator. - val iter = new RecordReaderIterator(vectorizedReader) - try { - vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter)) - logDebug(s"Appending $partitionSchema ${file.partitionValues}") - vectorizedReader.initBatch(partitionSchema, file.partitionValues) - if (returningBatch) { - vectorizedReader.enableReturningBatches() + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 + // timestamps *only* if the file was created by something other than "parquet-mr", + // so check the actual writer here for this file. We have to do this per-file, + // as each file in the table may have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy.startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None } - // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. - iter.asInstanceOf[Iterator[InternalRow]] - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = + new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + pushed.foreach { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _) } - } else { - logDebug(s"Falling back to parquet-mr") - // ParquetRecordReader returns InternalRow - val readSupport = new ParquetReadSupport( - convertTz, - enableVectorizedReader = false, - datetimeRebaseSpec, - int96RebaseSpec) - val reader = if (pushed.isDefined && enableRecordFilter) { - val parquetFilter = FilterCompat.get(pushed.get, null) - new ParquetRecordReader[InternalRow](readSupport, parquetFilter) + if (enableVectorizedReader) { + buildVectorizedIterator( + hadoopAttemptContext, split, file.partitionValues, partitionSchema, convertTz, + datetimeRebaseSpec, int96RebaseSpec, enableOffHeapColumnVector, returningBatch, + capacity, openedFooter, shouldCloseInputStream) } else { - new ParquetRecordReader[InternalRow](readSupport) + logDebug(s"Falling back to parquet-mr") + buildRowBasedIterator( + hadoopAttemptContext, split, file.partitionValues, partitionSchema, convertTz, + datetimeRebaseSpec, int96RebaseSpec, requiredSchema, pushed, enableRecordFilter) } - val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, - requiredSchema) - val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) - try { - readerWithRowIndexes.initialize(split, hadoopAttemptContext) - - val fullSchema = toAttributes(requiredSchema) ++ toAttributes(partitionSchema) - val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - - if (partitionSchema.length == 0) { - // There is no partition columns - iter.map(unsafeProjection) - } else { - val joinedRow = new JoinedRow() - iter.map(d => unsafeProjection(joinedRow(d, file.partitionValues))) - } - } catch { - case e: Throwable => - // SPARK-23457: In case there is an exception in initialization, close the iterator to - // avoid leaking resources. - iter.close() - throw e + } finally { + if (shouldCloseInputStream.get) { + openedFooter.inputStreamOpt.ifPresent(Utils.closeQuietly) } } } } + // scalastyle:off argcount + private def buildVectorizedIterator( + hadoopAttemptContext: TaskAttemptContextImpl, + split: FileSplit, + partitionValues: InternalRow, + partitionSchema: StructType, + convertTz: Option[ZoneId], + datetimeRebaseSpec: RebaseDateTime.RebaseSpec, + int96RebaseSpec: RebaseDateTime.RebaseSpec, + enableOffHeapColumnVector: Boolean, + returningBatch: Boolean, + batchSize: Int, + openedFooter: OpenedParquetFooter, + shouldCloseInputStream: AtomicBoolean): Iterator[InternalRow] = { + // scalastyle:on argcount + assert(openedFooter.inputStreamOpt.isPresent) + val vectorizedReader = new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseSpec.mode.toString, + datetimeRebaseSpec.timeZone, + int96RebaseSpec.mode.toString, + int96RebaseSpec.timeZone, + enableOffHeapColumnVector && TaskContext.get() != null, + batchSize) + // SPARK-37089: We cannot register a task completion listener to close this iterator here + // because downstream exec nodes have already registered their listeners. Since listeners + // are executed in reverse order of registration, a listener registered here would close the + // iterator while downstream exec nodes are still running. When off-heap column vectors are + // enabled, this can cause a use-after-free bug leading to a segfault. + // + // Instead, we use FileScanRDD's task completion listener to close this iterator. + val iter = new RecordReaderIterator(vectorizedReader) + try { + vectorizedReader.initialize( + split, hadoopAttemptContext, Some(openedFooter.inputFile), + Some(openedFooter.inputStream), Some(openedFooter.footer)) + // The caller don't need to take care of the close of inputStream after calling + // `initialize` because the ownership of inputStream has been transferred to the + // vectorizedReader + shouldCloseInputStream.set(false) + logDebug(s"Appending $partitionSchema $partitionValues") + vectorizedReader.initBatch(partitionSchema, partitionValues) + if (returningBatch) { + vectorizedReader.enableReturningBatches() + } + + // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. + iter.asInstanceOf[Iterator[InternalRow]] + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } + } + + private def buildRowBasedIterator( + hadoopAttemptContext: TaskAttemptContextImpl, + split: FileSplit, + partitionValues: InternalRow, + partitionSchema: StructType, + convertTz: Option[ZoneId], + datetimeRebaseSpec: RebaseDateTime.RebaseSpec, + int96RebaseSpec: RebaseDateTime.RebaseSpec, + requiredSchema: StructType, + pushed: Option[FilterPredicate], + enableRecordFilter: Boolean): Iterator[InternalRow] with Closeable = { + // ParquetRecordReader returns InternalRow + val readSupport = new ParquetReadSupport( + convertTz, + enableVectorizedReader = false, + datetimeRebaseSpec, + int96RebaseSpec) + val reader = if (pushed.isDefined && enableRecordFilter) { + val parquetFilter = FilterCompat.get(pushed.get, null) + new ParquetRecordReader[InternalRow](readSupport, parquetFilter) + } else { + new ParquetRecordReader[InternalRow](readSupport) + } + val readerWithRowIndexes = ParquetRowIndexUtil.addRowIndexToRecordReaderIfNeeded(reader, + requiredSchema) + val iter = new RecordReaderIterator[InternalRow](readerWithRowIndexes) + try { + readerWithRowIndexes.initialize(split, hadoopAttemptContext) + + val fullSchema = toAttributes(requiredSchema) ++ toAttributes(partitionSchema) + val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) + + if (partitionSchema.length == 0) { + // There is no partition columns + iter.map(unsafeProjection) + } else { + val joinedRow = new JoinedRow() + iter.map(d => unsafeProjection(joinedRow(d, partitionValues))) + } + } catch { + case e: Throwable => + // SPARK-23457: In case there is an exception in initialization, close the iterator to + // avoid leaking resources. + iter.close() + throw e + } + } + override def supportDataType(dataType: DataType): Boolean = dataType match { case _: AtomicType => true @@ -447,9 +508,9 @@ object ParquetFileFormat extends Logging { // Skips row group information since we only need the schema. // ParquetFileReader.readFooter throws RuntimeException, instead of IOException, // when it can't read the footer. - Some(new Footer(currentFile.getPath(), + Some(new Footer(currentFile.getPath, ParquetFooterReader.readFooter( - conf, currentFile, SKIP_ROW_GROUPS))) + HadoopInputFile.fromStatus(currentFile, conf), SKIP_ROW_GROUPS))) } catch { case e: RuntimeException => if (ignoreCorruptFiles) { logWarning(log"Skipped the footer in the corrupted file: ${MDC(PATH, currentFile)}", e) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala index 70ae8068a03a..1f03fea25687 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala @@ -17,14 +17,16 @@ package org.apache.spark.sql.execution.datasources.v2.parquet import java.time.ZoneId +import java.util.Optional import org.apache.hadoop.mapred.FileSplit import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.parquet.filter2.compat.FilterCompat import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate} -import org.apache.parquet.hadoop.{ParquetInputFormat, ParquetRecordReader} -import org.apache.parquet.hadoop.metadata.{FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop._ +import org.apache.parquet.hadoop.metadata.FileMetaData +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast @@ -86,17 +88,21 @@ case class ParquetPartitionReaderFactory( private val parquetReaderCallback = new ParquetReaderCallback() - private def getFooter(file: PartitionedFile): ParquetMetadata = { - val conf = broadcastedConf.value.value - if (aggregation.isDefined || enableVectorizedReader) { - // There are two purposes for reading footer with row groups: - // 1. When there are aggregates to push down, we get max/min/count from footer statistics. - // 2. When there are vectorized reads, we can avoid reading the footer twice by reading - // all row groups in advance and filter row groups according to filters that require - // push down (no need to read the footer metadata again). - ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.WITH_ROW_GROUPS) + private def openFileAndReadFooter(file: PartitionedFile): OpenedParquetFooter = { + val hadoopConf = broadcastedConf.value.value + if (aggregation.isDefined) { + val inputFile = HadoopInputFile.fromPath(file.toPath, hadoopConf) + // When there are aggregates to push down, we get max/min/count from footer statistics. + val footer = ParquetFooterReader.readFooter( + inputFile, + ParquetFooterReader.buildFilter(hadoopConf, file, false)) + new OpenedParquetFooter(footer, inputFile, Optional.empty) } else { - ParquetFooterReader.readFooter(conf, file, ParquetFooterReader.SKIP_ROW_GROUPS) + // When there are vectorized reads, we can avoid + // 1. opening the file twice by transfering the SeekableInputStream + // 2. reading the footer twice by reading all row groups in advance and filter row groups + // according to filters that require push down + ParquetFooterReader.openFileAndReadFooter(hadoopConf, file, enableVectorizedReader) } } @@ -130,13 +136,14 @@ case class ParquetPartitionReaderFactory( new PartitionReader[InternalRow] { private var hasNext = true private lazy val row: InternalRow = { - val footer = getFooter(file) - - if (footer != null && footer.getBlocks.size > 0) { - ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, - dataSchema, - partitionSchema, aggregation.get, readDataSchema, file.partitionValues, - getDatetimeRebaseSpec(footer.getFileMetaData)) + val openedFooter = openFileAndReadFooter(file) + assert(openedFooter.inputStreamOpt.isEmpty) + + if (openedFooter.footer != null && openedFooter.footer.getBlocks.size > 0) { + ParquetUtils.createAggInternalRowFromFooter(openedFooter.footer, + file.urlEncodedPath, dataSchema, partitionSchema, aggregation.get, + readDataSchema, file.partitionValues, + getDatetimeRebaseSpec(openedFooter.footer.getFileMetaData)) } else { null } @@ -175,11 +182,14 @@ case class ParquetPartitionReaderFactory( new PartitionReader[ColumnarBatch] { private var hasNext = true private val batch: ColumnarBatch = { - val footer = getFooter(file) - if (footer != null && footer.getBlocks.size > 0) { - val row = ParquetUtils.createAggInternalRowFromFooter(footer, file.urlEncodedPath, - dataSchema, partitionSchema, aggregation.get, readDataSchema, file.partitionValues, - getDatetimeRebaseSpec(footer.getFileMetaData)) + val openedFooter = openFileAndReadFooter(file) + assert(openedFooter.inputStreamOpt.isEmpty) + + if (openedFooter.footer != null && openedFooter.footer.getBlocks.size > 0) { + val row = ParquetUtils.createAggInternalRowFromFooter(openedFooter.footer, + file.urlEncodedPath, dataSchema, partitionSchema, aggregation.get, + readDataSchema, file.partitionValues, + getDatetimeRebaseSpec(openedFooter.footer.getFileMetaData)) AggregatePushDownUtils.convertAggregatesRowToBatch( row, readDataSchema, enableOffHeapColumnVector && Option(TaskContext.get()).isDefined) } else { @@ -211,74 +221,93 @@ case class ParquetPartitionReaderFactory( RebaseSpec) => RecordReader[Void, T]): RecordReader[Void, T] = { val conf = broadcastedConf.value.value - val filePath = file.toPath - val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) - val fileFooter = getFooter(file) - val footerFileMetaData = fileFooter.getFileMetaData - val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData) - // Try to push down filters when filter push-down is enabled. - val pushed = if (enableParquetFilterPushDown) { - val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = new ParquetFilters( - parquetSchema, - pushDownDate, - pushDownTimestamp, - pushDownDecimal, - pushDownStringPredicate, - pushDownInFilterThreshold, - isCaseSensitive, - datetimeRebaseSpec) - filters - // Collects all converted Parquet filter predicates. Notice that not all predicates can be - // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` - // is used here. - .flatMap(parquetFilters.createFilter) - .reduceOption(FilterApi.and) - } else { - None + val split = new FileSplit(file.toPath, file.start, file.length, Array.empty[String]) + val openedFooter = openFileAndReadFooter(file) + assert { + openedFooter.inputStreamOpt.isPresent == (aggregation.isEmpty && enableVectorizedReader) } - // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' - // *only* if the file was created by something other than "parquet-mr", so check the actual - // writer here for this file. We have to do this per-file, as each file in the table may - // have different writers. - // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. - def isCreatedByParquetMr: Boolean = - footerFileMetaData.getCreatedBy().startsWith("parquet-mr") - - val convertTz = - if (timestampConversion && !isCreatedByParquetMr) { - Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + + // Before transferring the ownership of inputStream to the vectorizedReader, + // we must take responsibility to close the inputStream if something goes wrong + // to avoid resource leak. + var shouldCloseInputStream = openedFooter.inputStreamOpt.isPresent + try { + val footerFileMetaData = openedFooter.footer.getFileMetaData + val datetimeRebaseSpec = getDatetimeRebaseSpec(footerFileMetaData) + // Try to push down filters when filter push-down is enabled. + val pushed = if (enableParquetFilterPushDown) { + val parquetSchema = footerFileMetaData.getSchema + val parquetFilters = new ParquetFilters( + parquetSchema, + pushDownDate, + pushDownTimestamp, + pushDownDecimal, + pushDownStringPredicate, + pushDownInFilterThreshold, + isCaseSensitive, + datetimeRebaseSpec) + filters + // Collects all converted Parquet filter predicates. Notice that not all predicates can be + // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` + // is used here. + .flatMap(parquetFilters.createFilter) + .reduceOption(FilterApi.and) } else { None } - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + // PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps' + // *only* if the file was created by something other than "parquet-mr", so check the actual + // writer here for this file. We have to do this per-file, as each file in the table may + // have different writers. + // Define isCreatedByParquetMr as function to avoid unnecessary parquet footer reads. + def isCreatedByParquetMr: Boolean = + footerFileMetaData.getCreatedBy.startsWith("parquet-mr") + + val convertTz = + if (timestampConversion && !isCreatedByParquetMr) { + Some(DateTimeUtils.getZoneId(conf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))) + } else { + None + } - // Try to push down filters when filter push-down is enabled. - // Notice: This push-down is RowGroups level, not individual records. - if (pushed.isDefined) { - ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) - } - val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( - footerFileMetaData.getKeyValueMetaData.get, - int96RebaseModeInRead) - Utils.createResourceUninterruptiblyIfInTaskThread { - Utils.tryInitializeResource( - buildReaderFunc( - file.partitionValues, - pushed, - convertTz, - datetimeRebaseSpec, - int96RebaseSpec) - ) { reader => - reader match { - case vectorizedReader: VectorizedParquetRecordReader => - vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter)) - case _ => - reader.initialize(split, hadoopAttemptContext) + val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) + + // Try to push down filters when filter push-down is enabled. + // Notice: This push-down is RowGroups level, not individual records. + pushed.foreach { + ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, _) + } + val int96RebaseSpec = DataSourceUtils.int96RebaseSpec( + footerFileMetaData.getKeyValueMetaData.get, + int96RebaseModeInRead) + Utils.createResourceUninterruptiblyIfInTaskThread { + Utils.tryInitializeResource( + buildReaderFunc( + file.partitionValues, + pushed, + convertTz, + datetimeRebaseSpec, + int96RebaseSpec) + ) { reader => + reader match { + case vectorizedReader: VectorizedParquetRecordReader => + vectorizedReader.initialize( + split, hadoopAttemptContext, Some(openedFooter.inputFile), + Some(openedFooter.inputStream), Some(openedFooter.footer)) + // We don't need to take care of the close of inputStream after calling `initialize` + // because the ownership of inputStream has been transferred to the vectorizedReader + shouldCloseInputStream = false + case _ => + reader.initialize(split, hadoopAttemptContext) + } + reader } - reader + } + } finally { + if (shouldCloseInputStream) { + openedFooter.inputStreamOpt.ifPresent(Utils.closeQuietly) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index 257a89754f4e..044f6ce202d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -22,6 +22,7 @@ import java.time.ZoneOffset import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.spark.sql.Row @@ -229,8 +230,8 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS // sure the test is configured correctly. assert(parts.size == 2) parts.foreach { part => - val oneFooter = - ParquetFooterReader.readFooter(hadoopConf, part.getPath, NO_FILTER) + val oneFooter = ParquetFooterReader.readFooter( + HadoopInputFile.fromStatus(part, hadoopConf), NO_FILTER) assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 1) val typeName = oneFooter .getFileMetaData.getSchema.getColumns.get(0).getPrimitiveType.getPrimitiveTypeName diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala index b7b082e32965..12daef65eabc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala @@ -144,8 +144,8 @@ private[sql] trait ParquetTest extends FileBasedDataSourceTest { protected def readFooter(path: Path, configuration: Configuration): ParquetMetadata = { ParquetFooterReader.readFooter( - configuration, - new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE), + HadoopInputFile.fromPath( + new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE), configuration), ParquetMetadataConverter.NO_FILTER) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d3f625542d96..769b633a9c52 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -26,6 +26,7 @@ import scala.jdk.CollectionConverters._ import org.apache.hadoop.fs.Path import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER +import org.apache.parquet.hadoop.util.HadoopInputFile import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{spy, times, verify} import org.scalatest.BeforeAndAfterEach @@ -2710,8 +2711,9 @@ class HiveDDLSuite OrcFileOperator.getFileReader(maybeFile.get.toPath.toString).get.getCompression.name case "parquet" => + val hadoopConf = sparkContext.hadoopConfiguration val footer = ParquetFooterReader.readFooter( - sparkContext.hadoopConfiguration, new Path(maybeFile.get.getPath), NO_FILTER) + HadoopInputFile.fromPath(new Path(maybeFile.get.getPath), hadoopConf), NO_FILTER) footer.getBlocks.get(0).getColumns.get(0).getCodec.toString }