Skip to content

Commit

Permalink
src: add a threadsafe variant of SetImmediate()
Browse files Browse the repository at this point in the history
Add a variant of `SetImmediate()` that can be called from any thread.
This allows removing the `AsyncRequest` abstraction and replaces it
with a more generic mechanism.

PR-URL: nodejs#31386
Refs: openjs-foundation/summit#240
Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
addaleax authored and MylesBorins committed Apr 1, 2020
1 parent 5e1bcae commit f1aeed5
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 7 deletions.
19 changes: 18 additions & 1 deletion src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -732,13 +732,15 @@ Environment::NativeImmediateQueue::Shift() {
if (!head_)
tail_ = nullptr; // The queue is now empty.
}
size_--;
return ret;
}

void Environment::NativeImmediateQueue::Push(
std::unique_ptr<Environment::NativeImmediateCallback> cb) {
NativeImmediateCallback* prev_tail = tail_;

size_++;
tail_ = cb.get();
if (prev_tail != nullptr)
prev_tail->set_next(std::move(cb));
Expand All @@ -758,6 +760,10 @@ void Environment::NativeImmediateQueue::ConcatMove(
other.size_ = 0;
}

size_t Environment::NativeImmediateQueue::size() const {
return size_.load();
}

template <typename Fn>
void Environment::CreateImmediate(Fn&& cb, bool ref) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
Expand All @@ -779,6 +785,17 @@ void Environment::SetUnrefImmediate(Fn&& cb) {
CreateImmediate(std::move(cb), false);
}

template <typename Fn>
void Environment::SetImmediateThreadsafe(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_threadsafe_.Push(std::move(callback));
}
uv_async_send(&task_queues_async_);
}

Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
: refed_(refed) {}

Expand Down Expand Up @@ -1138,7 +1155,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) {
inline void Environment::RegisterFinalizationGroupForCleanup(
v8::Local<v8::FinalizationGroup> group) {
cleanup_finalization_groups_.emplace_back(isolate(), group);
uv_async_send(&cleanup_finalization_groups_async_);
uv_async_send(&task_queues_async_);
}

size_t CleanupHookCallback::Hash::operator()(
Expand Down
20 changes: 15 additions & 5 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -462,15 +462,16 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_check_init(event_loop(), &idle_check_handle_);
uv_async_init(
event_loop(),
&cleanup_finalization_groups_async_,
&task_queues_async_,
[](uv_async_t* async) {
Environment* env = ContainerOf(
&Environment::cleanup_finalization_groups_async_, async);
&Environment::task_queues_async_, async);
env->CleanupFinalizationGroups();
env->RunAndClearNativeImmediates();
});
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&cleanup_finalization_groups_async_));
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));

thread_stopper()->Install(
this, static_cast<void*>(this), [](uv_async_t* handle) {
Expand Down Expand Up @@ -534,7 +535,7 @@ void Environment::RegisterHandleCleanups() {
close_and_finish,
nullptr);
RegisterHandleCleanup(
reinterpret_cast<uv_handle_t*>(&cleanup_finalization_groups_async_),
reinterpret_cast<uv_handle_t*>(&task_queues_async_),
close_and_finish,
nullptr);
}
Expand Down Expand Up @@ -665,6 +666,15 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
"RunAndClearNativeImmediates", this);
size_t ref_count = 0;

// It is safe to check .size() first, because there is a causal relationship
// between pushes to the threadsafe and this function being called.
// For the common case, it's worth checking the size first before establishing
// a mutex lock.
if (native_immediates_threadsafe_.size() > 0) {
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_));
}

NativeImmediateQueue queue;
queue.ConcatMove(std::move(native_immediates_));

Expand Down Expand Up @@ -1078,7 +1088,7 @@ void Environment::CleanupFinalizationGroups() {
if (try_catch.HasCaught() && !try_catch.HasTerminated())
errors::TriggerUncaughtException(isolate(), try_catch);
// Re-schedule the execution of the remainder of the queue.
uv_async_send(&cleanup_finalization_groups_async_);
uv_async_send(&task_queues_async_);
return;
}
}
Expand Down
11 changes: 10 additions & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,9 @@ class Environment : public MemoryRetainer {
inline void SetImmediate(Fn&& cb);
template <typename Fn>
inline void SetUnrefImmediate(Fn&& cb);
template <typename Fn>
// This behaves like SetImmediate() but can be called from any thread.
inline void SetImmediateThreadsafe(Fn&& cb);
// This needs to be available for the JS-land setImmediate().
void ToggleImmediateRef(bool ref);

Expand Down Expand Up @@ -1281,7 +1284,7 @@ class Environment : public MemoryRetainer {
uv_idle_t immediate_idle_handle_;
uv_prepare_t idle_prepare_handle_;
uv_check_t idle_check_handle_;
uv_async_t cleanup_finalization_groups_async_;
uv_async_t task_queues_async_;
bool profiler_idle_notifier_started_ = false;

AsyncHooks async_hooks_;
Expand Down Expand Up @@ -1431,12 +1434,18 @@ class Environment : public MemoryRetainer {
// 'other' afterwards.
inline void ConcatMove(NativeImmediateQueue&& other);

// size() is atomic and may be called from any thread.
inline size_t size() const;

private:
std::atomic<size_t> size_ {0};
std::unique_ptr<NativeImmediateCallback> head_;
NativeImmediateCallback* tail_ = nullptr;
};

NativeImmediateQueue native_immediates_;
Mutex native_immediates_threadsafe_mutex_;
NativeImmediateQueue native_immediates_threadsafe_;

void RunAndClearNativeImmediates(bool only_refed = false);
static void CheckImmediate(uv_check_t* handle);
Expand Down

0 comments on commit f1aeed5

Please sign in to comment.