Skip to content

Commit

Permalink
feat(node): worker_threads (#1151)
Browse files Browse the repository at this point in the history
Co-authored-by: Yoshiya Hinosawa <stibium121@gmail.com>
Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
  • Loading branch information
3 people authored Mar 22, 2022
1 parent 9873958 commit 13d9e91
Show file tree
Hide file tree
Showing 5 changed files with 436 additions and 47 deletions.
2 changes: 1 addition & 1 deletion node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ Deno standard library as it's a compatibility module.
- [x] vm _partly_
- [x] wasi
- [ ] webcrypto
- [ ] worker_threads
- [x] worker_threads
- [ ] zlib

* [x] node globals _partly_
Expand Down
2 changes: 1 addition & 1 deletion node/module_all.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,6 @@ export default {
v8,
vm,
wasi,
workerThreads,
"worker_threads": workerThreads,
zlib,
} as Record<string, unknown>;
30 changes: 30 additions & 0 deletions node/testdata/worker_threads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import {
getEnvironmentData,
isMainThread,
parentPort,
threadId,
workerData,
} from "../worker_threads.ts";
import { once } from "../events.ts";

async function message(expectedMessage: string) {
const [message] = await once(parentPort, "message");
if (message !== expectedMessage) {
console.log(`Expected the message "${expectedMessage}", but got`, message);
// fail test
parentPort.close();
}
}

await message("Hello, how are you my thread?");
parentPort.postMessage("I'm fine!");

parentPort.postMessage({
isMainThread,
threadId,
workerData: Array.isArray(workerData) &&
workerData[workerData.length - 1] instanceof MessagePort
? workerData.slice(0, -1)
: workerData,
envData: [getEnvironmentData("test"), getEnvironmentData(1)],
});
262 changes: 217 additions & 45 deletions node/worker_threads.ts
Original file line number Diff line number Diff line change
@@ -1,63 +1,235 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.

import { resolve, toFileUrl } from "../path/mod.ts";
import { notImplemented } from "./_utils.ts";
import { EventEmitter, once } from "./events.ts";

export function getEnvironmentData() {
notImplemented();
}
export const isMainThread = undefined;
export function markAsUntransferable() {
notImplemented();
}
export function moveMessagePortToContext() {
notImplemented();
}
export const parentPort = undefined;
export function receiveMessageOnPort() {
notImplemented();
}
export const resourceLimits = undefined;
export const SHARE_ENV = undefined;
export function setEnvironmentData() {
notImplemented();
let environmentData = new Map();
let threads = 0;

export interface WorkerOptions {
// only for typings
argv?: unknown[];
env?: Record<string, unknown>;
execArgv?: string[];
stdin?: boolean;
stdout?: boolean;
stderr?: boolean;
trackUnmanagedFds?: boolean;
resourceLimits?: {
maxYoungGenerationSizeMb?: number;
maxOldGenerationSizeMb?: number;
codeRangeSizeMb?: number;
stackSizeMb?: number;
};

eval?: boolean;
transferList?: Transferable[];
workerData?: unknown;
}
export const threadId = undefined;
export const workerData = undefined;
export class BroadcastChannel {
constructor() {
notImplemented();

const kHandle = Symbol("kHandle");
class _Worker extends EventEmitter {
readonly threadId: number;
readonly resourceLimits: Required<
NonNullable<WorkerOptions["resourceLimits"]>
> = {
maxYoungGenerationSizeMb: -1,
maxOldGenerationSizeMb: -1,
codeRangeSizeMb: -1,
stackSizeMb: 4,
};
private readonly [kHandle]: Worker;

postMessage: Worker["postMessage"];

constructor(specifier: URL | string, options?: WorkerOptions) {
super();
if (options?.eval === true) {
specifier = `data:text/javascript,${specifier}`;
} else if (typeof specifier === "string") {
specifier = toFileUrl(resolve(specifier));
}
const handle = this[kHandle] = new Worker(
specifier,
{
...(options || {}),
type: "module",
// unstable
deno: { namespace: true },
},
);
handle.addEventListener(
"error",
(event) => this.emit("error", event.error || event.message),
);
handle.addEventListener(
"messageerror",
(event) => this.emit("messageerror", event.data),
);
handle.addEventListener(
"message",
(event) => this.emit("message", event.data),
);
handle.postMessage({
environmentData,
threadId: (this.threadId = ++threads),
workerData: options?.workerData,
}, options?.transferList || []);
this.postMessage = handle.postMessage.bind(handle);
this.emit("online");
}
}
export class MessageChannel {
constructor() {
notImplemented();

terminate() {
this[kHandle].terminate();
this.emit("exit", 0);
}

readonly getHeapSnapshot = notImplemented;
// fake performance
readonly performance = globalThis.performance;
}
export class MessagePort {
constructor() {
notImplemented();
}

export const isMainThread =
// deno-lint-ignore no-explicit-any
typeof (globalThis as any).DedicatedWorkerGlobalScope === "undefined" ||
// deno-lint-ignore no-explicit-any
self instanceof (globalThis as any).DedicatedWorkerGlobalScope === false;

// fake resourceLimits
export const resourceLimits = isMainThread ? {} : {
maxYoungGenerationSizeMb: 48,
maxOldGenerationSizeMb: 2048,
codeRangeSizeMb: 0,
stackSizeMb: 4,
};

let threadId = 0;
let workerData: unknown = null;

// Like https://github.com/nodejs/node/blob/48655e17e1d84ba5021d7a94b4b88823f7c9c6cf/lib/internal/event_target.js#L611
interface NodeEventTarget extends
Pick<
EventEmitter,
"eventNames" | "listenerCount" | "emit" | "removeAllListeners"
> {
setMaxListeners(n: number): void;
getMaxListeners(): number;
// deno-lint-ignore no-explicit-any
off(eventName: string, listener: (...args: any[]) => void): NodeEventTarget;
// deno-lint-ignore no-explicit-any
on(eventName: string, listener: (...args: any[]) => void): NodeEventTarget;
// deno-lint-ignore no-explicit-any
once(eventName: string, listener: (...args: any[]) => void): NodeEventTarget;
addListener: NodeEventTarget["on"];
removeListener: NodeEventTarget["off"];
}

type ParentPort = typeof self & NodeEventTarget;

// deno-lint-ignore no-explicit-any
let parentPort: ParentPort = null as any;

if (!isMainThread) {
// deno-lint-ignore no-explicit-any
const listeners = new WeakMap<(...args: any[]) => void, (ev: any) => any>();

parentPort = self as ParentPort;
parentPort.off = parentPort.removeListener = function (
this: ParentPort,
name,
listener,
) {
this.removeEventListener(name, listeners.get(listener)!);
listeners.delete(listener);
return this;
};
parentPort.on = parentPort.addListener = function (
this: ParentPort,
name,
listener,
) {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
};
parentPort.once = function (this: ParentPort, name, listener) {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
};

// mocks
parentPort.setMaxListeners = () => {};
parentPort.getMaxListeners = () => Infinity;
parentPort.eventNames = () => [""];
parentPort.listenerCount = () => 0;

parentPort.emit = () => notImplemented();
parentPort.removeAllListeners = () => notImplemented();

// Receive startup message
[{ threadId, workerData, environmentData }] = await once(
parentPort,
"message",
);

// alias
parentPort.addEventListener("offline", () => {
parentPort.emit("close");
});
}

export function getEnvironmentData(key: unknown) {
return environmentData.get(key);
}
export class Worker {
constructor() {
notImplemented();

export function setEnvironmentData(key: unknown, value?: unknown) {
if (value === undefined) {
environmentData.delete(key);
} else {
environmentData.set(key, value);
}
}

// deno-lint-ignore no-explicit-any
const _MessagePort: typeof MessagePort = (globalThis as any).MessagePort;
const _MessageChannel: typeof MessageChannel =
// deno-lint-ignore no-explicit-any
(globalThis as any).MessageChannel;
export const BroadcastChannel = globalThis.BroadcastChannel;
export const SHARE_ENV = Symbol.for("nodejs.worker_threads.SHARE_ENV");
export {
_MessageChannel as MessageChannel,
_MessagePort as MessagePort,
_Worker as Worker,
notImplemented as markAsUntransferable,
notImplemented as moveMessagePortToContext,
notImplemented as receiveMessageOnPort,
parentPort,
threadId,
workerData,
};

export default {
markAsUntransferable: notImplemented,
moveMessagePortToContext: notImplemented,
receiveMessageOnPort: notImplemented,
MessagePort: _MessagePort,
MessageChannel: _MessageChannel,
BroadcastChannel,
Worker: _Worker,
getEnvironmentData,
isMainThread,
markAsUntransferable,
moveMessagePortToContext,
parentPort,
receiveMessageOnPort,
resourceLimits,
SHARE_ENV,
setEnvironmentData,
SHARE_ENV,
threadId,
workerData,
BroadcastChannel,
MessageChannel,
MessagePort,
Worker,
resourceLimits,
parentPort,
isMainThread,
};
Loading

0 comments on commit 13d9e91

Please sign in to comment.