-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-19303. VectorIO API: support pass-down of a release() operator #7418
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-19303. VectorIO API: support pass-down of a release() operator #7418
Conversation
Adds a new method with a release method too
readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate,
Consumer<ByteBuffer> release)
This is return buffers to pools even in failures.
The default implementation hands back to readVectored/2,
so that existing custom implementations of that will get
invoked.
There is a weakness there: it means implementations which
don't implement readVectored/2 will not have buffer release
working.
Checksum, Buffer and RawLocal all do this.
S3A, ABFS and HDFS don't do this.
Change-Id: I26fa7214014482e39af6a9212d2e30dd0fc6d429
|
💔 -1 overall
This message was automatically generated. |
|
@cnauroth @anujmodi2021 have either of you two implemented the vector read API yet? I ask as this PR currently maps the readVectored/3 call to the readVectored/2 call unless overridden, so the default implementation will leak buffers on failure, even if a release function is passed in. If I change it to passing the release call down, then any input stream which implemented readVectored/2 will not have the readVectored/3 call invoking it, unless they override that explicitly too. In this PR, everything in hadoop common does, and I will in S3AInputStream. I'm just trying to work out the best design for other streams. IF all the implementation are in the hadoop source tree, I can do the overrides there and have a default which does release buffers everywhere else. |
| channel.read(buffers[i], range.getOffset(), i, asyncHandler); | ||
| } | ||
| } catch (IOException ioe) { | ||
| LOG.debug("Exception occurred during vectored read ", ioe); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like this was unnecessary. Any failure will automatically be caught in failed() method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no IOE to be raised any more
| * part of their error handling. | ||
| * @param ranges the byte ranges to read | ||
| * @param allocate the function to allocate ByteBuffer | ||
| * @param release the function to release a ByteBuffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove "the"
| * Implementations SHOULD override this method if they can release buffers as | ||
| * part of their error handling. | ||
| * @param ranges the byte ranges to read | ||
| * @param allocate the function to allocate ByteBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove "the"
| */ | ||
| public static final Consumer<ByteBuffer> LOG_BYTE_BUFFER_RELEASED = | ||
| (buffer) -> { | ||
| LOG.debug("release buffer {}", buffer.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we parameterize and add. releasing buffer for range[x-y] ?
| * No matter what kind of buffer is requested, the allocation function | ||
| * is invoked; that is: the direct flag is ignored. | ||
| */ | ||
| public final class VectorIOBufferPool implements ByteBufferPool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this when we have WeakReferencedElasticByteBufferPool ? An actual implementation of release function should call putBuffer() of the WeakReferencedElasticByteBufferPool no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this as it's a handy way to combine the allocate and release methods into a tuple of oerations; my extended s3a recovery PR used/uses it.
Used this in the RawLocalFS to show how it can be used to pass around the combined allocate/release operations in one parameter
Hi @steveloughran, I am working on the vectored read API feature from the ABFS driver team. We are still working on the design part of the feature and will pick up the implementation soon. |
Hello @steveloughran ! GCS has an implementation of vectored read, overriding readVectored/2 here: Implementation details here: This is on the master branch and 3.0 release line, which is not yet in mainstream Dataproc use. We don't have vectored read in version 2.2 or earlier. It sounds like once this change is in a Hadoop release, GCS should plan on picking this up and overriding readVectored/3. Do I have it right? CC: @arunkumarchacko |
| } | ||
|
|
||
| @Override | ||
| public void readVectored(final List<? extends FileRange> ranges, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| * @param allocate the function to allocate ByteBuffer | ||
| * @param release the function to release a ByteBuffer. | ||
| * @throws IOException any IOE. | ||
| * @throws IllegalArgumentException if the any of ranges are invalid, or they overlap. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems like some words are out of order. Maybe "if any of the ranges..."?
| // Set up all of the futures, so that we can use them if things fail | ||
| for(FileRange range: sortedRanges) { | ||
| // Set up all of the futures, so that the caller can await on | ||
| // their competion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "completion"
* review comments * S3AInputStream implements overrides the new method API, but not any buffer releases. Change-Id: Ic01d10499924d2efc945436ea2a508a18d361e4e
cnauroth
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, pending CI.
Thank you, @steveloughran and @mukund-thakur .
|
🎊 +1 overall
This message was automatically generated. |
…#7418) The PositionedReadable vector IO API has a new readVectored() method which takes a release operator as its third argument. readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate, Consumer<ByteBuffer> release) This is return buffers to pools even in failures. The default implementation hands back to readVectored/2, so that existing custom implementations of that will get invoked. Contributed by Steve Loughran
…apache#7418) The PositionedReadable vector IO API has a new readVectored() method which takes a release operator as its third argument. readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate, Consumer<ByteBuffer> release) This is return buffers to pools even in failures. The default implementation hands back to readVectored/2, so that existing custom implementations of that will get invoked. Contributed by Steve Loughran
HADOOP-19303.
Adds a new method with a release method to
readVectored(List<? extends FileRange> ranges,
IntFunction allocate,
Consumer release)
This is return buffers to pools even in failures.
The default implementation hands back to readVectored/2, so that existing custom implementations of that will get invoked.
S3A, ABFS and HDFS don't do this.
This is the hadoop-common side of #7105, which I'm going to retire to
focus on analytics vector read performance.
Getting the new API into hadoop releases before that means that it is
ready for applications to use, if built against it.
How was this patch tested?
For code changes:
LICENSE,LICENSE-binary,NOTICE-binaryfiles?