From 13d9e9135f7990519e1f21606ba24788cb3b0a94 Mon Sep 17 00:00:00 2001 From: Mestery Date: Tue, 22 Mar 2022 15:04:50 +0100 Subject: [PATCH] feat(node): worker_threads (#1151) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Yoshiya Hinosawa Co-authored-by: Bartek IwaƄczuk --- node/README.md | 2 +- node/module_all.ts | 2 +- node/testdata/worker_threads.ts | 30 ++++ node/worker_threads.ts | 262 ++++++++++++++++++++++++++------ node/worker_threads_test.ts | 187 +++++++++++++++++++++++ 5 files changed, 436 insertions(+), 47 deletions(-) create mode 100644 node/testdata/worker_threads.ts create mode 100644 node/worker_threads_test.ts diff --git a/node/README.md b/node/README.md index 43d86f7eb5a0..8b5c26b695ff 100644 --- a/node/README.md +++ b/node/README.md @@ -56,7 +56,7 @@ Deno standard library as it's a compatibility module. - [x] vm _partly_ - [x] wasi - [ ] webcrypto -- [ ] worker_threads +- [x] worker_threads - [ ] zlib * [x] node globals _partly_ diff --git a/node/module_all.ts b/node/module_all.ts index 5a69f87858fc..342e978352d3 100644 --- a/node/module_all.ts +++ b/node/module_all.ts @@ -151,6 +151,6 @@ export default { v8, vm, wasi, - workerThreads, + "worker_threads": workerThreads, zlib, } as Record; diff --git a/node/testdata/worker_threads.ts b/node/testdata/worker_threads.ts new file mode 100644 index 000000000000..d1b5c4885103 --- /dev/null +++ b/node/testdata/worker_threads.ts @@ -0,0 +1,30 @@ +import { + getEnvironmentData, + isMainThread, + parentPort, + threadId, + workerData, +} from "../worker_threads.ts"; +import { once } from "../events.ts"; + +async function message(expectedMessage: string) { + const [message] = await once(parentPort, "message"); + if (message !== expectedMessage) { + console.log(`Expected the message "${expectedMessage}", but got`, message); + // fail test + parentPort.close(); + } +} + +await message("Hello, how are you my thread?"); +parentPort.postMessage("I'm fine!"); + +parentPort.postMessage({ + isMainThread, + threadId, + workerData: Array.isArray(workerData) && + workerData[workerData.length - 1] instanceof MessagePort + ? workerData.slice(0, -1) + : workerData, + envData: [getEnvironmentData("test"), getEnvironmentData(1)], +}); diff --git a/node/worker_threads.ts b/node/worker_threads.ts index 24feb70f0766..78e886bfb010 100644 --- a/node/worker_threads.ts +++ b/node/worker_threads.ts @@ -1,63 +1,235 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. // Copyright Joyent and Node contributors. All rights reserved. MIT license. +import { resolve, toFileUrl } from "../path/mod.ts"; import { notImplemented } from "./_utils.ts"; +import { EventEmitter, once } from "./events.ts"; -export function getEnvironmentData() { - notImplemented(); -} -export const isMainThread = undefined; -export function markAsUntransferable() { - notImplemented(); -} -export function moveMessagePortToContext() { - notImplemented(); -} -export const parentPort = undefined; -export function receiveMessageOnPort() { - notImplemented(); -} -export const resourceLimits = undefined; -export const SHARE_ENV = undefined; -export function setEnvironmentData() { - notImplemented(); +let environmentData = new Map(); +let threads = 0; + +export interface WorkerOptions { + // only for typings + argv?: unknown[]; + env?: Record; + execArgv?: string[]; + stdin?: boolean; + stdout?: boolean; + stderr?: boolean; + trackUnmanagedFds?: boolean; + resourceLimits?: { + maxYoungGenerationSizeMb?: number; + maxOldGenerationSizeMb?: number; + codeRangeSizeMb?: number; + stackSizeMb?: number; + }; + + eval?: boolean; + transferList?: Transferable[]; + workerData?: unknown; } -export const threadId = undefined; -export const workerData = undefined; -export class BroadcastChannel { - constructor() { - notImplemented(); + +const kHandle = Symbol("kHandle"); +class _Worker extends EventEmitter { + readonly threadId: number; + readonly resourceLimits: Required< + NonNullable + > = { + maxYoungGenerationSizeMb: -1, + maxOldGenerationSizeMb: -1, + codeRangeSizeMb: -1, + stackSizeMb: 4, + }; + private readonly [kHandle]: Worker; + + postMessage: Worker["postMessage"]; + + constructor(specifier: URL | string, options?: WorkerOptions) { + super(); + if (options?.eval === true) { + specifier = `data:text/javascript,${specifier}`; + } else if (typeof specifier === "string") { + specifier = toFileUrl(resolve(specifier)); + } + const handle = this[kHandle] = new Worker( + specifier, + { + ...(options || {}), + type: "module", + // unstable + deno: { namespace: true }, + }, + ); + handle.addEventListener( + "error", + (event) => this.emit("error", event.error || event.message), + ); + handle.addEventListener( + "messageerror", + (event) => this.emit("messageerror", event.data), + ); + handle.addEventListener( + "message", + (event) => this.emit("message", event.data), + ); + handle.postMessage({ + environmentData, + threadId: (this.threadId = ++threads), + workerData: options?.workerData, + }, options?.transferList || []); + this.postMessage = handle.postMessage.bind(handle); + this.emit("online"); } -} -export class MessageChannel { - constructor() { - notImplemented(); + + terminate() { + this[kHandle].terminate(); + this.emit("exit", 0); } + + readonly getHeapSnapshot = notImplemented; + // fake performance + readonly performance = globalThis.performance; } -export class MessagePort { - constructor() { - notImplemented(); - } + +export const isMainThread = + // deno-lint-ignore no-explicit-any + typeof (globalThis as any).DedicatedWorkerGlobalScope === "undefined" || + // deno-lint-ignore no-explicit-any + self instanceof (globalThis as any).DedicatedWorkerGlobalScope === false; + +// fake resourceLimits +export const resourceLimits = isMainThread ? {} : { + maxYoungGenerationSizeMb: 48, + maxOldGenerationSizeMb: 2048, + codeRangeSizeMb: 0, + stackSizeMb: 4, +}; + +let threadId = 0; +let workerData: unknown = null; + +// Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611 +interface NodeEventTarget extends + Pick< + EventEmitter, + "eventNames" | "listenerCount" | "emit" | "removeAllListeners" + > { + setMaxListeners(n: number): void; + getMaxListeners(): number; + // deno-lint-ignore no-explicit-any + off(eventName: string, listener: (...args: any[]) => void): NodeEventTarget; + // deno-lint-ignore no-explicit-any + on(eventName: string, listener: (...args: any[]) => void): NodeEventTarget; + // deno-lint-ignore no-explicit-any + once(eventName: string, listener: (...args: any[]) => void): NodeEventTarget; + addListener: NodeEventTarget["on"]; + removeListener: NodeEventTarget["off"]; +} + +type ParentPort = typeof self & NodeEventTarget; + +// deno-lint-ignore no-explicit-any +let parentPort: ParentPort = null as any; + +if (!isMainThread) { + // deno-lint-ignore no-explicit-any + const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>(); + + parentPort = self as ParentPort; + parentPort.off = parentPort.removeListener = function ( + this: ParentPort, + name, + listener, + ) { + this.removeEventListener(name, listeners.get(listener)!); + listeners.delete(listener); + return this; + }; + parentPort.on = parentPort.addListener = function ( + this: ParentPort, + name, + listener, + ) { + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); + return this; + }; + parentPort.once = function (this: ParentPort, name, listener) { + // deno-lint-ignore no-explicit-any + const _listener = (ev: any) => listener(ev.data); + listeners.set(listener, _listener); + this.addEventListener(name, _listener); + return this; + }; + + // mocks + parentPort.setMaxListeners = () => {}; + parentPort.getMaxListeners = () => Infinity; + parentPort.eventNames = () => [""]; + parentPort.listenerCount = () => 0; + + parentPort.emit = () => notImplemented(); + parentPort.removeAllListeners = () => notImplemented(); + + // Receive startup message + [{ threadId, workerData, environmentData }] = await once( + parentPort, + "message", + ); + + // alias + parentPort.addEventListener("offline", () => { + parentPort.emit("close"); + }); +} + +export function getEnvironmentData(key: unknown) { + return environmentData.get(key); } -export class Worker { - constructor() { - notImplemented(); + +export function setEnvironmentData(key: unknown, value?: unknown) { + if (value === undefined) { + environmentData.delete(key); + } else { + environmentData.set(key, value); } } + +// deno-lint-ignore no-explicit-any +const _MessagePort: typeof MessagePort = (globalThis as any).MessagePort; +const _MessageChannel: typeof MessageChannel = + // deno-lint-ignore no-explicit-any + (globalThis as any).MessageChannel; +export const BroadcastChannel = globalThis.BroadcastChannel; +export const SHARE_ENV = Symbol.for("nodejs.worker_threads.SHARE_ENV"); +export { + _MessageChannel as MessageChannel, + _MessagePort as MessagePort, + _Worker as Worker, + notImplemented as markAsUntransferable, + notImplemented as moveMessagePortToContext, + notImplemented as receiveMessageOnPort, + parentPort, + threadId, + workerData, +}; + export default { + markAsUntransferable: notImplemented, + moveMessagePortToContext: notImplemented, + receiveMessageOnPort: notImplemented, + MessagePort: _MessagePort, + MessageChannel: _MessageChannel, + BroadcastChannel, + Worker: _Worker, getEnvironmentData, - isMainThread, - markAsUntransferable, - moveMessagePortToContext, - parentPort, - receiveMessageOnPort, - resourceLimits, - SHARE_ENV, setEnvironmentData, + SHARE_ENV, threadId, workerData, - BroadcastChannel, - MessageChannel, - MessagePort, - Worker, + resourceLimits, + parentPort, + isMainThread, }; diff --git a/node/worker_threads_test.ts b/node/worker_threads_test.ts new file mode 100644 index 000000000000..7d34d7355e72 --- /dev/null +++ b/node/worker_threads_test.ts @@ -0,0 +1,187 @@ +import { assert, assertEquals, assertObjectMatch } from "../testing/asserts.ts"; +import { fromFileUrl, relative } from "../path/mod.ts"; +import { EventEmitter, once } from "./events.ts"; +import * as workerThreads from "./worker_threads.ts"; + +Deno.test({ + name: "[worker_threads] isMainThread", + fn() { + assertEquals(workerThreads.isMainThread, true); + }, +}); + +Deno.test({ + name: "[worker_threads] threadId", + fn() { + assertEquals(workerThreads.threadId, 0); + }, +}); + +Deno.test({ + name: "[worker_threads] resourceLimits", + fn() { + assertObjectMatch(workerThreads.resourceLimits, {}); + }, +}); + +Deno.test({ + name: "[worker_threads] parentPort", + fn() { + assertEquals(workerThreads.parentPort, null); + }, +}); + +Deno.test({ + name: "[worker_threads] workerData", + fn() { + assertEquals(workerThreads.workerData, null); + }, +}); + +Deno.test({ + name: "[worker_threads] setEnvironmentData / getEnvironmentData", + fn() { + workerThreads.setEnvironmentData("test", "test"); + assertEquals(workerThreads.getEnvironmentData("test"), "test"); + // delete + workerThreads.setEnvironmentData("test"); + assertEquals(workerThreads.getEnvironmentData("test"), undefined); + }, +}); + +Deno.test({ + name: "[worker_threads] Worker threadId", + async fn() { + const worker = new workerThreads.Worker( + new URL("./testdata/worker_threads.ts", import.meta.url), + ); + worker.postMessage("Hello, how are you my thread?"); + await once(worker, "message"); + assertEquals((await once(worker, "message"))[0].threadId, 1); + worker.terminate(); + + const worker1 = new workerThreads.Worker( + new URL("./testdata/worker_threads.ts", import.meta.url), + ); + worker1.postMessage("Hello, how are you my thread?"); + await once(worker1, "message"); + assertEquals((await once(worker1, "message"))[0].threadId, 2); + worker1.terminate(); + }, +}); + +Deno.test({ + name: "[worker_threads] Worker basics", + async fn() { + workerThreads.setEnvironmentData("test", "test"); + workerThreads.setEnvironmentData(1, { + test: "random", + random: "test", + }); + const { port1 } = new MessageChannel(); + const worker = new workerThreads.Worker( + new URL("./testdata/worker_threads.ts", import.meta.url), + { + workerData: ["hey", true, false, 2, port1], + transferList: [port1], + }, + ); + worker.postMessage("Hello, how are you my thread?"); + assertEquals((await once(worker, "message"))[0], "I'm fine!"); + const data = (await once(worker, "message"))[0]; + // data.threadId can be 1 when this test is runned individually + if (data.threadId === 1) data.threadId = 3; + assertObjectMatch(data, { + isMainThread: false, + threadId: 3, + workerData: ["hey", true, false, 2], + envData: ["test", { test: "random", random: "test" }], + }); + worker.terminate(); + }, + sanitizeResources: false, +}); + +const workerThreadsURL = JSON.stringify( + new URL("./worker_threads.ts", import.meta.url).toString(), +); + +Deno.test({ + name: "[worker_threads] Worker eval", + async fn() { + const worker = new workerThreads.Worker( + ` + import { parentPort } from ${workerThreadsURL}; + parentPort.postMessage("It works!"); + `, + { + eval: true, + }, + ); + assertEquals((await once(worker, "message"))[0], "It works!"); + worker.terminate(); + }, +}); + +Deno.test({ + name: "[worker_threads] inheritences", + async fn() { + const eventsURL = JSON.stringify( + new URL("./events.ts", import.meta.url).toString(), + ); + + const worker = new workerThreads.Worker( + ` + import { EventEmitter } from ${eventsURL}; + import { parentPort } from ${workerThreadsURL}; + parentPort.postMessage(parentPort instanceof EventTarget); + parentPort.postMessage(parentPort instanceof EventEmitter); + `, + { + eval: true, + }, + ); + assertEquals((await once(worker, "message"))[0], true); + assertEquals((await once(worker, "message"))[0], false); + assert(worker instanceof EventEmitter); + assert(!(worker instanceof EventTarget)); + worker.terminate(); + }, +}); + +Deno.test({ + name: "[worker_threads] Worker workerData", + async fn() { + const worker = new workerThreads.Worker( + new URL("./testdata/worker_threads.ts", import.meta.url), + { + workerData: null, + }, + ); + worker.postMessage("Hello, how are you my thread?"); + await once(worker, "message"); + assertEquals((await once(worker, "message"))[0].workerData, null); + worker.terminate(); + + const worker1 = new workerThreads.Worker( + new URL("./testdata/worker_threads.ts", import.meta.url), + ); + worker1.postMessage("Hello, how are you my thread?"); + await once(worker1, "message"); + assertEquals((await once(worker1, "message"))[0].workerData, undefined); + worker1.terminate(); + }, +}); + +Deno.test({ + name: "[worker_threads] Worker with relative path", + async fn() { + const worker = new workerThreads.Worker(relative( + Deno.cwd(), + fromFileUrl(new URL("./testdata/worker_threads.ts", import.meta.url)), + )); + worker.postMessage("Hello, how are you my thread?"); + assertEquals((await once(worker, "message"))[0], "I'm fine!"); + worker.terminate(); + }, +});