Skip to content

Commit

Permalink
feat: implement OperatorOutputStream
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed May 18, 2024
1 parent f8d2a61 commit f9a4e69
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 10 deletions.
12 changes: 12 additions & 0 deletions bindings/java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

<!-- library dependencies -->
<assertj.version>3.23.1</assertj.version>
<commons-io.version>2.16.1</commons-io.version>
<dotenv.version>2.3.2</dotenv.version>
<lombok.version>1.18.30</lombok.version>
<slf4j.version>2.0.7</slf4j.version>
Expand Down Expand Up @@ -110,6 +111,12 @@
<artifactId>dotenv-java</artifactId>
<version>${dotenv.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
Expand Down Expand Up @@ -151,6 +158,11 @@
<artifactId>dotenv-java</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
Expand Down
1 change: 1 addition & 0 deletions bindings/java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod executor;
mod layer;
mod operator;
mod operator_input_stream;
mod operator_output_stream;
mod utility;

pub(crate) type Result<T> = std::result::Result<T, error::Error>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public void write(String path, byte[] content) {
write(nativeHandle, path, content);
}

public OperatorOutputStream createOutputStream(String path) {
return new OperatorOutputStream(this, path);
}

public byte[] read(String path) {
return read(nativeHandle, path);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.opendal;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;

public class OperatorOutputStream extends OutputStream {
private static class Writer extends NativeObject {
private Writer(long nativeHandle) {
super(nativeHandle);
}

@Override
protected void disposeInternal(long handle) {
disposeWriter(handle);
}
}

private static final int MAX_BYTES = 16384;

private final Writer writer;
private final byte[] bytes = new byte[MAX_BYTES];

private int offset = 0;

public OperatorOutputStream(BlockingOperator operator, String path) {
final long op = operator.nativeHandle;
this.writer = new Writer(constructWriter(op, path));
}

@Override
public void write(int b) throws IOException {
bytes[offset++] = (byte) b;
if (offset >= MAX_BYTES) {
flush();
}
}

@Override
public void flush() throws IOException {
if (offset > MAX_BYTES) {
throw new IOException("INTERNAL ERROR: " + offset + " > " + MAX_BYTES);
} else if (offset < MAX_BYTES) {
final byte[] bytes = Arrays.copyOf(this.bytes, offset);
writeBytes(writer.nativeHandle, bytes);
} else {
writeBytes(writer.nativeHandle, bytes);
}
offset = 0;
}

@Override
public void close() throws IOException {
flush();
writer.close();
}

private static native long constructWriter(long op, String path);

private static native long disposeWriter(long writer);

private static native byte[] writeBytes(long writer, byte[] bytes);
}
4 changes: 2 additions & 2 deletions bindings/java/src/operator_input_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ pub unsafe extern "system" fn Java_org_apache_opendal_OperatorInputStream_readNe
_: JClass,
reader: *mut StdBytesIterator,
) -> jbyteArray {
intern_reader_next_bytes(&mut env, &mut *reader).unwrap_or_else(|e| {
intern_read_next_bytes(&mut env, &mut *reader).unwrap_or_else(|e| {
e.throw(&mut env);
JByteArray::default().into_raw()
})
}

fn intern_reader_next_bytes(
fn intern_read_next_bytes(
env: &mut JNIEnv,
reader: &mut StdBytesIterator,
) -> crate::Result<jbyteArray> {
Expand Down
95 changes: 95 additions & 0 deletions bindings/java/src/operator_output_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use jni::objects::{JByteArray, JClass, JString};
use jni::sys::jlong;
use jni::JNIEnv;

use opendal::{BlockingOperator, BlockingWriter};

use crate::convert::jstring_to_string;

/// # Safety
///
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_OperatorOutputStream_constructWriter(
mut env: JNIEnv,
_: JClass,
op: *mut BlockingOperator,
path: JString,
) -> jlong {
intern_construct_write(&mut env, &mut *op, path).unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}

fn intern_construct_write(
env: &mut JNIEnv,
op: &mut BlockingOperator,
path: JString,
) -> crate::Result<jlong> {
let path = jstring_to_string(env, &path)?;
let writer = op.writer(&path)?;
Ok(Box::into_raw(Box::new(writer)) as jlong)
}

/// # Safety
///
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_OperatorOutputStream_disposeWriter(
mut env: JNIEnv,
_: JClass,
writer: *mut BlockingWriter,
) {
let mut writer = Box::from_raw(writer);
intern_dispose_write(&mut writer).unwrap_or_else(|e| {
e.throw(&mut env);
})
}

fn intern_dispose_write(writer: &mut BlockingWriter) -> crate::Result<()> {
writer.close()?;
Ok(())
}

/// # Safety
///
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_OperatorOutputStream_writeBytes(
mut env: JNIEnv,
_: JClass,
writer: *mut BlockingWriter,
content: JByteArray,
) {
intern_write_bytes(&mut env, &mut *writer, content).unwrap_or_else(|e| {
e.throw(&mut env);
})
}

fn intern_write_bytes(
env: &mut JNIEnv,
writer: &mut BlockingWriter,
content: JByteArray,
) -> crate::Result<()> {
let content = env.convert_byte_array(content)?;
writer.write(content)?;
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,27 +29,28 @@
import java.util.stream.Stream;
import org.apache.opendal.BlockingOperator;
import org.apache.opendal.OperatorInputStream;
import org.apache.opendal.OperatorOutputStream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public class OperatorInputStreamTest {
public class OperatorInputOutputStreamTest {
@TempDir
private static Path tempDir;

@Test
void testReadWithInputStream() throws Exception {
void testReadWriteWithStream() throws Exception {
final Map<String, String> conf = new HashMap<>();
conf.put("root", tempDir.toString());

try (final BlockingOperator op = BlockingOperator.of("fs", conf)) {
final String path = "OperatorInputStreamTest.txt";
final StringBuilder content = new StringBuilder();
final String path = "OperatorInputOutputStreamTest.txt";
final long multi = 1024 * 1024;

final long multi = 1024;
for (long i = 0; i < multi; i++) {
content.append("[content] OperatorInputStreamTest\n");
try (final OperatorOutputStream os = op.createOutputStream(path)) {
for (long i = 0; i < multi; i++) {
os.write("[content] OperatorInputStreamTest\n".getBytes());
}
}
op.write(path, content.toString().getBytes());

try (final OperatorInputStream is = op.createInputStream(path)) {
final Stream<String> lines = new BufferedReader(new InputStreamReader(is)).lines();
Expand Down

0 comments on commit f9a4e69

Please sign in to comment.