diff --git a/crates/neon-runtime/src/napi/tsfn.rs b/crates/neon-runtime/src/napi/tsfn.rs index 313a7d032..eea90f78c 100644 --- a/crates/neon-runtime/src/napi/tsfn.rs +++ b/crates/neon-runtime/src/napi/tsfn.rs @@ -153,7 +153,7 @@ impl ThreadsafeFunction { /// References a threadsafe function to prevent exiting the event loop until it has been dropped. (Default) /// Safety: `Env` must be valid for the current thread - pub unsafe fn reference(&mut self, env: Env) { + pub unsafe fn reference(&self, env: Env) { assert_eq!( napi::ref_threadsafe_function(env, self.tsfn.0,), napi::Status::Ok, @@ -162,7 +162,7 @@ impl ThreadsafeFunction { /// Unreferences a threadsafe function to allow exiting the event loop before it has been dropped. /// Safety: `Env` must be valid for the current thread - pub unsafe fn unref(&mut self, env: Env) { + pub unsafe fn unref(&self, env: Env) { assert_eq!( napi::unref_threadsafe_function(env, self.tsfn.0,), napi::Status::Ok, diff --git a/src/context/mod.rs b/src/context/mod.rs index 325c005a4..3e697fe90 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -154,6 +154,8 @@ use crate::context::internal::Env; #[cfg(all(feature = "napi-4", feature = "event-queue-api"))] use crate::event::Channel; use crate::handle::{Handle, Managed}; +#[cfg(all(feature = "napi-6", feature = "event-queue-api"))] +use crate::lifecycle::InstanceData; #[cfg(feature = "legacy-runtime")] use crate::object::class::Class; use crate::object::{Object, This}; @@ -550,9 +552,18 @@ pub trait Context<'a>: ContextInternal<'a> { } #[cfg(all(feature = "napi-4", feature = "event-queue-api"))] - /// Creates an unbounded channel for scheduling events to be executed on the JavaScript thread. + /// Returns an unbounded channel for scheduling events to be executed on the JavaScript thread. + /// + /// When using N-API >= 6,the channel returned by this method is backed by a shared queue. + /// To create a channel backed by a _new_ queue see [`Channel`](crate::event::Channel). fn channel(&mut self) -> Channel { - Channel::new(self) + #[cfg(feature = "napi-6")] + let channel = InstanceData::channel(self); + + #[cfg(not(feature = "napi-6"))] + let channel = Channel::new(self); + + channel } #[cfg(all(feature = "napi-4", feature = "event-queue-api"))] diff --git a/src/event/event_queue.rs b/src/event/event_queue.rs index 605346b4b..6e71ba92f 100644 --- a/src/event/event_queue.rs +++ b/src/event/event_queue.rs @@ -1,3 +1,6 @@ +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + use neon_runtime::raw::Env; use neon_runtime::tsfn::ThreadsafeFunction; @@ -8,6 +11,9 @@ type Callback = Box; /// Channel for scheduling Rust closures to execute on the JavaScript main thread. /// +/// Cloning a `Channel` will create a new channel that shares a backing queue for +/// events. +/// /// # Example /// /// The following example spawns a standard Rust thread to complete a computation @@ -50,7 +56,7 @@ type Callback = Box; /// ``` pub struct Channel { - tsfn: ThreadsafeFunction, + state: Arc, has_ref: bool, } @@ -64,10 +70,8 @@ impl Channel { /// Creates an unbounded channel for scheduling closures on the JavaScript /// main thread pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self { - let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) }; - Self { - tsfn, + state: Arc::new(ChannelState::new(cx)), has_ref: true, } } @@ -75,20 +79,26 @@ impl Channel { /// Allow the Node event loop to exit while this `Channel` exists. /// _Idempotent_ pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self { - self.has_ref = false; - - unsafe { self.tsfn.unref(cx.env().to_raw()) } + // Already unreferenced + if !self.has_ref { + return self; + } + self.has_ref = false; + self.state.unref(cx); self } /// Prevent the Node event loop from exiting while this `Channel` exists. (Default) /// _Idempotent_ pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self { - self.has_ref = true; - - unsafe { self.tsfn.reference(cx.env().to_raw()) } + // Already referenced + if self.has_ref { + return self; + } + self.has_ref = true; + self.state.reference(cx); self } @@ -117,7 +127,7 @@ impl Channel { }); }); - self.tsfn.call(callback, None).map_err(|_| SendError) + self.state.tsfn.call(callback, None).map_err(|_| SendError) } /// Returns a boolean indicating if this `Channel` will prevent the Node event @@ -125,19 +135,71 @@ impl Channel { pub fn has_ref(&self) -> bool { self.has_ref } +} - // Monomorphized trampoline funciton for calling the user provided closure - fn callback(env: Option, callback: Callback) { - if let Some(env) = env { - callback(env); - } else { - crate::context::internal::IS_RUNNING.with(|v| { - *v.borrow_mut() = false; - }); +impl Clone for Channel { + /// Returns a clone of the Channel instance that shares the internal + /// unbounded queue with the original channel. Scheduling callbacks on the + /// same queue is faster than using separate channels, but might lead to + /// starvation if one of the threads posts significantly more callbacks on + /// the channel than the other one. + /// + /// Cloned and referenced Channel instances might trigger additional + /// event-loop tick when dropped. Channel can be wrapped into an Arc and + /// shared between different threads/callers to avoid this. + fn clone(&self) -> Self { + // Not referenced, we can simply clone the fields + if !self.has_ref { + return Self { + state: self.state.clone(), + has_ref: false, + }; + } + + let state = Arc::clone(&self.state); + + // Only need to increase the ref count since the tsfn is already referenced + state.ref_count.fetch_add(1, Ordering::Relaxed); + + Self { + state, + has_ref: true, } } } +impl Drop for Channel { + fn drop(&mut self) { + // Not a referenced event queue + if !self.has_ref { + return; + } + + // It was only us who kept the `ChannelState` alive. No need to unref + // the `tsfn`, because it is going to be dropped once this function + // returns. + if Arc::strong_count(&self.state) == 1 { + return; + } + + // The ChannelState is dropped on a worker thread. We have to `unref` + // the tsfn on the UV thread after all pending closures. Note that in + // the most of scenarios the optimization in N-API layer would coalesce + // `send()` with a user-supplied closure and the unref send here into a + // single UV tick. + // + // If this ever has to be optimized a second `Arc` could be used to wrap + // the `state` and it could be cloned in `try_send` and unref'ed on the + // UV thread if strong reference count goes to 0. + let state = Arc::clone(&self.state); + + self.send(move |mut cx| { + state.unref(&mut cx); + Ok(()) + }); + } +} + /// Error indicating that a closure was unable to be scheduled to execute on the event loop. pub struct SendError; @@ -154,3 +216,53 @@ impl std::fmt::Debug for SendError { } impl std::error::Error for SendError {} + +struct ChannelState { + tsfn: ThreadsafeFunction, + ref_count: AtomicUsize, +} + +impl ChannelState { + fn new<'a, C: Context<'a>>(cx: &mut C) -> Self { + let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) }; + Self { + tsfn, + ref_count: AtomicUsize::new(1), + } + } + + fn reference<'a, C: Context<'a>>(&self, cx: &mut C) { + // We can use relaxed ordering because `reference()` can only be called + // on the Event-Loop thread. + if self.ref_count.fetch_add(1, Ordering::Relaxed) != 0 { + return; + } + + unsafe { + self.tsfn.reference(cx.env().to_raw()); + } + } + + fn unref<'a, C: Context<'a>>(&self, cx: &mut C) { + // We can use relaxed ordering because `unref()` can only be called + // on the Event-Loop thread. + if self.ref_count.fetch_sub(1, Ordering::Relaxed) != 1 { + return; + } + + unsafe { + self.tsfn.unref(cx.env().to_raw()); + } + } + + // Monomorphized trampoline funciton for calling the user provided closure + fn callback(env: Option, callback: Callback) { + if let Some(env) = env { + callback(env); + } else { + crate::context::internal::IS_RUNNING.with(|v| { + *v.borrow_mut() = false; + }); + } + } +} diff --git a/src/lifecycle.rs b/src/lifecycle.rs index 1ccd55705..cae53f181 100644 --- a/src/lifecycle.rs +++ b/src/lifecycle.rs @@ -16,6 +16,8 @@ use neon_runtime::reference; use neon_runtime::tsfn::ThreadsafeFunction; use crate::context::Context; +#[cfg(all(feature = "event-queue-api"))] +use crate::event::Channel; use crate::handle::root::NapiRef; /// `InstanceData` holds Neon data associated with a particular instance of a @@ -30,6 +32,10 @@ pub(crate) struct InstanceData { /// given the cost of FFI, this optimization is omitted until the cost of an /// `Arc` is demonstrated as significant. drop_queue: Arc>, + + /// Shared `Channel` that is cloned to be returned by the `cx.channel()` method + #[cfg(all(feature = "event-queue-api"))] + shared_channel: Channel, } fn drop_napi_ref(env: Option, data: NapiRef) { @@ -57,13 +63,22 @@ impl InstanceData { } let drop_queue = unsafe { - let mut queue = ThreadsafeFunction::new(env, drop_napi_ref); + let queue = ThreadsafeFunction::new(env, drop_napi_ref); queue.unref(env); queue }; + #[cfg(all(feature = "event-queue-api"))] + let shared_channel = { + let mut channel = Channel::new(cx); + channel.unref(cx); + channel + }; + let data = InstanceData { drop_queue: Arc::new(drop_queue), + #[cfg(all(feature = "event-queue-api"))] + shared_channel, }; unsafe { &mut *neon_runtime::lifecycle::set_instance_data(env, data) } @@ -73,4 +88,13 @@ impl InstanceData { pub(crate) fn drop_queue<'a, C: Context<'a>>(cx: &mut C) -> Arc> { Arc::clone(&InstanceData::get(cx).drop_queue) } + + /// Clones the shared channel and references it since new channels should start + /// referenced, but the shared channel is unreferenced. + #[cfg(all(feature = "event-queue-api"))] + pub(crate) fn channel<'a, C: Context<'a>>(cx: &mut C) -> Channel { + let mut channel = InstanceData::get(cx).shared_channel.clone(); + channel.reference(cx); + channel + } }