diff --git a/src/env-inl.h b/src/env-inl.h index 50d6ac6aef9a1c..1c83e76cda7fbd 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -894,8 +894,21 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) { sub_worker_contexts_.erase(context); } +inline void Environment::add_refs(int64_t diff) { + task_queues_async_refs_ += diff; + CHECK_GE(task_queues_async_refs_, 0); + if (task_queues_async_refs_ == 0) + uv_unref(reinterpret_cast(&task_queues_async_)); + else + uv_ref(reinterpret_cast(&task_queues_async_)); +} + inline bool Environment::is_stopping() const { - return thread_stopper_.is_stopped(); + return is_stopping_.load(); +} + +inline void Environment::set_stopping(bool value) { + is_stopping_.store(value); } inline std::list* Environment::extra_linked_bindings() { @@ -1205,14 +1218,6 @@ int64_t Environment::base_object_count() const { return base_object_count_; } -bool AsyncRequest::is_stopped() const { - return stopped_.load(); -} - -void AsyncRequest::set_stopped(bool flag) { - stopped_.store(flag); -} - #define VP(PropertyName, StringValue) V(v8::Private, PropertyName) #define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName) #define VS(PropertyName, StringValue) V(v8::String, PropertyName) diff --git a/src/env.cc b/src/env.cc index a07e1dafcfe2bf..35ac40a3d99824 100644 --- a/src/env.cc +++ b/src/env.cc @@ -474,14 +474,6 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { uv_unref(reinterpret_cast(&idle_check_handle_)); uv_unref(reinterpret_cast(&task_queues_async_)); - thread_stopper()->Install( - this, static_cast(this), [](uv_async_t* handle) { - Environment* env = static_cast(handle->data); - uv_stop(env->event_loop()); - }); - thread_stopper()->set_stopped(false); - uv_unref(reinterpret_cast(thread_stopper()->GetHandle())); - // Register clean-up cb to be called to clean up the handles // when the environment is freed, note that they are not cleaned in // the one environment per process setup, but will be called in @@ -499,8 +491,9 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) { void Environment::ExitEnv() { set_can_call_into_js(false); - thread_stopper()->Stop(); + set_stopping(true); isolate_->TerminateExecution(); + SetImmediateThreadsafe([](Environment* env) { uv_stop(env->event_loop()); }); } void Environment::RegisterHandleCleanups() { @@ -605,7 +598,6 @@ void Environment::RunCleanup() { started_cleanup_ = true; TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment), "RunCleanup", this); - thread_stopper()->Uninstall(); CleanupHandles(); while (!cleanup_hooks_.empty()) { @@ -1017,7 +1009,6 @@ inline size_t Environment::SelfSize() const { // TODO(joyeecheung): refactor the MemoryTracker interface so // this can be done for common types within the Track* calls automatically // if a certain scope is entered. - size -= sizeof(thread_stopper_); size -= sizeof(async_hooks_); size -= sizeof(tick_info_); size -= sizeof(immediate_info_); @@ -1039,7 +1030,6 @@ void Environment::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackField("fs_stats_field_array", fs_stats_field_array_); tracker->TrackField("fs_stats_field_bigint_array", fs_stats_field_bigint_array_); - tracker->TrackField("thread_stopper", thread_stopper_); tracker->TrackField("cleanup_hooks", cleanup_hooks_); tracker->TrackField("async_hooks", async_hooks_); tracker->TrackField("immediate_info", immediate_info_); @@ -1103,38 +1093,6 @@ void Environment::CleanupFinalizationGroups() { } } -void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) { - CHECK_NULL(async_); - env_ = env; - async_ = new uv_async_t; - async_->data = data; - CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0); -} - -void AsyncRequest::Uninstall() { - if (async_ != nullptr) { - env_->CloseHandle(async_, [](uv_async_t* async) { delete async; }); - async_ = nullptr; - } -} - -void AsyncRequest::Stop() { - set_stopped(true); - if (async_ != nullptr) uv_async_send(async_); -} - -uv_async_t* AsyncRequest::GetHandle() { - return async_; -} - -void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const { - if (async_ != nullptr) tracker->TrackField("async_request", *async_); -} - -AsyncRequest::~AsyncRequest() { - CHECK_NULL(async_); -} - // Not really any better place than env.cc at this moment. void BaseObject::DeleteMe(void* data) { BaseObject* self = static_cast(data); diff --git a/src/env.h b/src/env.h index cd1b8e9517059a..74ad13e76094a8 100644 --- a/src/env.h +++ b/src/env.h @@ -587,34 +587,6 @@ struct AllocatedBuffer { friend class Environment; }; -class AsyncRequest : public MemoryRetainer { - public: - AsyncRequest() = default; - ~AsyncRequest() override; - - AsyncRequest(const AsyncRequest&) = delete; - AsyncRequest& operator=(const AsyncRequest&) = delete; - AsyncRequest(AsyncRequest&&) = delete; - AsyncRequest& operator=(AsyncRequest&&) = delete; - - void Install(Environment* env, void* data, uv_async_cb target); - void Uninstall(); - void Stop(); - inline void set_stopped(bool flag); - inline bool is_stopped() const; - uv_async_t* GetHandle(); - void MemoryInfo(MemoryTracker* tracker) const override; - - - SET_MEMORY_INFO_NAME(AsyncRequest) - SET_SELF_SIZE(AsyncRequest) - - private: - Environment* env_; - uv_async_t* async_ = nullptr; - std::atomic_bool stopped_ {true}; -}; - class KVStore { public: KVStore() = default; @@ -1062,6 +1034,14 @@ class Environment : public MemoryRetainer { inline bool can_call_into_js() const; inline void set_can_call_into_js(bool can_call_into_js); + // Increase or decrease a counter that manages whether this Environment + // keeps the event loop alive on its own or not. The counter starts out at 0, + // meaning it does not, and any positive value will make it keep the event + // loop alive. + // This is used by Workers to manage their own .ref()/.unref() implementation, + // as Workers aren't directly associated with their own libuv handles. + inline void add_refs(int64_t diff); + inline bool has_run_bootstrapping_code() const; inline void set_has_run_bootstrapping_code(bool has_run_bootstrapping_code); @@ -1082,6 +1062,7 @@ class Environment : public MemoryRetainer { inline void remove_sub_worker_context(worker::Worker* context); void stop_sub_worker_contexts(); inline bool is_stopping() const; + inline void set_stopping(bool value); inline std::list* extra_linked_bindings(); inline node_module* extra_linked_bindings_head(); inline const Mutex& extra_linked_bindings_mutex() const; @@ -1223,8 +1204,6 @@ class Environment : public MemoryRetainer { inline std::shared_ptr options(); inline std::shared_ptr inspector_host_port(); - inline AsyncRequest* thread_stopper() { return &thread_stopper_; } - // The BaseObject count is a debugging helper that makes sure that there are // no memory leaks caused by BaseObjects staying alive longer than expected // (in particular, no circular BaseObjectPtr references). @@ -1285,6 +1264,7 @@ class Environment : public MemoryRetainer { uv_prepare_t idle_prepare_handle_; uv_check_t idle_check_handle_; uv_async_t task_queues_async_; + int64_t task_queues_async_refs_ = 0; bool profiler_idle_notifier_started_ = false; AsyncHooks async_hooks_; @@ -1342,7 +1322,7 @@ class Environment : public MemoryRetainer { bool has_run_bootstrapping_code_ = false; bool has_serialized_options_ = false; - bool can_call_into_js_ = true; + std::atomic_bool can_call_into_js_ { true }; Flags flags_; uint64_t thread_id_; std::unordered_set sub_worker_contexts_; @@ -1460,10 +1440,7 @@ class Environment : public MemoryRetainer { bool started_cleanup_ = false; int64_t base_object_count_ = 0; - - // A custom async abstraction (a pair of async handle and a state variable) - // Used by embedders to shutdown running Node instance. - AsyncRequest thread_stopper_; + std::atomic_bool is_stopping_ { false }; template void ForEachBaseObject(T&& iterator); diff --git a/src/node_worker.cc b/src/node_worker.cc index 26f7ddab5aa400..bdc2a8024c97af 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -268,7 +268,7 @@ void Worker::Run() { stopped_ = true; this->env_ = nullptr; } - env_->thread_stopper()->set_stopped(true); + env_->set_stopping(true); env_->stop_sub_worker_contexts(); env_->RunCleanup(); RunAtExit(env_.get()); @@ -412,7 +412,6 @@ void Worker::JoinThread() { thread_joined_ = true; env()->remove_sub_worker_context(this); - on_thread_finished_.Uninstall(); { HandleScope handle_scope(env()->isolate()); @@ -439,6 +438,8 @@ void Worker::JoinThread() { } Worker::~Worker() { + JoinThread(); + Mutex::ScopedLock lock(mutex_); CHECK(stopped_); @@ -574,18 +575,16 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { w->stopped_ = false; w->thread_joined_ = false; - w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) { - Worker* w_ = static_cast(handle->data); - CHECK(w_->is_stopped()); - w_->parent_port_ = nullptr; - w_->JoinThread(); - delete w_; - }); + if (w->has_ref_) + w->env()->add_refs(1); uv_thread_options_t thread_options; thread_options.flags = UV_THREAD_HAS_STACK_SIZE; thread_options.stack_size = kStackSize; CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) { + // XXX: This could become a std::unique_ptr, but that makes at least + // gcc 6.3 detect undefined behaviour when there shouldn't be any. + // gcc 7+ handles this well. Worker* w = static_cast(arg); const uintptr_t stack_top = reinterpret_cast(&arg); @@ -596,7 +595,12 @@ void Worker::StartThread(const FunctionCallbackInfo& args) { w->Run(); Mutex::ScopedLock lock(w->mutex_); - w->on_thread_finished_.Stop(); + w->env()->SetImmediateThreadsafe( + [w = std::unique_ptr(w)](Environment* env) { + if (w->has_ref_) + env->add_refs(-1); + // implicitly delete w + }); }, static_cast(w)), 0); } @@ -611,13 +615,19 @@ void Worker::StopThread(const FunctionCallbackInfo& args) { void Worker::Ref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - uv_ref(reinterpret_cast(w->on_thread_finished_.GetHandle())); + if (!w->has_ref_) { + w->has_ref_ = true; + w->env()->add_refs(1); + } } void Worker::Unref(const FunctionCallbackInfo& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - uv_unref(reinterpret_cast(w->on_thread_finished_.GetHandle())); + if (w->has_ref_) { + w->has_ref_ = false; + w->env()->add_refs(-1); + } } void Worker::GetResourceLimits(const FunctionCallbackInfo& args) { diff --git a/src/node_worker.h b/src/node_worker.h index 7b1311734a2a4a..632644202a53ba 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -41,7 +41,6 @@ class Worker : public AsyncWrap { void MemoryInfo(MemoryTracker* tracker) const override { tracker->TrackField("parent_port", parent_port_); - tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_"); } SET_MEMORY_INFO_NAME(Worker) @@ -107,14 +106,14 @@ class Worker : public AsyncWrap { // instance refers to it via its [kPort] property. MessagePort* parent_port_ = nullptr; - AsyncRequest on_thread_finished_; - // A raw flag that is used by creator and worker threads to // sync up on pre-mature termination of worker - while in the // warmup phase. Once the worker is fully warmed up, use the // async handle of the worker's Environment for the same purpose. bool stopped_ = true; + bool has_ref_ = true; + // The real Environment of the worker object. It has a lesser // lifespan than the worker object itself - comes to life // when the worker thread creates a new Environment, and gets