Skip to content

Commit

Permalink
worker: implement worker.moveMessagePortToContext()
Browse files Browse the repository at this point in the history
This enables using `MessagePort`s in different `vm.Context`s,
aiding with the isolation that the `vm` module seeks to provide.

Refs: ayojs/ayo#111

PR-URL: #26497
Reviewed-By: James M Snell <jasnell@gmail.com>
  • Loading branch information
addaleax committed Mar 15, 2019
1 parent b0de48e commit fe8972a
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 4 deletions.
27 changes: 27 additions & 0 deletions doc/api/worker_threads.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,30 @@ if (isMainThread) {
}
```

## worker.moveMessagePortToContext(port, contextifiedSandbox)
<!-- YAML
added: REPLACEME
-->

* `port` {MessagePort} The message port which will be transferred.
* `contextifiedSandbox` {Object} A [contextified][] object as returned by the
`vm.createContext()` method.

* Returns: {MessagePort}

Transfer a `MessagePort` to a different [`vm`][] Context. The original `port`
object will be rendered unusable, and the returned `MessagePort` instance will
take its place.

The returned `MessagePort` will be an object in the target context, and will
inherit from its global `Object` class. Objects passed to the
[`port.onmessage()`][] listener will also be created in the target context
and inherit from its global `Object` class.

However, the created `MessagePort` will no longer inherit from
[`EventEmitter`][], and only [`port.onmessage()`][] can be used to receive
events using it.

## worker.parentPort
<!-- YAML
added: v10.5.0
Expand Down Expand Up @@ -583,6 +607,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`Worker`]: #worker_threads_class_worker
[`cluster` module]: cluster.html
[`port.on('message')`]: #worker_threads_event_message
[`port.onmessage()`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort/onmessage
[`port.postMessage()`]: #worker_threads_port_postmessage_value_transferlist
[`process.abort()`]: process.html#process_process_abort
[`process.chdir()`]: process.html#process_process_chdir_directory
Expand All @@ -600,6 +625,7 @@ active handle in the event system. If the worker is already `unref()`ed calling
[`require('worker_threads').threadId`]: #worker_threads_worker_threadid
[`require('worker_threads').workerData`]: #worker_threads_worker_workerdata
[`trace_events`]: tracing.html
[`vm`]: vm.html
[`worker.on('message')`]: #worker_threads_event_message_1
[`worker.postMessage()`]: #worker_threads_worker_postmessage_value_transferlist
[`worker.terminate()`]: #worker_threads_worker_terminate_callback
Expand All @@ -610,4 +636,5 @@ active handle in the event system. If the worker is already `unref()`ed calling
[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
[child processes]: child_process.html
[contextified]: vm.html#vm_what_does_it_mean_to_contextify_an_object
[v8.serdes]: v8.html#v8_serialization_api
2 changes: 2 additions & 0 deletions lib/internal/worker/io.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const {
MessagePort,
MessageChannel,
drainMessagePort,
moveMessagePortToContext,
stopMessagePort
} = internalBinding('messaging');
const { threadId } = internalBinding('worker');
Expand Down Expand Up @@ -233,6 +234,7 @@ module.exports = {
kIncrementsPortRef,
kWaitingStreams,
kStdioWantsMoreDataCallback,
moveMessagePortToContext,
MessagePort,
MessageChannel,
setupPortReferencing,
Expand Down
4 changes: 3 additions & 1 deletion lib/worker_threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ const {

const {
MessagePort,
MessageChannel
MessageChannel,
moveMessagePortToContext,
} = require('internal/worker/io');

module.exports = {
isMainThread,
MessagePort,
MessageChannel,
moveMessagePortToContext,
threadId,
Worker,
parentPort: null,
Expand Down
39 changes: 36 additions & 3 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

#include "async_wrap-inl.h"
#include "debug_utils.h"
#include "node_contextify.h"
#include "node_buffer.h"
#include "node_errors.h"
#include "node_process.h"
#include "util.h"

using node::contextify::ContextifyContext;
using v8::Array;
using v8::ArrayBuffer;
using v8::ArrayBufferCreationMode;
Expand Down Expand Up @@ -760,6 +762,35 @@ void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
port->OnMessage();
}

void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
if (!args[0]->IsObject() ||
!env->message_port_constructor_template()->HasInstance(args[0])) {
return THROW_ERR_INVALID_ARG_TYPE(env,
"First argument needs to be a MessagePort instance");
}
MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
CHECK_NOT_NULL(port);

Local<Value> context_arg = args[1];
ContextifyContext* context_wrapper;
if (!context_arg->IsObject() ||
(context_wrapper = ContextifyContext::ContextFromContextifiedSandbox(
env, context_arg.As<Object>())) == nullptr) {
return THROW_ERR_INVALID_ARG_TYPE(env, "Invalid context argument");
}

std::unique_ptr<MessagePortData> data;
if (!port->IsDetached())
data = port->Detach();

Context::Scope context_scope(context_wrapper->context());
MessagePort* target =
MessagePort::New(env, context_wrapper->context(), std::move(data));
if (target != nullptr)
args.GetReturnValue().Set(target->object());
}

void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
Entangle(a, b->data_.get());
}
Expand Down Expand Up @@ -816,9 +847,9 @@ static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
MessagePort* port2 = MessagePort::New(env, context);
MessagePort::Entangle(port1, port2);

args.This()->Set(env->context(), env->port1_string(), port1->object())
args.This()->Set(context, env->port1_string(), port1->object())
.FromJust();
args.This()->Set(env->context(), env->port2_string(), port2->object())
args.This()->Set(context, env->port2_string(), port2->object())
.FromJust();
}

Expand All @@ -833,7 +864,7 @@ static void InitMessaging(Local<Object> target,
FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel");
Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
templ->SetClassName(message_channel_string);
target->Set(env->context(),
target->Set(context,
message_channel_string,
templ->GetFunction(context).ToLocalChecked()).FromJust();
}
Expand All @@ -847,6 +878,8 @@ static void InitMessaging(Local<Object> target,
// the browser equivalents do not provide them.
env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
env->SetMethod(target, "moveMessagePortToContext",
MessagePort::MoveToContext);
}

} // anonymous namespace
Expand Down
5 changes: 5 additions & 0 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,17 @@ class MessagePort : public HandleWrap {
// Stop processing messages on this port as a receiving end.
void Stop();

/* constructor */
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
/* prototype methods */
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);

/* static */
static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);

// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePort* a, MessagePort* b);
Expand Down
69 changes: 69 additions & 0 deletions test/parallel/test-worker-message-port-move.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* global port */
'use strict';
const common = require('../common');
const assert = require('assert');
const vm = require('vm');
const {
MessagePort, MessageChannel, moveMessagePortToContext
} = require('worker_threads');

const context = vm.createContext();
const { port1, port2 } = new MessageChannel();
context.port = moveMessagePortToContext(port1, context);
context.global = context;
Object.assign(context, {
global: context,
assert,
MessagePort,
MessageChannel
});

vm.runInContext('(' + function() {
{
assert(port.postMessage instanceof Function);
assert(port.constructor instanceof Function);
for (let obj = port; obj !== null; obj = Object.getPrototypeOf(obj)) {
for (const key of Object.getOwnPropertyNames(obj)) {
if (typeof obj[key] === 'object' && obj[key] !== null) {
assert(obj[key] instanceof Object);
} else if (typeof obj[key] === 'function') {
assert(obj[key] instanceof Function);
}
}
}

assert(!(port instanceof MessagePort));
assert.strictEqual(port.onmessage, undefined);
port.onmessage = function({ data }) {
assert(data instanceof Object);
port.postMessage(data);
};
port.start();
}

{
let threw = false;
try {
port.postMessage(global);
} catch (e) {
assert.strictEqual(e.constructor.name, 'DOMException');
assert(e instanceof Object);
assert(e instanceof Error);
threw = true;
}
assert(threw);
}

{
const newDummyPort = new (port.constructor)();
assert(!(newDummyPort instanceof MessagePort));
assert(newDummyPort.close instanceof Function);
newDummyPort.close();
}
} + ')()', context);

port2.on('message', common.mustCall((msg) => {
assert(msg instanceof Object);
port2.close();
}));
port2.postMessage({});

0 comments on commit fe8972a

Please sign in to comment.