Skip to content
This repository has been archived by the owner on Aug 31, 2018. It is now read-only.

worker: implement MessagePort and MessageChannel #98

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/internal/module.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ const builtinLibs = [
'assert', 'async_hooks', 'buffer', 'child_process', 'cluster', 'crypto',
'dgram', 'dns', 'domain', 'events', 'fs', 'http', 'http2', 'https', 'net',
'os', 'path', 'perf_hooks', 'punycode', 'querystring', 'readline', 'repl',
'stream', 'string_decoder', 'tls', 'tty', 'url', 'util', 'v8', 'vm', 'zlib'
'stream', 'string_decoder', 'tls', 'tty', 'url', 'util', 'v8', 'vm', 'worker',
'zlib'
];

if (typeof process.binding('inspector').connect === 'function') {
Expand Down
83 changes: 83 additions & 0 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
'use strict';

const EventEmitter = require('events');
const util = require('util');

const { MessagePort, MessageChannel } = process.binding('messaging');
util.inherits(MessagePort, EventEmitter);

const debug = util.debuglog('worker');

// A MessagePort consists of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
function onmessage(payload, flag) {
debug(`[${process.threadId}] received message`, flag, payload);
// Emit the flag and deserialized object to userland.
if (flag === 0 || flag === undefined)
this.emit('message', payload);
else
this.emit('flaggedMessage', flag, payload);
}

Object.defineProperty(MessagePort.prototype, 'onmessage', {
enumerable: true,
configurable: true,
get() { return onmessage; },
set(value) {
Object.defineProperty(this, {
writable: true,
enumerable: true,
configurable: true,
value
});
this.ref();
this.start();
}
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing onmessage will stop the message and flaggedMessage events from being emitted. Is that intended behaviour?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I would say so. This is only there to match the Web spec more closely, but in that case you don’t want the EventEmitter interface anyways I guess.


function oninit() {
setupPortReferencing(this, this, 'message');
}

Object.defineProperty(MessagePort.prototype, 'oninit', {
enumerable: true,
writable: false,
value: oninit
});

function onclose() {
this.emit('close');
}

Object.defineProperty(MessagePort.prototype, 'onclose', {
enumerable: true,
writable: false,
value: onclose
});

function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
// If there are none or all are removed, unref() the channel so the worker
// can shutdown gracefully.
port.unref();
eventEmitter.on('newListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.ref();
port.start();
}
});
eventEmitter.on('removeListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.stop();
port.unref();
}
});
}

module.exports = {
MessagePort,
MessageChannel
};
5 changes: 5 additions & 0 deletions lib/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
'use strict';

const { MessagePort, MessageChannel } = require('internal/worker');

module.exports = { MessagePort, MessageChannel };
4 changes: 4 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
'lib/util.js',
'lib/v8.js',
'lib/vm.js',
'lib/worker.js',
'lib/zlib.js',
'lib/internal/buffer.js',
'lib/internal/child_process.js',
Expand Down Expand Up @@ -129,6 +130,7 @@
'lib/internal/http2/util.js',
'lib/internal/v8_prof_polyfill.js',
'lib/internal/v8_prof_processor.js',
'lib/internal/worker.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/BufferList.js',
'lib/internal/streams/legacy.js',
Expand Down Expand Up @@ -208,6 +210,7 @@
'src/node_http2.cc',
'src/node_http_parser.cc',
'src/node_main.cc',
'src/node_messaging.cc',
'src/node_os.cc',
'src/node_platform.cc',
'src/node_perf.cc',
Expand Down Expand Up @@ -259,6 +262,7 @@
'src/node_http2_state.h',
'src/node_internals.h',
'src/node_javascript.h',
'src/node_messaging.h',
'src/node_mutex.h',
'src/node_platform.h',
'src/node_perf.h',
Expand Down
1 change: 1 addition & 0 deletions src/async-wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ namespace node {
V(HTTP2SESSIONSHUTDOWNWRAP) \
V(HTTPPARSER) \
V(JSSTREAM) \
V(MESSAGEPORT) \
V(PIPECONNECTWRAP) \
V(PIPEWRAP) \
V(PROCESSWRAP) \
Expand Down
5 changes: 5 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class ModuleWrap;
V(mac_string, "mac") \
V(max_buffer_string, "maxBuffer") \
V(message_string, "message") \
V(message_port_constructor_string, "MessagePort") \
V(minttl_string, "minttl") \
V(model_string, "model") \
V(modulus_string, "modulus") \
Expand All @@ -208,6 +209,7 @@ class ModuleWrap;
V(onhandshakedone_string, "onhandshakedone") \
V(onhandshakestart_string, "onhandshakestart") \
V(onheaders_string, "onheaders") \
V(oninit_string, "oninit") \
V(onmessage_string, "onmessage") \
V(onnewsession_string, "onnewsession") \
V(onnewsessiondone_string, "onnewsessiondone") \
Expand Down Expand Up @@ -235,6 +237,8 @@ class ModuleWrap;
V(pid_string, "pid") \
V(pipe_string, "pipe") \
V(port_string, "port") \
V(port1_string, "port1") \
V(port2_string, "port2") \
V(preference_string, "preference") \
V(priority_string, "priority") \
V(produce_cached_data_string, "produceCachedData") \
Expand Down Expand Up @@ -308,6 +312,7 @@ class ModuleWrap;
V(domain_array, v8::Array) \
V(domains_stack_array, v8::Array) \
V(inspector_console_api_object, v8::Object) \
V(message_port_constructor_template, v8::FunctionTemplate) \
V(module_load_list_array, v8::Array) \
V(pbkdf2_constructor_template, v8::ObjectTemplate) \
V(pipe_constructor_template, v8::FunctionTemplate) \
Expand Down
Loading