-
Notifications
You must be signed in to change notification settings - Fork 1.5k
PARQUET-400: Replace CompatibilityUtil with SeekableInputStream. #349
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@piyushnarang, this is the update to your PR that I think may be a bit cleaner. |
| } | ||
|
|
||
| // Visisble for testing | ||
| static int readDirectBuffer(FSDataInputStream f, ByteBuffer buf, byte[] temp) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To cut down on allocation, this uses an 8k buffer that's tied to the instance. We can change this and maybe even make it configurable to find out where a good trade-off is.
|
Think for the most part this is similar to what we had in the other PR. Guess I'm not entirely clear on how this enables non Hadoop file systems in the future - you're still depending on Taking a step back, does it make sense to break out the ParquetFileReader code to create a ParquetFileReader interface / abstract that has a Hadoop based implementation & other implementations? Feels like we're trying to retro fit this support in the ParquetFileReader cause it currently has the logic inline. If we pulled it out it might make things easier. Think we should try and eliminate the reflection overhead (or benchmark it to confirm its low) if possible. With this implementation we call the reflection based ctor of the V2 reader everytime we create a ParquetFileReader. |
|
@piyushnarang, it's definitely similar, I just made a few changes to your PR. The I think we do eventually want to make a reader API that is independent of Hadoop, but I don't think we need to do it in this PR. I'm just trying to make this get us further along toward the goal of not needing the Hadoop API to use Parquet. On reflection overhead: this has the same overhead that the previous implementations had, instantiating a class to handle Hadoop 2 streams. |
|
Yeah I personally prefer keeping such changes small and iterative. Would be nice if we just tackled fixing the immediate concern (reads are broken) first and then followed up with a change to make Parquet more friendly to non-Hadoop setups. That said, if you're keen on adding that as part of this fix let's go ahead. I can close my prior PR in favor of this. Makes sense to move Let me know when you're happy with the implementation, I can take a more detailed look. |
|
@piyushnarang, I'd prefer to go with this approach since it solves the same problem and sets us up for separating the APIs later. I've added a big file of tests based on your MockInputStream so I think this is ready to go. |
| import java.io.InputStream; | ||
| import java.nio.ByteBuffer; | ||
|
|
||
| public abstract class SeekableInputStream extends InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add some javadoc on this abstract class and its purpose etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if SeekableInputStream is the right name? Maybe ParquetInputStream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This api is not really Parquet specific. It allows to seek and read blocks of the file.
Being in a parquet package is enough "parquet" in the name I think.
|
I made some comments. Thanks @piyushnarang and @rdblue for taking care of this |
80d5889 to
c6ff434
Compare
|
@piyushnarang, @julienledem, I've addressed the review items. I tested the hadoop 2 readFully method for ByteBuffer using the same tests I wrote for the hadoop 1 implementation, but the test won't compile in hadoop-1 so we would have to create another module for hadoop-2 tests and exclude it from the hadoop 1 test run. I don't think testing the readFully method is worth the trouble because it is so simple and unlikely to change from its current working and tested state. The tests are available in 4f273e4 if you'd like to have a look, but they were removed in the next commit to get tests passing in Jenkins. |
| */ | ||
| public static SeekableInputStream wrap(FSDataInputStream stream) { | ||
| if (byteBufferReadableClass != null && h2SeekableConstructor != null && | ||
| byteBufferReadableClass.isInstance(stream.getWrappedStream())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this check is a bit fragile? It will work if the stream's immediate inner stream is an instance of ByteBufferReadable that has a concrete implementations of read(ByteBuffer buf). If that stream too ends up delegating to its inner stream then we might have a problem right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if the wrapped class implemented ByteBufferReadable in order to delegate to an implementation that may not be, then it would be a problem and that would be caught by the previous version but not this method. But, I think it's fine to assume that when a stream implements ByteBufferReadable that it actually does. That's what ByteBufferReadable is intended to signal and it's weird that we have to implement a special case for FSDataInputStream at all.
Also, the previous implementation calls a method that isn't present in hadoop-1 and to do that relies on when methods are linked. I think the trade-off is worth this cleaner way of deciding whether to use the ByteBuffer interface, but I'm happy to change it if others agree with you that we should handle more levels of wrapper classes.
|
@rdblue - given that we expect the bulk of people to use the hadoop 2 version of the code (correct me if I'm wrong), we should have the tests running. Even if we have to create a new module for the tests, I think it is probably worth it as this functionality is pretty important to ensuring Parquet reads work. Will give future developers something to use in case they need to change the hadoop 2 read code. Might have misunderstood so let me know if you had something else in mind.. |
| } | ||
| } | ||
|
|
||
| public static void readFully(Reader stream, ByteBuffer buf) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename from stream to reader
|
Thanks Ryan. Couple of minor comments but think it looks good to me. |
|
I fixed the nits that @piyushnarang pointed out. Anything else? @julienledem or @isnotinvain? |
|
👍 Thanks @piyushnarang and @rdblue for tackling this! +1 |
| * {@code SeekableInputStream} is an interface with the methods needed by | ||
| * Parquet to read data from a file or Hadoop data stream. | ||
| */ | ||
| public abstract class SeekableInputStream extends InputStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add getLength() as well.
That's the only extra information we need to read the footer. It's missing in FSDataInputStream and that would simplify some code where we have to pass the FileStatus object along.
it is always available since we get the stream with FileStatus.open().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A seekable stream is a fairly normal construct, while a stream that knows its own length is irregular. I think we are probably better off having a slightly higher-level concept of a stream provider that knows the length of streams it opens. That would basically encapsulate FileStatus and FileSystem so you can pass a single object that can open parallel streams for a single Parquet file.
How about doing this as a follow up? This issue is a blocker for 1.9.0 so I'd like to get it in. We can discuss the right way to pass around the length but also work toward getting 1.9.0 out. I'll open an issue for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened PARQUET-674.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough
|
+1 |
|
Looks like 76a2ac8 conflicts with this PR |
This commit: * Makes SeekableInputStream implementations package private * Adds tests for the H2 stream wrapper * Adds javadoc to SeekableInputStream * Simplifies the wrap method by checking the underlying stream
These tests fail in Hadoop-1 and would require a new module.
This adds a wrapper so that the static method used for readFully can be passed a stand-in for FSDataInputStream.
7009b6b to
1bcb8a8
Compare
|
@isnotinvain, I rebased and fixed the conflicts, which were small. I'll commit this later today. |
|
sounds good to me |
|
Great, thanks! |
|
Thanks, glad this is now out :-) |
This fixes PARQUET-400 by replacing `CompatibilityUtil` with `SeekableInputStream` that's implemented for hadoop-1 and hadoop-2. The benefit of this approach is that `SeekableInputStream` can be used for non-Hadoop file systems in the future. This also changes the default Hadoop version to Hadoop-2. The library is still compatible with Hadoop 1.x, but this makes building Hadoop-2 classes, like `H2SeekableInputStream`, much easier and removes the need for multiple hadoop versions during compilation. Author: Ryan Blue <blue@apache.org> Closes apache#349 from rdblue/PARQUET-400-byte-buffers and squashes the following commits: 1bcb8a8 [Ryan Blue] PARQUET-400: Fix review nits. 823ca00 [Ryan Blue] PARQUET-400: Add tests for Hadoop 2 readFully. 02d3709 [Ryan Blue] PARQUET-400: Remove unused property. b543013 [Ryan Blue] PARQUET-400: Fix logger for HadoopStreams. 2cb6934 [Ryan Blue] PARQUET-400: Remove H2SeekableInputStream tests. abaa695 [Ryan Blue] PARQUET-400: Fix review items. 5dc50a5 [Ryan Blue] PARQUET-400: Add tests for H1SeekableInputStream methods. 730a9e2 [Ryan Blue] PARQUET-400: Move SeekableInputStream to io package. 506a556 [Ryan Blue] PARQUET-400: Remove Hadoop dependencies from SeekableInputStream. c80580c [Ryan Blue] PARQUET-400: Handle UnsupportedOperationException from read(ByteBuffer). ba08b3f [Ryan Blue] PARQUET-400: Replace CompatibilityUtil with SeekableInputStream. Conflicts: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java pom.xml Resolution: Fixed minor conflicts from using byte[] instead of ByteBuffer. Updated pom changes for current Pig version (PPD not backported).
This fixes PARQUET-400 by replacing `CompatibilityUtil` with `SeekableInputStream` that's implemented for hadoop-1 and hadoop-2. The benefit of this approach is that `SeekableInputStream` can be used for non-Hadoop file systems in the future. This also changes the default Hadoop version to Hadoop-2. The library is still compatible with Hadoop 1.x, but this makes building Hadoop-2 classes, like `H2SeekableInputStream`, much easier and removes the need for multiple hadoop versions during compilation. Author: Ryan Blue <blue@apache.org> Closes apache#349 from rdblue/PARQUET-400-byte-buffers and squashes the following commits: 1bcb8a8 [Ryan Blue] PARQUET-400: Fix review nits. 823ca00 [Ryan Blue] PARQUET-400: Add tests for Hadoop 2 readFully. 02d3709 [Ryan Blue] PARQUET-400: Remove unused property. b543013 [Ryan Blue] PARQUET-400: Fix logger for HadoopStreams. 2cb6934 [Ryan Blue] PARQUET-400: Remove H2SeekableInputStream tests. abaa695 [Ryan Blue] PARQUET-400: Fix review items. 5dc50a5 [Ryan Blue] PARQUET-400: Add tests for H1SeekableInputStream methods. 730a9e2 [Ryan Blue] PARQUET-400: Move SeekableInputStream to io package. 506a556 [Ryan Blue] PARQUET-400: Remove Hadoop dependencies from SeekableInputStream. c80580c [Ryan Blue] PARQUET-400: Handle UnsupportedOperationException from read(ByteBuffer). ba08b3f [Ryan Blue] PARQUET-400: Replace CompatibilityUtil with SeekableInputStream. Conflicts: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java pom.xml Resolution: Fixed minor conflicts from using byte[] instead of ByteBuffer. Updated pom changes for current Pig version (PPD not backported).
This fixes PARQUET-400 by replacing
CompatibilityUtilwithSeekableInputStreamthat's implemented for hadoop-1 and hadoop-2. The benefit of this approach is thatSeekableInputStreamcan be used for non-Hadoop file systems in the future.This also changes the default Hadoop version to Hadoop-2. The library is still compatible with Hadoop 1.x, but this makes building Hadoop-2 classes, like
H2SeekableInputStream, much easier and removes the need for multiple hadoop versions during compilation.