Skip to content

Commit b59be86

Browse files
committed
PARQUET-674: Add InputFile abstraction for openable files.
Author: Ryan Blue <blue@apache.org> Closes #368 from rdblue/PARQUET-674-add-data-source and squashes the following commits: 8c689e9 [Ryan Blue] PARQUET-674: Implement review comments. 4a7c327 [Ryan Blue] PARQUET-674: Add DataSource abstraction for openable files.
1 parent e54ca61 commit b59be86

File tree

3 files changed

+130
-11
lines changed

3 files changed

+130
-11
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.parquet.io;
21+
22+
import java.io.IOException;
23+
24+
/**
25+
* {@code InputFile} is an interface with the methods needed by Parquet to read
26+
* data files using {@link SeekableInputStream} instances.
27+
*/
28+
public interface InputFile {
29+
30+
/**
31+
* Returns the total length of the file, in bytes.
32+
* @throws IOException if the length cannot be determined
33+
*/
34+
long getLength() throws IOException;
35+
36+
/**
37+
* Opens a new {@link SeekableInputStream} for the underlying
38+
* data file.
39+
* @throws IOException if the stream cannot be opened.
40+
*/
41+
SeekableInputStream newStream() throws IOException;
42+
43+
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,13 @@
8888
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
8989
import org.apache.parquet.hadoop.metadata.FileMetaData;
9090
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
91+
import org.apache.parquet.hadoop.util.HadoopInputFile;
9192
import org.apache.parquet.hadoop.util.HiddenFileFilter;
9293
import org.apache.parquet.hadoop.util.HadoopStreams;
9394
import org.apache.parquet.io.SeekableInputStream;
9495
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter;
9596
import org.apache.parquet.io.ParquetDecodingException;
97+
import org.apache.parquet.io.InputFile;
9698

9799
/**
98100
* Internal implementation of the Parquet file reader as a block container
@@ -410,8 +412,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, Path
410412
* @throws IOException if an error occurs while reading the file
411413
*/
412414
public static ParquetMetadata readFooter(Configuration configuration, Path file, MetadataFilter filter) throws IOException {
413-
FileSystem fileSystem = file.getFileSystem(configuration);
414-
return readFooter(configuration, fileSystem.getFileStatus(file), filter);
415+
return readFooter(HadoopInputFile.fromPath(file, configuration), filter);
415416
}
416417

417418
/**
@@ -431,12 +432,21 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
431432
* @throws IOException if an error occurs while reading the file
432433
*/
433434
public static final ParquetMetadata readFooter(Configuration configuration, FileStatus file, MetadataFilter filter) throws IOException {
434-
FileSystem fileSystem = file.getPath().getFileSystem(configuration);
435-
SeekableInputStream in = HadoopStreams.wrap(fileSystem.open(file.getPath()));
436-
try {
437-
return readFooter(file.getLen(), file.getPath().toString(), in, filter);
438-
} finally {
439-
in.close();
435+
return readFooter(HadoopInputFile.fromStatus(file, configuration), filter);
436+
}
437+
438+
/**
439+
* Reads the meta data block in the footer of the file using provided input stream
440+
* @param file a {@link InputFile} to read
441+
* @param filter the filter to apply to row groups
442+
* @return the metadata blocks in the footer
443+
* @throws IOException if an error occurs while reading the file
444+
*/
445+
public static final ParquetMetadata readFooter(
446+
InputFile file, MetadataFilter filter) throws IOException {
447+
try (SeekableInputStream in = file.newStream()) {
448+
return readFooter(converter, file.getLength(), file.toString(),
449+
in, filter);
440450
}
441451
}
442452

@@ -449,7 +459,7 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
449459
* @return the metadata blocks in the footer
450460
* @throws IOException if an error occurs while reading the file
451461
*/
452-
public static final ParquetMetadata readFooter(long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
462+
private static final ParquetMetadata readFooter(ParquetMetadataConverter converter, long fileLen, String filePath, SeekableInputStream f, MetadataFilter filter) throws IOException {
453463
if (Log.DEBUG) {
454464
LOG.debug("File length " + fileLen);
455465
}
@@ -563,7 +573,7 @@ public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) t
563573
FileSystem fs = file.getFileSystem(conf);
564574
this.fileStatus = fs.getFileStatus(file);
565575
this.f = HadoopStreams.wrap(fs.open(file));
566-
this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
576+
this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, filter);
567577
this.fileMetaData = footer.getFileMetaData();
568578
this.blocks = footer.getBlocks();
569579
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
@@ -602,7 +612,7 @@ public ParquetMetadata getFooter() {
602612
if (footer == null) {
603613
try {
604614
// don't read the row groups because this.blocks is always set
605-
this.footer = readFooter(fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
615+
this.footer = readFooter(converter, fileStatus.getLen(), fileStatus.getPath().toString(), f, SKIP_ROW_GROUPS);
606616
} catch (IOException e) {
607617
throw new ParquetDecodingException("Unable to read file footer", e);
608618
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.parquet.hadoop.util;
21+
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.fs.FileStatus;
24+
import org.apache.hadoop.fs.FileSystem;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.parquet.io.SeekableInputStream;
27+
import org.apache.parquet.io.InputFile;
28+
import java.io.IOException;
29+
30+
public class HadoopInputFile implements InputFile {
31+
32+
private final FileSystem fs;
33+
private final FileStatus stat;
34+
35+
public static HadoopInputFile fromPath(Path path, Configuration conf)
36+
throws IOException {
37+
FileSystem fs = path.getFileSystem(conf);
38+
return new HadoopInputFile(fs, fs.getFileStatus(path));
39+
}
40+
41+
public static HadoopInputFile fromStatus(FileStatus stat, Configuration conf)
42+
throws IOException {
43+
FileSystem fs = stat.getPath().getFileSystem(conf);
44+
return new HadoopInputFile(fs, stat);
45+
}
46+
47+
private HadoopInputFile(FileSystem fs, FileStatus stat) {
48+
this.fs = fs;
49+
this.stat = stat;
50+
}
51+
52+
@Override
53+
public long getLength() {
54+
return stat.getLen();
55+
}
56+
57+
@Override
58+
public SeekableInputStream newStream() throws IOException {
59+
return HadoopStreams.wrap(fs.open(stat.getPath()));
60+
}
61+
62+
@Override
63+
public String toString() {
64+
return stat.getPath().toString();
65+
}
66+
}

0 commit comments

Comments
 (0)