-
Notifications
You must be signed in to change notification settings - Fork 453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement OperatorInputStream and OperatorOutputStream #4626
Conversation
15dbd41
to
b8f9d99
Compare
bindings/java/src/test/java/org/apache/opendal/test/OperatorInputStreamTest.java
Outdated
Show resolved
Hide resolved
@@ -39,11 +41,23 @@ pub struct StdBytesIterator { | |||
impl StdBytesIterator { | |||
/// NOTE: don't allow users to create StdIterator directly. | |||
#[inline] | |||
pub(crate) fn new(r: oio::BlockingReader, range: std::ops::Range<u64>) -> Self { | |||
pub(crate) fn new(r: oio::BlockingReader, range: impl RangeBounds<u64>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to wait for #4594.
I'm still working on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it blocked? I don't see the dependency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some services may behavior wrong while handling limit = u64::MAX
, it's a bug need to fix. Can we revert changes to core
first in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the point. But change range
to support ..
is significant to this PR. So I'd instead wait for your change and later rebase on those changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it and thanks a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or if you can rebase on my patch? I don't think the bug you mention would conflict with this patch but it's instead another issue to fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makese sense, let me adapt those logic here.
bindings/java/src/test/java/org/apache/opendal/test/OperatorInputStreamTest.java
Outdated
Show resolved
Hide resolved
Signed-off-by: tison <wander4096@gmail.com>
b8f9d99
to
f8d2a61
Compare
<dependency> | ||
<groupId>commons-io</groupId> | ||
<artifactId>commons-io</artifactId> | ||
<scope>test</scope> | ||
</dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add to test scope for some handy debugging, like:
System.out.println(FileUtils.readFileToString(tempDir.resolve(path).toFile(), StandardCharsets.UTF_8));
This doesn't affect the formal release.
Signed-off-by: tison <wander4096@gmail.com>
f9a4e69
to
6a25c47
Compare
} | ||
} | ||
|
||
private static final int MAX_BYTES = 16384; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later we can make it configuable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please configure by op.writer_with(path).chunk(size)
, and remove internal bytes
in OperatorOutputStream
.
We can implement this in coming PRs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in #4626 (comment), they are differnt concept of buffering. I agree that we can improve the configurability here, though.
if (offset > MAX_BYTES) { | ||
throw new IOException("INTERNAL ERROR: " + offset + " > " + MAX_BYTES); | ||
} else if (offset < MAX_BYTES) { | ||
final byte[] bytes = Arrays.copyOf(this.bytes, offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is normally called on close and thus only copy once. But users may like to flush to the underneath writer manually.
I'm not sure if we can avoid this copy with some JNI regioned array methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is normally called on close and thus only copy once. But users may like to flush to the underneath writer manually.
OpenDAL doesn't provide flush semantics and only guarantees data persistence after closing. Therefore, flush
should be a no-op, and this behavior needs to be clearly documented in our public API.
public void write(int b) throws IOException { | ||
bytes[offset++] = (byte) b; | ||
if (offset >= MAX_BYTES) { | ||
flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this, opendal core handling buffer on it own.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need it.
This sends the Java buffer to the Rust. Otherwise the Rust operator never receive the buffer. We should buffer it a bit in the Java side because if we send every byte, it would introduce expensive JNI call total cost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is, they are different concept of buffering.
if (offset > MAX_BYTES) { | ||
throw new IOException("INTERNAL ERROR: " + offset + " > " + MAX_BYTES); | ||
} else if (offset < MAX_BYTES) { | ||
final byte[] bytes = Arrays.copyOf(this.bytes, offset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is normally called on close and thus only copy once. But users may like to flush to the underneath writer manually.
OpenDAL doesn't provide flush semantics and only guarantees data persistence after closing. Therefore, flush
should be a no-op, and this behavior needs to be clearly documented in our public API.
} | ||
} | ||
|
||
private static final int MAX_BYTES = 16384; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please configure by op.writer_with(path).chunk(size)
, and remove internal bytes
in OperatorOutputStream
.
We can implement this in coming PRs.
…4626) Signed-off-by: tison <wander4096@gmail.com>
This closes #4616.