diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 9aa8c199588050..a72868916ca6cc 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -61,11 +61,11 @@ MessagePort.prototype.unref = MessagePortPrototype.unref; // uv_async_t) which can receive information from other threads and emits // .onmessage events, and a function used for sending data to a MessagePort // in some other thread. -MessagePort.prototype[kOnMessageListener] = function onmessage(payload) { - if (payload.type !== messageTypes.STDIO_WANTS_MORE_DATA) - debug(`[${threadId}] received message`, payload); +MessagePort.prototype[kOnMessageListener] = function onmessage(event) { + if (event.data && event.data.type !== messageTypes.STDIO_WANTS_MORE_DATA) + debug(`[${threadId}] received message`, event); // Emit the deserialized object to userland. - this.emit('message', payload); + this.emit('message', event.data); }; // This is for compatibility with the Web's MessagePort API. It makes sense to diff --git a/src/env.h b/src/env.h index 48aaa63a39cab4..749ad22b67267f 100644 --- a/src/env.h +++ b/src/env.h @@ -146,6 +146,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2; V(crypto_ec_string, "ec") \ V(crypto_rsa_string, "rsa") \ V(cwd_string, "cwd") \ + V(data_string, "data") \ V(dest_string, "dest") \ V(destroyed_string, "destroyed") \ V(detached_string, "detached") \ @@ -291,6 +292,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2; V(subject_string, "subject") \ V(subjectaltname_string, "subjectaltname") \ V(syscall_string, "syscall") \ + V(target_string, "target") \ V(thread_id_string, "threadId") \ V(ticketkeycallback_string, "onticketkeycallback") \ V(timeout_string, "timeout") \ @@ -359,6 +361,7 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2; V(inspector_console_extension_installer, v8::Function) \ V(libuv_stream_wrap_ctor_template, v8::FunctionTemplate) \ V(message_port, v8::Object) \ + V(message_event_object_template, v8::ObjectTemplate) \ V(message_port_constructor_template, v8::FunctionTemplate) \ V(native_module_require, v8::Function) \ V(performance_entry_callback, v8::Function) \ diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 896f43cd23d372..62f333bab0db98 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -25,6 +25,7 @@ using v8::Maybe; using v8::MaybeLocal; using v8::Nothing; using v8::Object; +using v8::ObjectTemplate; using v8::SharedArrayBuffer; using v8::String; using v8::Value; @@ -589,12 +590,19 @@ void MessagePort::OnMessage() { // Call the JS .onmessage() callback. HandleScope handle_scope(env()->isolate()); Context::Scope context_scope(context); - Local args[] = { - received.Deserialize(env(), context).FromMaybe(Local()) - }; - if (args[0].IsEmpty() || - MakeCallback(env()->onmessage_string(), 1, args).IsEmpty()) { + Local event; + Local payload; + Local cb_args[1]; + if (!received.Deserialize(env(), context).ToLocal(&payload) || + !env()->message_event_object_template()->NewInstance(context) + .ToLocal(&event) || + event->Set(context, env()->data_string(), payload).IsNothing() || + event->Set(context, env()->target_string(), object()).IsNothing() || + (cb_args[0] = event, false) || + MakeCallback(env()->onmessage_string(), + arraysize(cb_args), + cb_args).IsEmpty()) { // Re-schedule OnMessage() execution in case of failure. if (data_) TriggerAsync(); @@ -763,6 +771,8 @@ MaybeLocal GetMessagePortConstructor( if (!templ.IsEmpty()) return templ->GetFunction(context); + Isolate* isolate = env->isolate(); + { Local m = env->NewFunctionTemplate(MessagePort::New); m->SetClassName(env->message_port_constructor_string()); @@ -775,6 +785,13 @@ MaybeLocal GetMessagePortConstructor( env->SetProtoMethod(m, "drain", MessagePort::Drain); env->set_message_port_constructor_template(m); + + Local event_ctor = FunctionTemplate::New(isolate); + event_ctor->SetClassName(FIXED_ONE_BYTE_STRING(isolate, "MessageEvent")); + Local e = event_ctor->InstanceTemplate(); + e->Set(env->data_string(), Null(isolate)); + e->Set(env->target_string(), Null(isolate)); + env->set_message_event_object_template(e); } return GetMessagePortConstructor(env, context); diff --git a/test/parallel/test-worker-message-port-transfer-self.js b/test/parallel/test-worker-message-port-transfer-self.js index f076d3cb70f733..f02f06eaaea53f 100644 --- a/test/parallel/test-worker-message-port-transfer-self.js +++ b/test/parallel/test-worker-message-port-transfer-self.js @@ -25,7 +25,7 @@ assert.throws(common.mustCall(() => { // The failed transfer should not affect the ports in anyway. port2.onmessage = common.mustCall((message) => { - assert.strictEqual(message, 2); + assert.strictEqual(message.data, 2); const inspectedPort1 = util.inspect(port1); const inspectedPort2 = util.inspect(port2); diff --git a/test/parallel/test-worker-message-port.js b/test/parallel/test-worker-message-port.js index 47a8ddf6653c53..dce9da0b3063f5 100644 --- a/test/parallel/test-worker-message-port.js +++ b/test/parallel/test-worker-message-port.js @@ -21,14 +21,15 @@ const { MessageChannel, MessagePort } = require('worker_threads'); const { port1, port2 } = new MessageChannel(); port1.onmessage = common.mustCall((message) => { - assert.strictEqual(message, 4); + assert.strictEqual(message.data, 4); + assert.strictEqual(message.target, port1); port2.close(common.mustCall()); }); port1.postMessage(2); port2.onmessage = common.mustCall((message) => { - port2.postMessage(message * 2); + port2.postMessage(message.data * 2); }); } diff --git a/test/parallel/test-worker-onmessage.js b/test/parallel/test-worker-onmessage.js index 47f6324aad6933..f315b0d2019a9a 100644 --- a/test/parallel/test-worker-onmessage.js +++ b/test/parallel/test-worker-onmessage.js @@ -14,6 +14,6 @@ if (!process.env.HAS_STARTED_WORKER) { w.postMessage(2); } else { parentPort.onmessage = common.mustCall((message) => { - parentPort.postMessage(message * 2); + parentPort.postMessage(message.data * 2); }); }