Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -1613,6 +1613,19 @@ added: v10.5.0
The `'online'` event is emitted when the worker thread has started executing
JavaScript code.

### `worker.cpuUsage([prev])`

<!-- YAML
added:
- REPLACEME
-->

* Returns: {Promise}

This method returns a `Promise` that will resolve to an object identical to [`process.threadCpuUsage()`][],
or reject with an [`ERR_WORKER_NOT_RUNNING`][] error if the worker is no longer running.
This methods allows the statistics to be observed from outside the actual thread.

### `worker.getHeapSnapshot([options])`

<!-- YAML
Expand Down Expand Up @@ -1975,6 +1988,7 @@ thread spawned will spawn another until the application crashes.
[`process.stderr`]: process.md#processstderr
[`process.stdin`]: process.md#processstdin
[`process.stdout`]: process.md#processstdout
[`process.threadCpuUsage()`]: process.md#processthreadcpuusagepreviousvalue
[`process.title`]: process.md#processtitle
[`require('node:worker_threads').isMainThread`]: #workerismainthread
[`require('node:worker_threads').parentPort.on('message')`]: #event-message
Expand Down
35 changes: 34 additions & 1 deletion lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const {
Float64Array,
FunctionPrototypeBind,
MathMax,
NumberMAX_SAFE_INTEGER,
ObjectEntries,
Promise,
PromiseResolve,
Expand Down Expand Up @@ -41,6 +42,7 @@ const {
ERR_WORKER_INVALID_EXEC_ARGV,
ERR_INVALID_ARG_TYPE,
ERR_INVALID_ARG_VALUE,
ERR_OPERATION_FAILED,
} = errorCodes;

const workerIo = require('internal/worker/io');
Expand All @@ -61,7 +63,7 @@ const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker
const { deserializeError } = require('internal/error_serdes');
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
const { kEmptyObject } = require('internal/util');
const { validateArray, validateString } = require('internal/validators');
const { validateArray, validateString, validateObject, validateNumber } = require('internal/validators');
const {
throwIfBuildingSnapshot,
} = require('internal/v8/startup_snapshot');
Expand Down Expand Up @@ -466,6 +468,37 @@ class Worker extends EventEmitter {
};
});
}

cpuUsage(prev) {
if (prev) {
validateObject(prev, 'prev');
validateNumber(prev.user, 'prev.user', 0, NumberMAX_SAFE_INTEGER);
validateNumber(prev.system, 'prev.system', 0, NumberMAX_SAFE_INTEGER);
}
if (process.platform === 'sunos') {
throw new ERR_OPERATION_FAILED('worker.cpuUsage() is not available on SunOS');
}
const taker = this[kHandle]?.cpuUsage();
return new Promise((resolve, reject) => {
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
taker.ondone = (err, current) => {
if (err !== null) {
return reject(err);
}
if (prev) {
resolve({
user: current.user - prev.user,
system: current.system - prev.system,
});
} else {
resolve({
user: current.user,
system: current.system,
});
}
};
});
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ namespace node {
V(UDPWRAP) \
V(SIGINTWATCHDOG) \
V(WORKER) \
V(WORKERCPUUSAGE) \
V(WORKERHEAPSNAPSHOT) \
V(WORKERHEAPSTATISTICS) \
V(WRITEWRAP) \
Expand Down
1 change: 1 addition & 0 deletions src/env_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,7 @@
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tty_constructor_template, v8::FunctionTemplate) \
V(write_wrap_template, v8::ObjectTemplate) \
V(worker_cpu_usage_taker_template, v8::ObjectTemplate) \
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
V(worker_heap_statistics_taker_template, v8::ObjectTemplate) \
V(x509_constructor_template, v8::FunctionTemplate)
Expand Down
91 changes: 91 additions & 0 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ using v8::Isolate;
using v8::Local;
using v8::Locker;
using v8::Maybe;
using v8::Name;
using v8::Null;
using v8::Number;
using v8::Object;
Expand Down Expand Up @@ -810,6 +811,81 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
}
}

class WorkerCpuUsageTaker : public AsyncWrap {
public:
WorkerCpuUsageTaker(Environment* env, Local<Object> obj)
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERCPUUSAGE) {}

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(WorkerCpuUsageTaker)
SET_SELF_SIZE(WorkerCpuUsageTaker)
};

void Worker::CpuUsage(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());

Environment* env = w->env();
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
Local<Object> wrap;
if (!env->worker_cpu_usage_taker_template()
->NewInstance(env->context())
.ToLocal(&wrap)) {
return;
}

BaseObjectPtr<WorkerCpuUsageTaker> taker =
MakeDetachedBaseObject<WorkerCpuUsageTaker>(env, wrap);

bool scheduled = w->RequestInterrupt([taker = std::move(taker),
env](Environment* worker_env) mutable {
auto cpu_usage_stats = std::make_unique<uv_rusage_t>();
int err = uv_getrusage_thread(cpu_usage_stats.get());

env->SetImmediateThreadsafe(
[taker = std::move(taker),
cpu_usage_stats = std::move(cpu_usage_stats),
err = err](Environment* env) mutable {
Isolate* isolate = env->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());

Local<Value> argv[] = {
Null(isolate),
Undefined(isolate),
};

if (err) {
argv[0] = UVException(
isolate, err, "uv_getrusage_thread", nullptr, nullptr, nullptr);
} else {
Local<Name> names[] = {
FIXED_ONE_BYTE_STRING(isolate, "user"),
FIXED_ONE_BYTE_STRING(isolate, "system"),
};
Local<Value> values[] = {
Number::New(isolate,
1e6 * cpu_usage_stats->ru_utime.tv_sec +
cpu_usage_stats->ru_utime.tv_usec),
Number::New(isolate,
1e6 * cpu_usage_stats->ru_stime.tv_sec +
cpu_usage_stats->ru_stime.tv_usec),
};
argv[1] = Object::New(
isolate, Null(isolate), names, values, arraysize(names));
}

taker->MakeCallback(env->ondone_string(), arraysize(argv), argv);
},
CallbackFlags::kUnrefed);
});

if (scheduled) {
args.GetReturnValue().Set(wrap);
}
}

class WorkerHeapStatisticsTaker : public AsyncWrap {
public:
WorkerHeapStatisticsTaker(Environment* env, Local<Object> obj)
Expand Down Expand Up @@ -1101,6 +1177,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
SetProtoMethod(isolate, w, "getHeapStatistics", Worker::GetHeapStatistics);
SetProtoMethod(isolate, w, "cpuUsage", Worker::CpuUsage);

SetConstructorFunction(isolate, target, "Worker", w);
}
Expand Down Expand Up @@ -1133,6 +1210,19 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
wst->InstanceTemplate());
}

{
Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);

wst->InstanceTemplate()->SetInternalFieldCount(
WorkerCpuUsageTaker::kInternalFieldCount);
wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));

Local<String> wst_string =
FIXED_ONE_BYTE_STRING(isolate, "WorkerCpuUsageTaker");
wst->SetClassName(wst_string);
isolate_data->set_worker_cpu_usage_taker_template(wst->InstanceTemplate());
}

SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
}

Expand Down Expand Up @@ -1199,6 +1289,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Worker::LoopIdleTime);
registry->Register(Worker::LoopStartTime);
registry->Register(Worker::GetHeapStatistics);
registry->Register(Worker::CpuUsage);
}

} // anonymous namespace
Expand Down
1 change: 1 addition & 0 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class Worker : public AsyncWrap {
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetHeapStatistics(
const v8::FunctionCallbackInfo<v8::Value>& args);
static void CpuUsage(const v8::FunctionCallbackInfo<v8::Value>& args);

private:
bool CreateEnvMessagePort(Environment* env);
Expand Down
81 changes: 81 additions & 0 deletions test/parallel/test-worker-cpu-usage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
'use strict';
const common = require('../common');
const { isSunOS } = require('../common');
const assert = require('assert');
const {
Worker,
} = require('worker_threads');

function validate(result) {
assert.ok(typeof result == 'object' && result !== null);
assert.ok(result.user >= 0);
assert.ok(result.system >= 0);
assert.ok(Number.isFinite(result.user));
assert.ok(Number.isFinite(result.system));
}

function check(worker) {
[
-1,
1.1,
NaN,
undefined,
{},
[],
null,
function() {},
Symbol(),
true,
Infinity,
{ user: -1, system: 1 },
{ user: 1, system: -1 },
].forEach((value) => {
try {
worker.cpuUsage(value);
} catch (e) {
assert.ok(/ERR_OUT_OF_RANGE|ERR_INVALID_ARG_TYPE/i.test(e.code));
}
});
}

const worker = new Worker(`
const { parentPort } = require('worker_threads');
parentPort.on('message', () => {});
`, { eval: true });

// See test-process-threadCpuUsage-main-thread.js
if (isSunOS) {
assert.throws(
() => worker.cpuUsage(),
{
code: 'ERR_OPERATION_FAILED',
name: 'Error',
message: 'Operation failed: worker.cpuUsage() is not available on SunOS'
}
);
worker.terminate();
} else {
worker.on('online', common.mustCall(async () => {
check(worker);

const prev = await worker.cpuUsage();
validate(prev);

const curr = await worker.cpuUsage();
validate(curr);

assert.ok(curr.user >= prev.user);
assert.ok(curr.system >= prev.system);

const delta = await worker.cpuUsage(curr);
validate(delta);

worker.terminate();
}));

worker.once('exit', common.mustCall(async () => {
await assert.rejects(worker.cpuUsage(), {
code: 'ERR_WORKER_NOT_RUNNING'
});
}));
}
1 change: 1 addition & 0 deletions test/sequential/test-async-wrap-getasyncid.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const { getSystemErrorName } = require('util');
delete providers.SIGINTWATCHDOG;
delete providers.WORKERHEAPSNAPSHOT;
delete providers.WORKERHEAPSTATISTICS;
delete providers.WORKERCPUUSAGE;
delete providers.BLOBREADER;
delete providers.RANDOMPRIMEREQUEST;
delete providers.CHECKPRIMEREQUEST;
Expand Down
1 change: 1 addition & 0 deletions typings/internalBinding/worker.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ declare namespace InternalWorkerBinding {
getResourceLimits(): Float64Array;
takeHeapSnapshot(): object;
getHeapStatistics(): Promise<object>;
cpuUsage(): Promise<object>;
loopIdleTime(): number;
loopStartTime(): number;
}
Expand Down
Loading