Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: refactor thread stopping mechanism #26757

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) {
}

inline bool Environment::is_stopping() const {
return thread_stopper_.IsStopped();
return thread_stopper_.is_stopped();
}

inline performance::performance_state* Environment::performance_state() {
Expand Down Expand Up @@ -979,6 +979,14 @@ void Environment::ForEachBaseObject(T&& iterator) {
}
}

bool AsyncRequest::is_stopped() const {
return stopped_.load();
}

void AsyncRequest::set_stopped(bool flag) {
stopped_.store(flag);
}

#define VP(PropertyName, StringValue) V(v8::Private, PropertyName)
#define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName)
#define VS(PropertyName, StringValue) V(v8::String, PropertyName)
Expand Down
29 changes: 7 additions & 22 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,13 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));

GetAsyncRequest()->Install(
thread_stopper()->Install(
this, static_cast<void*>(this), [](uv_async_t* handle) {
Environment* env = static_cast<Environment*>(handle->data);
uv_stop(env->event_loop());
});
GetAsyncRequest()->SetStopped(false);
uv_unref(reinterpret_cast<uv_handle_t*>(GetAsyncRequest()->GetHandle()));
thread_stopper()->set_stopped(false);
uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper()->GetHandle()));

// Register clean-up cb to be called to clean up the handles
// when the environment is freed, note that they are not cleaned in
Expand All @@ -365,7 +365,7 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {

void Environment::ExitEnv() {
set_can_call_into_js(false);
GetAsyncRequest()->Stop();
thread_stopper()->Stop();
isolate_->TerminateExecution();
}

Expand Down Expand Up @@ -533,7 +533,7 @@ void Environment::RunCleanup() {
started_cleanup_ = true;
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
"RunCleanup", this);
GetAsyncRequest()->Uninstall();
thread_stopper()->Uninstall();
CleanupHandles();

while (!cleanup_hooks_.empty()) {
Expand Down Expand Up @@ -948,49 +948,34 @@ char* Environment::Reallocate(char* data, size_t old_size, size_t size) {
}

void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
Mutex::ScopedLock lock(mutex_);
CHECK_NULL(async_);
env_ = env;
async_ = new uv_async_t;
async_->data = data;
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
}

void AsyncRequest::Uninstall() {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr) {
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
async_ = nullptr;
}
}

void AsyncRequest::Stop() {
Mutex::ScopedLock lock(mutex_);
stop_ = true;
set_stopped(true);
if (async_ != nullptr) uv_async_send(async_);
}

void AsyncRequest::SetStopped(bool flag) {
Mutex::ScopedLock lock(mutex_);
stop_ = flag;
}

bool AsyncRequest::IsStopped() const {
Mutex::ScopedLock lock(mutex_);
return stop_;
}

uv_async_t* AsyncRequest::GetHandle() {
Mutex::ScopedLock lock(mutex_);
return async_;
}

void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
}

AsyncRequest::~AsyncRequest() {
Mutex::ScopedLock lock(mutex_);
CHECK_NULL(async_);
}

Expand Down
12 changes: 7 additions & 5 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "uv.h"
#include "v8.h"

#include <atomic>
#include <cstdint>
#include <functional>
#include <list>
Expand Down Expand Up @@ -518,18 +519,19 @@ class AsyncRequest : public MemoryRetainer {
void Install(Environment* env, void* data, uv_async_cb target);
void Uninstall();
void Stop();
void SetStopped(bool flag);
bool IsStopped() const;
inline void set_stopped(bool flag);
inline bool is_stopped() const;
uv_async_t* GetHandle();
void MemoryInfo(MemoryTracker* tracker) const override;


SET_MEMORY_INFO_NAME(AsyncRequest)
SET_SELF_SIZE(AsyncRequest)

private:
Environment* env_;
uv_async_t* async_ = nullptr;
mutable Mutex mutex_;
bool stop_ = true;
std::atomic_bool stopped_ {true};
};

class Environment {
Expand Down Expand Up @@ -1048,7 +1050,7 @@ class Environment {
inline ExecutionMode execution_mode() { return execution_mode_; }

inline void set_execution_mode(ExecutionMode mode) { execution_mode_ = mode; }
inline AsyncRequest* GetAsyncRequest() { return &thread_stopper_; }
inline AsyncRequest* thread_stopper() { return &thread_stopper_; }

private:
inline void CreateImmediate(native_immediate_callback cb,
Expand Down
4 changes: 2 additions & 2 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -832,14 +832,14 @@ inline int StartNodeWithIsolate(Isolate* isolate,
per_process::v8_platform.DrainVMTasks(isolate);

more = uv_loop_alive(env.event_loop());
if (more && !env.GetAsyncRequest()->IsStopped()) continue;
if (more && !env.is_stopping()) continue;

RunBeforeExit(&env);

// Emit `beforeExit` if the loop became alive either after emitting
// event, or after running some callbacks.
more = uv_loop_alive(env.event_loop());
} while (more == true && !env.GetAsyncRequest()->IsStopped());
} while (more == true && !env.is_stopping());
env.performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
}
Expand Down
7 changes: 4 additions & 3 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Worker::Worker(Environment* env,
bool Worker::is_stopped() const {
Mutex::ScopedLock lock(mutex_);
if (env_ != nullptr)
return env_->GetAsyncRequest()->IsStopped();
return env_->is_stopping();
return stopped_;
}

Expand Down Expand Up @@ -222,7 +222,7 @@ void Worker::Run() {
stopped_ = true;
this->env_ = nullptr;
}
env_->GetAsyncRequest()->SetStopped(true);
env_->thread_stopper()->set_stopped(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
Expand Down Expand Up @@ -381,7 +381,8 @@ void Worker::OnThreadStopped() {
Worker::~Worker() {
Mutex::ScopedLock lock(mutex_);

CHECK(stopped_ || env_ == nullptr || env_->GetAsyncRequest()->IsStopped());
CHECK(stopped_);
CHECK_NULL(env_);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, there was a control flow that takes to Worker destructor without nullifying env_ , not able to figure that out now; do you know?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gireeshpunathil I think that would be a bug – the child thread is not allowed to exist at this point (and the next CHECK verifies that the thread has been joined), and the child thread in turn owns the Environment.

CHECK(thread_joined_);

Debug(this, "Worker %llu destroyed", thread_id_);
Expand Down