Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: add public method for marking objects as untransferable #33979

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,42 @@ if (isMainThread) {
}
```

## `worker.markAsUntransferable(object)`
<!-- YAML
added: REPLACEME
-->

Mark an object as not transferable. If `object` occurs in the transfer list of
a [`port.postMessage()`][] call, it will be ignored.

In particular, this makes sense for objects that can be cloned, rather than
transferred, and which are used by other objects on the sending side.
For example, Node.js marks the `ArrayBuffer`s it uses for its
[`Buffer` pool][`Buffer.allocUnsafe()`] with this.

This operation cannot be undone.

```js
const { MessageChannel, markAsUntransferable } = require('worker_threads');

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
port1.postMessage(typedArray1, [ typedArray1.buffer ]);

// The following line prints the contents of typedArray1 -- it still owns its
// memory and has been cloned, not transfered. Without `markAsUntransferable()`,
// this would print an empty Uint8Array. typedArray2 is intact as well.
console.log(typedArray1);
console.log(typedArray2);
```

There is no equivalent to this API in browsers.

## `worker.moveMessagePortToContext(port, contextifiedSandbox)`
<!-- YAML
added: v11.13.0
Expand Down Expand Up @@ -439,6 +475,9 @@ For `Buffer` instances, specifically, whether the underlying
`ArrayBuffer` can be transferred or cloned depends entirely on how
instances were created, which often cannot be reliably determined.

An `ArrayBuffer` can be marked with [`markAsUntransferable()`][] to indicate
that it should always be cloned and never transferred.

Depending on how a `Buffer` instance was created, it may or may
not own its underlying `ArrayBuffer`. An `ArrayBuffer` must not
be transferred unless it is known that the `Buffer` instance
Expand Down Expand Up @@ -855,6 +894,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`WebAssembly.Module`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/WebAssembly/Module
[`Worker`]: #worker_threads_class_worker
[`cluster` module]: cluster.html
[`markAsUntransferable()`]: #worker_threads_worker_markasuntransferable_object
[`port.on('message')`]: #worker_threads_event_message
[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage
[`port.postMessage()`]: #worker_threads_port_postmessage_value_transferlist
Expand Down
6 changes: 2 additions & 4 deletions lib/buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,11 @@ const {
zeroFill: bindingZeroFill
} = internalBinding('buffer');
const {
arraybuffer_untransferable_private_symbol,
getOwnNonIndexProperties,
propertyFilter: {
ALL_PROPERTIES,
ONLY_ENUMERABLE
},
setHiddenValue,
} = internalBinding('util');
const {
customInspectSymbol,
Expand All @@ -83,7 +81,6 @@ const {
} = require('internal/util/inspect');
const { encodings } = internalBinding('string_decoder');


const {
codes: {
ERR_BUFFER_OUT_OF_BOUNDS,
Expand All @@ -104,6 +101,7 @@ const {

const {
FastBuffer,
markAsUntransferable,
addBufferPrototypeMethods
} = require('internal/buffer');

Expand Down Expand Up @@ -156,7 +154,7 @@ function createUnsafeBuffer(size) {
function createPool() {
poolSize = Buffer.poolSize;
allocPool = createUnsafeBuffer(poolSize).buffer;
setHiddenValue(allocPool, arraybuffer_untransferable_private_symbol, true);
markAsUntransferable(allocPool);
poolOffset = 0;
}
createPool();
Expand Down
15 changes: 14 additions & 1 deletion lib/internal/buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const {
ucs2Write,
utf8Write
} = internalBinding('buffer');
const {
untransferable_object_private_symbol,
setHiddenValue,
} = internalBinding('util');

// Temporary buffers to convert numbers.
const float32Array = new Float32Array(1);
Expand Down Expand Up @@ -1007,7 +1011,16 @@ function addBufferPrototypeMethods(proto) {
proto.utf8Write = utf8Write;
}

// This would better be placed in internal/worker/io.js, but that doesn't work
// because Buffer needs this and that would introduce a cyclic dependency.
function markAsUntransferable(obj) {
if ((typeof obj !== 'object' && typeof obj !== 'function') || obj === null)
return; // This object is a primitive and therefore already untransferable.
setHiddenValue(obj, untransferable_object_private_symbol, true);
}

module.exports = {
FastBuffer,
addBufferPrototypeMethods
addBufferPrototypeMethods,
markAsUntransferable,
};
7 changes: 6 additions & 1 deletion lib/worker_threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@ const {
MessagePort,
MessageChannel,
moveMessagePortToContext,
receiveMessageOnPort
receiveMessageOnPort,
} = require('internal/worker/io');

const {
markAsUntransferable,
} = require('internal/buffer');

module.exports = {
isMainThread,
MessagePort,
MessageChannel,
markAsUntransferable,
moveMessagePortToContext,
receiveMessageOnPort,
resourceLimits,
Expand Down
2 changes: 1 addition & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ constexpr size_t kFsStatsBufferLength =
// "node:" prefix to avoid name clashes with third-party code.
#define PER_ISOLATE_PRIVATE_SYMBOL_PROPERTIES(V) \
V(alpn_buffer_private_symbol, "node:alpnBuffer") \
V(arraybuffer_untransferable_private_symbol, "node:untransferableBuffer") \
V(arrow_message_private_symbol, "node:arrowMessage") \
V(contextify_context_private_symbol, "node:contextify:context") \
V(contextify_global_private_symbol, "node:contextify:global") \
V(decorated_private_symbol, "node:decorated") \
V(napi_wrapper, "node:napi:wrapper") \
V(untransferable_object_private_symbol, "node:untransferableObject") \

// Symbols are per-isolate primitives but Environment proxies them
// for the sake of convenience.
Expand Down
2 changes: 1 addition & 1 deletion src/node_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct node_napi_env__ : public napi_env__ {
v8::Local<v8::ArrayBuffer> ab) const override {
return ab->SetPrivate(
context(),
node_env()->arraybuffer_untransferable_private_symbol(),
node_env()->untransferable_object_private_symbol(),
v8::True(isolate));
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/node_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ MaybeLocal<Object> New(Environment* env,
Local<ArrayBuffer> ab =
CallbackInfo::CreateTrackedArrayBuffer(env, data, length, callback, hint);
if (ab->SetPrivate(env->context(),
env->arraybuffer_untransferable_private_symbol(),
env->untransferable_object_private_symbol(),
True(env->isolate())).IsNothing()) {
return Local<Object>();
}
Expand Down Expand Up @@ -1179,7 +1179,7 @@ void Initialize(Local<Object> target,
ArrayBuffer::New(env->isolate(), std::move(backing));
array_buffer->SetPrivate(
env->context(),
env->arraybuffer_untransferable_private_symbol(),
env->untransferable_object_private_symbol(),
True(env->isolate())).Check();
CHECK(target
->Set(env->context(),
Expand Down
26 changes: 15 additions & 11 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,21 @@ Maybe<bool> Message::Serialize(Environment* env,
std::vector<Local<ArrayBuffer>> array_buffers;
for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
Local<Value> entry = transfer_list_v[i];
// Currently, we support ArrayBuffers and MessagePorts.
if (entry->IsObject()) {
// See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
// for details.
bool untransferable;
if (!entry.As<Object>()->HasPrivate(
context,
env->untransferable_object_private_symbol())
.To(&untransferable)) {
return Nothing<bool>();
}
if (untransferable) continue;
}

// Currently, we support ArrayBuffers and BaseObjects for which
// GetTransferMode() does not return kUntransferable.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
// If we cannot render the ArrayBuffer unusable in this Isolate,
Expand All @@ -411,16 +425,6 @@ Maybe<bool> Message::Serialize(Environment* env,
// is always going to outlive any Workers it creates, and so will its
// allocator along with it.
if (!ab->IsDetachable()) continue;
// See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
// for details.
bool untransferrable;
if (!ab->HasPrivate(
context,
env->arraybuffer_untransferable_private_symbol())
.To(&untransferrable)) {
return Nothing<bool>();
}
if (untransferrable) continue;
if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
array_buffers.end()) {
ThrowDataCloneException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { MessageChannel, markAsUntransferable } = require('worker_threads');

{
const ab = new ArrayBuffer(8);

markAsUntransferable(ab);
assert.strictEqual(ab.byteLength, 8);

const { port1, port2 } = new MessageChannel();
port1.postMessage(ab, [ ab ]);

assert.strictEqual(ab.byteLength, 8); // The AB is not detached.
port2.once('message', common.mustCall());
}

{
const channel1 = new MessageChannel();
const channel2 = new MessageChannel();

markAsUntransferable(channel2.port1);

assert.throws(() => {
channel1.port1.postMessage(channel2.port1, [ channel2.port1 ]);
}, /was found in message but not listed in transferList/);

channel2.port1.postMessage('still works, not closed/transferred');
channel2.port2.once('message', common.mustCall());
}

{
for (const value of [0, null, false, true, undefined, [], {}]) {
markAsUntransferable(value); // Has no visible effect.
}
}