Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: refactor thread id management #25796

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ const { pathToFileURL } = require('url');

const {
Worker: WorkerImpl,
threadId
threadId,
isMainThread
} = internalBinding('worker');

const isMainThread = threadId === 0;

const kHandle = Symbol('kHandle');
const kPublicPort = Symbol('kPublicPort');
const kDispose = Symbol('kDispose');
Expand Down
4 changes: 3 additions & 1 deletion src/api/environment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,9 @@ Environment* CreateEnvironment(IsolateData* isolate_data,
// options than the global parse call.
std::vector<std::string> args(argv, argv + argc);
std::vector<std::string> exec_args(exec_argv, exec_argv + exec_argc);
Environment* env = new Environment(isolate_data, context);
// TODO(addaleax): Provide more sensible flags, in an embedder-accessible way.
Environment* env =
new Environment(isolate_data, context, Environment::kIsMainThread);
env->Start(per_process::v8_is_profiling);
env->ProcessCliArgs(args, exec_args);
return env;
Expand Down
6 changes: 1 addition & 5 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -647,17 +647,13 @@ inline void Environment::set_has_run_bootstrapping_code(bool value) {
}

inline bool Environment::is_main_thread() const {
return thread_id_ == 0;
return flags_ & kIsMainThread;
}

inline uint64_t Environment::thread_id() const {
return thread_id_;
}

inline void Environment::set_thread_id(uint64_t id) {
thread_id_ = id;
}

inline worker::Worker* Environment::worker_context() const {
return worker_context_;
}
Expand Down
8 changes: 7 additions & 1 deletion src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <stdio.h>
#include <algorithm>
#include <atomic>

namespace node {

Expand Down Expand Up @@ -166,8 +167,11 @@ void Environment::TrackingTraceStateObserver::UpdateTraceCategoryState() {
0, nullptr).ToLocalChecked();
}

static std::atomic<uint64_t> next_thread_id{0};

Environment::Environment(IsolateData* isolate_data,
Local<Context> context)
Local<Context> context,
Flags flags)
: isolate_(context->GetIsolate()),
isolate_data_(isolate_data),
immediate_info_(context->GetIsolate()),
Expand All @@ -176,6 +180,8 @@ Environment::Environment(IsolateData* isolate_data,
should_abort_on_uncaught_toggle_(isolate_, 1),
trace_category_state_(isolate_, kTraceCategoryCount),
stream_base_state_(isolate_, StreamBase::kNumStreamBaseStateFields),
flags_(flags),
thread_id_(next_thread_id++),
fs_stats_field_array_(isolate_, kFsStatsBufferLength),
fs_stats_field_bigint_array_(isolate_, kFsStatsBufferLength),
context_(context->GetIsolate(), context) {
Expand Down
12 changes: 9 additions & 3 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ class Environment {
DISALLOW_COPY_AND_ASSIGN(TickInfo);
};

enum Flags {
joyeecheung marked this conversation as resolved.
Show resolved Hide resolved
kNoFlags = 0,
kIsMainThread = 1
};

static inline Environment* GetCurrent(v8::Isolate* isolate);
static inline Environment* GetCurrent(v8::Local<v8::Context> context);
static inline Environment* GetCurrent(
Expand All @@ -608,7 +613,8 @@ class Environment {
static inline Environment* GetThreadLocalEnv();

Environment(IsolateData* isolate_data,
v8::Local<v8::Context> context);
v8::Local<v8::Context> context,
Flags flags = Flags());
~Environment();

void Start(bool start_profiler_idle_notifier);
Expand Down Expand Up @@ -761,7 +767,6 @@ class Environment {

inline bool is_main_thread() const;
inline uint64_t thread_id() const;
inline void set_thread_id(uint64_t id);
inline worker::Worker* worker_context() const;
inline void set_worker_context(worker::Worker* context);
inline void add_sub_worker_context(worker::Worker* context);
Expand Down Expand Up @@ -1005,7 +1010,8 @@ class Environment {

bool has_run_bootstrapping_code_ = false;
bool can_call_into_js_ = true;
uint64_t thread_id_ = 0;
Flags flags_;
uint64_t thread_id_;
std::unordered_set<worker::Worker*> sub_worker_contexts_;

static void* const kNodeContextTagPtr;
Expand Down
2 changes: 1 addition & 1 deletion src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data,
HandleScope handle_scope(isolate);
Local<Context> context = NewContext(isolate);
Context::Scope context_scope(context);
Environment env(isolate_data, context);
Environment env(isolate_data, context, Environment::kIsMainThread);
env.Start(per_process::v8_is_profiling);
env.ProcessCliArgs(args, exec_args);

Expand Down
44 changes: 23 additions & 21 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

using node::options_parser::kDisallowedInEnvironment;
using v8::ArrayBuffer;
using v8::Boolean;
using v8::Context;
using v8::Function;
using v8::FunctionCallbackInfo;
Expand All @@ -33,9 +34,6 @@ namespace worker {

namespace {

uint64_t next_thread_id = 1;
Mutex next_thread_id_mutex;

#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
void StartWorkerInspector(Environment* child, const std::string& url) {
child->inspector_agent()->Start(url,
Expand Down Expand Up @@ -74,17 +72,7 @@ Worker::Worker(Environment* env,
const std::string& url,
std::shared_ptr<PerIsolateOptions> per_isolate_opts)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER), url_(url) {
// Generate a new thread id.
{
Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex);
thread_id_ = next_thread_id++;
}

Debug(this, "Creating worker with id %llu", thread_id_);
wrap->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(),
static_cast<double>(thread_id_))).FromJust();
Debug(this, "Creating new worker instance at %p", static_cast<void*>(this));

// Set up everything that needs to be set up in the parent environment.
parent_port_ = MessagePort::New(env, env->context());
Expand Down Expand Up @@ -130,7 +118,7 @@ Worker::Worker(Environment* env,
CHECK_NE(env_, nullptr);
env_->set_abort_on_uncaught_exception(false);
env_->set_worker_context(this);
env_->set_thread_id(thread_id_);
thread_id_ = env_->thread_id();

env_->Start(env->profiler_idle_notifier_started());
env_->ProcessCliArgs(std::vector<std::string>{},
Expand All @@ -142,7 +130,15 @@ Worker::Worker(Environment* env,
// The new isolate won't be bothered on this thread again.
isolate_->DiscardThreadSpecificMetadata();

Debug(this, "Set up worker with id %llu", thread_id_);
wrap->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(), static_cast<double>(thread_id_)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now come to think of it, maybe this should be a BigInt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nodejs/workers Thoughts? BigInt seems like it might be a bit much, but since it’s really just an identifier, that might be okay?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds OK to me

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to be relevant that'd require spawning more than 9,007,199,254,740,991 threads.

If we can spawn 1000 threads every seconds (a lot) and our program does nothing but spawn threads and we disregard any other limit or cleanup - it would take us ~285,616 years to exhaust that number of threads and overflow.

9007199254740991 / 1000 (threads per second) / 60 (seconds per minute) / 60 (minutes per hour) / 24 (hours per day) / 365 (days per year) = ~285616

If our machine is super fast and we spawn 100,000 threads per second instead, it would still take us over 2000 years. I certainly hope no one is going to run a Node.js server for that long.

I think a double is fine 😅

.FromJust();

Debug(this,
"Set up worker at %p with id %llu",
static_cast<void*>(this),
thread_id_);
}

bool Worker::is_stopped() const {
Expand Down Expand Up @@ -562,11 +558,17 @@ void InitWorker(Local<Object> target,

env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);

auto thread_id_string = FIXED_ONE_BYTE_STRING(env->isolate(), "threadId");
target->Set(env->context(),
thread_id_string,
Number::New(env->isolate(),
static_cast<double>(env->thread_id()))).FromJust();
target
->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(), static_cast<double>(env->thread_id())))
.FromJust();

target
->Set(env->context(),
FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
Boolean::New(env->isolate(), env->is_main_thread()))
.FromJust();
}

} // anonymous namespace
Expand Down