From e6948b277b6e107ef505c95e639f0ebf9b28dad5 Mon Sep 17 00:00:00 2001 From: "K.J. Valencik" Date: Wed, 2 Jun 2021 17:26:13 -0400 Subject: [PATCH 01/12] Trying out a pattern for sharing tsfn in channels --- crates/neon-runtime/src/napi/tsfn.rs | 26 ++++--- src/context/mod.rs | 13 +++- src/event/event_queue.rs | 112 +++++++++++++++++++++++++-- src/lifecycle.rs | 21 ++++- 4 files changed, 153 insertions(+), 19 deletions(-) diff --git a/crates/neon-runtime/src/napi/tsfn.rs b/crates/neon-runtime/src/napi/tsfn.rs index 313a7d032..9ac96a923 100644 --- a/crates/neon-runtime/src/napi/tsfn.rs +++ b/crates/neon-runtime/src/napi/tsfn.rs @@ -153,20 +153,26 @@ impl ThreadsafeFunction { /// References a threadsafe function to prevent exiting the event loop until it has been dropped. (Default) /// Safety: `Env` must be valid for the current thread - pub unsafe fn reference(&mut self, env: Env) { - assert_eq!( - napi::ref_threadsafe_function(env, self.tsfn.0,), - napi::Status::Ok, - ); + pub unsafe fn reference(&mut self, env: Env) -> Result<(), napi::Status> { + let status = napi::ref_threadsafe_function(env, self.tsfn.0); + + if status == napi::Status::Ok { + Ok(()) + } else { + Err(status) + } } /// Unreferences a threadsafe function to allow exiting the event loop before it has been dropped. /// Safety: `Env` must be valid for the current thread - pub unsafe fn unref(&mut self, env: Env) { - assert_eq!( - napi::unref_threadsafe_function(env, self.tsfn.0,), - napi::Status::Ok, - ); + pub unsafe fn unref(&mut self, env: Env) -> Result<(), napi::Status> { + let status = napi::unref_threadsafe_function(env, self.tsfn.0); + + if status == napi::Status::Ok { + Ok(()) + } else { + Err(status) + } } // Provides a C ABI wrapper for a napi callback notifying us about tsfn diff --git a/src/context/mod.rs b/src/context/mod.rs index 325c005a4..ae27f252b 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -154,6 +154,8 @@ use crate::context::internal::Env; #[cfg(all(feature = "napi-4", feature = "event-queue-api"))] use crate::event::Channel; use crate::handle::{Handle, Managed}; +#[cfg(feature = "napi-6")] +use crate::lifecycle::InstanceData; #[cfg(feature = "legacy-runtime")] use crate::object::class::Class; use crate::object::{Object, This}; @@ -551,8 +553,17 @@ pub trait Context<'a>: ContextInternal<'a> { #[cfg(all(feature = "napi-4", feature = "event-queue-api"))] /// Creates an unbounded channel for scheduling events to be executed on the JavaScript thread. + /// + /// When using N-API >= 6,the channel returned by this method is backed by a shared queue. + /// To create a channel backed by a _new_ queue see [`Channel`](crate::event::Channel). fn channel(&mut self) -> Channel { - Channel::new(self) + #[cfg(feature = "napi-6")] + let channel = InstanceData::channel(self); + + #[cfg(not(feature = "napi-6"))] + let channel = Channel::new(self); + + channel } #[cfg(all(feature = "napi-4", feature = "event-queue-api"))] diff --git a/src/event/event_queue.rs b/src/event/event_queue.rs index 605346b4b..d803ac11c 100644 --- a/src/event/event_queue.rs +++ b/src/event/event_queue.rs @@ -1,3 +1,5 @@ +use std::sync::{Arc, RwLock}; + use neon_runtime::raw::Env; use neon_runtime::tsfn::ThreadsafeFunction; @@ -8,6 +10,9 @@ type Callback = Box; /// Channel for scheduling Rust closures to execute on the JavaScript main thread. /// +/// Cloning a `Channel` will create a new channel that shares a backing queue for +/// events. +/// /// # Example /// /// The following example spawns a standard Rust thread to complete a computation @@ -50,7 +55,7 @@ type Callback = Box; /// ``` pub struct Channel { - tsfn: ThreadsafeFunction, + state: Arc>, has_ref: bool, } @@ -65,9 +70,10 @@ impl Channel { /// main thread pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self { let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) }; + let state = ChannelState { tsfn, ref_count: 1 }; Self { - tsfn, + state: Arc::new(RwLock::new(state)), has_ref: true, } } @@ -75,20 +81,28 @@ impl Channel { /// Allow the Node event loop to exit while this `Channel` exists. /// _Idempotent_ pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self { - self.has_ref = false; + // Already unreferenced + if !self.has_ref { + return self; + } - unsafe { self.tsfn.unref(cx.env().to_raw()) } + ChannelState::unref(cx, &self.state); + self.has_ref = false; self } /// Prevent the Node event loop from exiting while this `Channel` exists. (Default) /// _Idempotent_ pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self { - self.has_ref = true; + // Already referenced + if self.has_ref { + return self; + } - unsafe { self.tsfn.reference(cx.env().to_raw()) } + ChannelState::reference(cx, &self.state); + self.has_ref = true; self } @@ -117,7 +131,12 @@ impl Channel { }); }); - self.tsfn.call(callback, None).map_err(|_| SendError) + self.state + .read() + .unwrap() + .tsfn + .call(callback, None) + .map_err(|_| SendError) } /// Returns a boolean indicating if this `Channel` will prevent the Node event @@ -138,6 +157,45 @@ impl Channel { } } +impl Clone for Channel { + fn clone(&self) -> Self { + // Not referenced, we can simply clone the fields + if !self.has_ref { + return Self { + state: Arc::clone(&self.state), + has_ref: false, + }; + } + + let mut state = self.state.write().unwrap(); + + // Only need to increase the ref count since the tsfn is already referenced + debug_assert!(state.ref_count > 0); + state.ref_count += 1; + + Self { + state: Arc::clone(&self.state), + has_ref: true, + } + } +} + +impl Drop for Channel { + fn drop(&mut self) { + // Not a referenced event queue + if !self.has_ref { + return; + } + + let state = Arc::clone(&self.state); + + self.send(move |mut cx| { + ChannelState::unref(&mut cx, &state); + Ok(()) + }); + } +} + /// Error indicating that a closure was unable to be scheduled to execute on the event loop. pub struct SendError; @@ -154,3 +212,43 @@ impl std::fmt::Debug for SendError { } impl std::error::Error for SendError {} + +struct ChannelState { + tsfn: ThreadsafeFunction, + ref_count: usize, +} + +impl ChannelState { + fn reference<'a, C: Context<'a>>(cx: &mut C, state: &RwLock) { + // Critical section, avoid panicking + { + let mut state = state.write().unwrap(); + + state.ref_count += 1; + + if state.ref_count == 1 { + unsafe { state.tsfn.reference(cx.env().to_raw()) } + } else { + Ok(()) + } + } + .unwrap(); + } + + fn unref<'a, C: Context<'a>>(cx: &mut C, state: &RwLock) { + // Critical section, avoid panicking + { + let mut state = state.write().unwrap(); + + debug_assert!(state.ref_count >= 1); + state.ref_count -= 1; + + if state.ref_count == 0 { + unsafe { state.tsfn.unref(cx.env().to_raw()) } + } else { + Ok(()) + } + } + .unwrap(); + } +} diff --git a/src/lifecycle.rs b/src/lifecycle.rs index 1ccd55705..d3025cedf 100644 --- a/src/lifecycle.rs +++ b/src/lifecycle.rs @@ -16,6 +16,7 @@ use neon_runtime::reference; use neon_runtime::tsfn::ThreadsafeFunction; use crate::context::Context; +use crate::event::Channel; use crate::handle::root::NapiRef; /// `InstanceData` holds Neon data associated with a particular instance of a @@ -30,6 +31,9 @@ pub(crate) struct InstanceData { /// given the cost of FFI, this optimization is omitted until the cost of an /// `Arc` is demonstrated as significant. drop_queue: Arc>, + + /// Shared `Channel` that is cloned to be returned by the `cx.channel()` method + shared_channel: Channel, } fn drop_napi_ref(env: Option, data: NapiRef) { @@ -58,12 +62,19 @@ impl InstanceData { let drop_queue = unsafe { let mut queue = ThreadsafeFunction::new(env, drop_napi_ref); - queue.unref(env); + queue.unref(env).unwrap(); queue }; + let shared_channel = { + let mut channel = Channel::new(cx); + channel.unref(cx); + channel + }; + let data = InstanceData { drop_queue: Arc::new(drop_queue), + shared_channel, }; unsafe { &mut *neon_runtime::lifecycle::set_instance_data(env, data) } @@ -73,4 +84,12 @@ impl InstanceData { pub(crate) fn drop_queue<'a, C: Context<'a>>(cx: &mut C) -> Arc> { Arc::clone(&InstanceData::get(cx).drop_queue) } + + /// Clones the shared channel and references it since new channels should start + /// referenced, but the shared channel is unreferenced. + pub(crate) fn channel<'a, C: Context<'a>>(cx: &mut C) -> Channel { + let mut channel = InstanceData::get(cx).shared_channel.clone(); + channel.reference(cx); + channel + } } From 4780c3b31d92ffc280ba7b0cc9c87e8ba08972a0 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Thu, 3 Jun 2021 15:45:33 -0700 Subject: [PATCH 02/12] optimize dropping of non-cloned channels --- src/context/mod.rs | 4 +- src/event/event_queue.rs | 232 ++++++++++++++++++++++++--------------- src/lifecycle.rs | 5 + 3 files changed, 152 insertions(+), 89 deletions(-) diff --git a/src/context/mod.rs b/src/context/mod.rs index ae27f252b..964f2dcb0 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -154,7 +154,7 @@ use crate::context::internal::Env; #[cfg(all(feature = "napi-4", feature = "event-queue-api"))] use crate::event::Channel; use crate::handle::{Handle, Managed}; -#[cfg(feature = "napi-6")] +#[cfg(all(feature = "napi-6", feature = "event-queue-api"))] use crate::lifecycle::InstanceData; #[cfg(feature = "legacy-runtime")] use crate::object::class::Class; @@ -552,7 +552,7 @@ pub trait Context<'a>: ContextInternal<'a> { } #[cfg(all(feature = "napi-4", feature = "event-queue-api"))] - /// Creates an unbounded channel for scheduling events to be executed on the JavaScript thread. + /// Clones a shared unbounded channel for scheduling events to be executed on the JavaScript thread. /// /// When using N-API >= 6,the channel returned by this method is backed by a shared queue. /// To create a channel backed by a _new_ queue see [`Channel`](crate::event::Channel). diff --git a/src/event/event_queue.rs b/src/event/event_queue.rs index d803ac11c..1909d7af7 100644 --- a/src/event/event_queue.rs +++ b/src/event/event_queue.rs @@ -1,3 +1,4 @@ +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use neon_runtime::raw::Env; @@ -55,8 +56,10 @@ type Callback = Box; /// ``` pub struct Channel { - state: Arc>, - has_ref: bool, + // We hold an extra reference to `state` in `try_send` so that we could + // unref the tsfn during the same UV tick if the state is guaranteed to be + // dropped before the `try_send`'s closure invocation. + state: Arc, } impl std::fmt::Debug for Channel { @@ -69,40 +72,27 @@ impl Channel { /// Creates an unbounded channel for scheduling closures on the JavaScript /// main thread pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self { - let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) }; - let state = ChannelState { tsfn, ref_count: 1 }; + let state = ChannelState { + shared: Arc::new(ChannelSharedState::new(cx)), + has_ref: AtomicBool::new(true), + }; Self { - state: Arc::new(RwLock::new(state)), - has_ref: true, + state: Arc::new(state), } } /// Allow the Node event loop to exit while this `Channel` exists. /// _Idempotent_ pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self { - // Already unreferenced - if !self.has_ref { - return self; - } - - ChannelState::unref(cx, &self.state); - - self.has_ref = false; + self.state.unref(cx); self } /// Prevent the Node event loop from exiting while this `Channel` exists. (Default) /// _Idempotent_ pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self { - // Already referenced - if self.has_ref { - return self; - } - - ChannelState::reference(cx, &self.state); - - self.has_ref = true; + self.state.reference(cx); self } @@ -121,20 +111,28 @@ impl Channel { where F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static, { + let state = self.state.clone(); + let callback = Box::new(move |env| { let env = unsafe { std::mem::transmute(env) }; // Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because // N-API creates a `HandleScope` before calling the callback. - TaskContext::with_context(env, move |cx| { + TaskContext::with_context(env, move |mut cx| { + // No one else, but us holding the state alive. + if Arc::strong_count(&state) == 1 { + state.unref(&mut cx); + } + let _ = f(cx); }); }); self.state + .shared + .tsfn .read() .unwrap() - .tsfn .call(callback, None) .map_err(|_| SendError) } @@ -142,113 +140,173 @@ impl Channel { /// Returns a boolean indicating if this `Channel` will prevent the Node event /// loop from exiting. pub fn has_ref(&self) -> bool { - self.has_ref + self.state.has_ref.load(Ordering::Relaxed) + } +} + +impl Clone for Channel { + /// Returns a clone of the Channel instance that shares the internal + /// unbounded queue with the original channel. Scheduling callbacks on the + /// same queue is faster than using separate channels, but might lead to + /// starvation if one of the threads posts significantly more callbacks on + /// the channel than the other one. + fn clone(&self) -> Self { + return Self { + state: Arc::new((*self.state).clone()), + }; } +} - // Monomorphized trampoline funciton for calling the user provided closure - fn callback(env: Option, callback: Callback) { - if let Some(env) = env { - callback(env); - } else { - crate::context::internal::IS_RUNNING.with(|v| { - *v.borrow_mut() = false; - }); +/// Error indicating that a closure was unable to be scheduled to execute on the event loop. +pub struct SendError; + +impl std::fmt::Display for SendError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "SendError") + } +} + +impl std::fmt::Debug for SendError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(self, f) + } +} + +impl std::error::Error for SendError {} + +struct ChannelState { + shared: Arc, + has_ref: AtomicBool, +} + +impl ChannelState { + fn reference<'a, C: Context<'a>>(self: &Self, cx: &mut C) { + // Already referenced + if self.has_ref.swap(true, Ordering::Relaxed) { + return; + } + + self.shared.reference(cx); + } + + fn unref<'a, C: Context<'a>>(self: &Self, cx: &mut C) { + // Already unreferenced + if !self.has_ref.swap(false, Ordering::Relaxed) { + return; } + + self.shared.unref(cx); } } -impl Clone for Channel { +impl Clone for ChannelState { fn clone(&self) -> Self { // Not referenced, we can simply clone the fields - if !self.has_ref { + if !self.has_ref.load(Ordering::Relaxed) { return Self { - state: Arc::clone(&self.state), - has_ref: false, + shared: self.shared.clone(), + has_ref: AtomicBool::new(false), }; } - let mut state = self.state.write().unwrap(); + let shared = Arc::clone(&self.shared); // Only need to increase the ref count since the tsfn is already referenced - debug_assert!(state.ref_count > 0); - state.ref_count += 1; + shared.ref_count.fetch_add(1, Ordering::Relaxed); Self { - state: Arc::clone(&self.state), - has_ref: true, + shared, + has_ref: AtomicBool::new(true), } } } -impl Drop for Channel { +impl Drop for ChannelState { fn drop(&mut self) { + // It was only us who kept the `ChannelState` alive. No need to unref + // the `tsfn`, because it is going to be dropped once this function + // returns. + if Arc::strong_count(&self.shared) == 1 { + return; + } + // Not a referenced event queue - if !self.has_ref { + if !self.has_ref.swap(false, Ordering::Relaxed) { return; } - let state = Arc::clone(&self.state); + // The ChannelState is dropped on a worker thread. We have to `unref` + // the tsfn on the UV thread after all pending closures. + let shared = Arc::clone(&self.shared); - self.send(move |mut cx| { - ChannelState::unref(&mut cx, &state); - Ok(()) - }); - } -} + let callback = Box::new(move |env| { + let env = unsafe { std::mem::transmute(env) }; -/// Error indicating that a closure was unable to be scheduled to execute on the event loop. -pub struct SendError; + // Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because + // N-API creates a `HandleScope` before calling the callback. + TaskContext::with_context(env, move |mut cx| { + shared.unref(&mut cx); + }); + }); -impl std::fmt::Display for SendError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "SendError") + self.shared + .tsfn + .read() + .unwrap() + .call(callback, None) + .map_err(|_| SendError) + .unwrap(); } } -impl std::fmt::Debug for SendError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(self, f) - } +struct ChannelSharedState { + tsfn: RwLock>, + ref_count: AtomicUsize, } -impl std::error::Error for SendError {} +impl ChannelSharedState { + fn new<'a, C: Context<'a>>(cx: &mut C) -> Self { + let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) }; + Self { + tsfn: RwLock::new(tsfn), + ref_count: AtomicUsize::new(1), + } + } -struct ChannelState { - tsfn: ThreadsafeFunction, - ref_count: usize, -} + fn reference<'a, C: Context<'a>>(self: &Self, cx: &mut C) { + if self.ref_count.fetch_add(1, Ordering::Relaxed) != 0 { + return; + } -impl ChannelState { - fn reference<'a, C: Context<'a>>(cx: &mut C, state: &RwLock) { // Critical section, avoid panicking { - let mut state = state.write().unwrap(); - - state.ref_count += 1; - - if state.ref_count == 1 { - unsafe { state.tsfn.reference(cx.env().to_raw()) } - } else { - Ok(()) - } + let mut tsfn = self.tsfn.write().unwrap(); + unsafe { tsfn.reference(cx.env().to_raw()) } } .unwrap(); } - fn unref<'a, C: Context<'a>>(cx: &mut C, state: &RwLock) { + fn unref<'a, C: Context<'a>>(self: &Self, cx: &mut C) { + if self.ref_count.fetch_sub(1, Ordering::Relaxed) != 1 { + return; + } + // Critical section, avoid panicking { - let mut state = state.write().unwrap(); - - debug_assert!(state.ref_count >= 1); - state.ref_count -= 1; - - if state.ref_count == 0 { - unsafe { state.tsfn.unref(cx.env().to_raw()) } - } else { - Ok(()) - } + let mut tsfn = self.tsfn.write().unwrap(); + unsafe { tsfn.unref(cx.env().to_raw()) } } .unwrap(); } + + // Monomorphized trampoline funciton for calling the user provided closure + fn callback(env: Option, callback: Callback) { + if let Some(env) = env { + callback(env); + } else { + crate::context::internal::IS_RUNNING.with(|v| { + *v.borrow_mut() = false; + }); + } + } } diff --git a/src/lifecycle.rs b/src/lifecycle.rs index d3025cedf..01d995e2a 100644 --- a/src/lifecycle.rs +++ b/src/lifecycle.rs @@ -16,6 +16,7 @@ use neon_runtime::reference; use neon_runtime::tsfn::ThreadsafeFunction; use crate::context::Context; +#[cfg(all(feature = "event-queue-api"))] use crate::event::Channel; use crate::handle::root::NapiRef; @@ -33,6 +34,7 @@ pub(crate) struct InstanceData { drop_queue: Arc>, /// Shared `Channel` that is cloned to be returned by the `cx.channel()` method + #[cfg(all(feature = "event-queue-api"))] shared_channel: Channel, } @@ -66,6 +68,7 @@ impl InstanceData { queue }; + #[cfg(all(feature = "event-queue-api"))] let shared_channel = { let mut channel = Channel::new(cx); channel.unref(cx); @@ -74,6 +77,7 @@ impl InstanceData { let data = InstanceData { drop_queue: Arc::new(drop_queue), + #[cfg(all(feature = "event-queue-api"))] shared_channel, }; @@ -87,6 +91,7 @@ impl InstanceData { /// Clones the shared channel and references it since new channels should start /// referenced, but the shared channel is unreferenced. + #[cfg(all(feature = "event-queue-api"))] pub(crate) fn channel<'a, C: Context<'a>>(cx: &mut C) -> Channel { let mut channel = InstanceData::get(cx).shared_channel.clone(); channel.reference(cx); From e1fbfb767781e30e4cc7f8b880150507026283e7 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 8 Jun 2021 10:16:17 -0700 Subject: [PATCH 03/12] use idiomatic rust --- src/event/event_queue.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/event/event_queue.rs b/src/event/event_queue.rs index 1909d7af7..a9bd4660b 100644 --- a/src/event/event_queue.rs +++ b/src/event/event_queue.rs @@ -180,7 +180,7 @@ struct ChannelState { } impl ChannelState { - fn reference<'a, C: Context<'a>>(self: &Self, cx: &mut C) { + fn reference<'a, C: Context<'a>>(&self, cx: &mut C) { // Already referenced if self.has_ref.swap(true, Ordering::Relaxed) { return; @@ -189,7 +189,7 @@ impl ChannelState { self.shared.reference(cx); } - fn unref<'a, C: Context<'a>>(self: &Self, cx: &mut C) { + fn unref<'a, C: Context<'a>>(&self, cx: &mut C) { // Already unreferenced if !self.has_ref.swap(false, Ordering::Relaxed) { return; @@ -273,7 +273,7 @@ impl ChannelSharedState { } } - fn reference<'a, C: Context<'a>>(self: &Self, cx: &mut C) { + fn reference<'a, C: Context<'a>>(&self, cx: &mut C) { if self.ref_count.fetch_add(1, Ordering::Relaxed) != 0 { return; } @@ -286,7 +286,7 @@ impl ChannelSharedState { .unwrap(); } - fn unref<'a, C: Context<'a>>(self: &Self, cx: &mut C) { + fn unref<'a, C: Context<'a>>(&self, cx: &mut C) { if self.ref_count.fetch_sub(1, Ordering::Relaxed) != 1 { return; } From 532eec0d25fe94376884d430da35dfd359cdddcc Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 8 Jun 2021 11:07:33 -0700 Subject: [PATCH 04/12] revert complicated abstraction, no benefit --- src/event/event_queue.rs | 145 ++++++++++++++------------------------- 1 file changed, 50 insertions(+), 95 deletions(-) diff --git a/src/event/event_queue.rs b/src/event/event_queue.rs index a9bd4660b..078487f33 100644 --- a/src/event/event_queue.rs +++ b/src/event/event_queue.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use neon_runtime::raw::Env; @@ -60,6 +60,7 @@ pub struct Channel { // unref the tsfn during the same UV tick if the state is guaranteed to be // dropped before the `try_send`'s closure invocation. state: Arc, + has_ref: bool, } impl std::fmt::Debug for Channel { @@ -72,19 +73,21 @@ impl Channel { /// Creates an unbounded channel for scheduling closures on the JavaScript /// main thread pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self { - let state = ChannelState { - shared: Arc::new(ChannelSharedState::new(cx)), - has_ref: AtomicBool::new(true), - }; - Self { - state: Arc::new(state), + state: Arc::new(ChannelState::new(cx)), + has_ref: true, } } /// Allow the Node event loop to exit while this `Channel` exists. /// _Idempotent_ pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self { + // Already unreferenced + if !self.has_ref { + return self; + } + + self.has_ref = false; self.state.unref(cx); self } @@ -92,6 +95,12 @@ impl Channel { /// Prevent the Node event loop from exiting while this `Channel` exists. (Default) /// _Idempotent_ pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self { + // Already referenced + if self.has_ref { + return self; + } + + self.has_ref = true; self.state.reference(cx); self } @@ -111,25 +120,17 @@ impl Channel { where F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static, { - let state = self.state.clone(); - let callback = Box::new(move |env| { let env = unsafe { std::mem::transmute(env) }; // Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because // N-API creates a `HandleScope` before calling the callback. - TaskContext::with_context(env, move |mut cx| { - // No one else, but us holding the state alive. - if Arc::strong_count(&state) == 1 { - state.unref(&mut cx); - } - + TaskContext::with_context(env, move |cx| { let _ = f(cx); }); }); self.state - .shared .tsfn .read() .unwrap() @@ -140,7 +141,7 @@ impl Channel { /// Returns a boolean indicating if this `Channel` will prevent the Node event /// loop from exiting. pub fn has_ref(&self) -> bool { - self.state.has_ref.load(Ordering::Relaxed) + self.has_ref } } @@ -150,121 +151,75 @@ impl Clone for Channel { /// same queue is faster than using separate channels, but might lead to /// starvation if one of the threads posts significantly more callbacks on /// the channel than the other one. - fn clone(&self) -> Self { - return Self { - state: Arc::new((*self.state).clone()), - }; - } -} - -/// Error indicating that a closure was unable to be scheduled to execute on the event loop. -pub struct SendError; - -impl std::fmt::Display for SendError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "SendError") - } -} - -impl std::fmt::Debug for SendError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - std::fmt::Display::fmt(self, f) - } -} - -impl std::error::Error for SendError {} - -struct ChannelState { - shared: Arc, - has_ref: AtomicBool, -} - -impl ChannelState { - fn reference<'a, C: Context<'a>>(&self, cx: &mut C) { - // Already referenced - if self.has_ref.swap(true, Ordering::Relaxed) { - return; - } - - self.shared.reference(cx); - } - - fn unref<'a, C: Context<'a>>(&self, cx: &mut C) { - // Already unreferenced - if !self.has_ref.swap(false, Ordering::Relaxed) { - return; - } - - self.shared.unref(cx); - } -} - -impl Clone for ChannelState { fn clone(&self) -> Self { // Not referenced, we can simply clone the fields - if !self.has_ref.load(Ordering::Relaxed) { + if !self.has_ref { return Self { - shared: self.shared.clone(), - has_ref: AtomicBool::new(false), + state: self.state.clone(), + has_ref: false, }; } - let shared = Arc::clone(&self.shared); + let state = Arc::clone(&self.state); // Only need to increase the ref count since the tsfn is already referenced - shared.ref_count.fetch_add(1, Ordering::Relaxed); + state.ref_count.fetch_add(1, Ordering::Relaxed); Self { - shared, - has_ref: AtomicBool::new(true), + state, + has_ref: true, } } } -impl Drop for ChannelState { +impl Drop for Channel { fn drop(&mut self) { // It was only us who kept the `ChannelState` alive. No need to unref // the `tsfn`, because it is going to be dropped once this function // returns. - if Arc::strong_count(&self.shared) == 1 { + if Arc::strong_count(&self.state) == 1 { return; } // Not a referenced event queue - if !self.has_ref.swap(false, Ordering::Relaxed) { + if !self.has_ref { return; } // The ChannelState is dropped on a worker thread. We have to `unref` // the tsfn on the UV thread after all pending closures. - let shared = Arc::clone(&self.shared); + let state = Arc::clone(&self.state); - let callback = Box::new(move |env| { - let env = unsafe { std::mem::transmute(env) }; - - // Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because - // N-API creates a `HandleScope` before calling the callback. - TaskContext::with_context(env, move |mut cx| { - shared.unref(&mut cx); - }); + self.send(move |mut cx| { + state.unref(&mut cx); + Ok(()) }); + } +} - self.shared - .tsfn - .read() - .unwrap() - .call(callback, None) - .map_err(|_| SendError) - .unwrap(); +/// Error indicating that a closure was unable to be scheduled to execute on the event loop. +pub struct SendError; + +impl std::fmt::Display for SendError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "SendError") } } -struct ChannelSharedState { +impl std::fmt::Debug for SendError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + std::fmt::Display::fmt(self, f) + } +} + +impl std::error::Error for SendError {} + +struct ChannelState { tsfn: RwLock>, ref_count: AtomicUsize, } -impl ChannelSharedState { +impl ChannelState { fn new<'a, C: Context<'a>>(cx: &mut C) -> Self { let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) }; Self { From 9b1f1253d7f1b0ee8de73338d4e7450052cb3fdf Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 8 Jun 2021 11:32:28 -0700 Subject: [PATCH 05/12] remove mutability requirements --- crates/neon-runtime/src/napi/tsfn.rs | 26 ++++++++++---------------- src/event/event_queue.rs | 27 ++++++++------------------- src/lifecycle.rs | 4 ++-- 3 files changed, 20 insertions(+), 37 deletions(-) diff --git a/crates/neon-runtime/src/napi/tsfn.rs b/crates/neon-runtime/src/napi/tsfn.rs index 9ac96a923..3e8df76e8 100644 --- a/crates/neon-runtime/src/napi/tsfn.rs +++ b/crates/neon-runtime/src/napi/tsfn.rs @@ -153,26 +153,20 @@ impl ThreadsafeFunction { /// References a threadsafe function to prevent exiting the event loop until it has been dropped. (Default) /// Safety: `Env` must be valid for the current thread - pub unsafe fn reference(&mut self, env: Env) -> Result<(), napi::Status> { - let status = napi::ref_threadsafe_function(env, self.tsfn.0); - - if status == napi::Status::Ok { - Ok(()) - } else { - Err(status) - } + pub unsafe fn reference(&self, env: Env) { + assert_eq!( + napi::ref_threadsafe_function(env, self.tsfn.0), + napi::Status::Ok + ); } /// Unreferences a threadsafe function to allow exiting the event loop before it has been dropped. /// Safety: `Env` must be valid for the current thread - pub unsafe fn unref(&mut self, env: Env) -> Result<(), napi::Status> { - let status = napi::unref_threadsafe_function(env, self.tsfn.0); - - if status == napi::Status::Ok { - Ok(()) - } else { - Err(status) - } + pub unsafe fn unref(&self, env: Env) { + assert_eq!( + napi::unref_threadsafe_function(env, self.tsfn.0), + napi::Status::Ok + ); } // Provides a C ABI wrapper for a napi callback notifying us about tsfn diff --git a/src/event/event_queue.rs b/src/event/event_queue.rs index 078487f33..cf5c722dd 100644 --- a/src/event/event_queue.rs +++ b/src/event/event_queue.rs @@ -1,5 +1,5 @@ use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; use neon_runtime::raw::Env; use neon_runtime::tsfn::ThreadsafeFunction; @@ -130,12 +130,7 @@ impl Channel { }); }); - self.state - .tsfn - .read() - .unwrap() - .call(callback, None) - .map_err(|_| SendError) + self.state.tsfn.call(callback, None).map_err(|_| SendError) } /// Returns a boolean indicating if this `Channel` will prevent the Node event @@ -215,7 +210,7 @@ impl std::fmt::Debug for SendError { impl std::error::Error for SendError {} struct ChannelState { - tsfn: RwLock>, + tsfn: ThreadsafeFunction, ref_count: AtomicUsize, } @@ -223,7 +218,7 @@ impl ChannelState { fn new<'a, C: Context<'a>>(cx: &mut C) -> Self { let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) }; Self { - tsfn: RwLock::new(tsfn), + tsfn, ref_count: AtomicUsize::new(1), } } @@ -233,12 +228,9 @@ impl ChannelState { return; } - // Critical section, avoid panicking - { - let mut tsfn = self.tsfn.write().unwrap(); - unsafe { tsfn.reference(cx.env().to_raw()) } + unsafe { + self.tsfn.reference(cx.env().to_raw()); } - .unwrap(); } fn unref<'a, C: Context<'a>>(&self, cx: &mut C) { @@ -246,12 +238,9 @@ impl ChannelState { return; } - // Critical section, avoid panicking - { - let mut tsfn = self.tsfn.write().unwrap(); - unsafe { tsfn.unref(cx.env().to_raw()) } + unsafe { + self.tsfn.unref(cx.env().to_raw()); } - .unwrap(); } // Monomorphized trampoline funciton for calling the user provided closure diff --git a/src/lifecycle.rs b/src/lifecycle.rs index 01d995e2a..cae53f181 100644 --- a/src/lifecycle.rs +++ b/src/lifecycle.rs @@ -63,8 +63,8 @@ impl InstanceData { } let drop_queue = unsafe { - let mut queue = ThreadsafeFunction::new(env, drop_napi_ref); - queue.unref(env).unwrap(); + let queue = ThreadsafeFunction::new(env, drop_napi_ref); + queue.unref(env); queue }; From f8fc9f8795aa34acd5b05e010ee2a90b77fad35a Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 8 Jun 2021 11:33:30 -0700 Subject: [PATCH 06/12] small stylistic fixes --- crates/neon-runtime/src/napi/tsfn.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/neon-runtime/src/napi/tsfn.rs b/crates/neon-runtime/src/napi/tsfn.rs index 3e8df76e8..b3f4493e4 100644 --- a/crates/neon-runtime/src/napi/tsfn.rs +++ b/crates/neon-runtime/src/napi/tsfn.rs @@ -156,7 +156,7 @@ impl ThreadsafeFunction { pub unsafe fn reference(&self, env: Env) { assert_eq!( napi::ref_threadsafe_function(env, self.tsfn.0), - napi::Status::Ok + napi::Status::Ok, ); } @@ -165,7 +165,7 @@ impl ThreadsafeFunction { pub unsafe fn unref(&self, env: Env) { assert_eq!( napi::unref_threadsafe_function(env, self.tsfn.0), - napi::Status::Ok + napi::Status::Ok, ); } From 0dfffbb81c761c4ea42923897cfb92bd9266ae68 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 8 Jun 2021 11:33:54 -0700 Subject: [PATCH 07/12] few more --- crates/neon-runtime/src/napi/tsfn.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/neon-runtime/src/napi/tsfn.rs b/crates/neon-runtime/src/napi/tsfn.rs index b3f4493e4..eea90f78c 100644 --- a/crates/neon-runtime/src/napi/tsfn.rs +++ b/crates/neon-runtime/src/napi/tsfn.rs @@ -155,7 +155,7 @@ impl ThreadsafeFunction { /// Safety: `Env` must be valid for the current thread pub unsafe fn reference(&self, env: Env) { assert_eq!( - napi::ref_threadsafe_function(env, self.tsfn.0), + napi::ref_threadsafe_function(env, self.tsfn.0,), napi::Status::Ok, ); } @@ -164,7 +164,7 @@ impl ThreadsafeFunction { /// Safety: `Env` must be valid for the current thread pub unsafe fn unref(&self, env: Env) { assert_eq!( - napi::unref_threadsafe_function(env, self.tsfn.0), + napi::unref_threadsafe_function(env, self.tsfn.0,), napi::Status::Ok, ); } From 740c4be53e9d1555dc2e1b7f044915c74d2bb6a9 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 8 Jun 2021 11:34:33 -0700 Subject: [PATCH 08/12] remove stale comment --- src/event/event_queue.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/event/event_queue.rs b/src/event/event_queue.rs index cf5c722dd..9d5a38baf 100644 --- a/src/event/event_queue.rs +++ b/src/event/event_queue.rs @@ -56,9 +56,6 @@ type Callback = Box; /// ``` pub struct Channel { - // We hold an extra reference to `state` in `try_send` so that we could - // unref the tsfn during the same UV tick if the state is guaranteed to be - // dropped before the `try_send`'s closure invocation. state: Arc, has_ref: bool, } From bdd8ce2de6ce5803765b9e3f9f320dc56de7ab6c Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 8 Jun 2021 15:01:24 -0700 Subject: [PATCH 09/12] add comments --- src/event/event_queue.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/event/event_queue.rs b/src/event/event_queue.rs index 9d5a38baf..ba6119bf4 100644 --- a/src/event/event_queue.rs +++ b/src/event/event_queue.rs @@ -143,6 +143,10 @@ impl Clone for Channel { /// same queue is faster than using separate channels, but might lead to /// starvation if one of the threads posts significantly more callbacks on /// the channel than the other one. + /// + /// Cloned and referenced Channel instances might trigger additional + /// event-loop tick when dropped. Channel can be wrapped into an Arc and + /// shared between different threads/callers to avoid this. fn clone(&self) -> Self { // Not referenced, we can simply clone the fields if !self.has_ref { @@ -179,7 +183,14 @@ impl Drop for Channel { } // The ChannelState is dropped on a worker thread. We have to `unref` - // the tsfn on the UV thread after all pending closures. + // the tsfn on the UV thread after all pending closures. Note that in + // the most of scenarios the optimization in N-API layer would coalesce + // `send()` with a user-supplied closure and the unref send here into a + // single UV tick. + // + // If this ever has to be optimized a second `Arc` could be used to wrap + // the `state` and it could be cloned in `try_send` and unref'ed on the + // UV thread if strong reference count goes to 0. let state = Arc::clone(&self.state); self.send(move |mut cx| { From 4e6e9ebcbf2bbb0a732c5a3d68bc2fd6d2924552 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 8 Jun 2021 15:15:28 -0700 Subject: [PATCH 10/12] extra comment --- src/event/event_queue.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/event/event_queue.rs b/src/event/event_queue.rs index ba6119bf4..8d69f190a 100644 --- a/src/event/event_queue.rs +++ b/src/event/event_queue.rs @@ -232,6 +232,8 @@ impl ChannelState { } fn reference<'a, C: Context<'a>>(&self, cx: &mut C) { + // We can use relaxed ordering because `reference()` can only be called + // on the Event-Loop thread. if self.ref_count.fetch_add(1, Ordering::Relaxed) != 0 { return; } @@ -242,6 +244,8 @@ impl ChannelState { } fn unref<'a, C: Context<'a>>(&self, cx: &mut C) { + // We can use relaxed ordering because `unref()` can only be called + // on the Event-Loop thread. if self.ref_count.fetch_sub(1, Ordering::Relaxed) != 1 { return; } From 5c8704cfa49d32acac1dd7bea4705f2ed4ff8b4c Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 8 Jun 2021 15:15:52 -0700 Subject: [PATCH 11/12] change order --- src/event/event_queue.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/event/event_queue.rs b/src/event/event_queue.rs index 8d69f190a..6e71ba92f 100644 --- a/src/event/event_queue.rs +++ b/src/event/event_queue.rs @@ -170,6 +170,11 @@ impl Clone for Channel { impl Drop for Channel { fn drop(&mut self) { + // Not a referenced event queue + if !self.has_ref { + return; + } + // It was only us who kept the `ChannelState` alive. No need to unref // the `tsfn`, because it is going to be dropped once this function // returns. @@ -177,11 +182,6 @@ impl Drop for Channel { return; } - // Not a referenced event queue - if !self.has_ref { - return; - } - // The ChannelState is dropped on a worker thread. We have to `unref` // the tsfn on the UV thread after all pending closures. Note that in // the most of scenarios the optimization in N-API layer would coalesce From 01f3bdcada9607a5ccf5ff77daf8522f698d1d6b Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 8 Jun 2021 15:16:46 -0700 Subject: [PATCH 12/12] fix comment --- src/context/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/context/mod.rs b/src/context/mod.rs index 964f2dcb0..3e697fe90 100644 --- a/src/context/mod.rs +++ b/src/context/mod.rs @@ -552,7 +552,7 @@ pub trait Context<'a>: ContextInternal<'a> { } #[cfg(all(feature = "napi-4", feature = "event-queue-api"))] - /// Clones a shared unbounded channel for scheduling events to be executed on the JavaScript thread. + /// Returns an unbounded channel for scheduling events to be executed on the JavaScript thread. /// /// When using N-API >= 6,the channel returned by this method is backed by a shared queue. /// To create a channel backed by a _new_ queue see [`Channel`](crate::event::Channel).