diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index ae2de87cbc..2ef39f7804 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -567,8 +567,23 @@ public static final ParquetMetadata readFooter(InputFile file, MetadataFilter fi public static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException { + return readFooter(file, options, f, /*closeStreamOnFailure*/ false); + } + + private static final ParquetMetadata readFooter( + InputFile file, ParquetReadOptions options, SeekableInputStream f, boolean closeStreamOnFailure) + throws IOException { ParquetMetadataConverter converter = new ParquetMetadataConverter(options); - return readFooter(file, options, f, converter); + try { + return readFooter(file, options, f, converter); + } catch (Exception e) { + // In case that readFooter throws an exception in the constructor, the new stream + // should be closed. Otherwise, there's no way to close this outside. + if (closeStreamOnFailure) { + f.close(); + } + throw e; + } } private static final ParquetMetadata readFooter( @@ -729,6 +744,22 @@ public static ParquetFileReader open(InputFile file, ParquetReadOptions options, return new ParquetFileReader(file, options, f); } + /** + * Open a {@link InputFile file} with {@link ParquetMetadata footer} and {@link ParquetReadOptions options}. + * + * @param file an input file + * @param footer a {@link ParquetMetadata} footer already read from the file + * @param options parquet read options + * @param f the input stream for the file + * @return an open ParquetFileReader + * @throws IOException if there is an error while opening the file + */ + public static ParquetFileReader open( + InputFile file, ParquetMetadata footer, ParquetReadOptions options, SeekableInputStream f) + throws IOException { + return new ParquetFileReader(file, footer, options, f); + } + protected SeekableInputStream f; private final InputFile file; private final ParquetReadOptions options; @@ -930,19 +961,31 @@ public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOEx this(file, options, file.newStream()); } + /** + * @param file Path to a parquet file + * @param options {@link ParquetReadOptions} + * @param f a {@link SeekableInputStream} for the parquet file + * @throws IOException if the file can not be opened + */ public ParquetFileReader(InputFile file, ParquetReadOptions options, SeekableInputStream f) throws IOException { + this(file, readFooter(file, options, f, /*closeStreamOnFailure*/ true), options, f); + } + + /** + * @param file Path to a parquet file + * @param footer a {@link ParquetMetadata} footer already read from the file + * @param options {@link ParquetReadOptions} + * @param f a {@link SeekableInputStream} for the parquet file + * @throws IOException if the file can not be opened + */ + public ParquetFileReader(InputFile file, ParquetMetadata footer, ParquetReadOptions options, SeekableInputStream f) + throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = file; this.f = f; this.options = options; - try { - this.footer = readFooter(file, options, f, converter); - } catch (Exception e) { - // In case that reading footer throws an exception in the constructor, the new stream - // should be closed. Otherwise, there's no way to close this outside. - f.close(); - throw e; - } + this.footer = footer; + this.fileMetaData = footer.getFileMetaData(); this.fileDecryptor = fileMetaData.getFileDecryptor(); // must be called before filterRowGroups! if (null != fileDecryptor && fileDecryptor.plaintextFile()) { @@ -1054,13 +1097,17 @@ public List getRowGroups() { return blocks; } - public void setRequestedSchema(MessageType projection) { + public void setRequestedSchema(List columns) { paths.clear(); - for (ColumnDescriptor col : projection.getColumns()) { + for (ColumnDescriptor col : columns) { paths.put(ColumnPath.get(col.getPath()), col); } } + public void setRequestedSchema(MessageType projection) { + setRequestedSchema(projection.getColumns()); + } + public void appendTo(ParquetFileWriter writer) throws IOException { writer.appendRowGroups(f, blocks, true); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java index 79a81a5e95..013498c2b4 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageChecksums.java @@ -36,6 +36,8 @@ import java.util.zip.CRC32; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.HeapByteBufferAllocator; import org.apache.parquet.bytes.TrackingByteBufferAllocator; @@ -688,8 +690,13 @@ private int getDataOffset(Page page) { */ private ParquetFileReader getParquetFileReader(Path path, Configuration conf, List columns) throws IOException { - ParquetMetadata footer = ParquetFileReader.readFooter(conf, path); - return new ParquetFileReader(conf, footer.getFileMetaData(), path, footer.getBlocks(), columns); + HadoopInputFile inputFile = HadoopInputFile.fromPath(path, conf); + SeekableInputStream inputStream = inputFile.newStream(); + ParquetReadOptions readOptions = HadoopReadOptions.builder(conf).build(); + ParquetMetadata footer = ParquetFileReader.readFooter(inputFile, readOptions, inputStream); + ParquetFileReader reader = ParquetFileReader.open(inputFile, footer, readOptions, inputStream); + reader.setRequestedSchema(columns); + return reader; } /**