Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle early finalization of TSFN by Node.js #744

Merged
merged 1 commit into from
Jun 1, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions crates/neon-runtime/src/napi/tsfn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::ffi::c_void;
use std::mem::MaybeUninit;
use std::sync::{Arc, Mutex};

use crate::napi::bindings as napi;
use crate::raw::{Env, Local};
Expand Down Expand Up @@ -34,6 +35,7 @@ unsafe impl Sync for Tsfn {}
/// function for scheduling tasks to execute on a JavaScript thread.
pub struct ThreadsafeFunction<T> {
tsfn: Tsfn,
is_finalized: Arc<Mutex<bool>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be an AtomicBool instead of a Mutex<bool>? I'm also not clear why it needs to be an Arc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arc provides a nice way of passing the pointer to the C function, because unlike Box you can clone it before turning into a raw pointer, and then decrement the reference count when turning it back from the raw pointer to the Arc instance and dropping it.

I think it could be AtomicBool. Let me try that in a moment.

Copy link
Contributor

@indutny-signal indutny-signal May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to AtomicBool. @jrose-signal must be really happy to see this happen 😉

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it has to be something indirect because ThreadsafeFunction is passed around by value. Box is probably safe because the finalizer callback is guaranteed to be invoked before Drop::drop completes (either Node invokes it on shutdown or we invoke it in drop), but it's a lot easier to mess that up somehow.

callback: fn(Option<Env>, T),
}

Expand Down Expand Up @@ -76,6 +78,7 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
callback: fn(Option<Env>, T),
) -> Self {
let mut result = MaybeUninit::uninit();
let is_finalized = Arc::new(Mutex::new(false));

assert_eq!(
napi::create_threadsafe_function(
Expand All @@ -87,8 +90,8 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
// Always set the reference count to 1. Prefer using
// Rust `Arc` to maintain the struct.
1,
std::ptr::null_mut(),
None,
Arc::into_raw(is_finalized.clone()) as *mut _,
Some(Self::finalize),
std::ptr::null_mut(),
Some(Self::callback),
result.as_mut_ptr(),
Expand All @@ -98,6 +101,7 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

Self {
tsfn: Tsfn(result.assume_init()),
is_finalized: is_finalized,
callback,
}
}
Expand All @@ -115,12 +119,28 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
data,
}));

let status =
unsafe { napi::call_threadsafe_function(self.tsfn.0, callback as *mut _, is_blocking) };
// Hold the lock before entering `call_threadsafe_function` so that
// `finalize_cb` would never complete.
let mut is_finalized = self.is_finalized.lock().unwrap();

let status = {
if *is_finalized {
napi::Status::Closing
} else {
unsafe {
napi::call_threadsafe_function(self.tsfn.0, callback as *mut _, is_blocking)
}
}
};

if status == napi::Status::Ok {
Ok(())
} else {
// Prevent further calls to `call_threadsafe_function`
if status == napi::Status::Closing {
*is_finalized = true;
}

// If the call failed, the callback won't execute
let callback = unsafe { Box::from_raw(callback) };

Expand Down Expand Up @@ -149,6 +169,14 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
);
}

// Provides a C ABI wrapper for a napi callback notifying us about tsfn
// being finalized.
unsafe extern "C" fn finalize(_env: Env, data: *mut c_void, _hint: *mut c_void) {
let is_finalized = Arc::from_raw(data as *mut Mutex<bool>);

*is_finalized.lock().unwrap() = true;
}

// Provides a C ABI wrapper for invoking the user supplied function pointer
unsafe extern "C" fn callback(
env: Env,
Expand All @@ -167,6 +195,14 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

impl<T> Drop for ThreadsafeFunction<T> {
fn drop(&mut self) {
let is_finalized = self.is_finalized.lock().unwrap();

// tsfn was already finalized by `Environment::CleanupHandles()` in
// Node.js
if *is_finalized {
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you need to drop the lock before calling napi::release_threadsafe_function?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, actually I don't the idea here is that we want finalize_cb to block until release_threadsafe_function completes, because otherwise finalize_cb might finish first and delete the tsfn in C++.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mutex isn't re-entrant, though, so in the non-early-exit case you'll lock, call release_threadsafe_function, and fail when it's time to lock again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

release_threadsafe_function doesn't call finalize_cb directly. It schedules an event on the loop thread and executes finalize_cb in another tick.


unsafe {
napi::release_threadsafe_function(
self.tsfn.0,
Expand Down