diff --git a/README.md b/README.md index c22a6749..b653ccde 100644 --- a/README.md +++ b/README.md @@ -424,6 +424,79 @@ Is `true` if this code runs inside a `Piscina` threadpool as a Worker. Provides the current version of this library as a semver string. +### Static method: `move(value)` + +By default, any value returned by a worker function will be cloned when +returned back to the Piscina pool, even if that object is capable of +being transfered. The `Piscina.move()` method can be used to wrap and +mark transferable values such that they will by transfered rather than +cloned. + +The `value` may be any object supported by Node.js to be transferable +(e.g. `ArrayBuffer`, any `TypedArray`, or `MessagePort`), or any object +implementing the `Transferable` interface. + +```js +const { move } = require('piscina'); + +module.exports = () => { + return move(new ArrayBuffer(10)); +} +``` + +The `move()` method will throw if the `value` is not transferable. + +The object returned by the `move()` method should not be set as a +nested value in an object. If it is used, the `move()` object itself +will be cloned as opposed to transfering the object it wraps. + +#### Interface: `Transferable` + +Objects may implement the `Transferable` interface to create their own +custom transferable objects. This is useful when an object being +passed into or from a worker contains a deeply nested transferable +object such as an `ArrayBuffer` or `MessagePort`. + +`Transferable` objects expose two properties inspected by Piscina +to determine how to transfer the object. These properties are +named using the special static `Piscina.transferableSymbol` and +`Piscina.valueSymbol` properties: + +* The `Piscina.transferableSymbol` property provides the object + (or objects) that are to be included in the `transferList`. + +* The `Piscina.valueSymbol` property provides a surrogate value + to transmit in place of the `Transferable` itself. + +Both properties are required. + +For example, + +```js +const { + move, + transferableSymbol, + valueSymbol +} = require('piscina'); + +module.exports = () => { + const obj = { + a: { b: new Uint8Array(5); }, + c: { new Uint8Array(10); }, + + get [transferableSymbol]() { + // Transfer the two underlying ArrayBuffers + return [this.a.b.buffer, this.c.buffer]; + } + + get [valueSymbol]() { + return { a: { b: this.b }, c: this.c }; + } + }; + return move(obj); +}; +``` + ## Current Limitations (Things we're working on / would love help with) * Improved Documentation diff --git a/examples/move/index.js b/examples/move/index.js new file mode 100644 index 00000000..f6f589e6 --- /dev/null +++ b/examples/move/index.js @@ -0,0 +1,15 @@ +const Piscina = require('../..'); +const { resolve } = require('path'); + +const pool = new Piscina({ + filename: resolve(__dirname, 'worker.js'), + idleTimeout: 1000 +}); + +(async () => { + // The task will transfer an ArrayBuffer + // back to the main thread rather than + // cloning it. + const u8 = await pool.runTask(); + console.log(u8.length); +})(); diff --git a/examples/move/worker.js b/examples/move/worker.js new file mode 100644 index 00000000..303d2109 --- /dev/null +++ b/examples/move/worker.js @@ -0,0 +1,7 @@ +const { move } = require('../..'); + +module.exports = () => { + // Using move causes the Uint8Array to be + // transferred rather than cloned. + return move(new Uint8Array(10)); +}; diff --git a/src/common.ts b/src/common.ts index 15c216dd..12827369 100644 --- a/src/common.ts +++ b/src/common.ts @@ -28,6 +28,40 @@ export const commonState = { workerData: undefined }; +// Internal symbol used to mark Transferable objects returned +// by the Piscina.move() function +const kMovable = Symbol('Piscina.kMovable'); +export const kTransferable = Symbol.for('Piscina.transferable'); +export const kValue = Symbol.for('Piscina.valueOf'); + +// True if the object implements the Transferable interface +export function isTransferable (value : any) : boolean { + return value != null && + typeof value === 'object' && + kTransferable in value && + kValue in value; +} + +// True if object implements Transferable and has been returned +// by the Piscina.move() function +export function isMovable (value : any) : boolean { + return isTransferable(value) && value[kMovable] === true; +} + +export function markMovable (value : object) : void { + Object.defineProperty(value, kMovable, { + enumerable: false, + configurable: true, + writable: true, + value: true + }); +} + +export interface Transferable { + readonly [kTransferable] : object; + readonly [kValue] : object; +} + export const kRequestCountField = 0; export const kResponseCountField = 1; export const kFieldCount = 2; diff --git a/src/index.ts b/src/index.ts index 8c5e0158..358ff4bc 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,12 +5,27 @@ import { AsyncResource } from 'async_hooks'; import { cpus } from 'os'; import { fileURLToPath, URL } from 'url'; import { resolve } from 'path'; -import { inspect } from 'util'; +import { inspect, types } from 'util'; import assert from 'assert'; import { Histogram, build } from 'hdr-histogram-js'; import { performance } from 'perf_hooks'; import hdrobj from 'hdr-histogram-percentiles-obj'; -import { ReadyMessage, RequestMessage, ResponseMessage, StartupMessage, commonState, kResponseCountField, kRequestCountField, kFieldCount } from './common'; +import { + ReadyMessage, + RequestMessage, + ResponseMessage, + StartupMessage, + commonState, + kResponseCountField, + kRequestCountField, + kFieldCount, + Transferable, + isTransferable, + markMovable, + isMovable, + kTransferable, + kValue +} from './common'; import { version } from '../package.json'; const cpuCount : number = (() => { @@ -49,6 +64,8 @@ type EnvSpecifier = typeof Worker extends { new (filename : never, options?: { env: infer T }) : Worker; } ? T : never; +type TransferListItem = TransferList extends (infer T)[] ? T : never; + interface Options { filename? : string | null, minThreads? : number, @@ -84,6 +101,28 @@ const kDefaultOptions : FilledOptions = { useAtomics: true }; +class DirectlyTransferable implements Transferable { + #value : object; + constructor (value : object) { + this.#value = value; + } + + get [kTransferable] () : object { return this.#value; } + + get [kValue] () : object { return this.#value; } +} + +class ArrayBufferViewTransferable implements Transferable { + #view : ArrayBufferView; + constructor (view : ArrayBufferView) { + this.#view = view; + } + + get [kTransferable] () : object { return this.#view.buffer; } + + get [kValue] () : object { return this.#view; } +} + let taskIdCounter = 0; type TaskCallback = (err : Error, result: any) => void; @@ -121,6 +160,19 @@ class TaskInfo extends AsyncResource { this.callback = callback; this.task = task; this.transferList = transferList; + + // If the task is a Transferable returned by + // Piscina.move(), then add it to the transferList + // automatically + if (isMovable(task)) { + if (this.transferList === undefined) { + this.transferList = []; + } + this.transferList = + this.transferList.concat(task[kTransferable]); + this.task = task[kValue]; + } + this.filename = filename; this.taskId = taskIdCounter++; this.abortSignal = abortSignal; @@ -860,6 +912,24 @@ class Piscina extends EventEmitterAsyncResource { static get Piscina () { return Piscina; } + + static move (val : Transferable | TransferListItem | ArrayBufferView) { + if (val != null && typeof val === 'object' && typeof val !== 'function') { + if (!isTransferable(val)) { + if ((types as any).isArrayBufferView(val)) { + val = new ArrayBufferViewTransferable(val as ArrayBufferView); + } else { + val = new DirectlyTransferable(val); + } + } + markMovable(val); + } + return val; + } + + static get transferableSymbol () { return kTransferable; } + + static get valueSymbol () { return kValue; } } export = Piscina; diff --git a/src/worker.ts b/src/worker.ts index 1d79dcb1..b88f0bf4 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -1,6 +1,17 @@ import { parentPort, MessagePort, receiveMessageOnPort, workerData } from 'worker_threads'; import { pathToFileURL } from 'url'; -import { commonState, ReadyMessage, RequestMessage, ResponseMessage, StartupMessage, kResponseCountField, kRequestCountField } from './common'; +import { + commonState, + ReadyMessage, + RequestMessage, + ResponseMessage, + StartupMessage, + kResponseCountField, + kRequestCountField, + isMovable, + kTransferable, + kValue +} from './common'; commonState.isWorkerThread = true; commonState.workerData = workerData; @@ -112,12 +123,17 @@ function onMessage ( (async function () { let response : ResponseMessage; + const transferList : any[] = []; try { const handler = await getHandler(filename); if (handler === null) { throw new Error(`No handler function exported from ${filename}`); } - const result = await handler(task); + let result = await handler(task); + if (isMovable(result)) { + transferList.concat(result[kTransferable]); + result = result[kValue]; + } response = { taskId, result: result, @@ -137,7 +153,7 @@ function onMessage ( // Post the response to the parent thread, and let it know that we have // an additional message available. If possible, use Atomics.wait() // to wait for the next message. - port.postMessage(response); + port.postMessage(response, transferList); Atomics.add(sharedBuffer, kResponseCountField, 1); atomicsWaitLoop(port, sharedBuffer); })().catch(throwInNextTick); diff --git a/test/fixtures/move.ts b/test/fixtures/move.ts new file mode 100644 index 00000000..86ac095e --- /dev/null +++ b/test/fixtures/move.ts @@ -0,0 +1,10 @@ +import Piscina from '../..'; +import assert from 'assert'; +import { types } from 'util'; + +export default function (moved) { + if (moved !== undefined) { + assert(types.isAnyArrayBuffer(moved)); + } + return Piscina.move(new ArrayBuffer(10)); +} diff --git a/test/move-test.ts b/test/move-test.ts new file mode 100644 index 00000000..0bbd02bb --- /dev/null +++ b/test/move-test.ts @@ -0,0 +1,91 @@ +import Piscina from '..'; +import { + isMovable, + markMovable, + isTransferable +} from '../dist/src/common'; +import { test } from 'tap'; +import { types } from 'util'; +import { MessageChannel, MessagePort } from 'worker_threads'; +import { resolve } from 'path'; + +const { + transferableSymbol, + valueSymbol +} = Piscina; + +test('Marking an object as movable works as expected', async ({ ok }) => { + const obj : any = { + get [transferableSymbol] () : object { return {}; }, + get [valueSymbol] () : object { return {}; } + }; + ok(isTransferable(obj)); + ok(!isMovable(obj)); // It's not movable initially + markMovable(obj); + ok(isMovable(obj)); // It is movable now +}); + +test('Marking primitives and null works as expected', async ({ is }) => { + is(Piscina.move(null), null); + is(Piscina.move(1 as any), 1); + is(Piscina.move(false as any), false); + is(Piscina.move('test' as any), 'test'); +}); + +test('Using Piscina.move() returns a movable object', async ({ ok }) => { + const obj : any = { + get [transferableSymbol] () : object { return {}; }, + get [valueSymbol] () : object { return {}; } + }; + ok(!isMovable(obj)); // It's not movable initially + const movable = Piscina.move(obj); + ok(isMovable(movable)); // It is movable now +}); + +test('Using ArrayBuffer works as expected', async ({ ok, is }) => { + const ab = new ArrayBuffer(5); + const movable = Piscina.move(ab); + ok(isMovable(movable)); + ok(types.isAnyArrayBuffer(movable[valueSymbol])); + ok(types.isAnyArrayBuffer(movable[transferableSymbol])); + is(movable[transferableSymbol], ab); +}); + +test('Using TypedArray works as expected', async ({ ok, is }) => { + const ab = new Uint8Array(5); + const movable = Piscina.move(ab); + ok(isMovable(movable)); + ok((types as any).isArrayBufferView(movable[valueSymbol])); + ok(types.isAnyArrayBuffer(movable[transferableSymbol])); + is(movable[transferableSymbol], ab.buffer); +}); + +test('Using MessagePort works as expected', async ({ ok, is }) => { + const mc = new MessageChannel(); + const movable = Piscina.move(mc.port1); + ok(isMovable(movable)); + ok(movable[valueSymbol] instanceof MessagePort); + ok(movable[transferableSymbol] instanceof MessagePort); + is(movable[transferableSymbol], mc.port1); +}); + +test('Moving works', async ({ is, ok }) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures/move.ts') + }); + + { + const ab = new ArrayBuffer(10); + const ret = await pool.runTask(Piscina.move(ab)); + is(ab.byteLength, 0); // It was moved + ok(types.isAnyArrayBuffer(ret)); + } + + { + // Test with empty transferList + const ab = new ArrayBuffer(10); + const ret = await pool.runTask(Piscina.move(ab), []); + is(ab.byteLength, 0); // It was moved + ok(types.isAnyArrayBuffer(ret)); + } +});