Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: add ability to take heap snapshot from parent thread #31569

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,11 @@ The WASI instance has already started.
The `execArgv` option passed to the `Worker` constructor contains
invalid flags.

<a id="ERR_WORKER_NOT_RUNNING"></a>
### `ERR_WORKER_NOT_RUNNING`

An operation failed because the `Worker` instance is not currently running.

<a id="ERR_WORKER_OUT_OF_MEMORY"></a>
### `ERR_WORKER_OUT_OF_MEMORY`

Expand Down
17 changes: 17 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,21 @@ inside the worker thread. If `stdout: true` was not passed to the
[`Worker`][] constructor, then data will be piped to the parent thread's
[`process.stdout`][] stream.

### `worker.takeHeapSnapshot()`
<!-- YAML
added: REPLACEME
-->

* Returns: {Promise} A promise for a Readable Stream containing
a V8 heap snapshot

Returns a readable stream for a V8 snapshot of the current state of the Worker.
See [`v8.getHeapSnapshot()`][] for more details.

If the Worker thread is no longer running, which may occur before the
[`'exit'` event][] is emitted, the returned `Promise` will be rejected
immediately with an [`ERR_WORKER_NOT_RUNNING`][] error.

### `worker.terminate()`
<!-- YAML
added: v10.5.0
Expand Down Expand Up @@ -716,6 +731,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`'exit'` event]: #worker_threads_event_exit
[`AsyncResource`]: async_hooks.html#async_hooks_class_asyncresource
[`Buffer`]: buffer.html
[`ERR_WORKER_NOT_RUNNING`]: errors.html#ERR_WORKER_NOT_RUNNING
[`EventEmitter`]: events.html
[`EventTarget`]: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget
[`MessagePort`]: #worker_threads_class_messageport
Expand Down Expand Up @@ -743,6 +759,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`require('worker_threads').threadId`]: #worker_threads_worker_threadid
[`require('worker_threads').workerData`]: #worker_threads_worker_workerdata
[`trace_events`]: tracing.html
[`v8.getHeapSnapshot()`]: v8.html#v8_v8_getheapsnapshot
[`vm`]: vm.html
[`worker.on('message')`]: #worker_threads_event_message_1
[`worker.postMessage()`]: #worker_threads_worker_postmessage_value_transferlist
Expand Down
1 change: 1 addition & 0 deletions lib/internal/errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,7 @@ E('ERR_WASI_ALREADY_STARTED', 'WASI instance has already started', Error);
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors) =>
`Initiated Worker with invalid execArgv flags: ${errors.join(', ')}`,
Error);
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
addaleax marked this conversation as resolved.
Show resolved Hide resolved
E('ERR_WORKER_OUT_OF_MEMORY', 'Worker terminated due to reaching memory limit',
Error);
E('ERR_WORKER_PATH',
Expand Down
41 changes: 41 additions & 0 deletions lib/internal/heap_utils.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict';
const {
Symbol
} = primordials;
const {
kUpdateTimer,
onStreamRead,
} = require('internal/stream_base_commons');
const { owner_symbol } = require('internal/async_hooks').symbols;
const { Readable } = require('stream');

const kHandle = Symbol('kHandle');

class HeapSnapshotStream extends Readable {
constructor(handle) {
super({ autoDestroy: true });
addaleax marked this conversation as resolved.
Show resolved Hide resolved
this[kHandle] = handle;
handle[owner_symbol] = this;
handle.onread = onStreamRead;
}

_read() {
if (this[kHandle])
this[kHandle].readStart();
}

_destroy() {
// Release the references on the handle so that
// it can be garbage collected.
this[kHandle][owner_symbol] = undefined;
this[kHandle] = undefined;
}

[kUpdateTimer]() {
// Does nothing
}
}

module.exports = {
HeapSnapshotStream
};
12 changes: 12 additions & 0 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const path = require('path');

const errorCodes = require('internal/errors').codes;
const {
ERR_WORKER_NOT_RUNNING,
ERR_WORKER_PATH,
ERR_WORKER_UNSERIALIZABLE_ERROR,
ERR_WORKER_UNSUPPORTED_EXTENSION,
Expand Down Expand Up @@ -314,6 +315,17 @@ class Worker extends EventEmitter {

return makeResourceLimits(this[kHandle].getResourceLimits());
}

getHeapSnapshot() {
const heapSnapshotTaker = this[kHandle] && this[kHandle].takeHeapSnapshot();
return new Promise((resolve, reject) => {
if (!heapSnapshotTaker) return reject(new ERR_WORKER_NOT_RUNNING());
heapSnapshotTaker.ondone = (handle) => {
const { HeapSnapshotStream } = require('internal/heap_utils');
resolve(new HeapSnapshotStream(handle));
};
});
}
}

function pipeWithoutWarning(source, dest) {
Expand Down
37 changes: 2 additions & 35 deletions lib/v8.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ const {
Int8Array,
Map,
ObjectPrototypeToString,
Symbol,
Uint16Array,
Uint32Array,
Uint8Array,
Expand All @@ -48,14 +47,7 @@ const {
createHeapSnapshotStream,
triggerHeapSnapshot
} = internalBinding('heap_utils');
const { Readable } = require('stream');
const { owner_symbol } = require('internal/async_hooks').symbols;
const {
kUpdateTimer,
onStreamRead,
} = require('internal/stream_base_commons');
const kHandle = Symbol('kHandle');

const { HeapSnapshotStream } = require('internal/heap_utils');

function writeHeapSnapshot(filename) {
if (filename !== undefined) {
Expand All @@ -65,31 +57,6 @@ function writeHeapSnapshot(filename) {
return triggerHeapSnapshot(filename);
}

class HeapSnapshotStream extends Readable {
constructor(handle) {
super({ autoDestroy: true });
this[kHandle] = handle;
handle[owner_symbol] = this;
handle.onread = onStreamRead;
}

_read() {
if (this[kHandle])
this[kHandle].readStart();
}

_destroy() {
// Release the references on the handle so that
// it can be garbage collected.
this[kHandle][owner_symbol] = undefined;
this[kHandle] = undefined;
}

[kUpdateTimer]() {
// Does nothing
}
}

function getHeapSnapshot() {
const handle = createHeapSnapshotStream();
assert(handle);
Expand Down Expand Up @@ -321,5 +288,5 @@ module.exports = {
DefaultDeserializer,
deserialize,
serialize,
writeHeapSnapshot
writeHeapSnapshot,
};
1 change: 1 addition & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
'lib/internal/fs/utils.js',
'lib/internal/fs/watchers.js',
'lib/internal/http.js',
'lib/internal/heap_utils.js',
'lib/internal/idna.js',
'lib/internal/inspector_async_hook.js',
'lib/internal/js_stream_socket.js',
Expand Down
1 change: 1 addition & 0 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ namespace node {
V(UDPWRAP) \
V(SIGINTWATCHDOG) \
V(WORKER) \
V(WORKERHEAPSNAPSHOT) \
V(WRITEWRAP) \
V(ZLIB)

Expand Down
3 changes: 2 additions & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,8 @@ constexpr size_t kFsStatsBufferLength =
V(streambaseoutputstream_constructor_template, v8::ObjectTemplate) \
V(tcp_constructor_template, v8::FunctionTemplate) \
V(tty_constructor_template, v8::FunctionTemplate) \
V(write_wrap_template, v8::ObjectTemplate)
V(write_wrap_template, v8::ObjectTemplate) \
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate)

#define ENVIRONMENT_STRONG_PERSISTENT_VALUES(V) \
V(as_callback_data, v8::Object) \
Expand Down
74 changes: 39 additions & 35 deletions src/heap_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,26 +236,24 @@ class HeapSnapshotStream : public AsyncWrap,
public:
HeapSnapshotStream(
Environment* env,
const HeapSnapshot* snapshot,
HeapSnapshotPointer&& snapshot,
v8::Local<v8::Object> obj) :
AsyncWrap(env, obj, AsyncWrap::PROVIDER_HEAPSNAPSHOT),
StreamBase(env),
snapshot_(snapshot) {
snapshot_(std::move(snapshot)) {
MakeWeak();
StreamBase::AttachToObject(GetObject());
}

~HeapSnapshotStream() override {
Cleanup();
}
~HeapSnapshotStream() override {}

int GetChunkSize() override {
return 65536; // big chunks == faster
}

void EndOfStream() override {
EmitRead(UV_EOF);
Cleanup();
snapshot_.reset();
}

WriteResult WriteAsciiChunk(char* data, int size) override {
Expand Down Expand Up @@ -309,22 +307,13 @@ class HeapSnapshotStream : public AsyncWrap,
SET_SELF_SIZE(HeapSnapshotStream)

private:
void Cleanup() {
if (snapshot_ != nullptr) {
const_cast<HeapSnapshot*>(snapshot_)->Delete();
snapshot_ = nullptr;
}
}


const HeapSnapshot* snapshot_;
HeapSnapshotPointer snapshot_;
};

inline void TakeSnapshot(Isolate* isolate, v8::OutputStream* out) {
const HeapSnapshot* const snapshot =
isolate->GetHeapProfiler()->TakeHeapSnapshot();
HeapSnapshotPointer snapshot {
isolate->GetHeapProfiler()->TakeHeapSnapshot() };
snapshot->Serialize(out, HeapSnapshot::kJSON);
const_cast<HeapSnapshot*>(snapshot)->Delete();
}

inline bool WriteSnapshot(Isolate* isolate, const char* filename) {
Expand All @@ -339,20 +328,44 @@ inline bool WriteSnapshot(Isolate* isolate, const char* filename) {

} // namespace

void CreateHeapSnapshotStream(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
void DeleteHeapSnapshot(const v8::HeapSnapshot* snapshot) {
const_cast<HeapSnapshot*>(snapshot)->Delete();
}

BaseObjectPtr<AsyncWrap> CreateHeapSnapshotStream(
Environment* env, HeapSnapshotPointer&& snapshot) {
HandleScope scope(env->isolate());
const HeapSnapshot* const snapshot =
env->isolate()->GetHeapProfiler()->TakeHeapSnapshot();
CHECK_NOT_NULL(snapshot);

if (env->streambaseoutputstream_constructor_template().IsEmpty()) {
// Create FunctionTemplate for HeapSnapshotStream
Local<FunctionTemplate> os = FunctionTemplate::New(env->isolate());
os->Inherit(AsyncWrap::GetConstructorTemplate(env));
Local<ObjectTemplate> ost = os->InstanceTemplate();
ost->SetInternalFieldCount(StreamBase::kStreamBaseFieldCount);
os->SetClassName(
FIXED_ONE_BYTE_STRING(env->isolate(), "HeapSnapshotStream"));
StreamBase::AddMethods(env, os);
env->set_streambaseoutputstream_constructor_template(ost);
}

Local<Object> obj;
if (!env->streambaseoutputstream_constructor_template()
->NewInstance(env->context())
.ToLocal(&obj)) {
return;
return {};
}
HeapSnapshotStream* out = new HeapSnapshotStream(env, snapshot, obj);
args.GetReturnValue().Set(out->object());
return MakeBaseObject<HeapSnapshotStream>(env, std::move(snapshot), obj);
}

void CreateHeapSnapshotStream(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
HeapSnapshotPointer snapshot {
env->isolate()->GetHeapProfiler()->TakeHeapSnapshot() };
CHECK(snapshot);
BaseObjectPtr<AsyncWrap> stream =
CreateHeapSnapshotStream(env, std::move(snapshot));
if (stream)
args.GetReturnValue().Set(stream->object());
}

void TriggerHeapSnapshot(const FunctionCallbackInfo<Value>& args) {
Expand Down Expand Up @@ -388,15 +401,6 @@ void Initialize(Local<Object> target,
env->SetMethod(target, "buildEmbedderGraph", BuildEmbedderGraph);
env->SetMethod(target, "triggerHeapSnapshot", TriggerHeapSnapshot);
env->SetMethod(target, "createHeapSnapshotStream", CreateHeapSnapshotStream);

// Create FunctionTemplate for HeapSnapshotStream
Local<FunctionTemplate> os = FunctionTemplate::New(env->isolate());
os->Inherit(AsyncWrap::GetConstructorTemplate(env));
Local<ObjectTemplate> ost = os->InstanceTemplate();
ost->SetInternalFieldCount(StreamBase::kStreamBaseFieldCount);
os->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(), "HeapSnapshotStream"));
StreamBase::AddMethods(env, os);
env->set_streambaseoutputstream_constructor_template(ost);
}

} // namespace heap
Expand Down
10 changes: 10 additions & 0 deletions src/node_internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,16 @@ class TraceEventScope {
void* id_;
};

namespace heap {

void DeleteHeapSnapshot(const v8::HeapSnapshot* snapshot);
using HeapSnapshotPointer =
DeleteFnPtr<const v8::HeapSnapshot, DeleteHeapSnapshot>;

BaseObjectPtr<AsyncWrap> CreateHeapSnapshotStream(
Environment* env, HeapSnapshotPointer&& snapshot);
} // namespace heap

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
Expand Down
Loading