Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.parquet;

import java.util.Optional;

import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.SeekableInputStream;

public record OpenedParquetFooter(
ParquetMetadata footer,
HadoopInputFile inputFile,
Optional<SeekableInputStream> inputStreamOpt) {

public SeekableInputStream inputStream() {
return inputStreamOpt.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
package org.apache.spark.sql.execution.datasources.parquet;

import java.io.IOException;
import java.util.Optional;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
Expand All @@ -37,53 +36,77 @@
*/
public class ParquetFooterReader {

public static final boolean SKIP_ROW_GROUPS = true;
public static final boolean WITH_ROW_GROUPS = false;

/**
* Reads footer for the input Parquet file 'split'. If 'skipRowGroup' is true,
* this will skip reading the Parquet row group metadata.
* Build a filter for reading footer of the input Parquet file 'split'.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the doc is out-dated - there is no 'split'

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thing has not changed, the 'split' represents PartitionedFile here

* If 'skipRowGroup' is true, this will skip reading the Parquet row group metadata.
*
* @param file a part (i.e. "block") of a single file that should be read
* @param configuration hadoop configuration of file
* @param hadoopConf hadoop configuration of file
* @param skipRowGroup If true, skip reading row groups;
* if false, read row groups according to the file split range
*/
public static ParquetMetadata readFooter(
Configuration configuration,
PartitionedFile file,
boolean skipRowGroup) throws IOException {
long fileStart = file.start();
ParquetMetadataConverter.MetadataFilter filter;
public static ParquetMetadataConverter.MetadataFilter buildFilter(
Configuration hadoopConf, PartitionedFile file, boolean skipRowGroup) {
if (skipRowGroup) {
filter = ParquetMetadataConverter.SKIP_ROW_GROUPS;
return ParquetMetadataConverter.SKIP_ROW_GROUPS;
} else {
filter = HadoopReadOptions.builder(configuration, file.toPath())
long fileStart = file.start();
return HadoopReadOptions.builder(hadoopConf, file.toPath())
.withRange(fileStart, fileStart + file.length())
.build()
.getMetadataFilter();
}
return readFooter(configuration, file.toPath(), filter);
}

public static ParquetMetadata readFooter(Configuration configuration,
Path file, ParquetMetadataConverter.MetadataFilter filter) throws IOException {
return readFooter(HadoopInputFile.fromPath(file, configuration), filter);
}

public static ParquetMetadata readFooter(Configuration configuration,
FileStatus fileStatus, ParquetMetadataConverter.MetadataFilter filter) throws IOException {
return readFooter(HadoopInputFile.fromStatus(fileStatus, configuration), filter);
}

private static ParquetMetadata readFooter(HadoopInputFile inputFile,
public static ParquetMetadata readFooter(
HadoopInputFile inputFile,
ParquetMetadataConverter.MetadataFilter filter) throws IOException {
ParquetReadOptions readOptions =
HadoopReadOptions.builder(inputFile.getConfiguration(), inputFile.getPath())
ParquetReadOptions readOptions = HadoopReadOptions
.builder(inputFile.getConfiguration(), inputFile.getPath())
.withMetadataFilter(filter).build();
// Use try-with-resources to ensure fd is closed.
try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, readOptions)) {
try (var fileReader = ParquetFileReader.open(inputFile, readOptions)) {
return fileReader.getFooter();
}
}

/**
* Decoding Parquet files generally involves two steps:
* 1. read and resolve the metadata (footer),
* 2. read and decode the row groups/column chunks.
* <p>
* It's possible to avoid opening the file twice by resuing the SeekableInputStream.
* When keepInputStreamOpen is true, the caller takes responsibility to close the
* SeekableInputStream. Currently, this is only supported by parquet vectorized reader.
*
* @param hadoopConf hadoop configuration of file
* @param file a part (i.e. "block") of a single file that should be read
* @param keepInputStreamOpen when true, keep the SeekableInputStream of file being open
* @return if keepInputStreamOpen is true, the returned OpenedParquetFooter carries
* Some(SeekableInputStream), otherwise None.
*/
public static OpenedParquetFooter openFileAndReadFooter(
Configuration hadoopConf,
PartitionedFile file,
boolean keepInputStreamOpen) throws IOException {
var readOptions = HadoopReadOptions.builder(hadoopConf, file.toPath())
// `keepInputStreamOpen` is true only when parquet vectorized reader is used
// on the caller side, in such a case, the footer will be resued later on
// reading row groups, so here must read row groups metadata ahead.
// when false, the caller uses parquet-mr to read the file, only file metadata
// is required on planning phase, and parquet-mr will read the footer again
// on reading row groups.
.withMetadataFilter(buildFilter(hadoopConf, file, !keepInputStreamOpen))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe worth adding some comments here to explain why we choose to skip row groups when keepInputStreamOpen is false

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a comment

.build();
var inputFile = HadoopInputFile.fromPath(file.toPath(), hadoopConf);
var inputStream = inputFile.newStream();
try (var fileReader = ParquetFileReader.open(inputFile, readOptions, inputStream)) {
var footer = fileReader.getFooter();
if (keepInputStreamOpen) {
fileReader.detachFileInputStream();
return new OpenedParquetFooter(footer, inputFile, Optional.of(inputStream));
} else {
return new OpenedParquetFooter(footer, inputFile, Optional.empty());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.ConfigurationUtil;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;

Expand Down Expand Up @@ -89,24 +90,27 @@ public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Vo
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
initialize(inputSplit, taskAttemptContext, Option.empty());
initialize(inputSplit, taskAttemptContext, Option.empty(), Option.empty(), Option.empty());
}

public void initialize(
InputSplit inputSplit,
TaskAttemptContext taskAttemptContext,
Option<HadoopInputFile> inputFile,
Option<SeekableInputStream> inputStream,
Option<ParquetMetadata> fileFooter) throws IOException, InterruptedException {
Configuration configuration = taskAttemptContext.getConfiguration();
FileSplit split = (FileSplit) inputSplit;
this.file = split.getPath();
ParquetReadOptions options = HadoopReadOptions
.builder(configuration, file)
.withRange(split.getStart(), split.getStart() + split.getLength())
.build();
ParquetFileReader fileReader;
if (fileFooter.isDefined()) {
fileReader = new ParquetFileReader(configuration, file, fileFooter.get());
if (inputFile.isDefined() && fileFooter.isDefined() && inputStream.isDefined()) {
fileReader = new ParquetFileReader(
inputFile.get(), fileFooter.get(), options, inputStream.get());
} else {
ParquetReadOptions options = HadoopReadOptions
.builder(configuration, file)
.withRange(split.getStart(), split.getStart() + split.getLength())
.build();
fileReader = new ParquetFileReader(
HadoopInputFile.fromPath(file, configuration), options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.List;
import java.util.Set;

import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
import scala.Option;
import scala.jdk.javaapi.CollectionConverters;

Expand All @@ -35,11 +33,15 @@
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector;
Expand Down Expand Up @@ -190,9 +192,11 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
public void initialize(
InputSplit inputSplit,
TaskAttemptContext taskAttemptContext,
Option<HadoopInputFile> inputFile,
Option<SeekableInputStream> inputStream,
Option<ParquetMetadata> fileFooter)
throws IOException, InterruptedException, UnsupportedOperationException {
super.initialize(inputSplit, taskAttemptContext, fileFooter);
super.initialize(inputSplit, taskAttemptContext, inputFile, inputStream, fileFooter);
initializeInternal();
}

Expand Down
Loading