From 337be58ee606135366e14103d7776e77f4883311 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Tue, 5 Sep 2017 22:38:32 +0200 Subject: [PATCH] worker: implement `MessagePort` and `MessageChannel` Implement `MessagePort` and `MessageChannel` along the lines of the DOM classes of the same names. `MessagePort`s initially support transferring only `ArrayBuffer`s. Thanks to Stephen Belanger for reviewing this change in its original form, to Benjamin Gruenbaum for reviewing the added tests in their original form, and to Olivia Hugger for reviewing the documentation in its original form. Refs: https://github.com/ayojs/ayo/pull/98 PR-URL: https://github.com/nodejs/node/pull/20876 Reviewed-By: Gireesh Punathil Reviewed-By: Benjamin Gruenbaum Reviewed-By: Shingo Inoue Reviewed-By: Matteo Collina Reviewed-By: Tiancheng "Timothy" Gu Reviewed-By: John-David Dalton Reviewed-By: Gus Caplan --- doc/api/_toc.md | 1 + doc/api/all.md | 1 + doc/api/errors.md | 16 + doc/api/worker.md | 146 +++++ lib/internal/bootstrap/loaders.js | 3 +- lib/internal/modules/cjs/helpers.js | 5 + lib/internal/worker.js | 105 ++++ lib/worker.js | 5 + node.gyp | 4 + src/async_wrap.h | 1 + src/env.h | 5 + src/node.cc | 9 + src/node_config.cc | 3 + src/node_errors.h | 6 + src/node_internals.h | 6 + src/node_messaging.cc | 548 ++++++++++++++++++ src/node_messaging.h | 167 ++++++ src/util.h | 3 + test/parallel/test-message-channel.js | 26 + .../parallel/test-message-port-arraybuffer.js | 20 + test/parallel/test-message-port.js | 56 ++ test/sequential/test-async-wrap-getasyncid.js | 2 + tools/doc/type-parser.js | 4 +- 23 files changed, 1140 insertions(+), 2 deletions(-) create mode 100644 doc/api/worker.md create mode 100644 lib/internal/worker.js create mode 100644 lib/worker.js create mode 100644 src/node_messaging.cc create mode 100644 src/node_messaging.h create mode 100644 test/parallel/test-message-channel.js create mode 100644 test/parallel/test-message-port-arraybuffer.js create mode 100644 test/parallel/test-message-port.js diff --git a/doc/api/_toc.md b/doc/api/_toc.md index 9b487b50a55031..1b2fdea26e46eb 100644 --- a/doc/api/_toc.md +++ b/doc/api/_toc.md @@ -53,6 +53,7 @@ * [Utilities](util.html) * [V8](v8.html) * [VM](vm.html) +* [Worker](worker.html) * [ZLIB](zlib.html)
diff --git a/doc/api/all.md b/doc/api/all.md index d013f07bd328fc..6f0a21dd092105 100644 --- a/doc/api/all.md +++ b/doc/api/all.md @@ -46,4 +46,5 @@ @include util @include v8 @include vm +@include worker @include zlib diff --git a/doc/api/errors.md b/doc/api/errors.md index 0c3c5fba7d0e49..161e5eb9564614 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -650,12 +650,23 @@ Used when a child process is being forked without specifying an IPC channel. Used when the main process is trying to read data from the child process's STDERR / STDOUT, and the data's length is longer than the `maxBuffer` option. + +### ERR_CLOSED_MESSAGE_PORT + +There was an attempt to use a `MessagePort` instance in a closed +state, usually after `.close()` has been called. + ### ERR_CONSOLE_WRITABLE_STREAM `Console` was instantiated without `stdout` stream, or `Console` has a non-writable `stdout` or `stderr` stream. + +### ERR_CONSTRUCT_CALL_REQUIRED + +A constructor for a class was called without `new`. + ### ERR_CPU_USAGE @@ -1213,6 +1224,11 @@ urlSearchParams.has.call(buf, 'foo'); // Throws a TypeError with code 'ERR_INVALID_THIS' ``` + +### ERR_INVALID_TRANSFER_OBJECT + +An invalid transfer object was passed to `postMessage()`. + ### ERR_INVALID_TUPLE diff --git a/doc/api/worker.md b/doc/api/worker.md new file mode 100644 index 00000000000000..4724714cd62f26 --- /dev/null +++ b/doc/api/worker.md @@ -0,0 +1,146 @@ +# Worker + + + +> Stability: 1 - Experimental + +## Class: MessageChannel + + +Instances of the `worker.MessageChannel` class represent an asynchronous, +two-way communications channel. +The `MessageChannel` has no methods of its own. `new MessageChannel()` +yields an object with `port1` and `port2` properties, which refer to linked +[`MessagePort`][] instances. + +```js +const { MessageChannel } = require('worker'); + +const { port1, port2 } = new MessageChannel(); +port1.on('message', (message) => console.log('received', message)); +port2.postMessage({ foo: 'bar' }); +// prints: received { foo: 'bar' } +``` + +## Class: MessagePort + + +* Extends: {EventEmitter} + +Instances of the `worker.MessagePort` class represent one end of an +asynchronous, two-way communications channel. It can be used to transfer +structured data, memory regions and other `MessagePort`s between different +[`Worker`][]s. + +With the exception of `MessagePort`s being [`EventEmitter`][]s rather +than `EventTarget`s, this implementation matches [browser `MessagePort`][]s. + +### Event: 'close' + + +The `'close'` event is emitted once either side of the channel has been +disconnected. + +### Event: 'message' + + +* `value` {any} The transmitted value + +The `'message'` event is emitted for any incoming message, containing the cloned +input of [`port.postMessage()`][]. + +Listeners on this event will receive a clone of the `value` parameter as passed +to `postMessage()` and no further arguments. + +### port.close() + + +Disables further sending of messages on either side of the connection. +This method can be called once you know that no further communication +will happen over this `MessagePort`. + +### port.postMessage(value[, transferList]) + + +* `value` {any} +* `transferList` {Object[]} + +Sends a JavaScript value to the receiving side of this channel. +`value` will be transferred in a way which is compatible with +the [HTML structured clone algorithm][]. In particular, it may contain circular +references and objects like typed arrays that the `JSON` API is not able +to stringify. + +`transferList` may be a list of `ArrayBuffer` objects. +After transferring, they will not be usable on the sending side of the channel +anymore (even if they are not contained in `value`). + +`value` may still contain `ArrayBuffer` instances that are not in +`transferList`; in that case, the underlying memory is copied rather than moved. + +For more information on the serialization and deserialization mechanisms +behind this API, see the [serialization API of the `v8` module][v8.serdes]. + +Because the object cloning uses the structured clone algorithm, +non-enumerable properties, property accessors, and object prototypes are +not preserved. In particular, [`Buffer`][] objects will be read as +plain [`Uint8Array`][]s on the receiving side. + +The message object will be cloned immediately, and can be modified after +posting without having side effects. + +### port.ref() + + +Opposite of `unref()`. Calling `ref()` on a previously `unref()`ed port will +*not* let the program exit if it's the only active handle left (the default +behavior). If the port is `ref()`ed, calling `ref()` again will have no effect. + +If listeners are attached or removed using `.on('message')`, the port will +be `ref()`ed and `unref()`ed automatically depending on whether +listeners for the event exist. + +### port.start() + + +Starts receiving messages on this `MessagePort`. When using this port +as an event emitter, this will be called automatically once `'message'` +listeners are attached. + +### port.unref() + + +Calling `unref()` on a port will allow the thread to exit if this is the only +active handle in the event system. If the port is already `unref()`ed calling +`unref()` again will have no effect. + +If listeners are attached or removed using `.on('message')`, the port will +be `ref()`ed and `unref()`ed automatically depending on whether +listeners for the event exist. + +[`Buffer`]: buffer.html +[`EventEmitter`]: events.html +[`MessagePort`]: #worker_class_messageport +[`port.postMessage()`]: #worker_port_postmessage_value_transferlist +[v8.serdes]: v8.html#v8_serialization_api +[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array +[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort +[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm diff --git a/lib/internal/bootstrap/loaders.js b/lib/internal/bootstrap/loaders.js index ff809a91291bee..417e8594e14aab 100644 --- a/lib/internal/bootstrap/loaders.js +++ b/lib/internal/bootstrap/loaders.js @@ -194,7 +194,8 @@ }; NativeModule.isInternal = function(id) { - return id.startsWith('internal/'); + return id.startsWith('internal/') || + (id === 'worker' && !process.binding('config').experimentalWorker); }; } diff --git a/lib/internal/modules/cjs/helpers.js b/lib/internal/modules/cjs/helpers.js index 60346c5841c7df..55eaed7d376506 100644 --- a/lib/internal/modules/cjs/helpers.js +++ b/lib/internal/modules/cjs/helpers.js @@ -105,6 +105,11 @@ const builtinLibs = [ 'v8', 'vm', 'zlib' ]; +if (process.binding('config').experimentalWorker) { + builtinLibs.push('worker'); + builtinLibs.sort(); +} + if (typeof process.binding('inspector').open === 'function') { builtinLibs.push('inspector'); builtinLibs.sort(); diff --git a/lib/internal/worker.js b/lib/internal/worker.js new file mode 100644 index 00000000000000..73f7525aa73cc2 --- /dev/null +++ b/lib/internal/worker.js @@ -0,0 +1,105 @@ +'use strict'; + +const EventEmitter = require('events'); +const util = require('util'); + +const { internalBinding } = require('internal/bootstrap/loaders'); +const { MessagePort, MessageChannel } = internalBinding('messaging'); +const { handle_onclose } = internalBinding('symbols'); + +util.inherits(MessagePort, EventEmitter); + +const kOnMessageListener = Symbol('kOnMessageListener'); + +const debug = util.debuglog('worker'); + +// A MessagePort consists of a handle (that wraps around an +// uv_async_t) which can receive information from other threads and emits +// .onmessage events, and a function used for sending data to a MessagePort +// in some other thread. +MessagePort.prototype[kOnMessageListener] = function onmessage(payload) { + debug('received message', payload); + // Emit the deserialized object to userland. + this.emit('message', payload); +}; + +// This is for compatibility with the Web's MessagePort API. It makes sense to +// provide it as an `EventEmitter` in Node.js, but if somebody overrides +// `onmessage`, we'll switch over to the Web API model. +Object.defineProperty(MessagePort.prototype, 'onmessage', { + enumerable: true, + configurable: true, + get() { + return this[kOnMessageListener]; + }, + set(value) { + this[kOnMessageListener] = value; + if (typeof value === 'function') { + this.ref(); + this.start(); + } else { + this.unref(); + this.stop(); + } + } +}); + +// This is called from inside the `MessagePort` constructor. +function oninit() { + setupPortReferencing(this, this, 'message'); +} + +Object.defineProperty(MessagePort.prototype, 'oninit', { + enumerable: true, + writable: false, + value: oninit +}); + +// This is called after the underlying `uv_async_t` has been closed. +function onclose() { + if (typeof this.onclose === 'function') { + // Not part of the Web standard yet, but there aren't many reasonable + // alternatives in a non-EventEmitter usage setting. + // Refs: https://github.com/whatwg/html/issues/1766 + this.onclose(); + } + this.emit('close'); +} + +Object.defineProperty(MessagePort.prototype, handle_onclose, { + enumerable: false, + writable: false, + value: onclose +}); + +const originalClose = MessagePort.prototype.close; +MessagePort.prototype.close = function(cb) { + if (typeof cb === 'function') + this.once('close', cb); + originalClose.call(this); +}; + +function setupPortReferencing(port, eventEmitter, eventName) { + // Keep track of whether there are any workerMessage listeners: + // If there are some, ref() the channel so it keeps the event loop alive. + // If there are none or all are removed, unref() the channel so the worker + // can shutdown gracefully. + port.unref(); + eventEmitter.on('newListener', (name) => { + if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { + port.ref(); + port.start(); + } + }); + eventEmitter.on('removeListener', (name) => { + if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { + port.stop(); + port.unref(); + } + }); +} + +module.exports = { + MessagePort, + MessageChannel +}; diff --git a/lib/worker.js b/lib/worker.js new file mode 100644 index 00000000000000..d67fb4efe40a33 --- /dev/null +++ b/lib/worker.js @@ -0,0 +1,5 @@ +'use strict'; + +const { MessagePort, MessageChannel } = require('internal/worker'); + +module.exports = { MessagePort, MessageChannel }; diff --git a/node.gyp b/node.gyp index 709ce226033af0..72a5d05c43d035 100644 --- a/node.gyp +++ b/node.gyp @@ -78,6 +78,7 @@ 'lib/util.js', 'lib/v8.js', 'lib/vm.js', + 'lib/worker.js', 'lib/zlib.js', 'lib/internal/assert.js', 'lib/internal/async_hooks.js', @@ -155,6 +156,7 @@ 'lib/internal/validators.js', 'lib/internal/stream_base_commons.js', 'lib/internal/vm/module.js', + 'lib/internal/worker.js', 'lib/internal/streams/lazy_transform.js', 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/buffer_list.js', @@ -333,6 +335,7 @@ 'src/node_file.cc', 'src/node_http2.cc', 'src/node_http_parser.cc', + 'src/node_messaging.cc', 'src/node_os.cc', 'src/node_platform.cc', 'src/node_perf.cc', @@ -390,6 +393,7 @@ 'src/node_http2_state.h', 'src/node_internals.h', 'src/node_javascript.h', + 'src/node_messaging.h', 'src/node_mutex.h', 'src/node_perf.h', 'src/node_perf_common.h', diff --git a/src/async_wrap.h b/src/async_wrap.h index 377702a8d6ef9c..cf269a4c1f5e1e 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -49,6 +49,7 @@ namespace node { V(HTTP2SETTINGS) \ V(HTTPPARSER) \ V(JSSTREAM) \ + V(MESSAGEPORT) \ V(PIPECONNECTWRAP) \ V(PIPESERVERWRAP) \ V(PIPEWRAP) \ diff --git a/src/env.h b/src/env.h index 7a432eaa3d4ff0..d87c39c5186bd7 100644 --- a/src/env.h +++ b/src/env.h @@ -193,6 +193,7 @@ struct PackageConfig { V(main_string, "main") \ V(max_buffer_string, "maxBuffer") \ V(message_string, "message") \ + V(message_port_constructor_string, "MessagePort") \ V(minttl_string, "minttl") \ V(modulus_string, "modulus") \ V(name_string, "name") \ @@ -212,6 +213,7 @@ struct PackageConfig { V(onhandshakedone_string, "onhandshakedone") \ V(onhandshakestart_string, "onhandshakestart") \ V(onheaders_string, "onheaders") \ + V(oninit_string, "oninit") \ V(onmessage_string, "onmessage") \ V(onnewsession_string, "onnewsession") \ V(onocspresponse_string, "onocspresponse") \ @@ -242,6 +244,8 @@ struct PackageConfig { V(pipe_target_string, "pipeTarget") \ V(pipe_source_string, "pipeSource") \ V(port_string, "port") \ + V(port1_string, "port1") \ + V(port2_string, "port2") \ V(preference_string, "preference") \ V(priority_string, "priority") \ V(promise_string, "promise") \ @@ -323,6 +327,7 @@ struct PackageConfig { V(http2stream_constructor_template, v8::ObjectTemplate) \ V(immediate_callback_function, v8::Function) \ V(inspector_console_api_object, v8::Object) \ + V(message_port_constructor_template, v8::FunctionTemplate) \ V(pbkdf2_constructor_template, v8::ObjectTemplate) \ V(pipe_constructor_template, v8::FunctionTemplate) \ V(performance_entry_callback, v8::Function) \ diff --git a/src/node.cc b/src/node.cc index 4973fea636afd5..281d441a26a86c 100644 --- a/src/node.cc +++ b/src/node.cc @@ -248,6 +248,11 @@ bool config_experimental_modules = false; // that is used by lib/vm.js bool config_experimental_vm_modules = false; +// Set in node.cc by ParseArgs when --experimental-worker is used. +// Used in node_config.cc to set a constant on process.binding('config') +// that is used by lib/worker.js +bool config_experimental_worker = false; + // Set in node.cc by ParseArgs when --experimental-repl-await is used. // Used in node_config.cc to set a constant on process.binding('config') // that is used by lib/repl.js. @@ -3104,6 +3109,7 @@ static void PrintHelp() { " --experimental-vm-modules experimental ES Module support\n" " in vm module\n" #endif // defined(NODE_HAVE_I18N_SUPPORT) + " --experimental-worker experimental threaded Worker support\n" #if HAVE_OPENSSL && NODE_FIPS_MODE " --force-fips force FIPS crypto (cannot be disabled)\n" #endif // HAVE_OPENSSL && NODE_FIPS_MODE @@ -3267,6 +3273,7 @@ static void CheckIfAllowedInEnv(const char* exe, bool is_env, "--experimental-modules", "--experimental-repl-await", "--experimental-vm-modules", + "--experimental-worker", "--expose-http2", // keep as a non-op through v9.x "--force-fips", "--icu-data-dir", @@ -3465,6 +3472,8 @@ static void ParseArgs(int* argc, new_v8_argc += 1; } else if (strcmp(arg, "--experimental-vm-modules") == 0) { config_experimental_vm_modules = true; + } else if (strcmp(arg, "--experimental-worker") == 0) { + config_experimental_worker = true; } else if (strcmp(arg, "--experimental-repl-await") == 0) { config_experimental_repl_await = true; } else if (strcmp(arg, "--loader") == 0) { diff --git a/src/node_config.cc b/src/node_config.cc index 603d55491a259b..dd5ee666486874 100644 --- a/src/node_config.cc +++ b/src/node_config.cc @@ -91,6 +91,9 @@ static void Initialize(Local target, if (config_experimental_vm_modules) READONLY_BOOLEAN_PROPERTY("experimentalVMModules"); + if (config_experimental_worker) + READONLY_BOOLEAN_PROPERTY("experimentalWorker"); + if (config_experimental_repl_await) READONLY_BOOLEAN_PROPERTY("experimentalREPLAwait"); diff --git a/src/node_errors.h b/src/node_errors.h index ab02421df8b36a..d119f4f7060853 100644 --- a/src/node_errors.h +++ b/src/node_errors.h @@ -19,9 +19,12 @@ namespace node { #define ERRORS_WITH_CODE(V) \ V(ERR_BUFFER_OUT_OF_BOUNDS, RangeError) \ V(ERR_BUFFER_TOO_LARGE, Error) \ + V(ERR_CLOSED_MESSAGE_PORT, Error) \ + V(ERR_CONSTRUCT_CALL_REQUIRED, Error) \ V(ERR_INDEX_OUT_OF_RANGE, RangeError) \ V(ERR_INVALID_ARG_VALUE, TypeError) \ V(ERR_INVALID_ARG_TYPE, TypeError) \ + V(ERR_INVALID_TRANSFER_OBJECT, TypeError) \ V(ERR_MEMORY_ALLOCATION_FAILED, Error) \ V(ERR_MISSING_ARGS, TypeError) \ V(ERR_MISSING_MODULE, Error) \ @@ -48,7 +51,10 @@ namespace node { // Errors with predefined static messages #define PREDEFINED_ERROR_MESSAGES(V) \ + V(ERR_CLOSED_MESSAGE_PORT, "Cannot send data on closed MessagePort") \ + V(ERR_CONSTRUCT_CALL_REQUIRED, "Cannot call constructor without `new`") \ V(ERR_INDEX_OUT_OF_RANGE, "Index out of range") \ + V(ERR_INVALID_TRANSFER_OBJECT, "Found invalid object in transferList") \ V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") #define V(code, message) \ diff --git a/src/node_internals.h b/src/node_internals.h index 7bf4eaf1ecf86a..a5d8ed0e5d3ad7 100644 --- a/src/node_internals.h +++ b/src/node_internals.h @@ -114,6 +114,7 @@ struct sockaddr; V(http_parser) \ V(inspector) \ V(js_stream) \ + V(messaging) \ V(module_wrap) \ V(os) \ V(performance) \ @@ -189,6 +190,11 @@ extern bool config_experimental_modules; // that is used by lib/vm.js extern bool config_experimental_vm_modules; +// Set in node.cc by ParseArgs when --experimental-vm-modules is used. +// Used in node_config.cc to set a constant on process.binding('config') +// that is used by lib/vm.js +extern bool config_experimental_worker; + // Set in node.cc by ParseArgs when --experimental-repl-await is used. // Used in node_config.cc to set a constant on process.binding('config') // that is used by lib/repl.js. diff --git a/src/node_messaging.cc b/src/node_messaging.cc new file mode 100644 index 00000000000000..c6e701c7d94426 --- /dev/null +++ b/src/node_messaging.cc @@ -0,0 +1,548 @@ +#include "node_messaging.h" +#include "node_internals.h" +#include "node_buffer.h" +#include "node_errors.h" +#include "util.h" +#include "util-inl.h" +#include "async_wrap.h" +#include "async_wrap-inl.h" + +using v8::Array; +using v8::ArrayBuffer; +using v8::ArrayBufferCreationMode; +using v8::Context; +using v8::EscapableHandleScope; +using v8::Exception; +using v8::Function; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::HandleScope; +using v8::Isolate; +using v8::Just; +using v8::Local; +using v8::Maybe; +using v8::MaybeLocal; +using v8::Nothing; +using v8::Object; +using v8::String; +using v8::Value; +using v8::ValueDeserializer; +using v8::ValueSerializer; + +namespace node { +namespace worker { + +Message::Message(MallocedBuffer&& buffer) + : main_message_buf_(std::move(buffer)) {} + +namespace { + +// This is used to tell V8 how to read transferred host objects, like other +// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them. +class DeserializerDelegate : public ValueDeserializer::Delegate { + public: + DeserializerDelegate(Message* m, Environment* env) + : env_(env), msg_(m) {} + + ValueDeserializer* deserializer = nullptr; + + private: + Environment* env_; + Message* msg_; +}; + +} // anonymous namespace + +MaybeLocal Message::Deserialize(Environment* env, + Local context) { + EscapableHandleScope handle_scope(env->isolate()); + Context::Scope context_scope(context); + + DeserializerDelegate delegate(this, env); + ValueDeserializer deserializer( + env->isolate(), + reinterpret_cast(main_message_buf_.data), + main_message_buf_.size, + &delegate); + delegate.deserializer = &deserializer; + + // Attach all transfered ArrayBuffers to their new Isolate. + for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) { + Local ab = + ArrayBuffer::New(env->isolate(), + array_buffer_contents_[i].release(), + array_buffer_contents_[i].size, + ArrayBufferCreationMode::kInternalized); + deserializer.TransferArrayBuffer(i, ab); + } + array_buffer_contents_.clear(); + + if (deserializer.ReadHeader(context).IsNothing()) + return MaybeLocal(); + return handle_scope.Escape( + deserializer.ReadValue(context).FromMaybe(Local())); +} + +namespace { + +// This tells V8 how to serialize objects that it does not understand +// (e.g. C++ objects) into the output buffer, in a way that our own +// DeserializerDelegate understands how to unpack. +class SerializerDelegate : public ValueSerializer::Delegate { + public: + SerializerDelegate(Environment* env, Local context, Message* m) + : env_(env), context_(context), msg_(m) {} + + void ThrowDataCloneError(Local message) override { + env_->isolate()->ThrowException(Exception::Error(message)); + } + + ValueSerializer* serializer = nullptr; + + private: + Environment* env_; + Local context_; + Message* msg_; + + friend class worker::Message; +}; + +} // anynomous namespace + +Maybe Message::Serialize(Environment* env, + Local context, + Local input, + Local transfer_list_v) { + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(context); + + // Verify that we're not silently overwriting an existing message. + CHECK(main_message_buf_.is_empty()); + + SerializerDelegate delegate(env, context, this); + ValueSerializer serializer(env->isolate(), &delegate); + delegate.serializer = &serializer; + + std::vector> array_buffers; + if (transfer_list_v->IsArray()) { + Local transfer_list = transfer_list_v.As(); + uint32_t length = transfer_list->Length(); + for (uint32_t i = 0; i < length; ++i) { + Local entry; + if (!transfer_list->Get(context, i).ToLocal(&entry)) + return Nothing(); + // Currently, we support ArrayBuffers. + if (entry->IsArrayBuffer()) { + Local ab = entry.As(); + // If we cannot render the ArrayBuffer unusable in this Isolate and + // take ownership of its memory, copying the buffer will have to do. + if (!ab->IsNeuterable() || ab->IsExternal()) + continue; + // We simply use the array index in the `array_buffers` list as the + // ID that we write into the serialized buffer. + uint32_t id = array_buffers.size(); + array_buffers.push_back(ab); + serializer.TransferArrayBuffer(id, ab); + continue; + } + + THROW_ERR_INVALID_TRANSFER_OBJECT(env); + return Nothing(); + } + } + + serializer.WriteHeader(); + if (serializer.WriteValue(context, input).IsNothing()) { + return Nothing(); + } + + for (Local ab : array_buffers) { + // If serialization succeeded, we want to take ownership of + // (a.k.a. externalize) the underlying memory region and render + // it inaccessible in this Isolate. + ArrayBuffer::Contents contents = ab->Externalize(); + ab->Neuter(); + array_buffer_contents_.push_back( + MallocedBuffer { static_cast(contents.Data()), + contents.ByteLength() }); + } + + // The serializer gave us a buffer allocated using `malloc()`. + std::pair data = serializer.Release(); + main_message_buf_ = + MallocedBuffer(reinterpret_cast(data.first), data.second); + return Just(true); +} + +MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { } + +MessagePortData::~MessagePortData() { + CHECK_EQ(owner_, nullptr); + Disentangle(); +} + +void MessagePortData::AddToIncomingQueue(Message&& message) { + // This function will be called by other threads. + Mutex::ScopedLock lock(mutex_); + incoming_messages_.emplace_back(std::move(message)); + + if (owner_ != nullptr) + owner_->TriggerAsync(); +} + +bool MessagePortData::IsSiblingClosed() const { + Mutex::ScopedLock lock(*sibling_mutex_); + return sibling_ == nullptr; +} + +void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { + CHECK_EQ(a->sibling_, nullptr); + CHECK_EQ(b->sibling_, nullptr); + a->sibling_ = b; + b->sibling_ = a; + a->sibling_mutex_ = b->sibling_mutex_; +} + +void MessagePortData::PingOwnerAfterDisentanglement() { + Mutex::ScopedLock lock(mutex_); + if (owner_ != nullptr) + owner_->TriggerAsync(); +} + +void MessagePortData::Disentangle() { + // Grab a copy of the sibling mutex, then replace it so that each sibling + // has its own sibling_mutex_ now. + std::shared_ptr sibling_mutex = sibling_mutex_; + Mutex::ScopedLock sibling_lock(*sibling_mutex); + sibling_mutex_ = std::make_shared(); + + MessagePortData* sibling = sibling_; + if (sibling_ != nullptr) { + sibling_->sibling_ = nullptr; + sibling_ = nullptr; + } + + // We close MessagePorts after disentanglement, so we trigger the + // corresponding uv_async_t to let them know that this happened. + PingOwnerAfterDisentanglement(); + if (sibling != nullptr) { + sibling->PingOwnerAfterDisentanglement(); + } +} + +MessagePort::~MessagePort() { + if (data_) + data_->owner_ = nullptr; +} + +MessagePort::MessagePort(Environment* env, + Local context, + Local wrap) + : HandleWrap(env, + wrap, + reinterpret_cast(new uv_async_t()), + AsyncWrap::PROVIDER_MESSAGEPORT), + data_(new MessagePortData(this)) { + auto onmessage = [](uv_async_t* handle) { + // Called when data has been put into the queue. + MessagePort* channel = static_cast(handle->data); + channel->OnMessage(); + }; + CHECK_EQ(uv_async_init(env->event_loop(), + async(), + onmessage), 0); + async()->data = static_cast(this); + + Local fn; + if (!wrap->Get(context, env->oninit_string()).ToLocal(&fn)) + return; + + if (fn->IsFunction()) { + Local init = fn.As(); + USE(init->Call(context, wrap, 0, nullptr)); + } +} + +void MessagePort::AddToIncomingQueue(Message&& message) { + data_->AddToIncomingQueue(std::move(message)); +} + +uv_async_t* MessagePort::async() { + return reinterpret_cast(GetHandle()); +} + +void MessagePort::TriggerAsync() { + CHECK_EQ(uv_async_send(async()), 0); +} + +void MessagePort::New(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + if (!args.IsConstructCall()) { + THROW_ERR_CONSTRUCT_CALL_REQUIRED(env); + return; + } + + Local context = args.This()->CreationContext(); + Context::Scope context_scope(context); + + new MessagePort(env, context, args.This()); +} + +MessagePort* MessagePort::New( + Environment* env, + Local context, + std::unique_ptr data) { + Context::Scope context_scope(context); + Local ctor; + if (!GetMessagePortConstructor(env, context).ToLocal(&ctor)) + return nullptr; + MessagePort* port = nullptr; + + // Construct a new instance, then assign the listener instance and possibly + // the MessagePortData to it. + Local instance; + if (!ctor->NewInstance(context).ToLocal(&instance)) + return nullptr; + ASSIGN_OR_RETURN_UNWRAP(&port, instance, nullptr); + if (data) { + port->Detach(); + port->data_ = std::move(data); + port->data_->owner_ = port; + // If the existing MessagePortData object had pending messages, this is + // the easiest way to run that queue. + port->TriggerAsync(); + } + return port; +} + +void MessagePort::OnMessage() { + HandleScope handle_scope(env()->isolate()); + Local context = object()->CreationContext(); + + // data_ can only ever be modified by the owner thread, so no need to lock. + // However, the message port may be transferred while it is processing + // messages, so we need to check that this handle still owns its `data_` field + // on every iteration. + while (data_) { + Message received; + { + // Get the head of the message queue. + Mutex::ScopedLock lock(data_->mutex_); + if (!data_->receiving_messages_) + break; + if (data_->incoming_messages_.empty()) + break; + received = std::move(data_->incoming_messages_.front()); + data_->incoming_messages_.pop_front(); + } + + if (!env()->can_call_into_js()) { + // In this case there is nothing to do but to drain the current queue. + continue; + } + + { + // Call the JS .onmessage() callback. + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(context); + Local args[] = { + received.Deserialize(env(), context).FromMaybe(Local()) + }; + + if (args[0].IsEmpty() || + !object()->Has(context, env()->onmessage_string()).FromMaybe(false) || + MakeCallback(env()->onmessage_string(), 1, args).IsEmpty()) { + // Re-schedule OnMessage() execution in case of failure. + if (data_) + TriggerAsync(); + return; + } + } + } + + if (data_ && data_->IsSiblingClosed()) { + Close(); + } +} + +bool MessagePort::IsSiblingClosed() const { + CHECK(data_); + return data_->IsSiblingClosed(); +} + +void MessagePort::OnClose() { + if (data_) { + data_->owner_ = nullptr; + data_->Disentangle(); + } + data_.reset(); + delete async(); +} + +std::unique_ptr MessagePort::Detach() { + Mutex::ScopedLock lock(data_->mutex_); + data_->owner_ = nullptr; + return std::move(data_); +} + + +void MessagePort::Send(Message&& message) { + Mutex::ScopedLock lock(*data_->sibling_mutex_); + if (data_->sibling_ == nullptr) + return; + data_->sibling_->AddToIncomingQueue(std::move(message)); +} + +void MessagePort::Send(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + Message msg; + if (msg.Serialize(env, object()->CreationContext(), args[0], args[1]) + .IsNothing()) { + return; + } + Send(std::move(msg)); +} + +void MessagePort::PostMessage(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + MessagePort* port; + ASSIGN_OR_RETURN_UNWRAP(&port, args.This()); + if (!port->data_) { + return THROW_ERR_CLOSED_MESSAGE_PORT(env); + } + if (args.Length() == 0) { + return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to " + "MessagePort.postMessage"); + } + port->Send(args); +} + +void MessagePort::Start() { + Mutex::ScopedLock lock(data_->mutex_); + data_->receiving_messages_ = true; + if (!data_->incoming_messages_.empty()) + TriggerAsync(); +} + +void MessagePort::Stop() { + Mutex::ScopedLock lock(data_->mutex_); + data_->receiving_messages_ = false; +} + +void MessagePort::Start(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + MessagePort* port; + ASSIGN_OR_RETURN_UNWRAP(&port, args.This()); + if (!port->data_) { + THROW_ERR_CLOSED_MESSAGE_PORT(env); + return; + } + port->Start(); +} + +void MessagePort::Stop(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + MessagePort* port; + ASSIGN_OR_RETURN_UNWRAP(&port, args.This()); + if (!port->data_) { + THROW_ERR_CLOSED_MESSAGE_PORT(env); + return; + } + port->Stop(); +} + +size_t MessagePort::self_size() const { + Mutex::ScopedLock lock(data_->mutex_); + size_t sz = sizeof(*this) + sizeof(*data_); + for (const Message& msg : data_->incoming_messages_) + sz += sizeof(msg) + msg.main_message_buf_.size; + return sz; +} + +void MessagePort::Entangle(MessagePort* a, MessagePort* b) { + Entangle(a, b->data_.get()); +} + +void MessagePort::Entangle(MessagePort* a, MessagePortData* b) { + MessagePortData::Entangle(a->data_.get(), b); +} + +MaybeLocal GetMessagePortConstructor( + Environment* env, Local context) { + // Factor generating the MessagePort JS constructor into its own piece + // of code, because it is needed early on in the child environment setup. + Local templ = env->message_port_constructor_template(); + if (!templ.IsEmpty()) + return templ->GetFunction(context); + + { + Local m = env->NewFunctionTemplate(MessagePort::New); + m->SetClassName(env->message_port_constructor_string()); + m->InstanceTemplate()->SetInternalFieldCount(1); + + AsyncWrap::AddWrapMethods(env, m); + + env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage); + env->SetProtoMethod(m, "start", MessagePort::Start); + env->SetProtoMethod(m, "stop", MessagePort::Stop); + env->SetProtoMethod(m, "close", HandleWrap::Close); + env->SetProtoMethod(m, "unref", HandleWrap::Unref); + env->SetProtoMethod(m, "ref", HandleWrap::Ref); + env->SetProtoMethod(m, "hasRef", HandleWrap::HasRef); + + env->set_message_port_constructor_template(m); + } + + return GetMessagePortConstructor(env, context); +} + +namespace { + +static void MessageChannel(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + if (!args.IsConstructCall()) { + THROW_ERR_CONSTRUCT_CALL_REQUIRED(env); + return; + } + + Local context = args.This()->CreationContext(); + Context::Scope context_scope(context); + + MessagePort* port1 = MessagePort::New(env, context); + MessagePort* port2 = MessagePort::New(env, context); + MessagePort::Entangle(port1, port2); + + args.This()->Set(env->context(), env->port1_string(), port1->object()) + .FromJust(); + args.This()->Set(env->context(), env->port2_string(), port2->object()) + .FromJust(); +} + +static void InitMessaging(Local target, + Local unused, + Local context, + void* priv) { + Environment* env = Environment::GetCurrent(context); + + { + Local message_channel_string = + FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel"); + Local templ = env->NewFunctionTemplate(MessageChannel); + templ->SetClassName(message_channel_string); + target->Set(env->context(), + message_channel_string, + templ->GetFunction(context).ToLocalChecked()).FromJust(); + } + + target->Set(context, + env->message_port_constructor_string(), + GetMessagePortConstructor(env, context).ToLocalChecked()) + .FromJust(); +} + +} // anonymous namespace + +} // namespace worker +} // namespace node + +NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging) diff --git a/src/node_messaging.h b/src/node_messaging.h new file mode 100644 index 00000000000000..7bd60163ea167c --- /dev/null +++ b/src/node_messaging.h @@ -0,0 +1,167 @@ +#ifndef SRC_NODE_MESSAGING_H_ +#define SRC_NODE_MESSAGING_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "env.h" +#include "node_mutex.h" +#include +#include + +namespace node { +namespace worker { + +class MessagePortData; +class MessagePort; + +// Represents a single communication message. +class Message { + public: + explicit Message(MallocedBuffer&& payload = MallocedBuffer()); + + Message(Message&& other) = default; + Message& operator=(Message&& other) = default; + Message& operator=(const Message&) = delete; + Message(const Message&) = delete; + + // Deserialize the contained JS value. May only be called once, and only + // after Serialize() has been called (e.g. by another thread). + v8::MaybeLocal Deserialize(Environment* env, + v8::Local context); + + // Serialize a JS value, and optionally transfer objects, into this message. + // The Message object retains ownership of all transferred objects until + // deserialization. + v8::Maybe Serialize(Environment* env, + v8::Local context, + v8::Local input, + v8::Local transfer_list); + + private: + MallocedBuffer main_message_buf_; + std::vector> array_buffer_contents_; + + friend class MessagePort; +}; + +// This contains all data for a `MessagePort` instance that is not tied to +// a specific Environment/Isolate/event loop, for easier transfer between those. +class MessagePortData { + public: + explicit MessagePortData(MessagePort* owner); + ~MessagePortData(); + + MessagePortData(MessagePortData&& other) = delete; + MessagePortData& operator=(MessagePortData&& other) = delete; + MessagePortData(const MessagePortData& other) = delete; + MessagePortData& operator=(const MessagePortData& other) = delete; + + // Add a message to the incoming queue and notify the receiver. + // This may be called from any thread. + void AddToIncomingQueue(Message&& message); + + // Returns true if and only this MessagePort is currently not entangled + // with another message port. + bool IsSiblingClosed() const; + + // Turns `a` and `b` into siblings, i.e. connects the sending side of one + // to the receiving side of the other. This is not thread-safe. + static void Entangle(MessagePortData* a, MessagePortData* b); + + // Removes any possible sibling. This is thread-safe (it acquires both + // `sibling_mutex_` and `mutex_`), and has to be because it is called once + // the corresponding JS handle handle wants to close + // which can happen on either side of a worker. + void Disentangle(); + + private: + // After disentangling this message port, the owner handle (if any) + // is asynchronously triggered, so that it can close down naturally. + void PingOwnerAfterDisentanglement(); + + // This mutex protects all fields below it, with the exception of + // sibling_. + mutable Mutex mutex_; + bool receiving_messages_ = false; + std::list incoming_messages_; + MessagePort* owner_ = nullptr; + // This mutex protects the sibling_ field and is shared between two entangled + // MessagePorts. If both mutexes are acquired, this one needs to be + // acquired first. + std::shared_ptr sibling_mutex_ = std::make_shared(); + MessagePortData* sibling_ = nullptr; + + friend class MessagePort; +}; + +// A message port that receives messages from other threads, including +// the uv_async_t handle that is used to notify the current event loop of +// new incoming messages. +class MessagePort : public HandleWrap { + public: + // Create a new MessagePort. The `context` argument specifies the Context + // instance that is used for creating the values emitted from this port. + MessagePort(Environment* env, + v8::Local context, + v8::Local wrap); + ~MessagePort(); + + // Create a new message port instance, optionally over an existing + // `MessagePortData` object. + static MessagePort* New(Environment* env, + v8::Local context, + std::unique_ptr data = nullptr); + + // Send a message, i.e. deliver it into the sibling's incoming queue. + // If there is no sibling, i.e. this port is closed, + // this message is silently discarded. + void Send(Message&& message); + void Send(const v8::FunctionCallbackInfo& args); + // Deliver a single message into this port's incoming queue. + void AddToIncomingQueue(Message&& message); + + // Start processing messages on this port as a receiving end. + void Start(); + // Stop processing messages on this port as a receiving end. + void Stop(); + + static void New(const v8::FunctionCallbackInfo& args); + static void PostMessage(const v8::FunctionCallbackInfo& args); + static void Start(const v8::FunctionCallbackInfo& args); + static void Stop(const v8::FunctionCallbackInfo& args); + + // Turns `a` and `b` into siblings, i.e. connects the sending side of one + // to the receiving side of the other. This is not thread-safe. + static void Entangle(MessagePort* a, MessagePort* b); + static void Entangle(MessagePort* a, MessagePortData* b); + + // Detach this port's data for transferring. After this, the MessagePortData + // is no longer associated with this handle, although it can still receive + // messages. + std::unique_ptr Detach(); + + bool IsSiblingClosed() const; + + size_t self_size() const override; + + private: + void OnClose() override; + void OnMessage(); + void TriggerAsync(); + inline uv_async_t* async(); + + std::unique_ptr data_ = nullptr; + + friend class MessagePortData; +}; + +v8::MaybeLocal GetMessagePortConstructor( + Environment* env, v8::Local context); + +} // namespace worker +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + + +#endif // SRC_NODE_MESSAGING_H_ diff --git a/src/util.h b/src/util.h index e272286d3e4b96..fade27458f3e16 100644 --- a/src/util.h +++ b/src/util.h @@ -436,8 +436,11 @@ struct MallocedBuffer { return ret; } + inline bool is_empty() const { return data == nullptr; } + MallocedBuffer() : data(nullptr) {} explicit MallocedBuffer(size_t size) : data(Malloc(size)), size(size) {} + MallocedBuffer(char* data, size_t size) : data(data), size(size) {} MallocedBuffer(MallocedBuffer&& other) : data(other.data), size(other.size) { other.data = nullptr; } diff --git a/test/parallel/test-message-channel.js b/test/parallel/test-message-channel.js new file mode 100644 index 00000000000000..0facaa1d835ea8 --- /dev/null +++ b/test/parallel/test-message-channel.js @@ -0,0 +1,26 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { MessageChannel } = require('worker'); + +{ + const channel = new MessageChannel(); + + channel.port1.on('message', common.mustCall(({ typedArray }) => { + assert.deepStrictEqual(typedArray, new Uint8Array([0, 1, 2, 3, 4])); + })); + + const typedArray = new Uint8Array([0, 1, 2, 3, 4]); + channel.port2.postMessage({ typedArray }, [ typedArray.buffer ]); + assert.strictEqual(typedArray.buffer.byteLength, 0); + channel.port2.close(); +} + +{ + const channel = new MessageChannel(); + + channel.port1.on('close', common.mustCall()); + channel.port2.on('close', common.mustCall()); + channel.port2.close(); +} diff --git a/test/parallel/test-message-port-arraybuffer.js b/test/parallel/test-message-port-arraybuffer.js new file mode 100644 index 00000000000000..4abeb585b4fb15 --- /dev/null +++ b/test/parallel/test-message-port-arraybuffer.js @@ -0,0 +1,20 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { MessageChannel } = require('worker'); + +{ + const { port1, port2 } = new MessageChannel(); + + const arrayBuffer = new ArrayBuffer(40); + const typedArray = new Uint32Array(arrayBuffer); + typedArray[0] = 0x12345678; + + port1.postMessage(typedArray, [ arrayBuffer ]); + port2.on('message', common.mustCall((received) => { + assert.strictEqual(received[0], 0x12345678); + port2.close(common.mustCall()); + })); +} diff --git a/test/parallel/test-message-port.js b/test/parallel/test-message-port.js new file mode 100644 index 00000000000000..8a7f3805200fa3 --- /dev/null +++ b/test/parallel/test-message-port.js @@ -0,0 +1,56 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { MessageChannel, MessagePort } = require('worker'); + +{ + const { port1, port2 } = new MessageChannel(); + assert(port1 instanceof MessagePort); + assert(port2 instanceof MessagePort); + + const input = { a: 1 }; + port1.postMessage(input); + port2.on('message', common.mustCall((received) => { + assert.deepStrictEqual(received, input); + port2.close(common.mustCall()); + })); +} + +{ + const { port1, port2 } = new MessageChannel(); + + const input = { a: 1 }; + port1.postMessage(input); + // Check that the message still gets delivered if `port2` has its + // `on('message')` handler attached at a later point in time. + setImmediate(() => { + port2.on('message', common.mustCall((received) => { + assert.deepStrictEqual(received, input); + port2.close(common.mustCall()); + })); + }); +} + +{ + const { port1, port2 } = new MessageChannel(); + + const input = { a: 1 }; + + const dummy = common.mustNotCall(); + // Check that the message still gets delivered if `port2` has its + // `on('message')` handler attached at a later point in time, even if a + // listener was removed previously. + port2.addListener('message', dummy); + setImmediate(() => { + port2.removeListener('message', dummy); + port1.postMessage(input); + setImmediate(() => { + port2.on('message', common.mustCall((received) => { + assert.deepStrictEqual(received, input); + port2.close(common.mustCall()); + })); + }); + }); +} diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js index 971296915ceecb..84a3e3b1f4dc05 100644 --- a/test/sequential/test-async-wrap-getasyncid.js +++ b/test/sequential/test-async-wrap-getasyncid.js @@ -35,7 +35,9 @@ common.crashOnUnhandledRejection(); delete providers.HTTP2STREAM; delete providers.HTTP2PING; delete providers.HTTP2SETTINGS; + // TODO(addaleax): Test for these delete providers.STREAMPIPE; + delete providers.MESSAGEPORT; const objKeys = Object.keys(providers); if (objKeys.length > 0) diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index e7c7aa69da5e95..be72893832373a 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -117,7 +117,9 @@ const customTypesMap = { 'Tracing': 'tracing.html#tracing_tracing_object', 'URL': 'url.html#url_the_whatwg_url_api', - 'URLSearchParams': 'url.html#url_class_urlsearchparams' + 'URLSearchParams': 'url.html#url_class_urlsearchparams', + + 'MessagePort': 'worker.html#worker_class_messageport' }; const arrayPart = /(?:\[])+$/;