Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Piscina.move #59

Merged
merged 1 commit into from
May 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,79 @@ Is `true` if this code runs inside a `Piscina` threadpool as a Worker.

Provides the current version of this library as a semver string.

### Static method: `move(value)`

By default, any value returned by a worker function will be cloned when
returned back to the Piscina pool, even if that object is capable of
being transfered. The `Piscina.move()` method can be used to wrap and
mark transferable values such that they will by transfered rather than
cloned.

The `value` may be any object supported by Node.js to be transferable
(e.g. `ArrayBuffer`, any `TypedArray`, or `MessagePort`), or any object
implementing the `Transferable` interface.

```js
const { move } = require('piscina');

module.exports = () => {
return move(new ArrayBuffer(10));
}
```

The `move()` method will throw if the `value` is not transferable.

The object returned by the `move()` method should not be set as a
nested value in an object. If it is used, the `move()` object itself
will be cloned as opposed to transfering the object it wraps.

#### Interface: `Transferable`

Objects may implement the `Transferable` interface to create their own
custom transferable objects. This is useful when an object being
passed into or from a worker contains a deeply nested transferable
object such as an `ArrayBuffer` or `MessagePort`.

`Transferable` objects expose two properties inspected by Piscina
to determine how to transfer the object. These properties are
named using the special static `Piscina.transferableSymbol` and
`Piscina.valueSymbol` properties:

* The `Piscina.transferableSymbol` property provides the object
(or objects) that are to be included in the `transferList`.

* The `Piscina.valueSymbol` property provides a surrogate value
to transmit in place of the `Transferable` itself.

Both properties are required.

For example,

```js
const {
move,
transferableSymbol,
valueSymbol
} = require('piscina');

module.exports = () => {
const obj = {
a: { b: new Uint8Array(5); },
c: { new Uint8Array(10); },

get [transferableSymbol]() {
// Transfer the two underlying ArrayBuffers
return [this.a.b.buffer, this.c.buffer];
}

get [valueSymbol]() {
return { a: { b: this.b }, c: this.c };
}
};
return move(obj);
};
```

## Current Limitations (Things we're working on / would love help with)

* Improved Documentation
Expand Down
15 changes: 15 additions & 0 deletions examples/move/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
const Piscina = require('../..');
const { resolve } = require('path');

const pool = new Piscina({
filename: resolve(__dirname, 'worker.js'),
idleTimeout: 1000
});

(async () => {
// The task will transfer an ArrayBuffer
// back to the main thread rather than
// cloning it.
const u8 = await pool.runTask();
console.log(u8.length);
})();
7 changes: 7 additions & 0 deletions examples/move/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
const { move } = require('../..');

module.exports = () => {
// Using move causes the Uint8Array to be
// transferred rather than cloned.
return move(new Uint8Array(10));
};
34 changes: 34 additions & 0 deletions src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,40 @@ export const commonState = {
workerData: undefined
};

// Internal symbol used to mark Transferable objects returned
// by the Piscina.move() function
const kMovable = Symbol('Piscina.kMovable');
export const kTransferable = Symbol.for('Piscina.transferable');
export const kValue = Symbol.for('Piscina.valueOf');

// True if the object implements the Transferable interface
export function isTransferable (value : any) : boolean {
return value != null &&
typeof value === 'object' &&
kTransferable in value &&
kValue in value;
}

// True if object implements Transferable and has been returned
// by the Piscina.move() function
export function isMovable (value : any) : boolean {
return isTransferable(value) && value[kMovable] === true;
}

export function markMovable (value : object) : void {
Object.defineProperty(value, kMovable, {
enumerable: false,
configurable: true,
writable: true,
value: true
});
jasnell marked this conversation as resolved.
Show resolved Hide resolved
}

export interface Transferable {
readonly [kTransferable] : object;
readonly [kValue] : object;
}

export const kRequestCountField = 0;
export const kResponseCountField = 1;
export const kFieldCount = 2;
74 changes: 72 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,27 @@ import { AsyncResource } from 'async_hooks';
import { cpus } from 'os';
import { fileURLToPath, URL } from 'url';
import { resolve } from 'path';
import { inspect } from 'util';
import { inspect, types } from 'util';
import assert from 'assert';
import { Histogram, build } from 'hdr-histogram-js';
import { performance } from 'perf_hooks';
import hdrobj from 'hdr-histogram-percentiles-obj';
import { ReadyMessage, RequestMessage, ResponseMessage, StartupMessage, commonState, kResponseCountField, kRequestCountField, kFieldCount } from './common';
import {
ReadyMessage,
RequestMessage,
ResponseMessage,
StartupMessage,
commonState,
kResponseCountField,
kRequestCountField,
kFieldCount,
Transferable,
isTransferable,
markMovable,
isMovable,
kTransferable,
kValue
} from './common';
import { version } from '../package.json';

const cpuCount : number = (() => {
Expand Down Expand Up @@ -49,6 +64,8 @@ type EnvSpecifier = typeof Worker extends {
new (filename : never, options?: { env: infer T }) : Worker;
} ? T : never;

type TransferListItem = TransferList extends (infer T)[] ? T : never;

interface Options {
filename? : string | null,
minThreads? : number,
Expand Down Expand Up @@ -84,6 +101,28 @@ const kDefaultOptions : FilledOptions = {
useAtomics: true
};

class DirectlyTransferable implements Transferable {
#value : object;
constructor (value : object) {
this.#value = value;
}

get [kTransferable] () : object { return this.#value; }

get [kValue] () : object { return this.#value; }
}

class ArrayBufferViewTransferable implements Transferable {
#view : ArrayBufferView;
constructor (view : ArrayBufferView) {
this.#view = view;
addaleax marked this conversation as resolved.
Show resolved Hide resolved
}

get [kTransferable] () : object { return this.#view.buffer; }

get [kValue] () : object { return this.#view; }
}

let taskIdCounter = 0;

type TaskCallback = (err : Error, result: any) => void;
Expand Down Expand Up @@ -121,6 +160,19 @@ class TaskInfo extends AsyncResource {
this.callback = callback;
this.task = task;
this.transferList = transferList;

// If the task is a Transferable returned by
// Piscina.move(), then add it to the transferList
// automatically
if (isMovable(task)) {
if (this.transferList === undefined) {
this.transferList = [];
}
this.transferList =
this.transferList.concat(task[kTransferable]);
this.task = task[kValue];
}

this.filename = filename;
this.taskId = taskIdCounter++;
this.abortSignal = abortSignal;
Expand Down Expand Up @@ -860,6 +912,24 @@ class Piscina extends EventEmitterAsyncResource {
static get Piscina () {
return Piscina;
}

static move (val : Transferable | TransferListItem | ArrayBufferView) {
if (val != null && typeof val === 'object' && typeof val !== 'function') {
if (!isTransferable(val)) {
if ((types as any).isArrayBufferView(val)) {
val = new ArrayBufferViewTransferable(val as ArrayBufferView);
} else {
val = new DirectlyTransferable(val);
}
}
markMovable(val);
}
jasnell marked this conversation as resolved.
Show resolved Hide resolved
return val;
}

static get transferableSymbol () { return kTransferable; }

static get valueSymbol () { return kValue; }
}

export = Piscina;
22 changes: 19 additions & 3 deletions src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
import { parentPort, MessagePort, receiveMessageOnPort, workerData } from 'worker_threads';
import { pathToFileURL } from 'url';
import { commonState, ReadyMessage, RequestMessage, ResponseMessage, StartupMessage, kResponseCountField, kRequestCountField } from './common';
import {
commonState,
ReadyMessage,
RequestMessage,
ResponseMessage,
StartupMessage,
kResponseCountField,
kRequestCountField,
isMovable,
kTransferable,
kValue
} from './common';

commonState.isWorkerThread = true;
commonState.workerData = workerData;
Expand Down Expand Up @@ -112,12 +123,17 @@ function onMessage (

(async function () {
let response : ResponseMessage;
const transferList : any[] = [];
addaleax marked this conversation as resolved.
Show resolved Hide resolved
try {
const handler = await getHandler(filename);
if (handler === null) {
throw new Error(`No handler function exported from ${filename}`);
}
const result = await handler(task);
let result = await handler(task);
if (isMovable(result)) {
transferList.concat(result[kTransferable]);
result = result[kValue];
}
response = {
taskId,
result: result,
Expand All @@ -137,7 +153,7 @@ function onMessage (
// Post the response to the parent thread, and let it know that we have
// an additional message available. If possible, use Atomics.wait()
// to wait for the next message.
port.postMessage(response);
port.postMessage(response, transferList);
Atomics.add(sharedBuffer, kResponseCountField, 1);
atomicsWaitLoop(port, sharedBuffer);
})().catch(throwInNextTick);
Expand Down
10 changes: 10 additions & 0 deletions test/fixtures/move.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import Piscina from '../..';
import assert from 'assert';
import { types } from 'util';

export default function (moved) {
if (moved !== undefined) {
assert(types.isAnyArrayBuffer(moved));
}
return Piscina.move(new ArrayBuffer(10));
}
Loading