Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ public class ParquetFileReader implements Closeable {

public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";

//URI Schemes to blacklist for bytebuffer read.
public static final String PARQUET_BYTEBUFFER_BLACKLIST = "parquet.bytebuffer.fs.blacklist";
public static final String[] PARQUET_BYTEBUFFER_BLACKLIST_DEFAULT = {"s3", "s3n", "s3a"};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we're going with the blacklist approach, we should also handle pure hdfs file systems as well right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We found the underlying bug, so I don't think the plan is to blacklist filesystems anymore.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right saw your other comment :-)


private static ParquetMetadataConverter converter = new ParquetMetadataConverter();

/**
Expand Down Expand Up @@ -472,6 +476,7 @@ static ParquetFileReader open(Configuration conf, Path file) throws IOException
private final FileMetaData fileMetaData;
private final String createdBy;
private final ByteBufferAllocator allocator;
private final boolean disableByteBufferRead;

private int currentBlock = 0;

Expand Down Expand Up @@ -506,6 +511,10 @@ public ParquetFileReader(
// the codec factory to get decompressors
this.codecFactory = new CodecFactory(configuration, 0);
this.allocator = new HeapByteBufferAllocator();

//Bypass ByteBuffer read path for S3 FileSystems. See PARQUET-400.
List<String> fsBlackList = Arrays.asList(configuration.getStrings(PARQUET_BYTEBUFFER_BLACKLIST, PARQUET_BYTEBUFFER_BLACKLIST_DEFAULT));
this.disableByteBufferRead = fsBlackList.contains(filePath.toUri().getScheme());
}

public void appendTo(ParquetFileWriter writer) throws IOException {
Expand Down Expand Up @@ -725,7 +734,7 @@ protected PageHeader readPageHeader() throws IOException {
// to allow reading older files (using dictionary) we need this.
// usually 13 to 19 bytes are missing
// if the last page is smaller than this, the page header itself is truncated in the buffer.
this.byteBuf.rewind(); // resetting the buffer to the position before we got the error
this.byteBuf.position(initialPos); // resetting the buffer to the position before we got the error
LOG.info("completing the column chunk to read the page header");
pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream.
}
Expand Down Expand Up @@ -814,8 +823,20 @@ public void addChunk(ChunkDescriptor descriptor) {
public List<Chunk> readAll(FSDataInputStream f) throws IOException {
List<Chunk> result = new ArrayList<Chunk>(chunks.size());
f.seek(offset);
ByteBuffer chunksByteBuffer = allocator.allocate(length);
CompatibilityUtil.getBuf(f, chunksByteBuffer, length);

//Allocate the bytebuffer based on whether the FS can support it.
ByteBuffer chunksByteBuffer;
if(disableByteBufferRead) {
byte[] chunkBytes = new byte[length];
f.readFully(chunkBytes);
chunksByteBuffer = ByteBuffer.wrap(chunkBytes);
} else {
chunksByteBuffer = allocator.allocate(length);
while (chunksByteBuffer.hasRemaining()) {
CompatibilityUtil.getBuf(f, chunksByteBuffer);
}
}

// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
int currentChunkOffset = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,7 @@ private V21FileAPI() throws ReflectiveOperationException {
}
}

private static Object invoke(Method method, String errorMsg, Object instance, Object... args) {
try {
return method.invoke(instance, args);
} catch (IllegalAccessException e) {
throw new IllegalArgumentException(errorMsg, e);
} catch (InvocationTargetException e) {
throw new IllegalArgumentException(errorMsg, e);
}
}

public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) throws IOException {
public static int getBuf(FSDataInputStream f, ByteBuffer readBuf) throws IOException {
int res;
if (useV21) {
try {
Expand All @@ -88,7 +78,7 @@ public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) t
// be a reasonable check to make to see if the interface is
// present but not implemented and we should be falling back
useV21 = false;
return getBuf(f, readBuf, maxSize);
return getBuf(f, readBuf);
} else if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
Expand All @@ -105,9 +95,15 @@ public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) t
throw new ShouldNeverHappenException(e);
}
} else {
byte[] buf = new byte[maxSize];
res = f.read(buf);
readBuf.put(buf, 0, res);
if (readBuf.hasArray()) {
int initPos = readBuf.position();
res = f.read(readBuf.array(), readBuf.arrayOffset(), readBuf.remaining());
readBuf.position(initPos + res);
} else {
byte[] buf = new byte[readBuf.remaining()];
res = f.read(buf);
readBuf.put(buf, 0, res);
}
}
return res;
}
Expand Down