diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 55ed5ee050..2b0a862a06 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -96,6 +96,10 @@ public class ParquetFileReader implements Closeable { public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; + //URI Schemes to blacklist for bytebuffer read. + public static final String PARQUET_BYTEBUFFER_BLACKLIST = "parquet.bytebuffer.fs.blacklist"; + public static final String[] PARQUET_BYTEBUFFER_BLACKLIST_DEFAULT = {"s3", "s3n", "s3a"}; + private static ParquetMetadataConverter converter = new ParquetMetadataConverter(); /** @@ -472,6 +476,7 @@ static ParquetFileReader open(Configuration conf, Path file) throws IOException private final FileMetaData fileMetaData; private final String createdBy; private final ByteBufferAllocator allocator; + private final boolean disableByteBufferRead; private int currentBlock = 0; @@ -506,6 +511,10 @@ public ParquetFileReader( // the codec factory to get decompressors this.codecFactory = new CodecFactory(configuration, 0); this.allocator = new HeapByteBufferAllocator(); + + //Bypass ByteBuffer read path for S3 FileSystems. See PARQUET-400. + List fsBlackList = Arrays.asList(configuration.getStrings(PARQUET_BYTEBUFFER_BLACKLIST, PARQUET_BYTEBUFFER_BLACKLIST_DEFAULT)); + this.disableByteBufferRead = fsBlackList.contains(filePath.toUri().getScheme()); } public void appendTo(ParquetFileWriter writer) throws IOException { @@ -725,7 +734,7 @@ protected PageHeader readPageHeader() throws IOException { // to allow reading older files (using dictionary) we need this. // usually 13 to 19 bytes are missing // if the last page is smaller than this, the page header itself is truncated in the buffer. - this.byteBuf.rewind(); // resetting the buffer to the position before we got the error + this.byteBuf.position(initialPos); // resetting the buffer to the position before we got the error LOG.info("completing the column chunk to read the page header"); pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream. } @@ -814,8 +823,20 @@ public void addChunk(ChunkDescriptor descriptor) { public List readAll(FSDataInputStream f) throws IOException { List result = new ArrayList(chunks.size()); f.seek(offset); - ByteBuffer chunksByteBuffer = allocator.allocate(length); - CompatibilityUtil.getBuf(f, chunksByteBuffer, length); + + //Allocate the bytebuffer based on whether the FS can support it. + ByteBuffer chunksByteBuffer; + if(disableByteBufferRead) { + byte[] chunkBytes = new byte[length]; + f.readFully(chunkBytes); + chunksByteBuffer = ByteBuffer.wrap(chunkBytes); + } else { + chunksByteBuffer = allocator.allocate(length); + while (chunksByteBuffer.hasRemaining()) { + CompatibilityUtil.getBuf(f, chunksByteBuffer); + } + } + // report in a counter the data we just scanned BenchmarkCounter.incrementBytesRead(length); int currentChunkOffset = 0; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java index bacf222a24..4289e1a265 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java @@ -66,17 +66,7 @@ private V21FileAPI() throws ReflectiveOperationException { } } - private static Object invoke(Method method, String errorMsg, Object instance, Object... args) { - try { - return method.invoke(instance, args); - } catch (IllegalAccessException e) { - throw new IllegalArgumentException(errorMsg, e); - } catch (InvocationTargetException e) { - throw new IllegalArgumentException(errorMsg, e); - } - } - - public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) throws IOException { + public static int getBuf(FSDataInputStream f, ByteBuffer readBuf) throws IOException { int res; if (useV21) { try { @@ -88,7 +78,7 @@ public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) t // be a reasonable check to make to see if the interface is // present but not implemented and we should be falling back useV21 = false; - return getBuf(f, readBuf, maxSize); + return getBuf(f, readBuf); } else if (e.getCause() instanceof IOException) { throw (IOException) e.getCause(); } else { @@ -105,9 +95,15 @@ public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) t throw new ShouldNeverHappenException(e); } } else { - byte[] buf = new byte[maxSize]; - res = f.read(buf); - readBuf.put(buf, 0, res); + if (readBuf.hasArray()) { + int initPos = readBuf.position(); + res = f.read(readBuf.array(), readBuf.arrayOffset(), readBuf.remaining()); + readBuf.position(initPos + res); + } else { + byte[] buf = new byte[readBuf.remaining()]; + res = f.read(buf); + readBuf.put(buf, 0, res); + } } return res; }