Skip to content

Commit

Permalink
Add async resource with hasRef to threadsafe functions
Browse files Browse the repository at this point in the history
Resolves #948
  • Loading branch information
kjvalencik committed Nov 29, 2022
1 parent c346524 commit c8ede88
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 17 deletions.
18 changes: 14 additions & 4 deletions crates/neon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,21 @@ pub use neon_macros::*;
#[cfg(feature = "napi-6")]
mod lifecycle;

#[cfg(feature = "napi-8")]
#[repr(u64)]
// Note: `upper` must be non-zero or `napi_check_object_type_tag` will always return false
// https://github.com/nodejs/node/blob/5fad0b93667ffc6e4def52996b9529ac99b26319/src/js_native_api_v8.cc#L2455
pub(crate) enum UpperTypeTag {
Module = 1,
Tsfn = 2,
}

#[cfg(feature = "napi-8")]
static MODULE_TAG: once_cell::sync::Lazy<crate::sys::TypeTag> = once_cell::sync::Lazy::new(|| {
let mut lower = [0; std::mem::size_of::<u64>()];

// Generating a random module tag at runtime allows Neon builds to be reproducible. A few
// alternativeswere considered:
// alternatives were considered:
// * Generating a random value at build time; this reduces runtime dependencies but, breaks
// reproducible builds
// * A static random value; this solves the previous issues, but does not protect against ABI
Expand All @@ -123,7 +132,8 @@ static MODULE_TAG: once_cell::sync::Lazy<crate::sys::TypeTag> = once_cell::sync:
// expansion of implementation.
let lower = u64::from_ne_bytes(lower);

// Note: `upper` must be non-zero or `napi_check_object_type_tag` will always return false
// https://github.com/nodejs/node/blob/5fad0b93667ffc6e4def52996b9529ac99b26319/src/js_native_api_v8.cc#L2455
crate::sys::TypeTag { lower, upper: 1 }
crate::sys::TypeTag {
lower,
upper: UpperTypeTag::Module as u64,
}
});
11 changes: 11 additions & 0 deletions crates/neon/src/sys/bindings/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,17 @@ mod napi1 {
message: *const c_char,
message_len: usize,
);

fn wrap(
env: Env,
js_object: Value,
native_object: *mut c_void,
finalize_cb: Finalize,
finalize_hint: *mut c_void,
result: *mut Ref,
) -> Status;

fn unwrap(env: Env, js_object: Value, result: *mut *mut c_void) -> Status;
}
);
}
Expand Down
177 changes: 164 additions & 13 deletions crates/neon/src/sys/tsfn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ const BOUNDARY: FailureBoundary = FailureBoundary {
panic: "A panic occurred while executing a `neon::event::Channel::send` callback",
};

#[cfg(feature = "napi-8")]
// Identifies state stored in the async resource of a threadsafe function
static TSFN_TAG: once_cell::sync::Lazy<crate::sys::TypeTag> = once_cell::sync::Lazy::new(|| {
let mut tag = *crate::MODULE_TAG;
tag.upper = crate::UpperTypeTag::Tsfn as u64;
tag
});

#[derive(Debug)]
struct Tsfn(napi::ThreadsafeFunction);

Expand All @@ -27,10 +35,16 @@ unsafe impl Sync for Tsfn {}
/// function for scheduling tasks to execute on a JavaScript thread.
pub struct ThreadsafeFunction<T> {
tsfn: Tsfn,
is_finalized: Arc<Mutex<bool>>,
state: Arc<Mutex<State>>,
callback: fn(Option<Env>, T),
}

#[derive(Debug)]
struct State {
is_finalized: bool,
has_ref: bool,
}

#[derive(Debug)]
struct Callback<T> {
callback: fn(Option<Env>, T),
Expand All @@ -40,6 +54,136 @@ struct Callback<T> {
/// Error returned when scheduling a threadsafe function with some data
pub struct CallError;

unsafe extern "C" fn get_ref_callback(env: Env, info: napi::CallbackInfo) -> napi::Value {
let complete = |result| {
let mut out = MaybeUninit::uninit();
assert_eq!(
napi::get_boolean(env, result, out.as_mut_ptr()),
napi::Status::Ok
);
out.assume_init()
};

// If we hit _any_ failure condition, assume the threadsafe function is referenced
let bail = || complete(true);
let mut this = MaybeUninit::uninit();

if napi::get_cb_info(
env,
info,
ptr::null_mut(),
ptr::null_mut(),
this.as_mut_ptr(),
ptr::null_mut(),
) != napi::Status::Ok
{
return bail();
}

let this = this.assume_init();

#[cfg(feature = "napi-8")]
{
let mut result = false;

if napi::check_object_type_tag(env, this, &*TSFN_TAG as *const _, &mut result as *mut _)
!= napi::Status::Ok
|| !result
{
return bail();
}
}

let mut state = MaybeUninit::uninit();

if napi::unwrap(env, this, state.as_mut_ptr()) != napi::Status::Ok {
return bail();
}

let state = &*state.assume_init().cast::<Mutex<State>>();
let is_ref = state.lock().map(|state| state.has_ref).unwrap_or(true);

complete(is_ref)
}

unsafe extern "C" fn drop_state(_env: Env, data: *mut c_void, _hint: *mut c_void) {
drop(Arc::<Mutex<State>>::from_raw(data.cast()))
}

unsafe fn create_async_resource(env: Env, state: Arc<Mutex<State>>) -> napi::Value {
let get_ref_fn_name = "hasRef";

let get_ref_fn = {
let mut get_ref_fn = MaybeUninit::uninit();

assert_eq!(
napi::create_function(
env,
get_ref_fn_name.as_ptr().cast(),
get_ref_fn_name.len(),
Some(get_ref_callback),
ptr::null_mut(),
get_ref_fn.as_mut_ptr(),
),
napi::Status::Ok,
);

get_ref_fn.assume_init()
};

let resource = {
let mut resource = MaybeUninit::uninit();

assert_eq!(
napi::create_object(env, resource.as_mut_ptr()),
napi::Status::Ok
);

resource.assume_init()
};

let get_ref_key = {
let mut key = MaybeUninit::uninit();

assert_eq!(
napi::create_string_utf8(
env,
get_ref_fn_name.as_ptr().cast(),
get_ref_fn_name.len(),
key.as_mut_ptr()
),
napi::Status::Ok
);

key.assume_init()
};

assert_eq!(
napi::set_property(env, resource, get_ref_key, get_ref_fn),
napi::Status::Ok
);

assert_eq!(
napi::wrap(
env,
resource,
Arc::into_raw(state) as *mut _,
Some(drop_state),
ptr::null_mut(),
ptr::null_mut(),
),
napi::Status::Ok
);

#[cfg(feature = "napi-8")]
assert_eq!(
napi::type_tag_object(env, resource, &*TSFN_TAG),
napi::Status::Ok
);

resource
}

impl<T: Send + 'static> ThreadsafeFunction<T> {
/// Creates a new unbounded N-API Threadsafe Function
/// Safety: `Env` must be valid for the current thread
Expand All @@ -55,19 +199,22 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
callback: fn(Option<Env>, T),
) -> Self {
let mut result = MaybeUninit::uninit();
let is_finalized = Arc::new(Mutex::new(false));
let state = Arc::new(Mutex::new(State {
is_finalized: false,
has_ref: true,
}));

assert_eq!(
napi::create_threadsafe_function(
env,
std::ptr::null_mut(),
std::ptr::null_mut(),
ptr::null_mut(),
create_async_resource(env, state.clone()),
super::string(env, "neon threadsafe function"),
max_queue_size,
// Always set the reference count to 1. Prefer using
// Rust `Arc` to maintain the struct.
1,
Arc::into_raw(is_finalized.clone()) as *mut _,
Arc::into_raw(state.clone()) as *mut _,
Some(Self::finalize),
std::ptr::null_mut(),
Some(Self::callback),
Expand All @@ -78,7 +225,7 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

Self {
tsfn: Tsfn(result.assume_init()),
is_finalized,
state,
callback,
}
}
Expand All @@ -98,10 +245,10 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

// Hold the lock before entering `call_threadsafe_function` so that
// `finalize_cb` would never complete.
let mut is_finalized = self.is_finalized.lock().unwrap();
let mut state = self.state.lock().unwrap();

let status = {
if *is_finalized {
if state.is_finalized {
napi::Status::Closing
} else {
unsafe {
Expand All @@ -115,7 +262,7 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
} else {
// Prevent further calls to `call_threadsafe_function`
if status == napi::Status::Closing {
*is_finalized = true;
state.is_finalized = true;
}

// If the call failed, the callback won't execute
Expand All @@ -128,27 +275,31 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
/// References a threadsafe function to prevent exiting the event loop until it has been dropped. (Default)
/// Safety: `Env` must be valid for the current thread
pub unsafe fn reference(&self, env: Env) {
let mut state = self.state.lock().unwrap();
assert_eq!(
napi::ref_threadsafe_function(env, self.tsfn.0),
napi::Status::Ok,
);
state.has_ref = true;
}

/// Unreferences a threadsafe function to allow exiting the event loop before it has been dropped.
/// Safety: `Env` must be valid for the current thread
pub unsafe fn unref(&self, env: Env) {
let mut state = self.state.lock().unwrap();
assert_eq!(
napi::unref_threadsafe_function(env, self.tsfn.0),
napi::Status::Ok,
);
state.has_ref = false;
}

// Provides a C ABI wrapper for a napi callback notifying us about tsfn
// being finalized.
unsafe extern "C" fn finalize(_env: Env, data: *mut c_void, _hint: *mut c_void) {
let is_finalized = Arc::from_raw(data as *mut Mutex<bool>);
let state = Arc::from_raw(data as *mut Mutex<State>);

*is_finalized.lock().unwrap() = true;
state.lock().unwrap().is_finalized = true;
}

// Provides a C ABI wrapper for invoking the user supplied function pointer
Expand Down Expand Up @@ -179,10 +330,10 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

impl<T> Drop for ThreadsafeFunction<T> {
fn drop(&mut self) {
let is_finalized = self.is_finalized.lock().unwrap();
let state = self.state.lock().unwrap();

// tsfn was already finalized by `Environment::CleanupHandles()` in Node.js
if *is_finalized {
if state.is_finalized {
return;
}

Expand Down

0 comments on commit c8ede88

Please sign in to comment.