Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse ThreadsafeFunction in EventQueue #739

Merged
merged 12 commits into from
Jun 29, 2021
4 changes: 2 additions & 2 deletions crates/neon-runtime/src/napi/tsfn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

/// References a threadsafe function to prevent exiting the event loop until it has been dropped. (Default)
/// Safety: `Env` must be valid for the current thread
pub unsafe fn reference(&mut self, env: Env) {
pub unsafe fn reference(&self, env: Env) {
assert_eq!(
napi::ref_threadsafe_function(env, self.tsfn.0,),
napi::Status::Ok,
Expand All @@ -162,7 +162,7 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

/// Unreferences a threadsafe function to allow exiting the event loop before it has been dropped.
/// Safety: `Env` must be valid for the current thread
pub unsafe fn unref(&mut self, env: Env) {
pub unsafe fn unref(&self, env: Env) {
assert_eq!(
napi::unref_threadsafe_function(env, self.tsfn.0,),
napi::Status::Ok,
Expand Down
15 changes: 13 additions & 2 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ use crate::context::internal::Env;
#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
use crate::event::Channel;
use crate::handle::{Handle, Managed};
#[cfg(all(feature = "napi-6", feature = "event-queue-api"))]
use crate::lifecycle::InstanceData;
#[cfg(feature = "legacy-runtime")]
use crate::object::class::Class;
use crate::object::{Object, This};
Expand Down Expand Up @@ -550,9 +552,18 @@ pub trait Context<'a>: ContextInternal<'a> {
}

#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
/// Creates an unbounded channel for scheduling events to be executed on the JavaScript thread.
/// Returns an unbounded channel for scheduling events to be executed on the JavaScript thread.
///
/// When using N-API >= 6,the channel returned by this method is backed by a shared queue.
/// To create a channel backed by a _new_ queue see [`Channel`](crate::event::Channel).
fn channel(&mut self) -> Channel {
Channel::new(self)
#[cfg(feature = "napi-6")]
let channel = InstanceData::channel(self);

#[cfg(not(feature = "napi-6"))]
let channel = Channel::new(self);

channel
}

#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
Expand Down
150 changes: 131 additions & 19 deletions src/event/event_queue.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use neon_runtime::raw::Env;
use neon_runtime::tsfn::ThreadsafeFunction;

Expand All @@ -8,6 +11,9 @@ type Callback = Box<dyn FnOnce(Env) + Send + 'static>;

/// Channel for scheduling Rust closures to execute on the JavaScript main thread.
///
/// Cloning a `Channel` will create a new channel that shares a backing queue for
/// events.
///
/// # Example
///
/// The following example spawns a standard Rust thread to complete a computation
Expand Down Expand Up @@ -50,7 +56,7 @@ type Callback = Box<dyn FnOnce(Env) + Send + 'static>;
/// ```

pub struct Channel {
tsfn: ThreadsafeFunction<Callback>,
state: Arc<ChannelState>,
has_ref: bool,
}

Expand All @@ -64,31 +70,35 @@ impl Channel {
/// Creates an unbounded channel for scheduling closures on the JavaScript
/// main thread
pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self {
let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) };

Self {
tsfn,
state: Arc::new(ChannelState::new(cx)),
has_ref: true,
}
}

/// Allow the Node event loop to exit while this `Channel` exists.
/// _Idempotent_
pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
kjvalencik marked this conversation as resolved.
Show resolved Hide resolved
self.has_ref = false;

unsafe { self.tsfn.unref(cx.env().to_raw()) }
// Already unreferenced
if !self.has_ref {
return self;
}

self.has_ref = false;
self.state.unref(cx);
self
}

/// Prevent the Node event loop from exiting while this `Channel` exists. (Default)
/// _Idempotent_
pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
self.has_ref = true;

unsafe { self.tsfn.reference(cx.env().to_raw()) }
// Already referenced
if self.has_ref {
return self;
}

self.has_ref = true;
self.state.reference(cx);
self
}

Expand Down Expand Up @@ -117,27 +127,79 @@ impl Channel {
});
});

self.tsfn.call(callback, None).map_err(|_| SendError)
self.state.tsfn.call(callback, None).map_err(|_| SendError)
}

/// Returns a boolean indicating if this `Channel` will prevent the Node event
/// loop from exiting.
pub fn has_ref(&self) -> bool {
self.has_ref
}
}

// Monomorphized trampoline funciton for calling the user provided closure
fn callback(env: Option<Env>, callback: Callback) {
if let Some(env) = env {
callback(env);
} else {
crate::context::internal::IS_RUNNING.with(|v| {
*v.borrow_mut() = false;
});
impl Clone for Channel {
/// Returns a clone of the Channel instance that shares the internal
/// unbounded queue with the original channel. Scheduling callbacks on the
/// same queue is faster than using separate channels, but might lead to
/// starvation if one of the threads posts significantly more callbacks on
/// the channel than the other one.
///
/// Cloned and referenced Channel instances might trigger additional
/// event-loop tick when dropped. Channel can be wrapped into an Arc and
/// shared between different threads/callers to avoid this.
fn clone(&self) -> Self {
// Not referenced, we can simply clone the fields
if !self.has_ref {
return Self {
state: self.state.clone(),
has_ref: false,
};
}

let state = Arc::clone(&self.state);

// Only need to increase the ref count since the tsfn is already referenced
state.ref_count.fetch_add(1, Ordering::Relaxed);

Self {
state,
has_ref: true,
}
}
}

impl Drop for Channel {
fn drop(&mut self) {
// Not a referenced event queue
if !self.has_ref {
return;
}

// It was only us who kept the `ChannelState` alive. No need to unref
// the `tsfn`, because it is going to be dropped once this function
// returns.
if Arc::strong_count(&self.state) == 1 {
kjvalencik marked this conversation as resolved.
Show resolved Hide resolved
return;
}

// The ChannelState is dropped on a worker thread. We have to `unref`
// the tsfn on the UV thread after all pending closures. Note that in
// the most of scenarios the optimization in N-API layer would coalesce
// `send()` with a user-supplied closure and the unref send here into a
// single UV tick.
//
// If this ever has to be optimized a second `Arc` could be used to wrap
// the `state` and it could be cloned in `try_send` and unref'ed on the
// UV thread if strong reference count goes to 0.
let state = Arc::clone(&self.state);

self.send(move |mut cx| {
kjvalencik marked this conversation as resolved.
Show resolved Hide resolved
state.unref(&mut cx);
Ok(())
});
}
}

/// Error indicating that a closure was unable to be scheduled to execute on the event loop.
pub struct SendError;

Expand All @@ -154,3 +216,53 @@ impl std::fmt::Debug for SendError {
}

impl std::error::Error for SendError {}

struct ChannelState {
tsfn: ThreadsafeFunction<Callback>,
ref_count: AtomicUsize,
}

impl ChannelState {
fn new<'a, C: Context<'a>>(cx: &mut C) -> Self {
let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) };
Self {
tsfn,
ref_count: AtomicUsize::new(1),
}
}

fn reference<'a, C: Context<'a>>(&self, cx: &mut C) {
// We can use relaxed ordering because `reference()` can only be called
// on the Event-Loop thread.
if self.ref_count.fetch_add(1, Ordering::Relaxed) != 0 {
kjvalencik marked this conversation as resolved.
Show resolved Hide resolved
return;
}

unsafe {
self.tsfn.reference(cx.env().to_raw());
}
}

fn unref<'a, C: Context<'a>>(&self, cx: &mut C) {
// We can use relaxed ordering because `unref()` can only be called
// on the Event-Loop thread.
if self.ref_count.fetch_sub(1, Ordering::Relaxed) != 1 {
kjvalencik marked this conversation as resolved.
Show resolved Hide resolved
return;
}

unsafe {
self.tsfn.unref(cx.env().to_raw());
}
}

// Monomorphized trampoline funciton for calling the user provided closure
fn callback(env: Option<Env>, callback: Callback) {
if let Some(env) = env {
callback(env);
} else {
crate::context::internal::IS_RUNNING.with(|v| {
*v.borrow_mut() = false;
});
}
}
}
26 changes: 25 additions & 1 deletion src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use neon_runtime::reference;
use neon_runtime::tsfn::ThreadsafeFunction;

use crate::context::Context;
#[cfg(all(feature = "event-queue-api"))]
use crate::event::Channel;
use crate::handle::root::NapiRef;

/// `InstanceData` holds Neon data associated with a particular instance of a
Expand All @@ -30,6 +32,10 @@ pub(crate) struct InstanceData {
/// given the cost of FFI, this optimization is omitted until the cost of an
/// `Arc` is demonstrated as significant.
drop_queue: Arc<ThreadsafeFunction<NapiRef>>,

/// Shared `Channel` that is cloned to be returned by the `cx.channel()` method
#[cfg(all(feature = "event-queue-api"))]
shared_channel: Channel,
}

fn drop_napi_ref(env: Option<Env>, data: NapiRef) {
Expand Down Expand Up @@ -57,13 +63,22 @@ impl InstanceData {
}

let drop_queue = unsafe {
let mut queue = ThreadsafeFunction::new(env, drop_napi_ref);
let queue = ThreadsafeFunction::new(env, drop_napi_ref);
queue.unref(env);
queue
};

#[cfg(all(feature = "event-queue-api"))]
let shared_channel = {
let mut channel = Channel::new(cx);
channel.unref(cx);
channel
};

let data = InstanceData {
drop_queue: Arc::new(drop_queue),
#[cfg(all(feature = "event-queue-api"))]
shared_channel,
};

unsafe { &mut *neon_runtime::lifecycle::set_instance_data(env, data) }
Expand All @@ -73,4 +88,13 @@ impl InstanceData {
pub(crate) fn drop_queue<'a, C: Context<'a>>(cx: &mut C) -> Arc<ThreadsafeFunction<NapiRef>> {
Arc::clone(&InstanceData::get(cx).drop_queue)
}

/// Clones the shared channel and references it since new channels should start
/// referenced, but the shared channel is unreferenced.
#[cfg(all(feature = "event-queue-api"))]
pub(crate) fn channel<'a, C: Context<'a>>(cx: &mut C) -> Channel {
let mut channel = InstanceData::get(cx).shared_channel.clone();
channel.reference(cx);
channel
}
}