Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse ThreadsafeFunction in EventQueue #739

Merged
merged 12 commits into from
Jun 29, 2021
26 changes: 16 additions & 10 deletions crates/neon-runtime/src/napi/tsfn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,26 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

/// References a threadsafe function to prevent exiting the event loop until it has been dropped. (Default)
/// Safety: `Env` must be valid for the current thread
pub unsafe fn reference(&mut self, env: Env) {
assert_eq!(
napi::ref_threadsafe_function(env, self.tsfn.0,),
napi::Status::Ok,
);
pub unsafe fn reference(&mut self, env: Env) -> Result<(), napi::Status> {
let status = napi::ref_threadsafe_function(env, self.tsfn.0);

if status == napi::Status::Ok {
Ok(())
} else {
Err(status)
}
}

/// Unreferences a threadsafe function to allow exiting the event loop before it has been dropped.
/// Safety: `Env` must be valid for the current thread
pub unsafe fn unref(&mut self, env: Env) {
assert_eq!(
napi::unref_threadsafe_function(env, self.tsfn.0,),
napi::Status::Ok,
);
pub unsafe fn unref(&mut self, env: Env) -> Result<(), napi::Status> {
let status = napi::unref_threadsafe_function(env, self.tsfn.0);

if status == napi::Status::Ok {
Ok(())
} else {
Err(status)
}
}

// Provides a C ABI wrapper for a napi callback notifying us about tsfn
Expand Down
15 changes: 13 additions & 2 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ use crate::context::internal::Env;
#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
use crate::event::Channel;
use crate::handle::{Handle, Managed};
#[cfg(all(feature = "napi-6", feature = "event-queue-api"))]
use crate::lifecycle::InstanceData;
#[cfg(feature = "legacy-runtime")]
use crate::object::class::Class;
use crate::object::{Object, This};
Expand Down Expand Up @@ -550,9 +552,18 @@ pub trait Context<'a>: ContextInternal<'a> {
}

#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
/// Creates an unbounded channel for scheduling events to be executed on the JavaScript thread.
/// Clones a shared unbounded channel for scheduling events to be executed on the JavaScript thread.
kjvalencik marked this conversation as resolved.
Show resolved Hide resolved
///
/// When using N-API >= 6,the channel returned by this method is backed by a shared queue.
/// To create a channel backed by a _new_ queue see [`Channel`](crate::event::Channel).
fn channel(&mut self) -> Channel {
Channel::new(self)
#[cfg(feature = "napi-6")]
let channel = InstanceData::channel(self);

#[cfg(not(feature = "napi-6"))]
let channel = Channel::new(self);

channel
}

#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
Expand Down
206 changes: 181 additions & 25 deletions src/event/event_queue.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};

use neon_runtime::raw::Env;
use neon_runtime::tsfn::ThreadsafeFunction;

Expand All @@ -8,6 +11,9 @@ type Callback = Box<dyn FnOnce(Env) + Send + 'static>;

/// Channel for scheduling Rust closures to execute on the JavaScript main thread.
///
/// Cloning a `Channel` will create a new channel that shares a backing queue for
/// events.
///
/// # Example
///
/// The following example spawns a standard Rust thread to complete a computation
Expand Down Expand Up @@ -50,8 +56,10 @@ type Callback = Box<dyn FnOnce(Env) + Send + 'static>;
/// ```

pub struct Channel {
tsfn: ThreadsafeFunction<Callback>,
has_ref: bool,
// We hold an extra reference to `state` in `try_send` so that we could
// unref the tsfn during the same UV tick if the state is guaranteed to be
// dropped before the `try_send`'s closure invocation.
state: Arc<ChannelState>,
}

impl std::fmt::Debug for Channel {
Expand All @@ -64,31 +72,27 @@ impl Channel {
/// Creates an unbounded channel for scheduling closures on the JavaScript
/// main thread
pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self {
let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) };
let state = ChannelState {
shared: Arc::new(ChannelSharedState::new(cx)),
has_ref: AtomicBool::new(true),
};

Self {
tsfn,
has_ref: true,
state: Arc::new(state),
}
}

/// Allow the Node event loop to exit while this `Channel` exists.
/// _Idempotent_
pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
kjvalencik marked this conversation as resolved.
Show resolved Hide resolved
self.has_ref = false;

unsafe { self.tsfn.unref(cx.env().to_raw()) }

self.state.unref(cx);
self
}

/// Prevent the Node event loop from exiting while this `Channel` exists. (Default)
/// _Idempotent_
pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
self.has_ref = true;

unsafe { self.tsfn.reference(cx.env().to_raw()) }

self.state.reference(cx);
self
}

Expand All @@ -107,34 +111,49 @@ impl Channel {
where
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
{
let state = self.state.clone();

let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };

// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
TaskContext::with_context(env, move |cx| {
TaskContext::with_context(env, move |mut cx| {
// No one else, but us holding the state alive.
if Arc::strong_count(&state) == 1 {
state.unref(&mut cx);
}

let _ = f(cx);
});
});

self.tsfn.call(callback, None).map_err(|_| SendError)
self.state
.shared
.tsfn
.read()
.unwrap()
.call(callback, None)
.map_err(|_| SendError)
}

/// Returns a boolean indicating if this `Channel` will prevent the Node event
/// loop from exiting.
pub fn has_ref(&self) -> bool {
self.has_ref
self.state.has_ref.load(Ordering::Relaxed)
}
}

// Monomorphized trampoline funciton for calling the user provided closure
fn callback(env: Option<Env>, callback: Callback) {
if let Some(env) = env {
callback(env);
} else {
crate::context::internal::IS_RUNNING.with(|v| {
*v.borrow_mut() = false;
});
}
impl Clone for Channel {
/// Returns a clone of the Channel instance that shares the internal
/// unbounded queue with the original channel. Scheduling callbacks on the
/// same queue is faster than using separate channels, but might lead to
/// starvation if one of the threads posts significantly more callbacks on
/// the channel than the other one.
fn clone(&self) -> Self {
return Self {
state: Arc::new((*self.state).clone()),
};
}
}

Expand All @@ -154,3 +173,140 @@ impl std::fmt::Debug for SendError {
}

impl std::error::Error for SendError {}

struct ChannelState {
shared: Arc<ChannelSharedState>,
has_ref: AtomicBool,
}

impl ChannelState {
fn reference<'a, C: Context<'a>>(self: &Self, cx: &mut C) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: self: &Self is idiomatically written &self.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack!

// Already referenced
if self.has_ref.swap(true, Ordering::Relaxed) {
return;
}

self.shared.reference(cx);
}

fn unref<'a, C: Context<'a>>(self: &Self, cx: &mut C) {
// Already unreferenced
if !self.has_ref.swap(false, Ordering::Relaxed) {
return;
}

self.shared.unref(cx);
}
}

impl Clone for ChannelState {
fn clone(&self) -> Self {
// Not referenced, we can simply clone the fields
if !self.has_ref.load(Ordering::Relaxed) {
return Self {
shared: self.shared.clone(),
has_ref: AtomicBool::new(false),
};
}

let shared = Arc::clone(&self.shared);

// Only need to increase the ref count since the tsfn is already referenced
shared.ref_count.fetch_add(1, Ordering::Relaxed);

Self {
shared,
has_ref: AtomicBool::new(true),
}
}
}

impl Drop for ChannelState {
fn drop(&mut self) {
// It was only us who kept the `ChannelState` alive. No need to unref
// the `tsfn`, because it is going to be dropped once this function
// returns.
if Arc::strong_count(&self.shared) == 1 {
return;
}

// Not a referenced event queue
if !self.has_ref.swap(false, Ordering::Relaxed) {
return;
}

// The ChannelState is dropped on a worker thread. We have to `unref`
// the tsfn on the UV thread after all pending closures.
let shared = Arc::clone(&self.shared);

let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };

// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
TaskContext::with_context(env, move |mut cx| {
shared.unref(&mut cx);
});
});

self.shared
.tsfn
.read()
.unwrap()
.call(callback, None)
.map_err(|_| SendError)
.unwrap();
}
}

struct ChannelSharedState {
tsfn: RwLock<ThreadsafeFunction<Callback>>,
ref_count: AtomicUsize,
}

impl ChannelSharedState {
fn new<'a, C: Context<'a>>(cx: &mut C) -> Self {
let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) };
Self {
tsfn: RwLock::new(tsfn),
ref_count: AtomicUsize::new(1),
}
}

fn reference<'a, C: Context<'a>>(self: &Self, cx: &mut C) {
if self.ref_count.fetch_add(1, Ordering::Relaxed) != 0 {
kjvalencik marked this conversation as resolved.
Show resolved Hide resolved
return;
}

// Critical section, avoid panicking
{
let mut tsfn = self.tsfn.write().unwrap();
unsafe { tsfn.reference(cx.env().to_raw()) }
}
.unwrap();
}

fn unref<'a, C: Context<'a>>(self: &Self, cx: &mut C) {
if self.ref_count.fetch_sub(1, Ordering::Relaxed) != 1 {
kjvalencik marked this conversation as resolved.
Show resolved Hide resolved
return;
}

// Critical section, avoid panicking
{
let mut tsfn = self.tsfn.write().unwrap();
unsafe { tsfn.unref(cx.env().to_raw()) }
}
.unwrap();
}

// Monomorphized trampoline funciton for calling the user provided closure
fn callback(env: Option<Env>, callback: Callback) {
if let Some(env) = env {
callback(env);
} else {
crate::context::internal::IS_RUNNING.with(|v| {
*v.borrow_mut() = false;
});
}
}
}
26 changes: 25 additions & 1 deletion src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use neon_runtime::reference;
use neon_runtime::tsfn::ThreadsafeFunction;

use crate::context::Context;
#[cfg(all(feature = "event-queue-api"))]
use crate::event::Channel;
use crate::handle::root::NapiRef;

/// `InstanceData` holds Neon data associated with a particular instance of a
Expand All @@ -30,6 +32,10 @@ pub(crate) struct InstanceData {
/// given the cost of FFI, this optimization is omitted until the cost of an
/// `Arc` is demonstrated as significant.
drop_queue: Arc<ThreadsafeFunction<NapiRef>>,

/// Shared `Channel` that is cloned to be returned by the `cx.channel()` method
#[cfg(all(feature = "event-queue-api"))]
shared_channel: Channel,
}

fn drop_napi_ref(env: Option<Env>, data: NapiRef) {
Expand Down Expand Up @@ -58,12 +64,21 @@ impl InstanceData {

let drop_queue = unsafe {
let mut queue = ThreadsafeFunction::new(env, drop_napi_ref);
queue.unref(env);
queue.unref(env).unwrap();
queue
};

#[cfg(all(feature = "event-queue-api"))]
let shared_channel = {
let mut channel = Channel::new(cx);
channel.unref(cx);
channel
};

let data = InstanceData {
drop_queue: Arc::new(drop_queue),
#[cfg(all(feature = "event-queue-api"))]
shared_channel,
};

unsafe { &mut *neon_runtime::lifecycle::set_instance_data(env, data) }
Expand All @@ -73,4 +88,13 @@ impl InstanceData {
pub(crate) fn drop_queue<'a, C: Context<'a>>(cx: &mut C) -> Arc<ThreadsafeFunction<NapiRef>> {
Arc::clone(&InstanceData::get(cx).drop_queue)
}

/// Clones the shared channel and references it since new channels should start
/// referenced, but the shared channel is unreferenced.
#[cfg(all(feature = "event-queue-api"))]
pub(crate) fn channel<'a, C: Context<'a>>(cx: &mut C) -> Channel {
let mut channel = InstanceData::get(cx).shared_channel.clone();
channel.reference(cx);
channel
}
}