Skip to content

Commit

Permalink
worker: add public method for marking objects as untransferable
Browse files Browse the repository at this point in the history
We currently mark a number of `ArrayBuffer`s as not transferable,
including the `Buffer` pool and ones with finalizers provided
by C++ addons.

There is no good reason to assume that userland code might not
encounter similar problems, for example when doing `ArrayBuffer`
pooling similar to ours. Therefore, provide an API that lets
userland code also mark objects as not transferable.

PR-URL: #33979
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Gus Caplan <me@gus.host>
  • Loading branch information
addaleax authored and codebytere committed Jun 30, 2020
1 parent ced68bf commit 4875a51
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 21 deletions.
40 changes: 40 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,42 @@ if (isMainThread) {
}
```

## `worker.markAsUntransferable(object)`
<!-- YAML
added: REPLACEME
-->

Mark an object as not transferable. If `object` occurs in the transfer list of
a [`port.postMessage()`][] call, it will be ignored.

In particular, this makes sense for objects that can be cloned, rather than
transferred, and which are used by other objects on the sending side.
For example, Node.js marks the `ArrayBuffer`s it uses for its
[`Buffer` pool][`Buffer.allocUnsafe()`] with this.

This operation cannot be undone.

```js
const { MessageChannel, markAsUntransferable } = require('worker_threads');

const pooledBuffer = new ArrayBuffer(8);
const typedArray1 = new Uint8Array(pooledBuffer);
const typedArray2 = new Float64Array(pooledBuffer);

markAsUntransferable(pooledBuffer);

const { port1 } = new MessageChannel();
port1.postMessage(typedArray1, [ typedArray1.buffer ]);

// The following line prints the contents of typedArray1 -- it still owns its
// memory and has been cloned, not transfered. Without `markAsUntransferable()`,
// this would print an empty Uint8Array. typedArray2 is intact as well.
console.log(typedArray1);
console.log(typedArray2);
```

There is no equivalent to this API in browsers.

## `worker.moveMessagePortToContext(port, contextifiedSandbox)`
<!-- YAML
added: v11.13.0
Expand Down Expand Up @@ -442,6 +478,9 @@ For `Buffer` instances, specifically, whether the underlying
`ArrayBuffer` can be transferred or cloned depends entirely on how
instances were created, which often cannot be reliably determined.

An `ArrayBuffer` can be marked with [`markAsUntransferable()`][] to indicate
that it should always be cloned and never transferred.

Depending on how a `Buffer` instance was created, it may or may
not own its underlying `ArrayBuffer`. An `ArrayBuffer` must not
be transferred unless it is known that the `Buffer` instance
Expand Down Expand Up @@ -853,6 +892,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`WebAssembly.Module`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/WebAssembly/Module
[`Worker`]: #worker_threads_class_worker
[`cluster` module]: cluster.html
[`markAsUntransferable()`]: #worker_threads_worker_markasuntransferable_object
[`port.on('message')`]: #worker_threads_event_message
[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage
[`port.postMessage()`]: #worker_threads_port_postmessage_value_transferlist
Expand Down
6 changes: 2 additions & 4 deletions lib/buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,11 @@ const {
zeroFill: bindingZeroFill
} = internalBinding('buffer');
const {
arraybuffer_untransferable_private_symbol,
getOwnNonIndexProperties,
propertyFilter: {
ALL_PROPERTIES,
ONLY_ENUMERABLE
},
setHiddenValue,
} = internalBinding('util');
const {
customInspectSymbol,
Expand All @@ -83,7 +81,6 @@ const {
} = require('internal/util/inspect');
const { encodings } = internalBinding('string_decoder');


const {
codes: {
ERR_BUFFER_OUT_OF_BOUNDS,
Expand All @@ -104,6 +101,7 @@ const {

const {
FastBuffer,
markAsUntransferable,
addBufferPrototypeMethods
} = require('internal/buffer');

Expand Down Expand Up @@ -156,7 +154,7 @@ function createUnsafeBuffer(size) {
function createPool() {
poolSize = Buffer.poolSize;
allocPool = createUnsafeBuffer(poolSize).buffer;
setHiddenValue(allocPool, arraybuffer_untransferable_private_symbol, true);
markAsUntransferable(allocPool);
poolOffset = 0;
}
createPool();
Expand Down
15 changes: 14 additions & 1 deletion lib/internal/buffer.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ const {
ucs2Write,
utf8Write
} = internalBinding('buffer');
const {
untransferable_object_private_symbol,
setHiddenValue,
} = internalBinding('util');

// Temporary buffers to convert numbers.
const float32Array = new Float32Array(1);
Expand Down Expand Up @@ -1007,7 +1011,16 @@ function addBufferPrototypeMethods(proto) {
proto.utf8Write = utf8Write;
}

// This would better be placed in internal/worker/io.js, but that doesn't work
// because Buffer needs this and that would introduce a cyclic dependency.
function markAsUntransferable(obj) {
if ((typeof obj !== 'object' && typeof obj !== 'function') || obj === null)
return; // This object is a primitive and therefore already untransferable.
setHiddenValue(obj, untransferable_object_private_symbol, true);
}

module.exports = {
FastBuffer,
addBufferPrototypeMethods
addBufferPrototypeMethods,
markAsUntransferable,
};
7 changes: 6 additions & 1 deletion lib/worker_threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,18 @@ const {
MessagePort,
MessageChannel,
moveMessagePortToContext,
receiveMessageOnPort
receiveMessageOnPort,
} = require('internal/worker/io');

const {
markAsUntransferable,
} = require('internal/buffer');

module.exports = {
isMainThread,
MessagePort,
MessageChannel,
markAsUntransferable,
moveMessagePortToContext,
receiveMessageOnPort,
resourceLimits,
Expand Down
2 changes: 1 addition & 1 deletion src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ constexpr size_t kFsStatsBufferLength =
// "node:" prefix to avoid name clashes with third-party code.
#define PER_ISOLATE_PRIVATE_SYMBOL_PROPERTIES(V) \
V(alpn_buffer_private_symbol, "node:alpnBuffer") \
V(arraybuffer_untransferable_private_symbol, "node:untransferableBuffer") \
V(arrow_message_private_symbol, "node:arrowMessage") \
V(contextify_context_private_symbol, "node:contextify:context") \
V(contextify_global_private_symbol, "node:contextify:global") \
V(decorated_private_symbol, "node:decorated") \
V(napi_wrapper, "node:napi:wrapper") \
V(untransferable_object_private_symbol, "node:untransferableObject") \

// Symbols are per-isolate primitives but Environment proxies them
// for the sake of convenience.
Expand Down
2 changes: 1 addition & 1 deletion src/node_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct node_napi_env__ : public napi_env__ {
v8::Local<v8::ArrayBuffer> ab) const override {
return ab->SetPrivate(
context(),
node_env()->arraybuffer_untransferable_private_symbol(),
node_env()->untransferable_object_private_symbol(),
v8::True(isolate));
}
};
Expand Down
4 changes: 2 additions & 2 deletions src/node_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ MaybeLocal<Object> New(Environment* env,
Local<ArrayBuffer> ab =
CallbackInfo::CreateTrackedArrayBuffer(env, data, length, callback, hint);
if (ab->SetPrivate(env->context(),
env->arraybuffer_untransferable_private_symbol(),
env->untransferable_object_private_symbol(),
True(env->isolate())).IsNothing()) {
return Local<Object>();
}
Expand Down Expand Up @@ -1179,7 +1179,7 @@ void Initialize(Local<Object> target,
ArrayBuffer::New(env->isolate(), std::move(backing));
array_buffer->SetPrivate(
env->context(),
env->arraybuffer_untransferable_private_symbol(),
env->untransferable_object_private_symbol(),
True(env->isolate())).Check();
CHECK(target
->Set(env->context(),
Expand Down
26 changes: 15 additions & 11 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,21 @@ Maybe<bool> Message::Serialize(Environment* env,
std::vector<Local<ArrayBuffer>> array_buffers;
for (uint32_t i = 0; i < transfer_list_v.length(); ++i) {
Local<Value> entry = transfer_list_v[i];
// Currently, we support ArrayBuffers and MessagePorts.
if (entry->IsObject()) {
// See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
// for details.
bool untransferable;
if (!entry.As<Object>()->HasPrivate(
context,
env->untransferable_object_private_symbol())
.To(&untransferable)) {
return Nothing<bool>();
}
if (untransferable) continue;
}

// Currently, we support ArrayBuffers and BaseObjects for which
// GetTransferMode() does not return kUntransferable.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
// If we cannot render the ArrayBuffer unusable in this Isolate,
Expand All @@ -411,16 +425,6 @@ Maybe<bool> Message::Serialize(Environment* env,
// is always going to outlive any Workers it creates, and so will its
// allocator along with it.
if (!ab->IsDetachable()) continue;
// See https://github.com/nodejs/node/pull/30339#issuecomment-552225353
// for details.
bool untransferrable;
if (!ab->HasPrivate(
context,
env->arraybuffer_untransferable_private_symbol())
.To(&untransferrable)) {
return Nothing<bool>();
}
if (untransferrable) continue;
if (std::find(array_buffers.begin(), array_buffers.end(), ab) !=
array_buffers.end()) {
ThrowDataCloneException(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { MessageChannel, markAsUntransferable } = require('worker_threads');

{
const ab = new ArrayBuffer(8);

markAsUntransferable(ab);
assert.strictEqual(ab.byteLength, 8);

const { port1, port2 } = new MessageChannel();
port1.postMessage(ab, [ ab ]);

assert.strictEqual(ab.byteLength, 8); // The AB is not detached.
port2.once('message', common.mustCall());
}

{
const channel1 = new MessageChannel();
const channel2 = new MessageChannel();

markAsUntransferable(channel2.port1);

assert.throws(() => {
channel1.port1.postMessage(channel2.port1, [ channel2.port1 ]);
}, /was found in message but not listed in transferList/);

channel2.port1.postMessage('still works, not closed/transferred');
channel2.port2.once('message', common.mustCall());
}

{
for (const value of [0, null, false, true, undefined, [], {}]) {
markAsUntransferable(value); // Has no visible effect.
}
}

0 comments on commit 4875a51

Please sign in to comment.