Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse ThreadsafeFunction in EventQueue #739

Merged
merged 12 commits into from
Jun 29, 2021

Conversation

indutny
Copy link
Contributor

@indutny indutny commented May 18, 2021

Node.js optimizes subsequent ThreadsafeFunction invocations to happen
during the same event loop tick, but only if the same instance of
ThreadsafeFunction is used. The performance improvement is most
noticeable when used in Electron, because scheduling a new UV tick in
Electron is very costly.

With this change EventQueue will use an
existing instance of ThreadsafeTrampoline (wrapper around
ThreadsafeFunction) if compiled with napi-6 feature, or it will fallback
to creating a new ThreadsafeFunction per EventQueue instance.

Fix: #727

@indutny indutny marked this pull request as draft May 18, 2021 23:32
@indutny
Copy link
Contributor Author

indutny commented May 18, 2021

Converted to draft because of the reference counting issues. Will keep you posted.

Update: it is fixed now!

@indutny-signal indutny-signal force-pushed the feature/shared-event-queue branch 2 times, most recently from 4caa935 to b9864a4 Compare May 18, 2021 23:49
@indutny indutny marked this pull request as ready for review May 18, 2021 23:49
@indutny indutny force-pushed the feature/shared-event-queue branch from b9864a4 to 76853a9 Compare May 19, 2021 01:11
@indutny-signal
Copy link
Contributor

(Force pushed to fix lint errors)

@kjvalencik
Copy link
Member

I left a comment here mentioning a couple of my thoughts: #727 (comment)

@indutny-signal
Copy link
Contributor

Replied.

Copy link
Contributor

@jrose-signal jrose-signal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty much the implementation I was envisioning, yep. :-) I like the compromise of having cx.queue() return a shared queue but still allowing EventQueue::new to create a new one.

I'll leave organizational and documentation feedback to the Neon maintainers. (In particular, I wonder if trampoline.rs should be a submodule of event_queue, since it's not going to be reused for anything else.)

Comment on lines 63 to 65
if self.ref_count == 0 {
return;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably panic, or however else Neon detects internal errors.

src/lifecycle.rs Outdated
pub(crate) fn threadsafe_trampoline<'a, C: Context<'a>>(
cx: &mut C,
) -> Arc<RwLock<ThreadsafeTrampoline>> {
Arc::clone(&InstanceData::get(cx).threadsafe_trampoline)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: it might be nice to lazily initialize this for programs that don't use EventQueues.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like in order to that I'll have to store it in an Arc<RwLock<Option<Arc<RwLock<ThreadsafeTrampoline>>>>> which doesn't look to nice to me. Is there any better way to do this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have an exclusive (&mut) reference to InstanceData, so you don't need extra locking. Option<Arc<_>> is sufficient.


pub(crate) struct ThreadsafeTrampoline {
tsfn: ThreadsafeFunction<Callback>,
ref_count: u32,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: if this were AtomicU32, you could avoid the lock. (You'd be able to use Relaxed operations safely because you're only checking the refcount, not relying on any other data to have been synchronized.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly, the lock is needed because .reference()/.unref() calls need mutable access to the ThreadsafeFunction.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since they require &mut Context, it implies the operations are serialized. We could get away with a Cell instead.

But, that might be an optimization better left for when it's demonstrated to be a performance bottleneck.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't imagine this being a bottleneck, and it is way too simple to do it with a mutex. I'm open to changing it, though.

Comment on lines 48 to 49
/// _Not idempotent_
pub(crate) fn reference(&mut self, env: Env) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Maybe we should make this more obviously distinct from the idempotent nature of ref by calling it increment_reference_count or something.

}
}

// Monomorphized trampoline funciton for calling the user provided closure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: function

tsfn,
has_ref: true,
}
Self::with_trampoline(cx, trampoline)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new returns a referenced trampoline, but with_trampoline expects an unreferenced one. I think the best answer here is to have with_trampoline take an already-referenced trampoline, which is consistent with fresh ones and not hard to implement for the shared one. Having it take a not-already-referenced one and immediately referencing it would also work, but would mean extra calls into Node for the fresh case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case it should probably be called with_referenced_trampoline or something so people don't forget.

@indutny-signal
Copy link
Contributor

@jrose-signal I believe I've addressed your feedback except for one suggestion. PTAL

Copy link
Contributor

@jrose-signal jrose-signal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment on lines 567 to 577
let shared_trampoline = {
let shared_trampoline = ThreadsafeTrampoline::new(self.env());
shared_trampoline.unref(self.env().to_raw());
shared_trampoline
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still a little less efficient but it's harder to get wrong, so maybe that's worth it.

src/trampoline.rs Outdated Show resolved Hide resolved
@indutny-signal
Copy link
Contributor

The latest commit on this PR actually fixes a bug, but it doesn't look like it could be reproduced without the proposed changes so I'm not sure if we should split this commit into a separate PR or handle here as well. Any suggestions are welcome!

@kjvalencik
Copy link
Member

That last bug looks interesting! It seems like it might be a bug in Node/N-API. I'll see if I can make a minimal non-Rust example and open an issue upstream.

@indutny
Copy link
Contributor Author

indutny commented May 20, 2021

Opened a separate PR to track it: #744

@@ -167,6 +179,12 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {

impl<T> Drop for ThreadsafeFunction<T> {
fn drop(&mut self) {
// tsfn was already finalized by `Environment::CleanupHandles()` in
// Node.js
if *self.is_finalized.lock().unwrap() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about the cases where we might end up running release concurrently to finalize callback. Maybe this has to be fixed in Node.js instead...

@indutny indutny force-pushed the feature/shared-event-queue branch from c22f932 to 74a21cb Compare June 1, 2021 21:19
@kjvalencik
Copy link
Member

I like the way this API is headed. One thing that I'm still not sure about is the asymmetry on EventQueue you get from EventQueue::new and ones you get from cx.queue(). The later shares a backing queue and the former doesn't, but it's not visible from the type signature and isn't super clear from usage.

Any ideas of how to make this more clear? I have a couple thoughts:

  • EventQueue could implement Clone in a way that shares the backing queue. This could be a way for users to leverage sharing outside of the global queue and maybe make things feel more symmetrical.
  • Introduce a new type for sharing. This is what tokio does, but increasing the API surface area might be confusing.

@indutny-signal
Copy link
Contributor

@kjvalencik I like the Clone idea! Let me see if I can concoct something later today.

@kjvalencik
Copy link
Member

@indutny Don't feel the need to jump on an implementation. Right now I'm trying to get a discussion going. I don't have a clear picture of what the API should be.

But, I also don't want to bikeshed too long because this optimization is very cool! 🚲 🏠

@indutny-signal
Copy link
Contributor

To confirm, nothing actionable for me right now? If yes - what's the plan for this PR?

@kjvalencik
Copy link
Member

Correct, nothing actionable. I'm going to think about the API a bit more and talk with @dherman while working on getting the other changes released.

Thanks!

@dherman
Copy link
Collaborator

dherman commented Jun 2, 2021

I like the idea of using Clone as a way to connect sharing to a user-visible operation, which then makes it a little easier to explain that the cx method always returns a clone of the same shared queue.

In #734 we experimented with renaming EventQueue to Channel, which maybe also helps here. EventQueue is a little mysterious since the queue is internal and the user of the API isn't actually directly interacting with it. I like the idea that you can create new channels or you can clone an existing one to share its internal queue, and cx.channel() gives you a cloned channel so in the common case you get the sharing by default.

@dherman
Copy link
Collaborator

dherman commented Jun 2, 2021

One more thought: I think just the one Channel type is sufficient even with cloning (as opposed to one type for the data structure that owns the queue and another type for the shared references). It's reminiscent of Rc and Arc, where you have a single type with interesting connections under the hood between the original and its clones, but the user doesn't generally need to worry about the difference; they all act as the same type.

@kjvalencik
Copy link
Member

kjvalencik commented Jun 3, 2021

@indutny Thanks again so much for this PR. I spent a good amount of time thinking about this and also discussing it with @dherman.

For both of us, what makes this all fit is impl Clone for EventQueue. This makes it semantically identical to things in Rust std that share a backing store when cloned (e.g., std::sync::Arc, std::mpsc::Sender). I think it also makes cx.queue() easy to understand--it's a clone of the globally cached queue.

There are a few changes I would like to make to this PR:

  • We renamed EventQueue to Channel (with deprecated aliases, to minimize the user impact). This (hopefully) more clearly illustrates what it is. This PR needs to be updated to use Channel everywhere.
  • Implement Clone for Channel. This can't be derived, but should be fairly straight forward. Clone the fields and increment the reference count (if and only if the channel being cloned is referenced).
  • Replace the cx.channel() implementation with a clone() + reference(&mut cx) of the InstanceData queue.
  • Remove the ThreadsafeTrampoline abstraction. In my opinion, this makes things more complicated than necessary. It's a little easier to understand with the functionality inlined into Channel. Possibly with a ChannelState helper struct to store the data inside the RwLock.
  • Add some additional documentation explaining the new behavior
  • Document that if needed for performance the RwLock may be replaced with an AtomicUsize and a small amount of unsafe since &mut cx already enforces the necessary invariants.

I know this is asking a lot and you have already been incredibly generous with your time, so I am happy to take over this PR if you would like me to, but I can also help guide it through if you prefer.

Cheers!

@indutny-signal
Copy link
Contributor

@kjvalencik I'll happily look into it. Will let you know how it goes!

@indutny-signal indutny-signal force-pushed the feature/shared-event-queue branch from 74a21cb to 3b8ec20 Compare June 3, 2021 22:46
@kjvalencik
Copy link
Member

kjvalencik commented Jun 7, 2021

I performed some benchmarks with this and unfortunately it performs worse in most scenarios and only marginally better in the ideal situation. (I didn't test in Electron, but I suspect it's still a large improvement there because of the very high cost of synchronizing event loops at each tick.)

Poking around a bit, I think the primary issue is acquiring a lock to perform Channel::try_send contending with the lock in Drop. The lock in Channel::try_send shouldn't be necessary because we are guaranteed the channel won't be dropped (we're holding a reference to it) and sending is Sync.

Unfortunately, I think the best fix is a more complicated solution that uses an atomic for reference counting instead of a RwLock. It will share a very similar design to the internals of Arc. It would be nice if we could simply use an Arc over a sentinel with a clever Drop implementation, but I haven't been able to think of a way to do that.

@indutny-signal indutny-signal force-pushed the feature/shared-event-queue branch from 0a662b0 to 4780c3b Compare June 8, 2021 16:55
}

impl ChannelState {
fn reference<'a, C: Context<'a>>(self: &Self, cx: &mut C) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: self: &Self is idiomatically written &self.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack!

src/context/mod.rs Outdated Show resolved Hide resolved
src/event/event_queue.rs Outdated Show resolved Hide resolved
src/event/event_queue.rs Show resolved Hide resolved
src/event/event_queue.rs Show resolved Hide resolved
src/event/event_queue.rs Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimization: share a single napi_threadsafe_function for all EventQueues
5 participants