Skip to content

Commit

Permalink
feat(bindings/java): explicit async runtime (#4376)
Browse files Browse the repository at this point in the history
* feat: explicit async runtime

Signed-off-by: tison <wander4096@gmail.com>

* add executor param everywhere

Signed-off-by: tison <wander4096@gmail.com>

* pipe

Signed-off-by: tison <wander4096@gmail.com>

* fixup

Signed-off-by: tison <wander4096@gmail.com>

* add test

Signed-off-by: tison <wander4096@gmail.com>

* license header

Signed-off-by: tison <wander4096@gmail.com>

* tidy

Signed-off-by: tison <wander4096@gmail.com>

* docs and errors

Signed-off-by: tison <wander4096@gmail.com>

---------

Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Mar 18, 2024
1 parent b2146b8 commit 4c1752b
Show file tree
Hide file tree
Showing 9 changed files with 430 additions and 168 deletions.
2 changes: 2 additions & 0 deletions bindings/java/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
.mvn/wrapper/maven-wrapper.jar
Cargo.lock

*.log
22 changes: 11 additions & 11 deletions bindings/java/src/blocking_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::Result;

/// # Safety
///
/// This function should not be called before the Operator are ready.
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_disposeInternal(
_: JNIEnv,
Expand All @@ -46,7 +46,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_disposeIn

/// # Safety
///
/// This function should not be called before the Operator are ready.
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_duplicate(
_: JNIEnv,
Expand All @@ -59,7 +59,7 @@ pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_duplicate

/// # Safety
///
/// This function should not be called before the Operator are ready.
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_read(
mut env: JNIEnv,
Expand All @@ -82,7 +82,7 @@ fn intern_read(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString) -> Re

/// # Safety
///
/// This function should not be called before the Operator are ready.
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_write(
mut env: JNIEnv,
Expand All @@ -109,7 +109,7 @@ fn intern_write(

/// # Safety
///
/// This function should not be called before the Operator are ready.
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_stat(
mut env: JNIEnv,
Expand All @@ -131,7 +131,7 @@ fn intern_stat(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString) -> Re

/// # Safety
///
/// This function should not be called before the Operator are ready.
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_delete(
mut env: JNIEnv,
Expand All @@ -151,7 +151,7 @@ fn intern_delete(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString) ->

/// # Safety
///
/// This function should not be called before the Operator are ready.
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_createDir(
mut env: JNIEnv,
Expand All @@ -171,7 +171,7 @@ fn intern_create_dir(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString)

/// # Safety
///
/// This function should not be called before the Operator are ready.
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_copy(
mut env: JNIEnv,
Expand Down Expand Up @@ -199,7 +199,7 @@ fn intern_copy(

/// # Safety
///
/// This function should not be called before the Operator are ready.
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_rename(
mut env: JNIEnv,
Expand Down Expand Up @@ -227,7 +227,7 @@ fn intern_rename(

/// # Safety
///
/// This function should not be called before the Operator are ready.
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_removeAll(
mut env: JNIEnv,
Expand All @@ -248,7 +248,7 @@ fn intern_remove_all(env: &mut JNIEnv, op: &mut BlockingOperator, path: JString)

/// # Safety
///
/// This function should not be called before the Operator are ready.
/// This function should not be called before the Operator is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_BlockingOperator_list(
mut env: JNIEnv,
Expand Down
152 changes: 152 additions & 0 deletions bindings/java/src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::cell::RefCell;
use std::ffi::c_void;
use std::future::Future;

use jni::objects::{JClass, JObject};
use jni::sys::jlong;
use jni::{JNIEnv, JavaVM};
use once_cell::sync::OnceCell;
use tokio::task::JoinHandle;

use crate::Result;

static mut RUNTIME: OnceCell<Executor> = OnceCell::new();
thread_local! {
static ENV: RefCell<Option<*mut jni::sys::JNIEnv>> = RefCell::new(None);
}

/// # Safety
///
/// This function could be only called by java vm when unload this lib.
#[no_mangle]
pub unsafe extern "system" fn JNI_OnUnload(_: JavaVM, _: *mut c_void) {
let _ = RUNTIME.take();
}

/// # Safety
///
/// This function could be only called when the lib is loaded and within an executor thread.
pub(crate) unsafe fn get_current_env<'local>() -> JNIEnv<'local> {
let env = ENV
.with(|cell| *cell.borrow_mut())
.expect("env must be available");
JNIEnv::from_raw(env).expect("env must be valid")
}

pub enum Executor {
Tokio(tokio::runtime::Runtime),
}

impl Executor {
pub fn enter_with<F, R>(&self, f: F) -> R
where
F: FnOnce() -> R,
{
match self {
Executor::Tokio(e) => {
let _guard = e.enter();
f()
}
}
}

pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Executor::Tokio(e) => e.spawn(future),
}
}
}

#[no_mangle]
pub extern "system" fn Java_org_apache_opendal_AsyncExecutor_makeTokioExecutor(
mut env: JNIEnv,
_: JClass,
cores: usize,
) -> jlong {
make_tokio_executor(&mut env, cores)
.map(|executor| Box::into_raw(Box::new(executor)) as jlong)
.unwrap_or_else(|e| {
e.throw(&mut env);
0
})
}

/// # Safety
///
/// This function should not be called before the AsyncExecutor is ready.
#[no_mangle]
pub unsafe extern "system" fn Java_org_apache_opendal_AsyncExecutor_disposeInternal(
_: JNIEnv,
_: JObject,
executor: *mut Executor,
) {
drop(Box::from_raw(executor));
}

pub(crate) fn make_tokio_executor(env: &mut JNIEnv, cores: usize) -> Result<Executor> {
let vm = env.get_java_vm().expect("JavaVM must be available");
let executor = tokio::runtime::Builder::new_multi_thread()
.worker_threads(cores)
.on_thread_start(move || {
ENV.with(|cell| {
let env = vm
.attach_current_thread_as_daemon()
.expect("attach thread must succeed");
*cell.borrow_mut() = Some(env.get_raw());
})
})
.enable_all()
.build()
.map_err(|e| {
opendal::Error::new(
opendal::ErrorKind::Unexpected,
"Failed to create tokio runtime.",
)
.set_source(e)
})?;
Ok(Executor::Tokio(executor))
}

/// # Safety
///
/// This function could be only when the lib is loaded.
pub(crate) unsafe fn executor_or_default<'a>(
env: &mut JNIEnv<'a>,
executor: *const Executor,
) -> &'a Executor {
if executor.is_null() {
default_executor(env)
} else {
&*executor
}
}

/// # Safety
///
/// This function could be only when the lib is loaded.
unsafe fn default_executor<'a>(env: &mut JNIEnv<'a>) -> &'a Executor {
RUNTIME
.get_or_try_init(|| make_tokio_executor(env, num_cpus::get()))
.expect("default executor must be able to initialize")
}
63 changes: 2 additions & 61 deletions bindings/java/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,92 +15,33 @@
// specific language governing permissions and limitations
// under the License.

use std::cell::RefCell;
use std::collections::HashMap;
use std::ffi::c_void;

use jni::objects::JObject;
use jni::objects::JValue;
use jni::sys::jboolean;
use jni::sys::jint;
use jni::sys::jlong;
use jni::sys::JNI_VERSION_1_8;
use jni::JNIEnv;
use jni::JavaVM;
use once_cell::sync::OnceCell;

use opendal::raw::PresignedRequest;
use opendal::Capability;
use opendal::Entry;
use opendal::EntryMode;
use opendal::Metadata;
use opendal::Metakey;
use opendal::OperatorInfo;
use tokio::runtime::Builder;
use tokio::runtime::Runtime;

mod blocking_operator;
mod convert;
mod error;
mod executor;
mod layer;
mod operator;
mod utility;

pub(crate) type Result<T> = std::result::Result<T, error::Error>;

static mut RUNTIME: OnceCell<Runtime> = OnceCell::new();
thread_local! {
static ENV: RefCell<Option<*mut jni::sys::JNIEnv>> = RefCell::new(None);
}

/// # Safety
///
/// This function could be only called by java vm when load this lib.
#[no_mangle]
pub unsafe extern "system" fn JNI_OnLoad(vm: JavaVM, _: *mut c_void) -> jint {
RUNTIME
.set(
Builder::new_multi_thread()
.worker_threads(num_cpus::get())
.on_thread_start(move || {
ENV.with(|cell| {
let env = vm.attach_current_thread_as_daemon().unwrap();
*cell.borrow_mut() = Some(env.get_raw());
})
})
.enable_all()
.build()
.unwrap(),
)
.unwrap();

JNI_VERSION_1_8
}

/// # Safety
///
/// This function could be only called by java vm when unload this lib.
#[no_mangle]
pub unsafe extern "system" fn JNI_OnUnload(_: JavaVM, _: *mut c_void) {
if let Some(r) = RUNTIME.take() {
r.shutdown_background()
}
}

/// # Safety
///
/// This function could be only when the lib is loaded and within a RUNTIME-spawned thread.
unsafe fn get_current_env<'local>() -> JNIEnv<'local> {
let env = ENV.with(|cell| *cell.borrow_mut()).unwrap();
JNIEnv::from_raw(env).unwrap()
}

/// # Safety
///
/// This function could be only when the lib is loaded.
unsafe fn get_global_runtime<'local>() -> &'local Runtime {
RUNTIME.get_unchecked()
}

fn make_presigned_request<'a>(env: &mut JNIEnv<'a>, req: PresignedRequest) -> Result<JObject<'a>> {
let method = env.new_string(req.method().as_str())?;
let uri = env.new_string(req.uri().to_string())?;
Expand Down
40 changes: 40 additions & 0 deletions bindings/java/src/main/java/org/apache/opendal/AsyncExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.opendal;

/**
* AsyncExecutor represents an underneath OpenDAL executor that runs async tasks spawned in the Rust world.
*
* <p>If the executor is passed to construct operators, the executor must outlive the operators.</p>
*/
public class AsyncExecutor extends NativeObject {
public static AsyncExecutor createTokioExecutor(int cores) {
return new AsyncExecutor(makeTokioExecutor(cores));
}

private AsyncExecutor(long nativeHandle) {
super(nativeHandle);
}

@Override
protected native void disposeInternal(long handle);

private static native long makeTokioExecutor(int cores);
}
Loading

0 comments on commit 4c1752b

Please sign in to comment.