From 600e96ec049a8985d17d2b0318e15f63db0c036b Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Wed, 15 Jan 2020 19:48:37 +0100 Subject: [PATCH] src: add a threadsafe variant of SetImmediate() Add a variant of `SetImmediate()` that can be called from any thread. This allows removing the `AsyncRequest` abstraction and replaces it with a more generic mechanism. PR-URL: https://github.com/nodejs/node/pull/31386 Refs: https://github.com/openjs-foundation/summit/pull/240 Reviewed-By: Gireesh Punathil Reviewed-By: James M Snell Reviewed-By: Colin Ihrig Reviewed-By: Rich Trott --- src/env-inl.h | 19 ++++++++++++++++++- src/env.cc | 20 +++++++++++++++----- src/env.h | 11 ++++++++++- 3 files changed, 43 insertions(+), 7 deletions(-) diff --git a/src/env-inl.h b/src/env-inl.h index e4fdbf580efbb4..50d6ac6aef9a1c 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -745,6 +745,7 @@ Environment::NativeImmediateQueue::Shift() { if (!head_) tail_ = nullptr; // The queue is now empty. } + size_--; return ret; } @@ -752,6 +753,7 @@ void Environment::NativeImmediateQueue::Push( std::unique_ptr cb) { NativeImmediateCallback* prev_tail = tail_; + size_++; tail_ = cb.get(); if (prev_tail != nullptr) prev_tail->set_next(std::move(cb)); @@ -771,6 +773,10 @@ void Environment::NativeImmediateQueue::ConcatMove( other.size_ = 0; } +size_t Environment::NativeImmediateQueue::size() const { + return size_.load(); +} + template void Environment::CreateImmediate(Fn&& cb, bool ref) { auto callback = std::make_unique>( @@ -792,6 +798,17 @@ void Environment::SetUnrefImmediate(Fn&& cb) { CreateImmediate(std::move(cb), false); } +template +void Environment::SetImmediateThreadsafe(Fn&& cb) { + auto callback = std::make_unique>( + std::move(cb), false); + { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + native_immediates_threadsafe_.Push(std::move(callback)); + } + uv_async_send(&task_queues_async_); +} + Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed) : refed_(refed) {} @@ -1151,7 +1168,7 @@ void Environment::RemoveCleanupHook(void (*fn)(void*), void* arg) { inline void Environment::RegisterFinalizationGroupForCleanup( v8::Local group) { cleanup_finalization_groups_.emplace_back(isolate(), group); - uv_async_send(&cleanup_finalization_groups_async_); + uv_async_send(&task_queues_async_); } size_t CleanupHookCallback::Hash::operator()( diff --git a/src/env.cc b/src/env.cc index b77d4f294d06e2..a07e1dafcfe2bf 100644 --- a/src/env.cc +++ b/src/env.cc @@ -463,15 +463,16 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { uv_check_init(event_loop(), &idle_check_handle_); uv_async_init( event_loop(), - &cleanup_finalization_groups_async_, + &task_queues_async_, [](uv_async_t* async) { Environment* env = ContainerOf( - &Environment::cleanup_finalization_groups_async_, async); + &Environment::task_queues_async_, async); env->CleanupFinalizationGroups(); + env->RunAndClearNativeImmediates(); }); uv_unref(reinterpret_cast(&idle_prepare_handle_)); uv_unref(reinterpret_cast(&idle_check_handle_)); - uv_unref(reinterpret_cast(&cleanup_finalization_groups_async_)); + uv_unref(reinterpret_cast(&task_queues_async_)); thread_stopper()->Install( this, static_cast(this), [](uv_async_t* handle) { @@ -535,7 +536,7 @@ void Environment::RegisterHandleCleanups() { close_and_finish, nullptr); RegisterHandleCleanup( - reinterpret_cast(&cleanup_finalization_groups_async_), + reinterpret_cast(&task_queues_async_), close_and_finish, nullptr); } @@ -666,6 +667,15 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) { "RunAndClearNativeImmediates", this); size_t ref_count = 0; + // It is safe to check .size() first, because there is a causal relationship + // between pushes to the threadsafe and this function being called. + // For the common case, it's worth checking the size first before establishing + // a mutex lock. + if (native_immediates_threadsafe_.size() > 0) { + Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_); + native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_)); + } + NativeImmediateQueue queue; queue.ConcatMove(std::move(native_immediates_)); @@ -1087,7 +1097,7 @@ void Environment::CleanupFinalizationGroups() { if (try_catch.HasCaught() && !try_catch.HasTerminated()) errors::TriggerUncaughtException(isolate(), try_catch); // Re-schedule the execution of the remainder of the queue. - uv_async_send(&cleanup_finalization_groups_async_); + uv_async_send(&task_queues_async_); return; } } diff --git a/src/env.h b/src/env.h index ed8e955ec25f93..cd1b8e9517059a 100644 --- a/src/env.h +++ b/src/env.h @@ -1196,6 +1196,9 @@ class Environment : public MemoryRetainer { inline void SetImmediate(Fn&& cb); template inline void SetUnrefImmediate(Fn&& cb); + template + // This behaves like SetImmediate() but can be called from any thread. + inline void SetImmediateThreadsafe(Fn&& cb); // This needs to be available for the JS-land setImmediate(). void ToggleImmediateRef(bool ref); @@ -1281,7 +1284,7 @@ class Environment : public MemoryRetainer { uv_idle_t immediate_idle_handle_; uv_prepare_t idle_prepare_handle_; uv_check_t idle_check_handle_; - uv_async_t cleanup_finalization_groups_async_; + uv_async_t task_queues_async_; bool profiler_idle_notifier_started_ = false; AsyncHooks async_hooks_; @@ -1433,12 +1436,18 @@ class Environment : public MemoryRetainer { // 'other' afterwards. inline void ConcatMove(NativeImmediateQueue&& other); + // size() is atomic and may be called from any thread. + inline size_t size() const; + private: + std::atomic size_ {0}; std::unique_ptr head_; NativeImmediateCallback* tail_ = nullptr; }; NativeImmediateQueue native_immediates_; + Mutex native_immediates_threadsafe_mutex_; + NativeImmediateQueue native_immediates_threadsafe_; void RunAndClearNativeImmediates(bool only_refed = false); static void CheckImmediate(uv_check_t* handle);