Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resourceLimits support #21

Merged
merged 2 commits into from
May 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* ✔ Proper async tracking integration
* ✔ Tracking statistics for run and wait times
* ✔ Cancelation Support
* ✔ Supports enforcing memory resource limits

For Node.js 12.x and higher.

Expand Down Expand Up @@ -147,6 +148,16 @@ This class extends [`EventEmitter`][] from Node.js.
handling I/O in parallel.
* `useAtomics`: (`boolean`) Use the [`Atomics`][] API for faster communication
between threads. This is on by default.
* `resourceLimits`: (`object`) See [Node.js new Worker options][]
* `maxOldGenerationSizeMb`: (`number`) The maximum size of each worker threads
main heap in MB.
* `maxYoungGenerationSizeMb`: (`number`) The maximum size of a heap space for
recently created objects.
* `codeRangeSizeMb`: (`number`) The size of a pre-allocated memory range used
for generated code.

Use caution when setting resource limits. Setting limits that are too low may
result in the `Piscina` worker threads being unusable.

### Method: `runTask(task[, transferList][, filename][, abortSignal])`

Expand Down Expand Up @@ -335,5 +346,6 @@ Piscina development is sponsored by [NearForm Research][].
[`Atomics`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Atomics
[`EventEmitter`]: https://nodejs.org/api/events.html
[`postMessage`]: https://nodejs.org/api/worker_threads.html#worker_threads_port_postmessage_value_transferlist
[Node.js new Worker options]: https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options
[MIT Licensed]: LICENSE.md
[NearForm Research]: https://www.nearform.com/research/
23 changes: 23 additions & 0 deletions examples/resourceLimits/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
'use strict';

const Piscina = require('../..');
const { resolve } = require('path');
const { strictEqual } = require('assert');

const piscina = new Piscina({
filename: resolve(__dirname, 'worker.js'),
resourceLimits: {
maxOldGenerationSizeMb: 16,
maxYoungGenerationSizeMb: 4,
codeRangeSizeMb: 16
}
});

(async function() {
try {
await piscina.runTask();
} catch (err) {
console.log('Worker terminated due to resource limits');
strictEqual(err.code, 'ERR_WORKER_OUT_OF_MEMORY');
}
})();
8 changes: 8 additions & 0 deletions examples/resourceLimits/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict';

module.exports = () => {
const array = [];
while (true) {
array.push([array]);
}
};
18 changes: 15 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,20 @@ class AbortError extends Error {
}
}

type ResourceLimits = Worker extends {
resourceLimits? : infer T;
} ? T : {};

interface Options {
// Probably also support URL here
filename? : string | null,
minThreads? : number,
maxThreads? : number,
idleTimeout? : number,
maxQueue? : number,
concurrentTasksPerWorker? : number
useAtomics? : boolean
concurrentTasksPerWorker? : number,
useAtomics? : boolean,
resourceLimits? : ResourceLimits
}
jasnell marked this conversation as resolved.
Show resolved Hide resolved

interface FilledOptions extends Options {
Expand Down Expand Up @@ -279,7 +284,9 @@ class ThreadPool {

_addNewWorker () : WorkerInfo {
const pool = this;
const worker = new Worker(resolve(__dirname, 'worker.js'));
const worker = new Worker(resolve(__dirname, 'worker.js'), {
resourceLimits: this.options.resourceLimits
});

const { port1, port2 } = new MessageChannel();
const workerInfo = new WorkerInfo(worker, port1, onMessage);
Expand Down Expand Up @@ -542,6 +549,11 @@ class Piscina extends EventEmitter {
typeof options.useAtomics !== 'boolean') {
throw new TypeError('options.useAtomics must be a boolean value');
}
if (options.resourceLimits !== undefined &&
(typeof options.resourceLimits !== 'object' ||
options.resourceLimits === null)) {
throw new TypeError('options.resourceLimits must be an object');
}

this.#pool = new ThreadPool(this, options);
}
Expand Down
8 changes: 8 additions & 0 deletions test/fixtures/resource-limits.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
'use strict';

module.exports = () => {
const array = [];
while (true) {
array.push([array]);
}
};
6 changes: 6 additions & 0 deletions test/option-validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,9 @@ test('useAtomics must be a boolean', async ({ throws }) => {
useAtomics: 'string'
}) as any), /options.useAtomics must be a boolean/);
});

test('resourceLimits must be an object', async ({ throws }) => {
throws(() => new Piscina(({
resourceLimits: 0
}) as any), /options.resourceLimits must be an object/);
});
34 changes: 34 additions & 0 deletions test/test-resourcelimits.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import Piscina from '..';
import { test } from 'tap';
import { resolve } from 'path';

test('resourceLimits causes task to reject', async ({ is, rejects }) => {
const worker = new Piscina({
filename: resolve(__dirname, 'fixtures/resource-limits.js'),
resourceLimits: {
maxOldGenerationSizeMb: 16,
maxYoungGenerationSizeMb: 4,
codeRangeSizeMb: 16
}
});
worker.on('error', () => {
// Ignore any additional errors that may occur.
// This may happen because when the Worker is
// killed a new worker is created that may hit
// the memory limits immediately. When that
// happens, there is no associated Promise to
// reject so we emit an error event instead.
// We don't care so much about that here. We
// could potentially avoid the issue by setting
// higher limits above but rather than try to
// guess at limits that may work consistently,
// let's just ignore the additional error for
// now.
});
const limits : any = worker.options.resourceLimits;
is(limits.maxOldGenerationSizeMb, 16);
is(limits.maxYoungGenerationSizeMb, 4);
is(limits.codeRangeSizeMb, 16);
rejects(worker.runTask(null),
/Worker terminated due to reaching memory limit: JS heap out of memory/);
});