Skip to content

Commit c93a22d

Browse files
author
Robert Kruszewski
committed
allow reading footers from provided file listing and streams
1 parent 60b6d5a commit c93a22d

File tree

1 file changed

+49
-8
lines changed

1 file changed

+49
-8
lines changed

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 49 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,36 @@ public static List<Footer> readAllFootersInParallel(final Configuration configur
236236
return readAllFootersInParallel(configuration, partFiles, false);
237237
}
238238

239+
/**
240+
* read all the footers of the files provided with input streams
241+
* (not using summary files)
242+
* @param configuration the conf to access the File System
243+
* @param partFiles the files to read with equivalent input streams
244+
* @param skipRowGroups to skip the rowGroup info
245+
* @return the footers
246+
* @throws IOException
247+
*/
248+
public static List<Footer> readAllFootersInParallel(final Configuration configuration, Map<FileStatus, FSDataInputStream> partFiles, final boolean skipRowGroups) throws IOException {
249+
List<Callable<Footer>> footers = new ArrayList<Callable<Footer>>();
250+
for (final Map.Entry<FileStatus, FSDataInputStream> currentFile : partFiles.entrySet()) {
251+
footers.add(new Callable<Footer>() {
252+
@Override
253+
public Footer call() throws Exception {
254+
try {
255+
return new Footer(currentFile.getKey().getPath(), readFooter(currentFile.getValue(), currentFile.getKey(), filter(skipRowGroups)));
256+
} catch (IOException e) {
257+
throw new IOException("Could not read footer for file " + currentFile, e);
258+
}
259+
}
260+
});
261+
}
262+
try {
263+
return runAllInParallel(configuration.getInt(PARQUET_READ_PARALLELISM, 5), footers);
264+
} catch (ExecutionException e) {
265+
throw new IOException("Could not read footer: " + e.getMessage(), e.getCause());
266+
}
267+
}
268+
239269
/**
240270
* read all the footers of the files provided
241271
* (not using summary files)
@@ -434,22 +464,33 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
434464
FileSystem fileSystem = file.getPath().getFileSystem(configuration);
435465
FSDataInputStream in = fileSystem.open(file.getPath());
436466
try {
437-
return readFooter(file, in, filter);
467+
return readFooter(in, file, filter);
438468
} finally {
439469
in.close();
440470
}
441471
}
442472

443-
private static final ParquetMetadata readFooter(FileStatus file, FSDataInputStream f, MetadataFilter filter) throws IOException {
444-
long l = file.getLen();
473+
/**
474+
* Reads the meta data block in the footer of the file using provided input stream
475+
* @param f input stream for the file
476+
* @param file the parquet File
477+
* @param filter the filter to apply to row groups
478+
* @return the metadata blocks in the footer
479+
* @throws IOException if an error occurs while reading the file
480+
*/
481+
public static final ParquetMetadata readFooter(FSDataInputStream f, FileStatus file, MetadataFilter filter) throws IOException {
482+
return readFooter(f, file.getLen(), file.getPath().toString(), filter);
483+
}
484+
485+
private static final ParquetMetadata readFooter(FSDataInputStream f, long fileLen, String filePath, MetadataFilter filter) throws IOException {
445486
if (Log.DEBUG) {
446-
LOG.debug("File length " + l);
487+
LOG.debug("File length " + fileLen);
447488
}
448489
int FOOTER_LENGTH_SIZE = 4;
449-
if (l < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
450-
throw new RuntimeException(file.getPath() + " is not a Parquet file (too small)");
490+
if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { // MAGIC + data + footer + footerIndex + MAGIC
491+
throw new RuntimeException(filePath + " is not a Parquet file (too small)");
451492
}
452-
long footerLengthIndex = l - FOOTER_LENGTH_SIZE - MAGIC.length;
493+
long footerLengthIndex = fileLen - FOOTER_LENGTH_SIZE - MAGIC.length;
453494
if (Log.DEBUG) {
454495
LOG.debug("reading footer index at " + footerLengthIndex);
455496
}
@@ -459,7 +500,7 @@ private static final ParquetMetadata readFooter(FileStatus file, FSDataInputStre
459500
byte[] magic = new byte[MAGIC.length];
460501
f.readFully(magic);
461502
if (!Arrays.equals(MAGIC, magic)) {
462-
throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
503+
throw new RuntimeException(filePath + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
463504
}
464505
long footerIndex = footerLengthIndex - footerLength;
465506
if (Log.DEBUG) {

0 commit comments

Comments
 (0)