-
Notifications
You must be signed in to change notification settings - Fork 1.5k
PARQUET-400: Fix for ByteBuffer incomplete read issue #346
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PARQUET-400: Fix for ByteBuffer incomplete read issue #346
Conversation
…ng Hadoop 2.x The problem was not handling the case where a read request returns less than the requested number of bytes. The FSDataInputStream lacks an API equivalent for readFully when using ByteBuffers, which used to solve this problem when using byte arrays as the destination. This has been fixed by including a loop to manually request the remaining bytes until everything has been read.
Conflicts: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompatibilityUtil.java
|
@rdblue / @danielcweeks / @jaltekruse - please take a look |
|
Ping @rdblue, can you take a look please? |
| public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism"; | ||
|
|
||
| // configure if we want to use Hadoop's V2 read(bytebuffer). If true, we try to read using the | ||
| // new Hadoop read(ByteBuffer) api. Else, we skip. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you put some more comments here about what this v2 read is / how it works / why you would want this on or off?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some comments. Let me know if I can clarify more.
|
Invoking the read method via reflection could itself be slow right? I don't have an intuition on how slow that can be, but for such a low level thing as this it seems surprising that we'd do this. instead of having this Compatibility helper class that has v1/v2 switches all over the place, can we just make an interface (or abstract class) for these operations w/ 2 implementations? Then we just pick an implementation at startup and don't need any if v1 / if v2 logic. Also IIRC you get some decent performance optimizations if you only ever class load a single implementation of a particular interface / abstract class, so it should be better than calling |
| private int readWithByteBuffer(FSDataInputStream f, ByteBuffer readBuf) throws IOException { | ||
| int remaining = readBuf.remaining(); | ||
| try { | ||
| while (readBuf.hasRemaining()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there no readFully() method we can call that handles this for us? that seems surprising.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah not that I'm aware of https://hadoop.apache.org/docs/r2.7.1/api/org/apache/hadoop/fs/FSDataInputStream.html#read(java.nio.ByteBuffer)
Reads up to buf.remaining() bytes into buf. Callers should use buf.limit(..) to control the size of the desired read.
|
@isnotinvain - I like your idea of skipping using reflection. Will look into creating a parquet-hadoop2 module in the project that depends on hadoop 2.x so that we have the |
|
@isnotinvain - updated the implementation to skip reflection on the individual read calls. Just using reflection at the start to figure out which of the two interfaces to use. Tested this out with v1 reads & v2 reads with a hadoop job - seems to work ok. |
| * @throws EOFException if readBuf.remaining() is greater than the number of bytes available to | ||
| * read on the FSDataInputStream f. | ||
| */ | ||
| int readBuf(FSDataInputStream f, ByteBuffer readBuf) throws IOException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be called readFully?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah can rename it to readFully
|
Do we need the configuration property for whether to use v1 or v2? It seems like we can auto-detect this, so any reason for the added config? |
|
The rationale for the config property was to avoid performing the reflection based check for every file attempted to be read by Parquet in some scenarios. If you know you're running Parquet in a v1 based setup you could specify that you don't want the byteBuffer based read and directly end up using the V1 APIs. We could do the check once and store the result as a static member variable but that means you're constrained to that value for the entire JVM runtime. I believe @rdblue brought that up as a concern on the previous version of this PR - #306. |
| List<Chunk> result = new ArrayList<Chunk>(chunks.size()); | ||
| f.seek(offset); | ||
|
|
||
| //Allocate the bytebuffer based on whether the FS can support it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: There should be a space before "Allocate".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
Closing in favor of: #349 |
Spinning up a new PR as I don't have write permissions to update @jaltekruse 's existing PR. Fixed a few of the comments that were outstanding on that PR:
readFullyfor the fallback and check if byteBuffer.hasArray() is true.getBufto read all the remaining bytes into the byte buffer. While testing this, I noticed that on our Hadoop (2.x) cluster we end up returning fewer bytes than byteBuffer.remaining(), so I've added a loop to ensure we get all the remaining bytes. Also seems to line up with the javadoc for FSDataInputStream.read(byteBuffer).