Skip to content

Commit

Permalink
feat: implement OperatorInputStream and OperatorOutputStream (#4626)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun authored May 19, 2024
1 parent 87c558a commit ac69437
Show file tree
Hide file tree
Showing 12 changed files with 461 additions and 18 deletions.
12 changes: 12 additions & 0 deletions bindings/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

<!-- library dependencies -->
<assertj.version>3.23.1</assertj.version>
<commons-io.version>2.16.1</commons-io.version>
<dotenv.version>2.3.2</dotenv.version>
<lombok.version>1.18.30</lombok.version>
<slf4j.version>2.0.7</slf4j.version>
Expand Down Expand Up @@ -110,6 +111,12 @@
<artifactId>dotenv-java</artifactId>
<version>${dotenv.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
Expand Down Expand Up @@ -151,6 +158,11 @@
<artifactId>dotenv-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
Expand Down
1 change: 1 addition & 0 deletions bindings/java/src/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use jni::sys::jobject;
use jni::sys::jobjectArray;
use jni::sys::jsize;
use jni::JNIEnv;

use opendal::BlockingOperator;

use crate::convert::jstring_to_string;
Expand Down
2 changes: 2 additions & 0 deletions bindings/java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ mod error;
mod executor;
mod layer;
mod operator;
mod operator_input_stream;
mod operator_output_stream;
mod utility;

pub(crate) type Result<T> = std::result::Result<T, error::Error>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,18 @@ public void write(String path, byte[] content) {
write(nativeHandle, path, content);
}

public OperatorOutputStream createOutputStream(String path) {
return new OperatorOutputStream(this, path);
}

public byte[] read(String path) {
return read(nativeHandle, path);
}

public OperatorInputStream createInputStream(String path) {
return new OperatorInputStream(this, path);
}

public void delete(String path) {
delete(nativeHandle, path);
}
Expand Down Expand Up @@ -104,23 +112,23 @@ public List<Entry> list(String path) {
@Override
protected native void disposeInternal(long handle);

private static native long duplicate(long nativeHandle);
private static native long duplicate(long op);

private static native void write(long nativeHandle, String path, byte[] content);
private static native void write(long op, String path, byte[] content);

private static native byte[] read(long nativeHandle, String path);
private static native byte[] read(long op, String path);

private static native void delete(long nativeHandle, String path);
private static native void delete(long op, String path);

private static native Metadata stat(long nativeHandle, String path);
private static native Metadata stat(long op, String path);

private static native long createDir(long nativeHandle, String path);
private static native long createDir(long op, String path);

private static native long copy(long nativeHandle, String sourcePath, String targetPath);
private static native long copy(long op, String sourcePath, String targetPath);

private static native long rename(long nativeHandle, String sourcePath, String targetPath);
private static native long rename(long op, String sourcePath, String targetPath);

private static native void removeAll(long nativeHandle, String path);
private static native void removeAll(long op, String path);

private static native Entry[] list(long nativeHandle, String path);
private static native Entry[] list(long op, String path);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.opendal;

import java.io.IOException;
import java.io.InputStream;

public class OperatorInputStream extends InputStream {
private static class Reader extends NativeObject {
private Reader(long nativeHandle) {
super(nativeHandle);
}

@Override
protected void disposeInternal(long handle) {
disposeReader(handle);
}
}

private final Reader reader;

private int offset = 0;
private byte[] bytes = new byte[0];

public OperatorInputStream(BlockingOperator operator, String path) {
final long op = operator.nativeHandle;
this.reader = new Reader(constructReader(op, path));
}

@Override
public int read() throws IOException {
if (bytes != null && offset >= bytes.length) {
bytes = readNextBytes(reader.nativeHandle);
offset = 0;
}

if (bytes != null) {
return bytes[offset++] & 0xFF;
}

return -1;
}

@Override
public void close() throws IOException {
reader.close();
}

private static native long constructReader(long op, String path);

private static native long disposeReader(long reader);

private static native byte[] readNextBytes(long reader);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.opendal;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;

public class OperatorOutputStream extends OutputStream {
private static class Writer extends NativeObject {
private Writer(long nativeHandle) {
super(nativeHandle);
}

@Override
protected void disposeInternal(long handle) {
disposeWriter(handle);
}
}

private static final int MAX_BYTES = 16384;

private final Writer writer;
private final byte[] bytes = new byte[MAX_BYTES];

private int offset = 0;

public OperatorOutputStream(BlockingOperator operator, String path) {
final long op = operator.nativeHandle;
this.writer = new Writer(constructWriter(op, path));
}

@Override
public void write(int b) throws IOException {
bytes[offset++] = (byte) b;
if (offset >= MAX_BYTES) {
flush();
}
}

@Override
public void flush() throws IOException {
if (offset > MAX_BYTES) {
throw new IOException("INTERNAL ERROR: " + offset + " > " + MAX_BYTES);
} else if (offset < MAX_BYTES) {
final byte[] bytes = Arrays.copyOf(this.bytes, offset);
writeBytes(writer.nativeHandle, bytes);
} else {
writeBytes(writer.nativeHandle, bytes);
}
offset = 0;
}

@Override
public void close() throws IOException {
flush();
writer.close();
}

private static native long constructWriter(long op, String path);

private static native long disposeWriter(long writer);

private static native byte[] writeBytes(long writer, byte[] bytes);
}
92 changes: 92 additions & 0 deletions bindings/java/src/operator_input_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::convert::jstring_to_string;
use jni::objects::{JByteArray, JClass, JObject, JString};
use jni::sys::{jbyteArray, jlong};
use jni::JNIEnv;
use opendal::{BlockingOperator, StdBytesIterator};

/// # Safety
///
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_OperatorInputStream_constructReader(
mut env: JNIEnv,
_: JClass,
op: *mut BlockingOperator,
path: JString,
) -> jlong {
intern_construct_reader(&mut env, &mut *op, path).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}

fn intern_construct_reader(
env: &mut JNIEnv,
op: &mut BlockingOperator,
path: JString,
) -> crate::Result<jlong> {
let path = jstring_to_string(env, &path)?;
let reader = op.reader(&path)?.into_bytes_iterator(..);
Ok(Box::into_raw(Box::new(reader)) as jlong)
}

/// # Safety
///
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_OperatorInputStream_disposeReader(
_: JNIEnv,
_: JClass,
reader: *mut StdBytesIterator,
) {
drop(Box::from_raw(reader));
}

/// # Safety
///
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_OperatorInputStream_readNextBytes(
mut env: JNIEnv,
_: JClass,
reader: *mut StdBytesIterator,
) -> jbyteArray {
intern_read_next_bytes(&mut env, &mut *reader).unwrap_or_else(|e| {
e.throw(&mut env);
JByteArray::default().into_raw()
})
}

fn intern_read_next_bytes(
env: &mut JNIEnv,
reader: &mut StdBytesIterator,
) -> crate::Result<jbyteArray> {
match reader
.next()
.transpose()
.map_err(|err| opendal::Error::new(opendal::ErrorKind::Unexpected, &err.to_string()))?
{
None => Ok(JObject::null().into_raw()),
Some(content) => {
let result = env.byte_array_from_slice(&content)?;
Ok(result.into_raw())
}
}
}
Loading

0 comments on commit ac69437

Please sign in to comment.