From bbb6ae7e6f24272ee2f4c013f658444bd5c37cc3 Mon Sep 17 00:00:00 2001 From: Martin Schitter Date: Thu, 7 Mar 2024 09:36:28 +0100 Subject: [PATCH 1/5] implement receiveMessageOnPort for node:worker_threads --- ext/node/polyfills/worker_threads.ts | 17 +++++++- ext/web/13_message_port.js | 12 ++++++ ext/web/internal.d.ts | 2 + ext/web/lib.deno_web.d.ts | 5 +++ ext/web/lib.rs | 2 + ext/web/message_port.rs | 20 ++++++++++ tests/node_compat/config.jsonc | 3 ++ ...est-worker-message-port-receive-message.js | 40 +++++++++++++++++++ tools/node_compat/TODO.md | 3 +- 9 files changed, 100 insertions(+), 4 deletions(-) create mode 100644 tests/node_compat/test/parallel/test-worker-message-port-receive-message.js diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 785bf021d4e702..86961186bef0d0 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -311,9 +311,22 @@ export function markAsUntransferable() { export function moveMessagePortToContext() { notImplemented("moveMessagePortToContext"); } -export function receiveMessageOnPort() { - notImplemented("receiveMessageOnPort"); + +/** + * @param { MessagePort } port + * @returns {object | undefined} + */ +export function receiveMessageOnPort(port: MessagePort): object | undefined { + if (!(port instanceof MessagePort)) { + const err = new TypeError( + 'The "port" argument must be a MessagePort instance', + ); + err["code"] = "ERR_INVALID_ARG_TYPE"; + throw err; + } + return port.receiveMessage(); } + export { _Worker as Worker, BroadcastChannel, diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index b6a308716e109c..1ef85855ef5e9c 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -11,6 +11,7 @@ import { op_message_port_create_entangled, op_message_port_post_message, op_message_port_recv_message, + op_message_port_recv_message_sync, } from "ext:core/ops"; const { ArrayBufferPrototypeGetByteLength, @@ -145,6 +146,17 @@ class MessagePort extends EventTarget { op_message_port_post_message(this[_id], data); } + /** + * This method is only provided for `receiveMessageOnPort` + * in `node:worker_threads` + * @returns {object | undefined} + */ + receiveMessage() { + const data = op_message_port_recv_message_sync(this[_id]); + if (data === null) return undefined; + return { message: deserializeJsMessageData(data)[0] }; + } + start() { webidl.assertBranded(this, MessagePortPrototype); if (this[_enabled]) return; diff --git a/ext/web/internal.d.ts b/ext/web/internal.d.ts index 4af04b07135a8e..336f5837cf71b9 100644 --- a/ext/web/internal.d.ts +++ b/ext/web/internal.d.ts @@ -110,4 +110,6 @@ declare module "ext:deno_web/13_message_port.js" { data: Uint8Array; transferables: Transferable[]; } + const MessageChannel: typeof MessageChannel; + const MessagePort: typeof MessagePort; } diff --git a/ext/web/lib.deno_web.d.ts b/ext/web/lib.deno_web.d.ts index d28e33f58df567..65cb340418603f 100644 --- a/ext/web/lib.deno_web.d.ts +++ b/ext/web/lib.deno_web.d.ts @@ -1059,6 +1059,11 @@ declare interface MessagePort extends EventTarget { */ postMessage(message: any, transfer: Transferable[]): void; postMessage(message: any, options?: StructuredSerializeOptions): void; + /** + * Sync message access. Provided only for use in + * `node:worker_threads.receiveMessageOnPort` + */ + receiveMessage(): object | undefined; /** * Begins dispatching messages received on the port. This is implicitly called * when assigning a value to `this.onmessage`. diff --git a/ext/web/lib.rs b/ext/web/lib.rs index 332102fcc9031a..60a0cc0d72d4af 100644 --- a/ext/web/lib.rs +++ b/ext/web/lib.rs @@ -46,6 +46,7 @@ pub use crate::message_port::create_entangled_message_port; use crate::message_port::op_message_port_create_entangled; use crate::message_port::op_message_port_post_message; use crate::message_port::op_message_port_recv_message; +use crate::message_port::op_message_port_recv_message_sync; pub use crate::message_port::JsMessageData; pub use crate::message_port::MessagePort; @@ -78,6 +79,7 @@ deno_core::extension!(deno_web, op_message_port_create_entangled, op_message_port_post_message, op_message_port_recv_message, + op_message_port_recv_message_sync, compression::op_compression_new, compression::op_compression_write, compression::op_compression_finish, diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs index 5560309657a0d3..18429a17951876 100644 --- a/ext/web/message_port.rs +++ b/ext/web/message_port.rs @@ -17,6 +17,7 @@ use deno_core::Resource; use deno_core::ResourceId; use serde::Deserialize; use serde::Serialize; +use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedSender; @@ -227,3 +228,22 @@ pub async fn op_message_port_recv_message( let cancel = RcRef::map(resource.clone(), |r| &r.cancel); resource.port.recv(state).or_cancel(cancel).await? } + +#[op2] +#[serde] +pub fn op_message_port_recv_message_sync( + state: &mut OpState, // Rc>, + #[smi] rid: ResourceId, +) -> Result, AnyError> { + let resource = state.resource_table.get::(rid)?; + let mut rx = resource.port.rx.borrow_mut(); + + match rx.try_recv() { + Ok((d, t)) => Ok(Some(JsMessageData { + data: d, + transferables: serialize_transferables(state, t), + })), + Err(TryRecvError::Empty) => Ok(None), + Err(TryRecvError::Disconnected) => Ok(None), + } +} diff --git a/tests/node_compat/config.jsonc b/tests/node_compat/config.jsonc index d84cc4dd2c3f8e..9bec0e68f733bf 100644 --- a/tests/node_compat/config.jsonc +++ b/tests/node_compat/config.jsonc @@ -104,6 +104,8 @@ "test-util.js", "test-webcrypto-sign-verify.js", "test-whatwg-url-properties.js", + // needs replace ".on" => ".addEventListener" in L29 + "test-worker-message-port-receive-message.js", "test-zlib-convenience-methods.js", "test-zlib-empty-buffer.js", "test-zlib-invalid-input.js", @@ -664,6 +666,7 @@ "test-whatwg-url-custom-tostringtag.js", "test-whatwg-url-override-hostname.js", "test-whatwg-url-properties.js", + "test-worker-message-port-receive-message.js", "test-zlib-close-after-error.js", "test-zlib-close-after-write.js", "test-zlib-convenience-methods.js", diff --git a/tests/node_compat/test/parallel/test-worker-message-port-receive-message.js b/tests/node_compat/test/parallel/test-worker-message-port-receive-message.js new file mode 100644 index 00000000000000..3945a8a1fca984 --- /dev/null +++ b/tests/node_compat/test/parallel/test-worker-message-port-receive-message.js @@ -0,0 +1,40 @@ +// deno-fmt-ignore-file +// deno-lint-ignore-file + +// Copyright Joyent and Node contributors. All rights reserved. MIT license. +// Taken from Node 18.12.1 +// This file is automatically generated by `tools/node_compat/setup.ts`. Do not modify this file manually. + +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { MessageChannel, receiveMessageOnPort } = require('worker_threads'); + +const { port1, port2 } = new MessageChannel(); + +const message1 = { hello: 'world' }; +const message2 = { foo: 'bar' }; + +// Make sure receiveMessageOnPort() works in a FIFO way, the same way it does +// when we’re using events. +assert.strictEqual(receiveMessageOnPort(port2), undefined); +port1.postMessage(message1); +port1.postMessage(message2); +assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 }); +assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message2 }); +assert.strictEqual(receiveMessageOnPort(port2), undefined); +assert.strictEqual(receiveMessageOnPort(port2), undefined); + +// Make sure message handlers aren’t called. +port2.addEventListener('message', common.mustNotCall()); +port1.postMessage(message1); +assert.deepStrictEqual(receiveMessageOnPort(port2), { message: message1 }); +port1.close(); + +for (const value of [null, 0, -1, {}, []]) { + assert.throws(() => receiveMessageOnPort(value), { + name: 'TypeError', + code: 'ERR_INVALID_ARG_TYPE', + message: 'The "port" argument must be a MessagePort instance' + }); +} diff --git a/tools/node_compat/TODO.md b/tools/node_compat/TODO.md index eb288c65e3a8ae..139a55dcbd5f9b 100644 --- a/tools/node_compat/TODO.md +++ b/tools/node_compat/TODO.md @@ -3,7 +3,7 @@ NOTE: This file should not be manually edited. Please edit `tests/node_compat/config.json` and run `deno task setup` in `tools/node_compat` dir instead. -Total: 2999 +Total: 2998 - [abort/test-abort-backtrace.js](https://github.com/nodejs/node/tree/v18.12.1/test/abort/test-abort-backtrace.js) - [abort/test-abort-fatal-error.js](https://github.com/nodejs/node/tree/v18.12.1/test/abort/test-abort-fatal-error.js) @@ -2711,7 +2711,6 @@ Total: 2999 - [parallel/test-worker-message-port-message-port-transferring.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-message-port-transferring.js) - [parallel/test-worker-message-port-move.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-move.js) - [parallel/test-worker-message-port-multiple-sharedarraybuffers.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-multiple-sharedarraybuffers.js) -- [parallel/test-worker-message-port-receive-message.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-receive-message.js) - [parallel/test-worker-message-port-terminate-transfer-list.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-terminate-transfer-list.js) - [parallel/test-worker-message-port-transfer-closed.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-transfer-closed.js) - [parallel/test-worker-message-port-transfer-duplicate.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-worker-message-port-transfer-duplicate.js) From f101602b8e0786955e1f5d1f6f3af34da5fb67cb Mon Sep 17 00:00:00 2001 From: Martin Schitter Date: Sat, 9 Mar 2024 18:55:27 +0100 Subject: [PATCH 2/5] move receiveMessage code to node:worker_threads --- ext/node/polyfills/worker_threads.ts | 12 ++++++++++-- ext/web/13_message_port.js | 14 ++------------ ext/web/internal.d.ts | 4 ++++ ext/web/lib.deno_web.d.ts | 5 ----- 4 files changed, 16 insertions(+), 19 deletions(-) diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 86961186bef0d0..5eb03a5a63e4ff 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -11,7 +11,13 @@ import { isAbsolute, resolve } from "node:path"; import { notImplemented } from "ext:deno_node/_utils.ts"; import { EventEmitter, once } from "node:events"; import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; -import { MessageChannel, MessagePort } from "ext:deno_web/13_message_port.js"; +import { op_message_port_recv_message_sync } from "ext:core/ops"; +import { + deserializeJsMessageData, + MessageChannel, + MessagePort, + MessagePortIdSymbol, +} from "ext:deno_web/13_message_port.js"; let environmentData = new Map(); let threads = 0; @@ -324,7 +330,9 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined { err["code"] = "ERR_INVALID_ARG_TYPE"; throw err; } - return port.receiveMessage(); + const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]); + if (data === null) return undefined; + return { message: deserializeJsMessageData(data)[0] }; } export { diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index 1ef85855ef5e9c..bd7712c5f261ad 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -11,7 +11,6 @@ import { op_message_port_create_entangled, op_message_port_post_message, op_message_port_recv_message, - op_message_port_recv_message_sync, } from "ext:core/ops"; const { ArrayBufferPrototypeGetByteLength, @@ -84,6 +83,7 @@ webidl.configureInterface(MessageChannel); const MessageChannelPrototype = MessageChannel.prototype; const _id = Symbol("id"); +const MessagePortIdSymbol = _id; const _enabled = Symbol("enabled"); /** @@ -146,17 +146,6 @@ class MessagePort extends EventTarget { op_message_port_post_message(this[_id], data); } - /** - * This method is only provided for `receiveMessageOnPort` - * in `node:worker_threads` - * @returns {object | undefined} - */ - receiveMessage() { - const data = op_message_port_recv_message_sync(this[_id]); - if (data === null) return undefined; - return { message: deserializeJsMessageData(data)[0] }; - } - start() { webidl.assertBranded(this, MessagePortPrototype); if (this[_enabled]) return; @@ -392,6 +381,7 @@ export { deserializeJsMessageData, MessageChannel, MessagePort, + MessagePortIdSymbol, MessagePortPrototype, serializeJsMessageData, structuredClone, diff --git a/ext/web/internal.d.ts b/ext/web/internal.d.ts index 336f5837cf71b9..b2aea80d9fb75a 100644 --- a/ext/web/internal.d.ts +++ b/ext/web/internal.d.ts @@ -112,4 +112,8 @@ declare module "ext:deno_web/13_message_port.js" { } const MessageChannel: typeof MessageChannel; const MessagePort: typeof MessagePort; + const MessagePortIdSymbol: typeof MessagePortIdSymbol; + function deserializeJsMessageData( + messageData: messagePort.MessageData, + ): [object, object[]]; } diff --git a/ext/web/lib.deno_web.d.ts b/ext/web/lib.deno_web.d.ts index 65cb340418603f..d28e33f58df567 100644 --- a/ext/web/lib.deno_web.d.ts +++ b/ext/web/lib.deno_web.d.ts @@ -1059,11 +1059,6 @@ declare interface MessagePort extends EventTarget { */ postMessage(message: any, transfer: Transferable[]): void; postMessage(message: any, options?: StructuredSerializeOptions): void; - /** - * Sync message access. Provided only for use in - * `node:worker_threads.receiveMessageOnPort` - */ - receiveMessage(): object | undefined; /** * Begins dispatching messages received on the port. This is implicitly called * when assigning a value to `this.onmessage`. From 7053ee717ee4605cd712a32cd1f529ffe4da295d Mon Sep 17 00:00:00 2001 From: Martin Schitter Date: Sun, 10 Mar 2024 12:35:19 +0100 Subject: [PATCH 3/5] fix linters primordials warning --- ext/node/polyfills/worker_threads.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index b3284a05922173..ba22e2c5117834 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -15,6 +15,7 @@ import { MessageChannel, MessagePort, MessagePortIdSymbol, + MessagePortPrototype, serializeJsMessageData, } from "ext:deno_web/13_message_port.js"; import * as webidl from "ext:deno_webidl/00_webidl.js"; @@ -25,6 +26,7 @@ import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channe import { op_message_port_recv_message_sync } from "ext:core/ops"; import { isAbsolute, resolve } from "node:path"; +const { ObjectPrototypeIsPrototypeOf } = primordials; const { Error, Symbol, @@ -504,7 +506,7 @@ export function moveMessagePortToContext() { * @returns {object | undefined} */ export function receiveMessageOnPort(port: MessagePort): object | undefined { - if (!(port instanceof MessagePort)) { + if (!(ObjectPrototypeIsPrototypeOf(MessagePortPrototype, port))) { const err = new TypeError( 'The "port" argument must be a MessagePort instance', ); From 7ed9164e4a025053fbf17777abd4b568b3eb7e42 Mon Sep 17 00:00:00 2001 From: Martin Schitter Date: Sun, 10 Mar 2024 19:13:25 +0100 Subject: [PATCH 4/5] fixing ESM detection and cosmetic move of import line --- ext/node/polyfills/worker_threads.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index ba22e2c5117834..a946e387d25372 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -8,6 +8,7 @@ import { op_host_recv_ctrl, op_host_recv_message, op_host_terminate_worker, + op_message_port_recv_message_sync, op_require_read_closest_package_json, } from "ext:core/ops"; import { @@ -23,7 +24,6 @@ import { log } from "ext:runtime/06_util.js"; import { notImplemented } from "ext:deno_node/_utils.ts"; import { EventEmitter, once } from "node:events"; import { BroadcastChannel } from "ext:deno_broadcast_channel/01_broadcast_channel.js"; -import { op_message_port_recv_message_sync } from "ext:core/ops"; import { isAbsolute, resolve } from "node:path"; const { ObjectPrototypeIsPrototypeOf } = primordials; @@ -175,17 +175,17 @@ class NodeWorker extends EventEmitter { // empty catch block when package json might not be present } if ( - !(StringPrototypeEndsWith( + (StringPrototypeEndsWith( StringPrototypeToString(specifier), ".mjs", )) || (pkg && pkg.exists && pkg.typ == "module") ) { + specifier = toFileUrl(specifier as string); + } else { const cwdFileUrl = toFileUrl(Deno.cwd()); specifier = `data:text/javascript,(async function() {const { createRequire } = await import("node:module");const require = createRequire("${cwdFileUrl}");require("${specifier}");})();`; - } else { - specifier = toFileUrl(specifier as string); } } From 062ed22fe534d6930f2c9c91fe0c8a77f77d767b Mon Sep 17 00:00:00 2001 From: Martin Schitter Date: Sun, 10 Mar 2024 23:40:54 +0100 Subject: [PATCH 5/5] revert critical change --- ext/node/polyfills/worker_threads.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index a946e387d25372..74abf5bb5f0fcf 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -175,17 +175,17 @@ class NodeWorker extends EventEmitter { // empty catch block when package json might not be present } if ( - (StringPrototypeEndsWith( + !(StringPrototypeEndsWith( StringPrototypeToString(specifier), ".mjs", )) || (pkg && pkg.exists && pkg.typ == "module") ) { - specifier = toFileUrl(specifier as string); - } else { const cwdFileUrl = toFileUrl(Deno.cwd()); specifier = `data:text/javascript,(async function() {const { createRequire } = await import("node:module");const require = createRequire("${cwdFileUrl}");require("${specifier}");})();`; + } else { + specifier = toFileUrl(specifier as string); } }