From 09de2e53e24521c3f63106d0d698a051202ba438 Mon Sep 17 00:00:00 2001 From: Santiago Gimeno Date: Thu, 3 Nov 2022 18:26:58 +0100 Subject: [PATCH] worker: allow retrieving elu from parent Usually, when extracting the ELU from a specific JS thread is better to do it from a different thread as the event loop we're observing might already be blocked. The `Worker.performance.eventLoopUtilization()` method allows us to do this for worker threads, but there's not a way to do this for the main thread. This new API, which allows us to retrieve the ELU of the parent thread from a specific worker, is going to enable this. For the moment, I have defined this new API in ``` require('worker_threads').parent.performance.eventLoopUtilization() ``` though I haven't added documentation yet as a) I want to know first whether this approach is acceptable, and in case it is, b) I'm not really sure whether that's the place the API should live in. Would love receiving feedback on this. --- lib/internal/main/worker_thread.js | 3 ++ lib/internal/worker.js | 48 ++++++++++++++++- lib/internal/worker/io.js | 3 +- lib/worker_threads.js | 8 ++- src/node_worker.cc | 11 ++++ src/node_worker.h | 3 ++ ...worker-parent-performance-eventlooputil.js | 54 +++++++++++++++++++ 7 files changed, 127 insertions(+), 3 deletions(-) create mode 100644 test/parallel/test-worker-parent-performance-eventlooputil.js diff --git a/lib/internal/main/worker_thread.js b/lib/internal/main/worker_thread.js index 9ae04e288fc70c..c256f10aef9573 100644 --- a/lib/internal/main/worker_thread.js +++ b/lib/internal/main/worker_thread.js @@ -29,6 +29,7 @@ const { messageTypes: { // Messages that may be received by workers LOAD_SCRIPT, + PARENT_LOOP_START, // Messages that may be posted from workers UP_AND_RUNNING, ERROR_MESSAGE, @@ -159,6 +160,8 @@ port.on('message', (message) => { const CJSLoader = require('internal/modules/cjs/loader'); CJSLoader.Module.runMain(filename); } + } else if (message.type === PARENT_LOOP_START) { + require('internal/worker').setParentEventLoopStartTime(message.value); } else if (message.type === STDIO_PAYLOAD) { const { stream, chunks } = message; ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => { diff --git a/lib/internal/worker.js b/lib/internal/worker.js index d88170ab9cd9cf..067233e0416217 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -27,6 +27,7 @@ const { const EventEmitter = require('events'); const assert = require('internal/assert'); const path = require('path'); +const { setImmediate } = require('timers'); const { internalEventLoopUtilization } = require('internal/perf/event_loop_utilization'); @@ -60,6 +61,13 @@ const { fileURLToPath, isURLInstance, pathToFileURL } = require('internal/url'); const { kEmptyObject } = require('internal/util'); const { validateArray } = require('internal/validators'); +const { + constants: { + NODE_PERFORMANCE_MILESTONE_LOOP_START, + }, + milestones, +} = internalBinding('performance'); + const { ownsProcessState, isMainThread, @@ -70,7 +78,8 @@ const { kMaxOldGenerationSizeMb, kCodeRangeSizeMb, kStackSizeMb, - kTotalResourceLimitCount + kTotalResourceLimitCount, + parentLoopIdleTime, } = internalBinding('worker'); const kHandle = Symbol('kHandle'); @@ -83,6 +92,7 @@ const kOnErrorMessage = Symbol('kOnErrorMessage'); const kParentSideStdio = Symbol('kParentSideStdio'); const kLoopStartTime = Symbol('kLoopStartTime'); const kIsOnline = Symbol('kIsOnline'); +const kSendLoopStart = Symbol('kSendLoopStart'); const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV'); let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { @@ -265,6 +275,13 @@ class Worker extends EventEmitter { this[kHandle].startThread(); process.nextTick(() => process.emit('worker', this)); + // Send current thread loopStart to the worker. In case the loop has not yet + // started send it after one iteration. + if (milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] === -1) { + setImmediate(() => this[kSendLoopStart]()); + } else { + this[kSendLoopStart](); + } if (workerThreadsChannel.hasSubscribers) { workerThreadsChannel.publish({ worker: this, @@ -346,6 +363,15 @@ class Worker extends EventEmitter { } } + [kSendLoopStart]() { + if (this[kPort]) { + this[kPort].postMessage({ + type: messageTypes.PARENT_LOOP_START, + value: milestones[NODE_PERFORMANCE_MILESTONE_LOOP_START] / 1e6 + }); + } + } + postMessage(...args) { if (this[kPublicPort] === null) return; @@ -490,6 +516,24 @@ function eventLoopUtilization(util1, util2) { ); } +let parentEventLoopStartTime = -1; +function setParentEventLoopStartTime(time) { + parentEventLoopStartTime = time; +} + +function parentEventLoopUtilization(util1, util2) { + if (parentEventLoopStartTime === -1) { + return { idle: 0, active: 0, utilization: 0 }; + } + + return internalEventLoopUtilization( + parentEventLoopStartTime, + parentLoopIdleTime(), + util1, + util2 + ); +} + module.exports = { ownsProcessState, isMainThread, @@ -501,4 +545,6 @@ module.exports = { assignEnvironmentData, threadId, Worker, + setParentEventLoopStartTime, + parentEventLoopUtilization }; diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 61f9a5363716a8..8186f4d3ddd8e4 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -89,7 +89,8 @@ const messageTypes = { ERROR_MESSAGE: 'errorMessage', STDIO_PAYLOAD: 'stdioPayload', STDIO_WANTS_MORE_DATA: 'stdioWantsMoreData', - LOAD_SCRIPT: 'loadScript' + LOAD_SCRIPT: 'loadScript', + PARENT_LOOP_START: 'parentLoopStart' }; // We have to mess with the MessagePort prototype a bit, so that a) we can make diff --git a/lib/worker_threads.js b/lib/worker_threads.js index 9d702fa2883447..b45a309c28c0bc 100644 --- a/lib/worker_threads.js +++ b/lib/worker_threads.js @@ -7,7 +7,8 @@ const { setEnvironmentData, getEnvironmentData, threadId, - Worker + Worker, + parentEventLoopUtilization } = require('internal/worker'); const { @@ -38,4 +39,9 @@ module.exports = { BroadcastChannel, setEnvironmentData, getEnvironmentData, + parent: isMainThread ? null : { + performance: { + eventLoopUtilization: parentEventLoopUtilization + } + } }; diff --git a/src/node_worker.cc b/src/node_worker.cc index 571160a14ebb1e..96a99d82cd6888 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -56,6 +56,7 @@ Worker::Worker(Environment* env, per_isolate_opts_(per_isolate_opts), exec_argv_(exec_argv), platform_(env->isolate_data()->platform()), + parent_loop_(env->event_loop()), thread_id_(AllocateEnvironmentThreadId()), env_vars_(env_vars), snapshot_data_(snapshot_data) { @@ -865,6 +866,14 @@ void Worker::LoopStartTime(const FunctionCallbackInfo& args) { args.GetReturnValue().Set(loop_start_time / 1e6); } +void Worker::ParentLoopIdleTime(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + CHECK(!env->is_main_thread()); + Worker* w = env->worker_context(); + uint64_t idle_time = uv_metrics_idle_time(w->parent_loop_); + args.GetReturnValue().Set(1.0 * idle_time / 1e6); +} + namespace { // Return the MessagePort that is global for this Environment and communicates @@ -921,6 +930,7 @@ void InitWorker(Local target, } SetMethod(context, target, "getEnvMessagePort", GetEnvMessagePort); + SetMethod(context, target, "parentLoopIdleTime", Worker::ParentLoopIdleTime); target ->Set(env->context(), @@ -967,6 +977,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) { registry->Register(Worker::TakeHeapSnapshot); registry->Register(Worker::LoopIdleTime); registry->Register(Worker::LoopStartTime); + registry->Register(Worker::ParentLoopIdleTime); } } // anonymous namespace diff --git a/src/node_worker.h b/src/node_worker.h index dcb58d13e0e6f9..9a5defad41137f 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -75,6 +75,7 @@ class Worker : public AsyncWrap { static void TakeHeapSnapshot(const v8::FunctionCallbackInfo& args); static void LoopIdleTime(const v8::FunctionCallbackInfo& args); static void LoopStartTime(const v8::FunctionCallbackInfo& args); + static void ParentLoopIdleTime(const v8::FunctionCallbackInfo&); private: bool CreateEnvMessagePort(Environment* env); @@ -91,6 +92,8 @@ class Worker : public AsyncWrap { std::unique_ptr inspector_parent_handle_; + uv_loop_t* parent_loop_; + // This mutex protects access to all variables listed below it. mutable Mutex mutex_; diff --git a/test/parallel/test-worker-parent-performance-eventlooputil.js b/test/parallel/test-worker-parent-performance-eventlooputil.js new file mode 100644 index 00000000000000..69fe61550c5d3b --- /dev/null +++ b/test/parallel/test-worker-parent-performance-eventlooputil.js @@ -0,0 +1,54 @@ +'use strict'; + +const { mustCall } = require('../common'); + +const TIMEOUT = 10; +const SPIN_DUR = 50; + +const assert = require('assert'); +const { Worker, parent, workerData } = require('worker_threads'); + +// Do not use isMainThread directly, otherwise the test would time out in case +// it's started inside of another worker thread. +if (!process.env.HAS_STARTED_WORKER) { + process.env.HAS_STARTED_WORKER = '1'; + const i32arr = new Int32Array(new SharedArrayBuffer(4)); + const w = new Worker(__filename, { workerData: i32arr }); + w.on('online', mustCall(() => { + Atomics.wait(i32arr, 0, 0); + + const t = Date.now(); + while (Date.now() - t < SPIN_DUR); + + Atomics.store(i32arr, 0, 0); + Atomics.notify(i32arr, 0); + Atomics.wait(i32arr, 0, 0); + })); +} else { + setTimeout(() => { + const { eventLoopUtilization } = parent.performance; + const i32arr = workerData; + const elu1 = eventLoopUtilization(); + + Atomics.store(i32arr, 0, 1); + Atomics.notify(i32arr, 0); + Atomics.wait(i32arr, 0, 1); + + const elu2 = eventLoopUtilization(elu1); + const elu3 = eventLoopUtilization(); + const elu4 = eventLoopUtilization(elu3, elu1); + + assert.strictEqual(elu2.idle, 0); + assert.strictEqual(elu4.idle, 0); + assert.strictEqual(elu2.utilization, 1); + assert.strictEqual(elu4.utilization, 1); + assert.strictEqual(elu3.active - elu1.active, elu4.active); + assert.ok(elu2.active > SPIN_DUR - 10, `${elu2.active} <= ${SPIN_DUR - 10}`); + assert.ok(elu2.active < elu4.active, `${elu2.active} >= ${elu4.active}`); + assert.ok(elu3.active > elu2.active, `${elu3.active} <= ${elu2.active}`); + assert.ok(elu3.active > elu4.active, `${elu3.active} <= ${elu4.active}`); + + Atomics.store(i32arr, 0, 1); + Atomics.notify(i32arr, 0); + }, TIMEOUT); +}