Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -567,8 +567,23 @@ public static final ParquetMetadata readFooter(InputFile file, MetadataFilter fi

public static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f)
throws IOException {
return readFooter(file, options, f, /*closeStreamOnFailure*/ false);
}

private static final ParquetMetadata readFooter(
InputFile file, ParquetReadOptions options, SeekableInputStream f, boolean closeStreamOnFailure)
throws IOException {
ParquetMetadataConverter converter = new ParquetMetadataConverter(options);
return readFooter(file, options, f, converter);
try {
return readFooter(file, options, f, converter);
} catch (Exception e) {
// In case that readFooter throws an exception in the constructor, the new stream
// should be closed. Otherwise, there's no way to close this outside.
if (closeStreamOnFailure) {
f.close();
}
throw e;
}
}

private static final ParquetMetadata readFooter(
Expand Down Expand Up @@ -729,6 +744,22 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options,
return new ParquetFileReader(file, options, f);
}

/**
* Open a {@link InputFile file} with {@link ParquetMetadata footer} and {@link ParquetReadOptions options}.
*
* @param file an input file
* @param footer a {@link ParquetMetadata} footer already read from the file
* @param options parquet read options
* @param f the input stream for the file
* @return an open ParquetFileReader
* @throws IOException if there is an error while opening the file
*/
public static ParquetFileReader open(
InputFile file, ParquetMetadata footer, ParquetReadOptions options, SeekableInputStream f)
throws IOException {
return new ParquetFileReader(file, footer, options, f);
}

protected SeekableInputStream f;
private final InputFile file;
private final ParquetReadOptions options;
Expand Down Expand Up @@ -930,19 +961,31 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx
this(file, options, file.newStream());
}

/**
* @param file Path to a parquet file
* @param options {@link ParquetReadOptions}
* @param f a {@link SeekableInputStream} for the parquet file
* @throws IOException if the file can not be opened
*/
public ParquetFileReader(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException {
this(file, readFooter(file, options, f, /*closeStreamOnFailure*/ true), options, f);
}

/**
* @param file Path to a parquet file
* @param footer a {@link ParquetMetadata} footer already read from the file
* @param options {@link ParquetReadOptions}
* @param f a {@link SeekableInputStream} for the parquet file
* @throws IOException if the file can not be opened
*/
public ParquetFileReader(InputFile file, ParquetMetadata footer, ParquetReadOptions options, SeekableInputStream f)
throws IOException {
this.converter = new ParquetMetadataConverter(options);
this.file = file;
this.f = f;
this.options = options;
try {
this.footer = readFooter(file, options, f, converter);
} catch (Exception e) {
// In case that reading footer throws an exception in the constructor, the new stream
// should be closed. Otherwise, there's no way to close this outside.
f.close();
throw e;
}
this.footer = footer;

this.fileMetaData = footer.getFileMetaData();
this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups!
if (null != fileDecryptor && fileDecryptor.plaintextFile()) {
Expand Down Expand Up @@ -1054,13 +1097,17 @@ public List<BlockMetaData> getRowGroups() {
return blocks;
}

public void setRequestedSchema(MessageType projection) {
public void setRequestedSchema(List<ColumnDescriptor> columns) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change? Does it exist in the original PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the diff here is not smart, this actually adds a new method

public void setRequestedSchema(List<ColumnDescriptor> columns)

there are many ctors of ParquetFileReader were marked as deprecated but without providing the equivalent replacement, for example,

  @Deprecated
  public ParquetFileReader(
      Configuration configuration,
      FileMetaData fileMetaData,
      Path filePath,
      List<BlockMetaData> blocks,
      List<ColumnDescriptor> columns)

this API is proposed to restore the ability to set read columns via List<ColumnDescriptor>, instead of MessageType

paths.clear();
for (ColumnDescriptor col : projection.getColumns()) {
for (ColumnDescriptor col : columns) {
paths.put(ColumnPath.get(col.getPath()), col);
}
}

public void setRequestedSchema(MessageType projection) {
setRequestedSchema(projection.getColumns());
}

public void appendTo(ParquetFileWriter writer) throws IOException {
writer.appendRowGroups(f, blocks, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import java.util.zip.CRC32;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
Expand Down Expand Up @@ -688,8 +690,13 @@ private int getDataOffset(Page page) {
*/
private ParquetFileReader getParquetFileReader(Path path, Configuration conf, List<ColumnDescriptor> columns)
throws IOException {
ParquetMetadata footer = ParquetFileReader.readFooter(conf, path);
return new ParquetFileReader(conf, footer.getFileMetaData(), path, footer.getBlocks(), columns);
HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf);
SeekableInputStream inputStream = inputFile.newStream();
ParquetReadOptions readOptions = HadoopReadOptions.builder(conf).build();
ParquetMetadata footer = ParquetFileReader.readFooter(inputFile, readOptions, inputStream);
ParquetFileReader reader = ParquetFileReader.open(inputFile, footer, readOptions, inputStream);
reader.setRequestedSchema(columns);
return reader;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to explicitly add a new test case rather than silently changing old ones.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that, but it seems there is no dedicated test suite for ParquetFileReader constructor. The replaced API is deprecated, and the replacement does not reduce the test coverage because the deprecated API is used many times in other test suites.

}

/**
Expand Down