Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions doc/api/v8.md
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,33 @@ added: REPLACEME

Stopping collecting the profile and the profile will be discarded.

## Class: `HeapProfileHandle`

<!-- YAML
added: REPLACEME
-->

### `heapProfileHandle.stop()`

<!-- YAML
added: REPLACEME
-->

* Returns: {Promise}

Stopping collecting the profile, then return a Promise that fulfills with an error or the
profile data.

### `heapProfileHandle[Symbol.asyncDispose]()`

<!-- YAML
added: REPLACEME
-->

* Returns: {Promise}

Stopping collecting the profile and the profile will be discarded.

## `v8.isStringOneByteRepresentation(content)`

<!-- YAML
Expand Down
43 changes: 43 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -2001,6 +2001,49 @@ w.on('online', async () => {
});
```

### `worker.startHeapProfile()`

<!-- YAML
added: REPLACEME
-->

* Returns: {Promise}

Starting a Heap profile then return a Promise that fulfills with an error
or an `HeapProfileHandle` object. This API supports `await using` syntax.

```cjs
const { Worker } = require('node:worker_threads');

const worker = new Worker(`
const { parentPort } = require('worker_threads');
parentPort.on('message', () => {});
`, { eval: true });

worker.on('online', async () => {
const handle = await worker.startHeapProfile();
const profile = await handle.stop();
console.log(profile);
worker.terminate();
});
```

`await using` example.

```cjs
const { Worker } = require('node::worker_threads');

const w = new Worker(`
const { parentPort } = require('worker_threads');
parentPort.on('message', () => {});
`, { eval: true });

w.on('online', async () => {
// Stop profile automatically when return and profile will be discarded
await using handle = await w.startHeapProfile();
});
```

### `worker.stderr`

<!-- YAML
Expand Down
42 changes: 42 additions & 0 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,35 @@ class CPUProfileHandle {
}
}

class HeapProfileHandle {
#worker = null;
#promise = null;

constructor(worker) {
this.#worker = worker;
}

stop() {
if (this.#promise) {
return this.#promise;
}
const stopTaker = this.#worker[kHandle]?.stopHeapProfile();
return this.#promise = new Promise((resolve, reject) => {
if (!stopTaker) return reject(new ERR_WORKER_NOT_RUNNING());
stopTaker.ondone = (err, profile) => {
if (err) {
return reject(err);
}
resolve(profile);
};
});
};

async [SymbolAsyncDispose]() {
await this.stop();
}
}

class Worker extends EventEmitter {
constructor(filename, options = kEmptyObject) {
throwIfBuildingSnapshot('Creating workers');
Expand Down Expand Up @@ -551,6 +580,19 @@ class Worker extends EventEmitter {
};
});
}

startHeapProfile() {
const startTaker = this[kHandle]?.startHeapProfile();
return new Promise((resolve, reject) => {
if (!startTaker) return reject(new ERR_WORKER_NOT_RUNNING());
startTaker.ondone = (err) => {
if (err) {
return reject(err);
}
resolve(new HeapProfileHandle(this));
};
});
}
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ namespace node {
V(WORKER) \
V(WORKERCPUPROFILE) \
V(WORKERCPUUSAGE) \
V(WORKERHEAPPROFILE) \
V(WORKERHEAPSNAPSHOT) \
V(WORKERHEAPSTATISTICS) \
V(WRITEWRAP) \
Expand Down
1 change: 1 addition & 0 deletions src/env_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@
V(write_wrap_template, v8::ObjectTemplate) \
V(worker_cpu_profile_taker_template, v8::ObjectTemplate) \
V(worker_cpu_usage_taker_template, v8::ObjectTemplate) \
V(worker_heap_profile_taker_template, v8::ObjectTemplate) \
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
V(worker_heap_statistics_taker_template, v8::ObjectTemplate) \
V(x509_constructor_template, v8::FunctionTemplate)
Expand Down
2 changes: 2 additions & 0 deletions src/node_errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ void OOMErrorHandler(const char* location, const v8::OOMDetails& details);
V(ERR_FS_CP_SOCKET, Error) \
V(ERR_FS_CP_FIFO_PIPE, Error) \
V(ERR_FS_CP_UNKNOWN, Error) \
V(ERR_HEAP_PROFILE_HAVE_BEEN_STARTED, Error) \
V(ERR_HEAP_PROFILE_NOT_STARTED, Error) \
V(ERR_ILLEGAL_CONSTRUCTOR, Error) \
V(ERR_INVALID_ADDRESS, Error) \
V(ERR_INVALID_ARG_VALUE, TypeError) \
Expand Down
183 changes: 183 additions & 0 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

using node::kAllowedInEnvvar;
using node::kDisallowedInEnvvar;
using v8::AllocationProfile;
using v8::Array;
using v8::ArrayBuffer;
using v8::Boolean;
Expand All @@ -32,6 +33,7 @@ using v8::Float64Array;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::HeapProfiler;
using v8::HeapStatistics;
using v8::Integer;
using v8::Isolate;
Expand Down Expand Up @@ -1031,6 +1033,169 @@ void Worker::StopCpuProfile(const FunctionCallbackInfo<Value>& args) {
}
}

class WorkerHeapProfileTaker final : public AsyncWrap {
public:
WorkerHeapProfileTaker(Environment* env, Local<Object> obj)
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPPROFILE) {}

SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(WorkerHeapProfileTaker)
SET_SELF_SIZE(WorkerHeapProfileTaker)
};

void Worker::StartHeapProfile(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
Environment* env = w->env();

AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
Local<Object> wrap;
if (!env->worker_heap_profile_taker_template()
->NewInstance(env->context())
.ToLocal(&wrap)) {
return;
}

BaseObjectPtr<WorkerHeapProfileTaker> taker =
MakeDetachedBaseObject<WorkerHeapProfileTaker>(env, wrap);

bool scheduled = w->RequestInterrupt([taker = std::move(taker),
env](Environment* worker_env) mutable {
v8::HeapProfiler* profiler = worker_env->isolate()->GetHeapProfiler();
bool success = profiler->StartSamplingHeapProfiler();
env->SetImmediateThreadsafe(
[taker = std::move(taker),
success = success](Environment* env) mutable {
Isolate* isolate = env->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
Local<Value> argv[] = {
Null(isolate), // error
};
if (!success) {
argv[0] = ERR_HEAP_PROFILE_HAVE_BEEN_STARTED(
isolate, "heap profiler have been started");
}
taker->MakeCallback(env->ondone_string(), arraysize(argv), argv);
},
CallbackFlags::kUnrefed);
});

if (scheduled) {
args.GetReturnValue().Set(wrap);
}
}

static void buildHeapProfileNode(Isolate* isolate,
const AllocationProfile::Node* node,
JSONWriter* writer) {
size_t selfSize = 0;
for (const auto& allocation : node->allocations)
selfSize += allocation.size * allocation.count;

writer->json_keyvalue("selfSize", selfSize);
writer->json_keyvalue("id", node->node_id);
writer->json_objectstart("callFrame");
writer->json_keyvalue("scriptId", node->script_id);
writer->json_keyvalue("lineNumber", node->line_number - 1);
writer->json_keyvalue("columnNumber", node->column_number - 1);
node::Utf8Value name(isolate, node->name);
node::Utf8Value script_name(isolate, node->script_name);
writer->json_keyvalue("functionName", *name);
writer->json_keyvalue("url", *script_name);
writer->json_objectend();

writer->json_arraystart("children");
for (const auto* child : node->children) {
writer->json_start();
buildHeapProfileNode(isolate, child, writer);
writer->json_end();
}
writer->json_arrayend();
}

static bool serializeProfile(Isolate* isolate, std::ostringstream& out_stream) {
HandleScope scope(isolate);
HeapProfiler* profiler = isolate->GetHeapProfiler();
std::unique_ptr<AllocationProfile> profile(profiler->GetAllocationProfile());
if (!profile) {
return false;
}
JSONWriter writer(out_stream, false);
writer.json_start();

writer.json_arraystart("samples");
for (const auto& sample : profile->GetSamples()) {
writer.json_start();
writer.json_keyvalue("size", sample.size * sample.count);
writer.json_keyvalue("nodeId", sample.node_id);
writer.json_keyvalue("ordinal", static_cast<double>(sample.sample_id));
writer.json_end();
}
writer.json_arrayend();

writer.json_objectstart("head");
buildHeapProfileNode(isolate, profile->GetRootNode(), &writer);
writer.json_objectend();

writer.json_end();
profiler->StopSamplingHeapProfiler();
return true;
}

void Worker::StopHeapProfile(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());

Environment* env = w->env();
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
Local<Object> wrap;
if (!env->worker_heap_profile_taker_template()
->NewInstance(env->context())
.ToLocal(&wrap)) {
return;
}

BaseObjectPtr<WorkerHeapProfileTaker> taker =
MakeDetachedBaseObject<WorkerHeapProfileTaker>(env, wrap);

bool scheduled = w->RequestInterrupt([taker = std::move(taker),
env](Environment* worker_env) mutable {
std::ostringstream out_stream;
bool success = serializeProfile(worker_env->isolate(), out_stream);
env->SetImmediateThreadsafe(
[taker = std::move(taker),
out_stream = std::move(out_stream),
success = success](Environment* env) mutable {
Isolate* isolate = env->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker.get());
Local<Value> argv[] = {
Null(isolate), // error
Undefined(isolate), // profile
};
if (success) {
Local<Value> result;
if (!ToV8Value(env->context(), out_stream.str(), isolate)
.ToLocal(&result)) {
return;
}
argv[1] = result;
} else {
argv[0] = ERR_HEAP_PROFILE_NOT_STARTED(isolate,
"heap profile not started");
}
taker->MakeCallback(env->ondone_string(), arraysize(argv), argv);
},
CallbackFlags::kUnrefed);
});

if (scheduled) {
args.GetReturnValue().Set(wrap);
}
}
class WorkerHeapStatisticsTaker : public AsyncWrap {
public:
WorkerHeapStatisticsTaker(Environment* env, Local<Object> obj)
Expand Down Expand Up @@ -1328,6 +1493,8 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
SetProtoMethod(isolate, w, "cpuUsage", Worker::CpuUsage);
SetProtoMethod(isolate, w, "startCpuProfile", Worker::StartCpuProfile);
SetProtoMethod(isolate, w, "stopCpuProfile", Worker::StopCpuProfile);
SetProtoMethod(isolate, w, "startHeapProfile", Worker::StartHeapProfile);
SetProtoMethod(isolate, w, "stopHeapProfile", Worker::StopHeapProfile);

SetConstructorFunction(isolate, target, "Worker", w);
}
Expand Down Expand Up @@ -1387,6 +1554,20 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
wst->InstanceTemplate());
}

{
Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);

wst->InstanceTemplate()->SetInternalFieldCount(
WorkerHeapProfileTaker::kInternalFieldCount);
wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));

Local<String> wst_string =
FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapProfileTaker");
wst->SetClassName(wst_string);
isolate_data->set_worker_heap_profile_taker_template(
wst->InstanceTemplate());
}

SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
}

Expand Down Expand Up @@ -1466,6 +1647,8 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
registry->Register(Worker::CpuUsage);
registry->Register(Worker::StartCpuProfile);
registry->Register(Worker::StopCpuProfile);
registry->Register(Worker::StartHeapProfile);
registry->Register(Worker::StopHeapProfile);
}

} // anonymous namespace
Expand Down
Loading
Loading