From b274b8ef5ef3403aca17130a39986b89995ae5f7 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Sat, 30 Nov 2024 13:33:12 -0500 Subject: [PATCH 01/30] feat: add std stream event emit for abort handlers --- src/WorkerHandler.js | 13 ++++++++----- test/Pool.test.js | 21 ++++++++++++++++++++- test/workers/cleanup-abort.js | 18 ++++++++++++++++-- 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 1d855e41..46878ab6 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -220,13 +220,15 @@ function objectToError (obj) { function handleEmittedStdPayload(handler, payload) { // TODO: refactor if parallel task execution gets added - if (Object.keys(handler.processing).length !== 1) { - return; - } var task = Object.values(handler.processing)[0] - if (task.options && typeof task.options.on === 'function') { + if (task && task.options && typeof task.options.on === 'function') { task.options.on(payload); } + + task = Object.values(handler.tracking)[0]; + if (task && task.options && typeof task.options.on === "function") { + task.options.on(payload); + } } /** @@ -422,7 +424,8 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) { if (error instanceof Promise.CancellationError || error instanceof Promise.TimeoutError) { me.tracking[id] = { id, - resolver: Promise.defer() + resolver: Promise.defer(), + options: me.processing[id].options, }; // remove this task from the queue. It is already rejected (hence this diff --git a/test/Pool.test.js b/test/Pool.test.js index 8775c074..0b6a202f 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -1354,7 +1354,7 @@ describe('Pool', function () { maxWorkers: 1, onCreateWorker: () => { workerCount += 1; - } + }, }); let task = pool.exec('asyncTimeout', [], {}); @@ -1559,6 +1559,25 @@ describe('Pool', function () { }); }); }); + + it('should trigger event stdout in abort handler', function (done) { + var pool = createPool(__dirname + '/workers/cleanup-abort.js', { + maxWorkers: 1, + workerType: 'process', + emitStdStreams: true, + workerTerminateTimeout: 1000, + }); + + pool.exec('stdoutStreamOnAbort', [], { + on: function (payload) { + assert.strictEqual(payload.stdout.trim(), "Hello, world!"); + console.log(payload); + pool.terminate(); + done(); + } + }).timeout(100); + + }); }); describe('validate', () => { diff --git a/test/workers/cleanup-abort.js b/test/workers/cleanup-abort.js index 29709ed6..4c8c0b9b 100644 --- a/test/workers/cleanup-abort.js +++ b/test/workers/cleanup-abort.js @@ -5,8 +5,7 @@ function asyncTimeout() { return new Promise(function (resolve) { let timeout = setTimeout(() => { resolve(); - }, 5000); - + }, 5000); me.worker.addAbortListener(async function () { clearTimeout(timeout); resolve(); @@ -34,11 +33,26 @@ function asyncAbortHandlerNeverResolves() { }); } +function stdoutStreamOnAbort() { + var me = this; + return new Promise(function (resolve) { + let timeout = setTimeout(() => { + resolve(); + }, 5000); + + me.worker.addAbortListener(async function () { + console.log("Hello, world!"); + resolve(); + }); + }); +} + // create a worker and register public functions workerpool.worker( { asyncTimeout: asyncTimeout, asyncAbortHandlerNeverResolves: asyncAbortHandlerNeverResolves, + stdoutStreamOnAbort: stdoutStreamOnAbort, }, { abortListenerTimeout: 1000 From ba382280f5c728626c68ba3f828bd67b5f23a06d Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Sat, 30 Nov 2024 13:34:46 -0500 Subject: [PATCH 02/30] ref: remove log --- test/Pool.test.js | 1 - 1 file changed, 1 deletion(-) diff --git a/test/Pool.test.js b/test/Pool.test.js index 0b6a202f..3436fc66 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -1571,7 +1571,6 @@ describe('Pool', function () { pool.exec('stdoutStreamOnAbort', [], { on: function (payload) { assert.strictEqual(payload.stdout.trim(), "Hello, world!"); - console.log(payload); pool.terminate(); done(); } From f9b4919ecdd96c932addab1bfdb40b9f99897f60 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Sun, 1 Dec 2024 20:24:38 -0500 Subject: [PATCH 03/30] feat: add event emit from abort handler --- src/WorkerHandler.js | 10 ++++++++++ test/Pool.test.js | 16 ++++++++++++++++ test/workers/cleanup-abort.js | 18 ++++++++++++++---- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 46878ab6..d54407c1 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -301,6 +301,16 @@ function WorkerHandler(script, _options) { task.resolver.resolve(response.result); } } + }else { + // if the task is not the current, it might be tracked for cleanup + var task = me.tracking[id]; + if (task !== undefined) { + if (response.isEvent) { + if (task.options && typeof task.options.on === 'function') { + task.options.on(response.payload); + } + } + } } if (response.method === CLEANUP_METHOD_ID) { diff --git a/test/Pool.test.js b/test/Pool.test.js index 3436fc66..5eafd945 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -1575,7 +1575,23 @@ describe('Pool', function () { done(); } }).timeout(100); + }); + it('should trigger event in abort handler', function (done) { + var pool = createPool(__dirname + '/workers/cleanup-abort.js', { + maxWorkers: 1, + workerType: 'process', + emitStdStreams: true, + workerTerminateTimeout: 1000, + }); + + pool.exec('eventEmitOnAbort', [], { + on: function (payload) { + assert.strictEqual(payload.status, 'cleanup_success'); + pool.terminate(); + done(); + } + }).timeout(100); }); }); diff --git a/test/workers/cleanup-abort.js b/test/workers/cleanup-abort.js index 4c8c0b9b..4e8d6a7a 100644 --- a/test/workers/cleanup-abort.js +++ b/test/workers/cleanup-abort.js @@ -36,10 +36,6 @@ function asyncAbortHandlerNeverResolves() { function stdoutStreamOnAbort() { var me = this; return new Promise(function (resolve) { - let timeout = setTimeout(() => { - resolve(); - }, 5000); - me.worker.addAbortListener(async function () { console.log("Hello, world!"); resolve(); @@ -47,12 +43,26 @@ function stdoutStreamOnAbort() { }); } +function eventEmitOnAbort() { + var me = this; + return new Promise(function (resolve) { + me.worker.addAbortListener(async function () { + workerpool.workerEmit({ + status: 'cleanup_success', + }); + resolve(); + }); + }); +} + + // create a worker and register public functions workerpool.worker( { asyncTimeout: asyncTimeout, asyncAbortHandlerNeverResolves: asyncAbortHandlerNeverResolves, stdoutStreamOnAbort: stdoutStreamOnAbort, + eventEmitOnAbort: eventEmitOnAbort, }, { abortListenerTimeout: 1000 From 1f235f38226ff3f5d95fbc5eb84dfa8628caae91 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Sun, 1 Dec 2024 20:35:34 -0500 Subject: [PATCH 04/30] ref: lower test timeouts --- test/Pool.test.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/Pool.test.js b/test/Pool.test.js index 5eafd945..4b118aec 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -1574,7 +1574,7 @@ describe('Pool', function () { pool.terminate(); done(); } - }).timeout(100); + }).timeout(50); }); it('should trigger event in abort handler', function (done) { @@ -1591,7 +1591,7 @@ describe('Pool', function () { pool.terminate(); done(); } - }).timeout(100); + }).timeout(50); }); }); From 9ac04b4139357f3a61a9f97162748db455c9239d Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Mon, 2 Dec 2024 21:23:59 -0500 Subject: [PATCH 05/30] ref: change option assignment to tracked task --- src/WorkerHandler.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index d54407c1..5aca0a4e 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -435,7 +435,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) { me.tracking[id] = { id, resolver: Promise.defer(), - options: me.processing[id].options, + options: options, }; // remove this task from the queue. It is already rejected (hence this From 650128dd0171292c84e03b46d14c3bf827f29552 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Wed, 4 Dec 2024 18:01:36 -0500 Subject: [PATCH 06/30] fmt --- src/WorkerHandler.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 5aca0a4e..c1fdf87d 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -301,7 +301,7 @@ function WorkerHandler(script, _options) { task.resolver.resolve(response.result); } } - }else { + } else { // if the task is not the current, it might be tracked for cleanup var task = me.tracking[id]; if (task !== undefined) { From 85de98559b32e0287a335e52e895517f7fc45193 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Sat, 7 Dec 2024 09:12:08 -0500 Subject: [PATCH 07/30] ref: change abort listener execution to fuffil on first promise to resolve or reject --- src/worker.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/worker.js b/src/worker.js index 3d8bad41..cf2a101c 100644 --- a/src/worker.js +++ b/src/worker.js @@ -192,7 +192,7 @@ worker.cleanup = function(requestId) { error: convertError(new Error('Worker terminating')), }); - // If there are no handlers registered, reject the promise with an error as we want the handler to be notified + // If there are no handlers registered, as we want the handler to be notified // that cleanup should begin and the handler should be GCed. return new Promise(function(resolve) { resolve(); }); } @@ -231,10 +231,10 @@ worker.cleanup = function(requestId) { // - Reject if one or more handlers reject // Upon one of the above cases a message will be sent to the handler with the result of the handler execution // which will either kill the worker if the result contains an error, or - return Promise.all([ - settlePromise, - timeoutPromise - ]).then(function() { + return new Promise(function (resolve, reject) { + settlePromise.then(resolve, reject); + timeoutPromise.then(resolve, reject); + }).then(function() { worker.send({ id: requestId, method: CLEANUP_METHOD_ID, From 18e48535978402df13fcec5d12befee23b4fa319 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Sun, 8 Dec 2024 10:43:09 -0500 Subject: [PATCH 08/30] feat: add tracking tasks to busy check --- src/WorkerHandler.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index c1fdf87d..44641211 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -488,7 +488,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) { * @return {boolean} Returns true if the worker is busy */ WorkerHandler.prototype.busy = function () { - return this.cleaning || Object.keys(this.processing).length > 0; + return this.cleaning || Object.keys(this.processing).length > 0 || Object.keys(this.tracking).length > 0; }; /** From 0685ae3eef0436ff834dc7e8b4b2e85d104cd3f7 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Tue, 10 Dec 2024 22:16:55 -0500 Subject: [PATCH 09/30] feat: add event handlers for abort start and resolution --- src/Pool.js | 3 +++ src/WorkerHandler.js | 19 ++++++++++++++++--- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/Pool.js b/src/Pool.js index 3f762b2f..c8bed8ed 100644 --- a/src/Pool.js +++ b/src/Pool.js @@ -52,6 +52,8 @@ function Pool(script, options) { this.onCreateWorker = options.onCreateWorker || (() => null); /** @readonly */ this.onTerminateWorker = options.onTerminateWorker || (() => null); + /** @readonly */ + this.onAbortResolution = options.onAbortResolution || (() => null); /** @readonly */ this.emitStdStreams = options.emitStdStreams || false @@ -430,6 +432,7 @@ Pool.prototype._createWorkerHandler = function () { workerType: this.workerType, workerTerminateTimeout: this.workerTerminateTimeout, emitStdStreams: this.emitStdStreams, + onAbortResolution: this.onAbortResolution }); } diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 44641211..8206c426 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -251,6 +251,7 @@ function WorkerHandler(script, _options) { this.workerOpts = options.workerOpts; this.workerThreadOpts = options.workerThreadOpts this.workerTerminateTimeout = options.workerTerminateTimeout; + this.onAbortResolution = options.onAbortResolution; // The ready message is only sent if the worker.add method is called (And the default script is not used) if (!script) { @@ -318,10 +319,19 @@ function WorkerHandler(script, _options) { if (trackedTask !== undefined) { if (response.error) { clearTimeout(trackedTask.timeoutId); - trackedTask.resolver.reject(objectToError(response.error)) + trackedTask.resolver.reject(objectToError(response.error)); + me.onAbortResolution({ + error: response.error, + id, + isTerminating: true + }); } else { me.tracking && clearTimeout(trackedTask.timeoutId); - trackedTask.resolver.resolve(trackedTask.result); + trackedTask.resolver.resolve(trackedTask.result); + me.onAbortResolution({ + id, + isTerminating: false, + }) } } delete me.tracking[id]; @@ -459,7 +469,10 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) { id, method: CLEANUP_METHOD_ID }); - + me.tracking[id].options.workerAbortStart({ + id, + taskResolver: me.tracking[id].resolver, + }); /** * Sets a timeout to reject the cleanup operation if the message sent to the worker From 4a892d292e1faf777fd34a5529a951d9a1d2d268 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Tue, 10 Dec 2024 22:17:18 -0500 Subject: [PATCH 10/30] test: abort test refactor (wip) --- test/Pool.test.js | 184 ++++++++++++++++++++++++---------------------- 1 file changed, 95 insertions(+), 89 deletions(-) diff --git a/test/Pool.test.js b/test/Pool.test.js index 4b118aec..98225f13 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -1318,51 +1318,70 @@ describe('Pool', function () { describe('abort handler', () => { - it('should not terminate worker if abort listener is defined dedicated worker with Timeout', function () { + it('should not terminate worker if abort listener is defined dedicated worker with Timeout', function (done) { var workerCount = 0; var pool = createPool(__dirname + '/workers/cleanup-abort.js', { maxWorkers: 1, onCreateWorker: () => { workerCount += 1; + }, + onTerminateWorker: function() { + workerCount -= 1; } }); - return pool.exec('asyncTimeout', []) + pool.exec('asyncTimeout', [],{ + workerAbortStart: async function(args) { + // wait for the promise to resolve, + // then check pool stats + await args.taskResolver.promise; + var stats = pool.stats(); + assert.strictEqual(stats.busyWorkers, 0); + + pool.terminate(); + done(); + } + }) .timeout(200) .catch(function (err) { assert(err instanceof Promise.TimeoutError); let stats = pool.stats(); assert.strictEqual(workerCount, 1); assert.strictEqual(stats.totalWorkers, 1); - assert.strictEqual(stats.idleWorkers, 1); - assert.strictEqual(stats.busyWorkers, 0); - }).then(function() { - return pool.exec(add, [1, 2]) - }).then(function() { - var stats = pool.stats(); - assert.strictEqual(workerCount, 1); - assert.strictEqual(stats.totalWorkers, 1); - assert.strictEqual(stats.idleWorkers, 1); - assert.strictEqual(stats.busyWorkers, 0); - + assert.strictEqual(stats.idleWorkers, 0); + assert.strictEqual(stats.busyWorkers, 1); }); }); - it('should not terminate worker if abort listener is defined dedicated worker with Cancellation', function () { + it('should not terminate worker if abort listener is defined dedicated worker with Cancellation', function (done) { var workerCount = 0; var pool = createPool(__dirname + '/workers/cleanup-abort.js', { maxWorkers: 1, onCreateWorker: () => { workerCount += 1; }, + onTerminateWorker: function() { + workerCount -= 1; + } }); - let task = pool.exec('asyncTimeout', [], {}); + let task = pool.exec('asyncTimeout', [], { + workerAbortStart: async function(args) { + // wait for the promise to resolve, + // then check pool stats. + await args.taskResolver.promise; + var stats = pool.stats(); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + pool.terminate(); + done(); + } + }); // Wrap in a new promise which waits 50ms // in order to allow the function executing in the - // worker to - return new Promise(function(resolve) { + // worker to finish. + const _ = new Promise(function(resolve) { setTimeout(function() { resolve(); }, 50); @@ -1374,29 +1393,23 @@ describe('Pool', function () { let stats = pool.stats(); assert.strictEqual(workerCount, 1); assert.strictEqual(stats.totalWorkers, 1); - assert.strictEqual(stats.idleWorkers, 1); - assert.strictEqual(stats.busyWorkers, 0); - }).then(function() { - return pool.exec(add, [1, 2]) - }).then(function() { - var stats = pool.stats(); - assert.strictEqual(workerCount, 1); - assert.strictEqual(stats.totalWorkers, 1); - assert.strictEqual(stats.idleWorkers, 1); - assert.strictEqual(stats.busyWorkers, 0); - + assert.strictEqual(stats.idleWorkers, 0); + assert.strictEqual(stats.busyWorkers, 1); }); }); }); - it('should not terminate worker if abort listener is defined inline worker with Timeout', function () { + it('should not terminate worker if abort listener is defined inline worker with Timeout', function (done) { var workerCount = 0; var pool = createPool({ onCreateWorker: () => { workerCount += 1; }, maxWorkers: 1, + onTerminateWorker: function() { + workerCount -= 1; + } }); function asyncTimeout() { var me = this; @@ -1412,8 +1425,18 @@ describe('Pool', function () { }); }); } - function add(a, b) { } - return pool.exec(asyncTimeout, [], { + + const _ = pool.exec(asyncTimeout, [], { + workerAbortStart: async function(args) { + // wait for the promise to resolve, + // then check pool stats. + await args.taskResolver.promise; + var stats = pool.stats(); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + pool.terminate(); + done(); + } }) .timeout(200) .catch(function(err) { @@ -1423,25 +1446,19 @@ describe('Pool', function () { assert.strictEqual(stats.totalWorkers, 1); assert.strictEqual(stats.idleWorkers, 1); assert.strictEqual(stats.busyWorkers, 0); - }).always(function () { - return pool.exec(add, [1, 2]).then(function () { - var stats = pool.stats(); - assert.strictEqual(workerCount, 1); - assert.strictEqual(stats.totalWorkers, 1); - assert.strictEqual(stats.idleWorkers, 1); - assert.strictEqual(stats.busyWorkers, 0); - - }); }); }); - it('should not terminate worker if abort listener is defined inline worker with Cancellation', function () { + it('should not terminate worker if abort listener is defined inline worker with Cancellation', function (done) { var workerCount = 0; var pool = createPool({ onCreateWorker: () => { workerCount += 1; }, maxWorkers: 1, + onTerminateWorker: function() { + workerCount -= 1; + } }); function asyncTimeout() { @@ -1458,10 +1475,20 @@ describe('Pool', function () { }); }); } - function add(a, b) { } + const task = pool.exec(asyncTimeout, [], { + workerAbortStart: async function(args) { + // wait for the promise to resolve, + // then check pool stats. + await args.taskResolver.promise; + var stats = pool.stats(); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + pool.terminate(); + done(); + } }) - return new Promise(function(resolve) { + const _ = new Promise(function(resolve) { setTimeout(function() { resolve(); }, 50); @@ -1473,89 +1500,68 @@ describe('Pool', function () { var stats = pool.stats(); assert.strictEqual(stats.busyWorkers, 1); assert.strictEqual(stats.totalWorkers, 1); - }).always(function () { - return pool.exec(add, [1, 2]).then(function () { - var stats = pool.stats(); - assert.strictEqual(workerCount, 1); - assert.strictEqual(stats.totalWorkers, 1); - assert.strictEqual(stats.idleWorkers, 1); - assert.strictEqual(stats.busyWorkers, 0); - - }); }); }); }); - it('should invoke timeout for abort handler if timeout period is reached with Timeout', function () { + it('should invoke termination timeout for abort handler if timeout period is reached from task timeout', function (done) { var workerCount = 0; var pool = createPool(__dirname + '/workers/cleanup-abort.js', { maxWorkers: 2, onCreateWorker: function() { workerCount += 1; + }, + workerTerminateTimeout: 1000, + onAbortResolution: function(args) { + const stats = pool.stats(); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(args.isTerminating, true); + pool.terminate(); + done(); } }); - return pool.exec('asyncAbortHandlerNeverResolves', []) - .timeout(1000) + const _ = pool.exec('asyncAbortHandlerNeverResolves', [], { + + }) + .timeout(200) .catch(function (err) { assert(err instanceof Promise.TimeoutError); - var stats = pool.stats(); assert.strictEqual(stats.busyWorkers, 1); assert.strictEqual(stats.totalWorkers, 1); - }).always(function() { - var stats = pool.stats(); - assert.strictEqual(stats.busyWorkers, 0); - assert.strictEqual(stats.totalWorkers, 1); - return pool.exec(add, [1, 2]).then(function() { - var stats = pool.stats(); - assert.strictEqual(workerCount, 1); - assert.strictEqual(stats.totalWorkers, 1); - assert.strictEqual(stats.idleWorkers, 1); - assert.strictEqual(stats.busyWorkers, 0); - - }); }); }); - it('should invoke timeout for abort handler if timeout period is reached with Cancellation', function () { + it('should invoke timeout for abort handler if timeout period is reached with Cancellation', function (done) { var workerCount = 0; var pool = createPool(__dirname + '/workers/cleanup-abort.js', { maxWorkers: 1, onCreateWorker: function() { workerCount += 1; - } + }, + onAbortResolution: function(args) { + const stats = pool.stats(); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(args.isTerminating, true); + pool.terminate(); + done(); + }, + workerTerminateTimeout: 500, }); const task = pool.exec('asyncAbortHandlerNeverResolves', []) - return new Promise(function(resolve) { - setTimeout(function() { + const _ = new Promise(function(resolve) { resolve(); - }, 50); }).then(function() { return task.cancel() .catch(function (err) { - assert(err instanceof Promise.TimeoutError); + assert(err instanceof Promise.CancellationError); var stats = pool.stats(); assert(stats.busyWorkers === 1); - }).always(function() { - assert.strictEqual(workerCount, 1); - - var stats = pool.stats(); - assert.strictEqual(stats.busyWorkers, 0); - assert.strictEqual(stats.idleWorkers, 1); - assert.strictEqual(stats.totalWorkers, 1); - return pool.exec(add, [1, 2]).then(function() { - assert.strictEqual(workerCount, 1); - var stats = pool.stats(); - - assert.strictEqual(stats.busyWorkers, 0); - assert.strictEqual(stats.idleWorkers, 1); - assert.strictEqual(stats.totalWorkers, 1); - }); }); }); }); From e824a79cd46ba0f4c4ff0ac425b42dc88325da81 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Tue, 10 Dec 2024 22:31:37 -0500 Subject: [PATCH 11/30] ref: add default for tracking task options --- src/WorkerHandler.js | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 8206c426..a5fe5e59 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -445,7 +445,9 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) { me.tracking[id] = { id, resolver: Promise.defer(), - options: options, + options: options ?? { + workerAbortStart: () => {} + }, }; // remove this task from the queue. It is already rejected (hence this From efb12c6f2f001204a75906b401980e1005ff6dd5 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Tue, 10 Dec 2024 22:36:09 -0500 Subject: [PATCH 12/30] ref: rename event handler --- src/WorkerHandler.js | 4 ++-- test/Pool.test.js | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index a5fe5e59..2105ec09 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -446,7 +446,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) { id, resolver: Promise.defer(), options: options ?? { - workerAbortStart: () => {} + onAbortStart: () => {} }, }; @@ -471,7 +471,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) { id, method: CLEANUP_METHOD_ID }); - me.tracking[id].options.workerAbortStart({ + me.tracking[id].options.onAbortStart({ id, taskResolver: me.tracking[id].resolver, }); diff --git a/test/Pool.test.js b/test/Pool.test.js index 98225f13..634392d0 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -1331,7 +1331,7 @@ describe('Pool', function () { }); pool.exec('asyncTimeout', [],{ - workerAbortStart: async function(args) { + onAbortStart: async function(args) { // wait for the promise to resolve, // then check pool stats await args.taskResolver.promise; @@ -1366,7 +1366,7 @@ describe('Pool', function () { }); let task = pool.exec('asyncTimeout', [], { - workerAbortStart: async function(args) { + onAbortStart: async function(args) { // wait for the promise to resolve, // then check pool stats. await args.taskResolver.promise; @@ -1427,7 +1427,7 @@ describe('Pool', function () { } const _ = pool.exec(asyncTimeout, [], { - workerAbortStart: async function(args) { + onAbortStart: async function(args) { // wait for the promise to resolve, // then check pool stats. await args.taskResolver.promise; @@ -1477,7 +1477,7 @@ describe('Pool', function () { } const task = pool.exec(asyncTimeout, [], { - workerAbortStart: async function(args) { + onAbortStart: async function(args) { // wait for the promise to resolve, // then check pool stats. await args.taskResolver.promise; From 8987fa3fbf5531176d51ff12c24b40f038c21a9a Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Wed, 11 Dec 2024 20:48:01 -0500 Subject: [PATCH 13/30] ref: refactor std handler --- src/WorkerHandler.js | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index c1fdf87d..fdb1bb40 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -220,15 +220,11 @@ function objectToError (obj) { function handleEmittedStdPayload(handler, payload) { // TODO: refactor if parallel task execution gets added - var task = Object.values(handler.processing)[0] - if (task && task.options && typeof task.options.on === 'function') { - task.options.on(payload); - } + Object.values(handler.processing) + .forEach(task => task?.options?.on(payload)); - task = Object.values(handler.tracking)[0]; - if (task && task.options && typeof task.options.on === "function") { - task.options.on(payload); - } + Object.values(handler.tracking) + .forEach(task => task?.options?.on(payload)); } /** From f5c38fd1860e600cc055aef27f8a485cbc6f73c3 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Wed, 11 Dec 2024 20:57:09 -0500 Subject: [PATCH 14/30] ref: move tracking task deletion after task check --- src/WorkerHandler.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 2105ec09..b123598f 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -317,6 +317,7 @@ function WorkerHandler(script, _options) { if (response.method === CLEANUP_METHOD_ID) { var trackedTask = me.tracking[response.id]; if (trackedTask !== undefined) { + delete me.tracking[id]; if (response.error) { clearTimeout(trackedTask.timeoutId); trackedTask.resolver.reject(objectToError(response.error)); @@ -334,7 +335,6 @@ function WorkerHandler(script, _options) { }) } } - delete me.tracking[id]; } } }); From 9d8566369e2b79ffd0d987bc7dce31e71671c689 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Thu, 12 Dec 2024 21:13:39 -0500 Subject: [PATCH 15/30] ref: move abort resolver handler call to worker termination if rejecting --- src/WorkerHandler.js | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index db530ac9..348c3684 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -317,11 +317,6 @@ function WorkerHandler(script, _options) { if (response.error) { clearTimeout(trackedTask.timeoutId); trackedTask.resolver.reject(objectToError(response.error)); - me.onAbortResolution({ - error: response.error, - id, - isTerminating: true - }); } else { me.tracking && clearTimeout(trackedTask.timeoutId); trackedTask.resolver.resolve(trackedTask.result); @@ -454,9 +449,19 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) { delete me.tracking[id]; var promise = me.terminateAndNotify(true) - .then(function() { + .then(function(args) { + me.onAbortResolution({ + error: args, + id, + isTerminating: true + }); throw err; }, function(err) { + me.onAbortResolution({ + error: err, + id, + isTerminating: true + }); throw err; }); From 7c1e099fd471ccf7426621fc9b632211e30200fe Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Thu, 12 Dec 2024 21:50:06 -0500 Subject: [PATCH 16/30] dev: refactor abort example to show implemented changes --- examples/abort.js | 54 +++++++++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/examples/abort.js b/examples/abort.js index 9345da15..172bc756 100644 --- a/examples/abort.js +++ b/examples/abort.js @@ -1,16 +1,21 @@ const workerpool = require(".."); -var workerCount = 0; - // create a worker pool const pool = workerpool.pool(__dirname + "/workers/cleanupAbort.js", { // maximum time to wait for worker to cleanup it's resources // on termination before forcefully stopping the worker workerTerminateTimeout: 1000, - onCreateWorker: (args) => { + onCreateWorker: function (args) { console.log("New worker created"); - workerCount += 1; - } + }, + onAbortResolution: function(args) { + console.log("abort operation concluded for task:", args.id); + console.log("is worker terminating", args.isTerminating); + }, + onTerminateWorker: function() { + console.log("worker terminated"); + }, + maxWorkers: 1, }); function add (a, b) { @@ -18,29 +23,36 @@ function add (a, b) { } const main = async () => { - const cleanedUpTask = pool.exec('asyncTimeout', []).timeout(1_000).catch((err) => { - console.log("task timeout"); + let abortResolverSuccess; + await pool.exec('asyncTimeout', [], { + onAbortStart: async function(args) { + console.log("abort operation started"); + abortResolverSuccess = args.taskResolver.promise; + } + }).timeout(100).catch((err) => { console.log("timeout occured: ", err.message); - console.log("worker count ", workerCount); - return pool.exec(add, [1, 2]).then((sum) => { - console.log('add result', sum); - console.log("worker count: ", workerCount); - }); }); - await cleanedUpTask; - const canceledTask = pool.exec('asyncAbortHandlerNeverResolves').cancel().catch((err) => { + await abortResolverSuccess.catch((err) => { + console.log("", err); + }); + + console.log("pool status after abort operation:", pool.stats()); + + + let abortResolverFailure; + await pool.exec('asyncAbortHandlerNeverResolves', [], { + onAbortStart: function(args) { + console.log("abort operation started"); + abortResolverFailure = args.taskResolver.promise; + } + }).cancel().catch((err) => { console.log("task canceled"); console.log("cancel occured: ", err.message); - console.log("worker count ", workerCount); - return pool.exec(add, [1, 2]).then((sum) => { - console.log('add result', sum); - console.log("worker count: ", workerCount); - }); }); - await canceledTask; + console.log("final pool stats", pool.stats()); + // we dont need to terminate the pool, since all workers should be terminated by this point even though there is a handler. } - main(); From ad9534de0be99dfff906d3c1610137b7eff0ee95 Mon Sep 17 00:00:00 2001 From: joshLong145 Date: Wed, 1 Jan 2025 15:46:31 -0500 Subject: [PATCH 17/30] ref: add default for abort resolution handler --- src/WorkerHandler.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 348c3684..9e72d60c 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -247,7 +247,7 @@ function WorkerHandler(script, _options) { this.workerOpts = options.workerOpts; this.workerThreadOpts = options.workerThreadOpts this.workerTerminateTimeout = options.workerTerminateTimeout; - this.onAbortResolution = options.onAbortResolution; + this.onAbortResolution = options.onAbortResolution || (() => null); // The ready message is only sent if the worker.add method is called (And the default script is not used) if (!script) { From 7d24650e205bd6e08f1a4eae7eb7e17c00ac54d5 Mon Sep 17 00:00:00 2001 From: Josh Long Date: Wed, 5 Feb 2025 21:42:42 -0500 Subject: [PATCH 18/30] move to terminationHandler over pool controlled worker cleanup --- src/Pool.js | 16 ++++++++-------- src/WorkerHandler.js | 17 ++++++++++++++--- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/src/Pool.js b/src/Pool.js index c8bed8ed..4e4792fa 100644 --- a/src/Pool.js +++ b/src/Pool.js @@ -231,18 +231,18 @@ Pool.prototype._next = function () { // get the first task from the queue var me = this; var task = this.tasks.shift(); - + var terminationHandler = function() { + // if the worker crashed and terminated, remove it from the pool + if (worker.terminated) { + return me._removeWorker(worker); + } + } // check if the task is still pending (and not cancelled -> promise rejected) if (task.resolver.promise.pending) { // send the request to the worker - var promise = worker.exec(task.method, task.params, task.resolver, task.options) + var promise = worker.exec(task.method, task.params, task.resolver, task.options, terminationHandler) .then(me._boundNext) - .catch(function () { - // if the worker crashed and terminated, remove it from the pool - if (worker.terminated) { - return me._removeWorker(worker); - } - }).then(function() { + .then(function() { me._next(); // trigger next task in the queue }); diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 9e72d60c..5adc203f 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -395,7 +395,7 @@ WorkerHandler.prototype.methods = function () { * @param {import('./types.js').ExecOptions} [options] * @return {Promise.<*, Error>} result */ -WorkerHandler.prototype.exec = function(method, params, resolver, options) { +WorkerHandler.prototype.exec = function(method, params, resolver, options, terminationHandler) { if (!resolver) { resolver = Promise.defer(); } @@ -455,14 +455,22 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) { id, isTerminating: true }); - throw err; + if (terminationHandler) { + return terminationHandler(); + } else { + throw err; + } }, function(err) { me.onAbortResolution({ error: err, id, isTerminating: true }); - throw err; + if (terminationHandler) { + return terminationHandler(); + } else { + throw err; + } }); return promise; @@ -494,6 +502,9 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) { return me.tracking[id].resolver.promise; } else { + if (terminationHandler) { + return terminationHandler(); + } throw error; } }) From 0e0448aa0d011d657cb3dea0b0506b3d2121761e Mon Sep 17 00:00:00 2001 From: Josh Long Date: Wed, 5 Feb 2025 21:43:16 -0500 Subject: [PATCH 19/30] clarify example --- examples/abort.js | 73 ++++++++++++++++++++++++++++------------------- 1 file changed, 43 insertions(+), 30 deletions(-) diff --git a/examples/abort.js b/examples/abort.js index 172bc756..be78ea8f 100644 --- a/examples/abort.js +++ b/examples/abort.js @@ -8,51 +8,64 @@ const pool = workerpool.pool(__dirname + "/workers/cleanupAbort.js", { onCreateWorker: function (args) { console.log("New worker created"); }, - onAbortResolution: function(args) { + onAbortResolution: function (args) { console.log("abort operation concluded for task:", args.id); console.log("is worker terminating", args.isTerminating); }, - onTerminateWorker: function() { + onTerminateWorker: function () { console.log("worker terminated"); }, maxWorkers: 1, }); -function add (a, b) { - return a + b; +function add(a, b) { + return a + b; } const main = async () => { - let abortResolverSuccess; - await pool.exec('asyncTimeout', [], { - onAbortStart: async function(args) { - console.log("abort operation started"); - abortResolverSuccess = args.taskResolver.promise; - } - }).timeout(100).catch((err) => { - console.log("timeout occured: ", err.message); + let abortResolverSuccess; + await pool + .exec("asyncTimeout", [], { + onAbortStart: async function (args) { + console.log( + "abort operation started from task timeout, in onAbortStart", + ); + abortResolverSuccess = args.taskResolver.promise; + }, + }) + .timeout(100) + .catch((err) => { + console.log("timeout handled: ", err.message); }); - await abortResolverSuccess.catch((err) => { - console.log("", err); - }); + await abortResolverSuccess.catch((err) => { + console.log("", err); + }); + + console.log("pool status after abort operation:", pool.stats()); - console.log("pool status after abort operation:", pool.stats()); - - - let abortResolverFailure; - await pool.exec('asyncAbortHandlerNeverResolves', [], { - onAbortStart: function(args) { - console.log("abort operation started"); - abortResolverFailure = args.taskResolver.promise; - } - }).cancel().catch((err) => { - console.log("task canceled"); - console.log("cancel occured: ", err.message); + let abortResolverFailure; + await pool + .exec("asyncAbortHandlerNeverResolves", [], { + onAbortStart: function (args) { + console.log( + "abort operation started from task cancel, in onAbortStart", + ); + abortResolverFailure = args.taskResolver.promise; + }, + }) + .cancel() + .catch((err) => { + console.log("task canceled"); + console.log("cancel occured: ", err.message); }); - console.log("final pool stats", pool.stats()); - // we dont need to terminate the pool, since all workers should be terminated by this point even though there is a handler. -} + await abortResolverFailure.catch((e) => { + console.log("cancelation handled: ", e.message); + }); + + console.log("final pool stats", pool.stats()); + // we dont need to terminate the pool, since all workers should be terminated by this point even though there is a handler. +}; main(); From e1837861d13aa983c782acb5fce461c186190068 Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sun, 9 Feb 2025 17:25:34 -0500 Subject: [PATCH 20/30] refactor event listeners for abort op lifecycle update types remove logging --- src/Pool.js | 2 -- src/WorkerHandler.js | 59 ++++++++++++++++++++++++++------------------ src/types.js | 4 ++- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/src/Pool.js b/src/Pool.js index 4e4792fa..10121701 100644 --- a/src/Pool.js +++ b/src/Pool.js @@ -52,8 +52,6 @@ function Pool(script, options) { this.onCreateWorker = options.onCreateWorker || (() => null); /** @readonly */ this.onTerminateWorker = options.onTerminateWorker || (() => null); - /** @readonly */ - this.onAbortResolution = options.onAbortResolution || (() => null); /** @readonly */ this.emitStdStreams = options.emitStdStreams || false diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 5adc203f..7967f088 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -247,7 +247,6 @@ function WorkerHandler(script, _options) { this.workerOpts = options.workerOpts; this.workerThreadOpts = options.workerThreadOpts this.workerTerminateTimeout = options.workerTerminateTimeout; - this.onAbortResolution = options.onAbortResolution || (() => null); // The ready message is only sent if the worker.add method is called (And the default script is not used) if (!script) { @@ -290,7 +289,7 @@ function WorkerHandler(script, _options) { me.terminate(); } - // resolve the task's promise + // resolve the task's promis if (response.error) { task.resolver.reject(objectToError(response.error)); } @@ -319,11 +318,13 @@ function WorkerHandler(script, _options) { trackedTask.resolver.reject(objectToError(response.error)); } else { me.tracking && clearTimeout(trackedTask.timeoutId); - trackedTask.resolver.resolve(trackedTask.result); - me.onAbortResolution({ - id, - isTerminating: false, - }) + trackedTask.resolver.resolve(trackedTask.result); + if (trackedTask.options) { + trackedTask.options.onAbortResolution && trackedTask.options.onAbortResolution({ + id, + isTerminating: false, + }) + } } } } @@ -437,7 +438,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi id, resolver: Promise.defer(), options: options ?? { - onAbortStart: () => {} + onAbortResolution: () => {}, }, }; @@ -450,22 +451,26 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi var promise = me.terminateAndNotify(true) .then(function(args) { - me.onAbortResolution({ - error: args, - id, - isTerminating: true - }); - if (terminationHandler) { + if (options) { + options.onAbortResolution && options.onAbortResolution({ + error: err, + id, + isTerminating: true + }); + } + if (terminationHandler) { return terminationHandler(); } else { throw err; } }, function(err) { - me.onAbortResolution({ - error: err, - id, - isTerminating: true - }); + if (options) { + options.onAbortResolution && options.onAbortResolution({ + error: err, + id, + isTerminating: true + }); + } if (terminationHandler) { return terminationHandler(); } else { @@ -480,11 +485,13 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi id, method: CLEANUP_METHOD_ID }); - me.tracking[id].options.onAbortStart({ - id, - taskResolver: me.tracking[id].resolver, - }); + if (options) { + options.onAbortStart && options.onAbortStart({ + id, + taskResolver: me.tracking[id].resolver, + }); + } /** * Sets a timeout to reject the cleanup operation if the message sent to the worker * does not receive a response. see worker.tryCleanup for worker cleanup operations. @@ -498,14 +505,18 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi */ me.tracking[id].timeoutId = setTimeout(function() { me.tracking[id].resolver.reject(error); + if (terminationHandler) { + return terminationHandler(); + } }, me.workerTerminateTimeout); return me.tracking[id].resolver.promise; } else { if (terminationHandler) { return terminationHandler(); + } else { + throw error; } - throw error; } }) }; diff --git a/src/types.js b/src/types.js index c4709fe9..2fbaf5ec 100644 --- a/src/types.js +++ b/src/types.js @@ -24,13 +24,15 @@ * @property {import('worker_threads').WorkerOptions} [workerThreadOpts] Object`. For `worker` worker type. An object passed to [worker_threads.options](https://nodejs.org/api/worker_threads.html#new-workerfilename-options). * @property {boolean} [emitStdStreams] Capture stdout and stderr from the worker and emit them via the `stdout` and `stderr` events. Not supported by the `web` worker type. * @property { (arg: WorkerArg) => WorkerArg | undefined } [onCreateWorker] A callback that is called whenever a worker is being created. It can be used to allocate resources for each worker for example. Optionally, this callback can return an object containing one or more of the `WorkerArg` properties. The provided properties will be used to override the Pool properties for the worker being created. - * @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. + * @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. */ /** * @typedef {Object} ExecOptions * @property {(payload: any) => unknown} [on] An event listener, to handle events sent by the worker for this execution. * @property {Object[]} [transfer] A list of transferable objects to send to the worker. Not supported by `process` worker type. See ./examples/transferableObjects.js for usage. + * @property {(payload: {id: number, error: Error | undefined, isTerminating: boolean}) => void } [onAbortResolution] An event listener triggered when whenever an abort operation concludes. + * @property {(payload: {id number, taskResolver: any) => void } [onAbortStart] an event listener triggered when a task throws a Timeout or Canceltion exception and cleanup is starting. */ /** From 7e9fa7e6b6c35a866f26738e7e53333d5a79989d Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sun, 9 Feb 2025 17:52:50 -0500 Subject: [PATCH 21/30] cleanup tests --- test/Pool.test.js | 1726 ++++++++++++++++++++++++--------------------- 1 file changed, 919 insertions(+), 807 deletions(-) diff --git a/test/Pool.test.js b/test/Pool.test.js index 3fc54871..a2ee43b2 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -1,17 +1,17 @@ -var assert = require('assert'); -var {Promise} = require('../src/Promise'); -var Pool = require('../src/Pool'); -var tryRequire = require('./utils').tryRequire +var assert = require("assert"); +var { Promise } = require("../src/Promise"); +var Pool = require("../src/Pool"); +var tryRequire = require("./utils").tryRequire; function add(a, b) { return a + b; } -describe('Pool', function () { +describe("Pool", function () { // Creating pool with this function ensures that the pool is terminated // at the end of the test, which avoid hanging the test suite if terminate() // hadn't been called for some reason - let createdPools = [] + let createdPools = []; function createPool(script, options) { const pool = new Pool(script, options); createdPools.push(pool); @@ -24,26 +24,25 @@ describe('Pool', function () { } }); - describe('nodeWorker', function() { - function add(a,b) { - return a+b; + describe("nodeWorker", function () { + function add(a, b) { + return a + b; } - it('supports process', function() { - var pool = createPool({ workerType: 'process' }); + it("supports process", function () { + var pool = createPool({ workerType: "process" }); - return pool.exec(add, [3, 4]) - .then(function (result) { - assert.strictEqual(result, 7); + return pool.exec(add, [3, 4]).then(function (result) { + assert.strictEqual(result, 7); - return pool.terminate(); - }); + return pool.terminate(); + }); }); - var WorkerThreads = tryRequire('worker_threads'); + var WorkerThreads = tryRequire("worker_threads"); - it('supports auto', function() { - var pool = createPool({ workerType: 'auto' }); + it("supports auto", function () { + var pool = createPool({ workerType: "auto" }); var promise = pool.exec(add, [3, 4]); assert.strictEqual(pool.workers.length, 1); var worker = pool.workers[0].worker; @@ -62,8 +61,8 @@ describe('Pool', function () { }); if (WorkerThreads) { - it('supports thread', function() { - var pool = createPool({ workerType: 'thread' }); + it("supports thread", function () { + var pool = createPool({ workerType: "thread" }); var promise = pool.exec(add, [3, 4]); assert.strictEqual(pool.workers.length, 1); @@ -77,58 +76,80 @@ describe('Pool', function () { }); }); - it('supports passing options to threads', function() { - const maxYoungGenerationSizeMb = 200 - var pool = createPool({ minWorkers:1, workerType: 'thread', workerThreadOpts: { resourceLimits: { maxYoungGenerationSizeMb } } }); + it("supports passing options to threads", function () { + const maxYoungGenerationSizeMb = 200; + var pool = createPool({ + minWorkers: 1, + workerType: "thread", + workerThreadOpts: { resourceLimits: { maxYoungGenerationSizeMb } }, + }); var worker = pool.workers[0].worker; assert.strictEqual(worker.isWorkerThread, true); - assert.strictEqual(worker.resourceLimits.maxYoungGenerationSizeMb, maxYoungGenerationSizeMb); + assert.strictEqual( + worker.resourceLimits.maxYoungGenerationSizeMb, + maxYoungGenerationSizeMb, + ); }); - it('supports passing options to threads via mode auto', function() { - const maxYoungGenerationSizeMb = 200 - var pool = createPool({ minWorkers:1, workerType: 'auto', workerThreadOpts: { resourceLimits: { maxYoungGenerationSizeMb } } }); + it("supports passing options to threads via mode auto", function () { + const maxYoungGenerationSizeMb = 200; + var pool = createPool({ + minWorkers: 1, + workerType: "auto", + workerThreadOpts: { resourceLimits: { maxYoungGenerationSizeMb } }, + }); var worker = pool.workers[0].worker; assert.strictEqual(worker.isWorkerThread, true); - assert.strictEqual(worker.resourceLimits.maxYoungGenerationSizeMb, maxYoungGenerationSizeMb); + assert.strictEqual( + worker.resourceLimits.maxYoungGenerationSizeMb, + maxYoungGenerationSizeMb, + ); }); } else { - it('errors when not supporting worker thread', function() { - assert.throws(function() { - createPool({ workerType: 'thread' }); - }, /WorkerPool: workerType = 'thread' is not supported, Node >= 11\.7\.0 required/) + it("errors when not supporting worker thread", function () { + assert.throws(function () { + createPool({ workerType: "thread" }); + }, /WorkerPool: workerType = 'thread' is not supported, Node >= 11\.7\.0 required/); }); } - }) + }); - it('supports forkOpts parameter to pass options to fork', function() { - var pool = createPool({ workerType: 'process', forkOpts: { env: { TEST_ENV: 'env_value'} } }); + it("supports forkOpts parameter to pass options to fork", function () { + var pool = createPool({ + workerType: "process", + forkOpts: { env: { TEST_ENV: "env_value" } }, + }); function getEnv() { return process.env.TEST_ENV; } - return pool.exec(getEnv, []) - .then(function (result) { - assert.strictEqual(result, 'env_value'); + return pool.exec(getEnv, []).then(function (result) { + assert.strictEqual(result, "env_value"); - return pool.terminate(); - }); + return pool.terminate(); + }); }); - it('supports worker creation hook to pass dynamic options to fork (for example)', function() { + it("supports worker creation hook to pass dynamic options to fork (for example)", function () { var counter = 0; var terminatedWorkers = []; var pool = createPool({ - workerType: 'process', + workerType: "process", maxWorkers: 4, // make sure we can create enough workers (otherwise we could be limited by the number of CPUs) onCreateWorker: (opts) => { - return {...opts, forkOpts: {...opts.forkOpts, env: { TEST_ENV: `env_value${counter++}` }}} + return { + ...opts, + forkOpts: { + ...opts.forkOpts, + env: { TEST_ENV: `env_value${counter++}` }, + }, + }; }, onTerminateWorker: (opts) => { terminatedWorkers.push(opts.forkOpts.env.TEST_ENV); - } + }, }); function getEnv() { @@ -138,33 +159,67 @@ describe('Pool', function () { return Promise.all([ pool.exec(getEnv, []), pool.exec(getEnv, []), - pool.exec(getEnv, []) - ]).then(function (result) { - assert.strictEqual(result.length, 3, 'The creation hook should be called 3 times'); - assert(result.includes('env_value0'), 'result should include the value with counter = 0'); - assert(result.includes('env_value1'), 'result should include the value with counter = 1'); - assert(result.includes('env_value2'), 'result should include the value with counter = 2'); - return pool.terminate(); - }).then(function () { - assert.strictEqual(terminatedWorkers.length, 3, 'The termination hook should be called 3 times'); - assert(terminatedWorkers.includes('env_value0'), 'terminatedWorkers should include the value with counter = 0'); - assert(terminatedWorkers.includes('env_value1'), 'terminatedWorkers should include the value with counter = 1'); - assert(terminatedWorkers.includes('env_value2'), 'terminatedWorkers should include the value with counter = 2'); - }); + pool.exec(getEnv, []), + ]) + .then(function (result) { + assert.strictEqual( + result.length, + 3, + "The creation hook should be called 3 times", + ); + assert( + result.includes("env_value0"), + "result should include the value with counter = 0", + ); + assert( + result.includes("env_value1"), + "result should include the value with counter = 1", + ); + assert( + result.includes("env_value2"), + "result should include the value with counter = 2", + ); + return pool.terminate(); + }) + .then(function () { + assert.strictEqual( + terminatedWorkers.length, + 3, + "The termination hook should be called 3 times", + ); + assert( + terminatedWorkers.includes("env_value0"), + "terminatedWorkers should include the value with counter = 0", + ); + assert( + terminatedWorkers.includes("env_value1"), + "terminatedWorkers should include the value with counter = 1", + ); + assert( + terminatedWorkers.includes("env_value2"), + "terminatedWorkers should include the value with counter = 2", + ); + }); }); - it('supports worker creation hook to pass dynamic options to threads (for example)', function() { + it("supports worker creation hook to pass dynamic options to threads (for example)", function () { var counter = 0; var terminatedWorkers = []; var pool = createPool({ - workerType: 'thread', + workerType: "thread", maxWorkers: 4, // make sure we can create enough workers (otherwise we could be limited by the number of CPUs) onCreateWorker: (opts) => { - return {...opts, workerThreadOpts: {...opts.workerThreadOpts, env: { TEST_ENV: `env_value${counter++}` }}} + return { + ...opts, + workerThreadOpts: { + ...opts.workerThreadOpts, + env: { TEST_ENV: `env_value${counter++}` }, + }, + }; }, onTerminateWorker: (opts) => { terminatedWorkers.push(opts.workerThreadOpts.env.TEST_ENV); - } + }, }); function getEnv() { @@ -174,123 +229,171 @@ describe('Pool', function () { return Promise.all([ pool.exec(getEnv, []), pool.exec(getEnv, []), - pool.exec(getEnv, []) - ]).then(function (result) { - assert.strictEqual(result.length, 3, 'The creation hook should be called 3 times'); - assert(result.includes('env_value0'), 'result should include the value with counter = 0'); - assert(result.includes('env_value1'), 'result should include the value with counter = 1'); - assert(result.includes('env_value2'), 'result should include the value with counter = 2'); - return pool.terminate(); - }).then(function () { - assert.strictEqual(terminatedWorkers.length, 3, 'The termination hook should be called 3 times'); - assert(terminatedWorkers.includes('env_value0'), 'terminatedWorkers should include the value with counter = 0'); - assert(terminatedWorkers.includes('env_value1'), 'terminatedWorkers should include the value with counter = 1'); - assert(terminatedWorkers.includes('env_value2'), 'terminatedWorkers should include the value with counter = 2'); - }); + pool.exec(getEnv, []), + ]) + .then(function (result) { + assert.strictEqual( + result.length, + 3, + "The creation hook should be called 3 times", + ); + assert( + result.includes("env_value0"), + "result should include the value with counter = 0", + ); + assert( + result.includes("env_value1"), + "result should include the value with counter = 1", + ); + assert( + result.includes("env_value2"), + "result should include the value with counter = 2", + ); + return pool.terminate(); + }) + .then(function () { + assert.strictEqual( + terminatedWorkers.length, + 3, + "The termination hook should be called 3 times", + ); + assert( + terminatedWorkers.includes("env_value0"), + "terminatedWorkers should include the value with counter = 0", + ); + assert( + terminatedWorkers.includes("env_value1"), + "terminatedWorkers should include the value with counter = 1", + ); + assert( + terminatedWorkers.includes("env_value2"), + "terminatedWorkers should include the value with counter = 2", + ); + }); }); - - it('supports stdout/stderr capture via fork', function(done) { - var pool = createPool(__dirname + '/workers/console.js', {workerType: 'process', emitStdStreams: true}); - - var receivedEvents = [] - pool.exec("stdStreams", [], { - on: function (payload) { - receivedEvents.push(payload) - } - }) - .then(function (result) { - assert.strictEqual(result, 'done'); - assert.deepStrictEqual(receivedEvents, [{ - stdout: 'stdout message\n' - }, { - stderr: 'stderr message\n' - }]); - pool.terminate(); - done(); - }) - .catch(function (err) { - console.log(err); - assert.fail('Should not throw an error'); - done(err); + it("supports stdout/stderr capture via fork", function (done) { + var pool = createPool(__dirname + "/workers/console.js", { + workerType: "process", + emitStdStreams: true, }); - }) - - it('excludes stdout/stderr capture via fork', function(done) { - var pool = createPool(__dirname + '/workers/console.js', {workerType: 'process'}); - var receivedEvents = [] - pool.exec("stdStreams", [], { - on: function (payload) { - receivedEvents.push(payload) - } - }) - .then(function (result) { - assert.strictEqual(result, 'done'); - assert.deepStrictEqual(receivedEvents, []); + var receivedEvents = []; + pool + .exec("stdStreams", [], { + on: function (payload) { + receivedEvents.push(payload); + }, + }) + .then(function (result) { + assert.strictEqual(result, "done"); + assert.deepStrictEqual(receivedEvents, [ + { + stdout: "stdout message\n", + }, + { + stderr: "stderr message\n", + }, + ]); + + pool.terminate(); + done(); + }) + .catch(function (err) { + console.log(err); + assert.fail("Should not throw an error"); + done(err); + }); + }); - pool.terminate(); - done(); - }) - .catch(function (err) { - console.log(err); - assert.fail('Should not throw an error'); - done(err); + it("excludes stdout/stderr capture via fork", function (done) { + var pool = createPool(__dirname + "/workers/console.js", { + workerType: "process", }); - }) - it('supports stdout/stderr capture via threads', function(done) { - var pool = createPool(__dirname + '/workers/console.js', {workerType: 'threads', emitStdStreams: true}); + var receivedEvents = []; + pool + .exec("stdStreams", [], { + on: function (payload) { + receivedEvents.push(payload); + }, + }) + .then(function (result) { + assert.strictEqual(result, "done"); + assert.deepStrictEqual(receivedEvents, []); - var receivedEvents = [] - pool.exec("stdStreams", [], { - on: function (payload) { - receivedEvents.push(payload) - } - }) - .then(function (result) { - assert.strictEqual(result, 'done'); - assert.deepStrictEqual(receivedEvents, [{ - stdout: 'stdout message\n' - }, { - stderr: 'stderr message\n' - }]); + pool.terminate(); + done(); + }) + .catch(function (err) { + console.log(err); + assert.fail("Should not throw an error"); + done(err); + }); + }); - pool.terminate(); - done(); - }) - .catch(function (err) { - console.log(err); - assert.fail('Should not throw an error'); - done(err); + it("supports stdout/stderr capture via threads", function (done) { + var pool = createPool(__dirname + "/workers/console.js", { + workerType: "threads", + emitStdStreams: true, }); - }) - - it('excludes stdout/stderr capture via threads', function(done) { - var pool = createPool(__dirname + '/workers/console.js', {workerType: 'threads'}); - var receivedEvents = [] - pool.exec("stdStreams", [], { - on: function (payload) { - receivedEvents.push(payload) - } - }) - .then(function (result) { - assert.strictEqual(result, 'done'); - assert.deepStrictEqual(receivedEvents, []); + var receivedEvents = []; + pool + .exec("stdStreams", [], { + on: function (payload) { + receivedEvents.push(payload); + }, + }) + .then(function (result) { + assert.strictEqual(result, "done"); + assert.deepStrictEqual(receivedEvents, [ + { + stdout: "stdout message\n", + }, + { + stderr: "stderr message\n", + }, + ]); + + pool.terminate(); + done(); + }) + .catch(function (err) { + console.log(err); + assert.fail("Should not throw an error"); + done(err); + }); + }); - pool.terminate(); - done(); - }) - .catch(function (err) { - console.log(err); - assert.fail('Should not throw an error'); - done(err); + it("excludes stdout/stderr capture via threads", function (done) { + var pool = createPool(__dirname + "/workers/console.js", { + workerType: "threads", }); - }) - it('should offload a function to a worker', function (done) { - var pool = createPool({maxWorkers: 10}); + var receivedEvents = []; + pool + .exec("stdStreams", [], { + on: function (payload) { + receivedEvents.push(payload); + }, + }) + .then(function (result) { + assert.strictEqual(result, "done"); + assert.deepStrictEqual(receivedEvents, []); + + pool.terminate(); + done(); + }) + .catch(function (err) { + console.log(err); + assert.fail("Should not throw an error"); + done(err); + }); + }); + + it("should offload a function to a worker", function (done) { + var pool = createPool({ maxWorkers: 10 }); function add(a, b) { return a + b; @@ -298,23 +401,24 @@ describe('Pool', function () { assert.strictEqual(pool.workers.length, 0); - pool.exec(add, [3, 4]) - .then(function (result) { - assert.strictEqual(result, 7); - assert.strictEqual(pool.workers.length, 1); - return pool.terminate(); - }) - .then(function() { - assert.strictEqual(pool.workers.length, 0); - done(); - }) - .catch(done); + pool + .exec(add, [3, 4]) + .then(function (result) { + assert.strictEqual(result, 7); + assert.strictEqual(pool.workers.length, 1); + return pool.terminate(); + }) + .then(function () { + assert.strictEqual(pool.workers.length, 0); + done(); + }) + .catch(done); assert.strictEqual(pool.workers.length, 1); }); - it('should offload functions to multiple workers', function (done) { - var pool = createPool({maxWorkers: 10}); + it("should offload functions to multiple workers", function (done) { + var pool = createPool({ maxWorkers: 10 }); function add(a, b) { return a + b; @@ -322,23 +426,21 @@ describe('Pool', function () { assert.strictEqual(pool.workers.length, 0); - Promise.all([ - pool.exec(add, [3, 4]), - pool.exec(add, [2, 3]) - ]) - .then(function (results) { - assert.deepStrictEqual(results, [7, 5]); - assert.strictEqual(pool.workers.length, 2); + Promise.all([pool.exec(add, [3, 4]), pool.exec(add, [2, 3])]).then( + function (results) { + assert.deepStrictEqual(results, [7, 5]); + assert.strictEqual(pool.workers.length, 2); - pool.terminate(); - done(); - }); + pool.terminate(); + done(); + }, + ); assert.strictEqual(pool.workers.length, 2); }); - it('should put tasks in queue when all workers are busy', function (done) { - var pool = createPool({maxWorkers: 2}); + it("should put tasks in queue when all workers are busy", function (done) { + var pool = createPool({ maxWorkers: 2 }); function add(a, b) { return a + b; @@ -359,161 +461,166 @@ describe('Pool', function () { assert.strictEqual(pool.tasks.length, 2); assert.strictEqual(pool.workers.length, 2); - Promise.all([ - task1, - task2, - task3, - task4 - ]) - .then(function (results) { - assert.deepStrictEqual(results, [7, 5, 12, 2]); - assert.strictEqual(pool.tasks.length, 0); - assert.strictEqual(pool.workers.length, 2); + Promise.all([task1, task2, task3, task4]).then(function (results) { + assert.deepStrictEqual(results, [7, 5, 12, 2]); + assert.strictEqual(pool.tasks.length, 0); + assert.strictEqual(pool.workers.length, 2); - pool.terminate(); - done(); - }); + pool.terminate(); + done(); + }); }); - it('should create a proxy', function (done) { + it("should create a proxy", function (done) { var pool = createPool(); pool.proxy().then(function (proxy) { - assert.deepStrictEqual(Object.keys(proxy).sort(), ['methods', 'run']); + assert.deepStrictEqual(Object.keys(proxy).sort(), ["methods", "run"]); - proxy.methods() - .then(function (methods) { - assert.deepStrictEqual(methods.sort(), ['methods', 'run']); + proxy + .methods() + .then(function (methods) { + assert.deepStrictEqual(methods.sort(), ["methods", "run"]); - pool.terminate(); - done(); - }) - .catch(function () { - assert.fail('Should not throw an error'); - }); + pool.terminate(); + done(); + }) + .catch(function () { + assert.fail("Should not throw an error"); + }); }); }); - it('should create a proxy of a custom worker', function (done) { - var pool = createPool(__dirname + '/workers/simple.js'); + it("should create a proxy of a custom worker", function (done) { + var pool = createPool(__dirname + "/workers/simple.js"); pool.proxy().then(function (proxy) { - assert.deepStrictEqual(Object.keys(proxy).sort(), ['add','methods','multiply','run','timeout']); + assert.deepStrictEqual(Object.keys(proxy).sort(), [ + "add", + "methods", + "multiply", + "run", + "timeout", + ]); pool.terminate(); done(); }); }); - it('should invoke a method via a proxy', function (done) { - var pool = createPool(__dirname + '/workers/simple.js'); + it("should invoke a method via a proxy", function (done) { + var pool = createPool(__dirname + "/workers/simple.js"); pool.proxy().then(function (proxy) { - proxy.multiply(4, 3) - .then(function (result) { - assert.strictEqual(result, 12); + proxy + .multiply(4, 3) + .then(function (result) { + assert.strictEqual(result, 12); - pool.terminate(); - done(); - }) - .catch(function (err) { - console.log(err); - assert.fail('Should not throw an error'); - }); + pool.terminate(); + done(); + }) + .catch(function (err) { + console.log(err); + assert.fail("Should not throw an error"); + }); }); }); - it('should invoke an async method via a proxy', function (done) { - var pool = createPool(__dirname + '/workers/simple.js'); + it("should invoke an async method via a proxy", function (done) { + var pool = createPool(__dirname + "/workers/simple.js"); pool.proxy().then(function (proxy) { - proxy.timeout(100) - .then(function (result) { - assert.strictEqual(result, 'done'); + proxy + .timeout(100) + .then(function (result) { + assert.strictEqual(result, "done"); - pool.terminate(); - done(); - }) - .catch(function (err) { - console.log(err); - assert.fail('Should not throw an error'); - }); + pool.terminate(); + done(); + }) + .catch(function (err) { + console.log(err); + assert.fail("Should not throw an error"); + }); }); }); - it('should handle errors thrown by a worker', function (done) { - var pool = createPool({maxWorkers: 10}); + it("should handle errors thrown by a worker", function (done) { + var pool = createPool({ maxWorkers: 10 }); function test() { - throw new TypeError('Test error'); + throw new TypeError("Test error"); } - pool.exec(test) - .catch(function (err) { - assert.ok(err instanceof Error); - assert.strictEqual(err.message, 'Test error') + pool.exec(test).catch(function (err) { + assert.ok(err instanceof Error); + assert.strictEqual(err.message, "Test error"); - pool.terminate(); - done(); - }); + pool.terminate(); + done(); + }); }); - it('should execute a function returning a Promise', function (done) { - var pool = createPool({maxWorkers: 10}); + it("should execute a function returning a Promise", function (done) { + var pool = createPool({ maxWorkers: 10 }); function testAsync() { - return Promise.resolve('done'); + return Promise.resolve("done"); } - pool.exec(testAsync) - .then(function (result) { - assert.strictEqual(result, 'done'); + pool + .exec(testAsync) + .then(function (result) { + assert.strictEqual(result, "done"); - pool.terminate(); - done(); - }) - .catch(function () { - assert.fail('Should not throw an error'); - }); + pool.terminate(); + done(); + }) + .catch(function () { + assert.fail("Should not throw an error"); + }); }); - it('should propagate a rejected Promise', function (done) { - var pool = createPool({maxWorkers: 10}); + it("should propagate a rejected Promise", function (done) { + var pool = createPool({ maxWorkers: 10 }); function testAsync() { - return Promise.reject(new Error('I reject!')); + return Promise.reject(new Error("I reject!")); } - pool.exec(testAsync) - .then(function () { - assert.fail('Should not resolve'); - }) - .catch(function (err) { - assert.strictEqual(err.toString(), 'Error: I reject!'); + pool + .exec(testAsync) + .then(function () { + assert.fail("Should not resolve"); + }) + .catch(function (err) { + assert.strictEqual(err.toString(), "Error: I reject!"); - pool.terminate(); - done(); - }); + pool.terminate(); + done(); + }); }); - it('should cancel a task', function (done) { - var pool = createPool({maxWorkers: 10}); + it("should cancel a task", function (done) { + var pool = createPool({ maxWorkers: 10 }); function forever() { while (1 > 0) {} // runs forever } - var promise = pool.exec(forever) - .then(function () { - done(new Error('promise should not resolve!')); - }) + var promise = pool + .exec(forever) + .then(function () { + done(new Error("promise should not resolve!")); + }) //.catch(Promise.CancellationError, function (err) { // TODO: not yet supported - .catch(function (err) { - assert(err instanceof Promise.CancellationError); - // we cannot assert that no workers remain in the pool, because that happens - // on a different promise chain (termination is now async) - done(); - }); + .catch(function (err) { + assert(err instanceof Promise.CancellationError); + // we cannot assert that no workers remain in the pool, because that happens + // on a different promise chain (termination is now async) + done(); + }); // cancel the task setTimeout(function () { @@ -521,8 +628,8 @@ describe('Pool', function () { }, 0); }); - it('should cancel a queued task', function (done) { - var pool = createPool({maxWorkers: 1}); + it("should cancel a queued task", function (done) { + var pool = createPool({ maxWorkers: 1 }); var reachedTheEnd = false; function delayed() { @@ -537,20 +644,21 @@ describe('Pool', function () { return 1; } - var p1 = pool.exec(delayed) - .then(function (result) { - assert.strictEqual(result, 1); - assert.strictEqual(reachedTheEnd, true); + var p1 = pool + .exec(delayed) + .then(function (result) { + assert.strictEqual(result, 1); + assert.strictEqual(reachedTheEnd, true); - assert.strictEqual(pool.workers.length, 1); - assert.strictEqual(pool.tasks.length, 0); + assert.strictEqual(pool.workers.length, 1); + assert.strictEqual(pool.tasks.length, 0); - return pool.terminate(); - }) - .then(function() { - done(); - }) - .catch(done); + return pool.terminate(); + }) + .then(function () { + done(); + }) + .catch(done); assert.strictEqual(pool.workers.length, 1); assert.strictEqual(pool.tasks.length, 0); @@ -559,16 +667,15 @@ describe('Pool', function () { assert.strictEqual(pool.workers.length, 1); assert.strictEqual(pool.tasks.length, 1); - p2.cancel(); // cancel immediately + p2.cancel(); // cancel immediately assert.strictEqual(pool.workers.length, 1); assert.strictEqual(pool.tasks.length, 1); reachedTheEnd = true; }); - it('should run following tasks if a previous queued task is cancelled', function (done) { - - var pool = createPool({maxWorkers: 1}); + it("should run following tasks if a previous queued task is cancelled", function (done) { + var pool = createPool({ maxWorkers: 1 }); var reachedTheEnd = false; function delayed() { @@ -595,26 +702,26 @@ describe('Pool', function () { var twoDone = false; function checkDone() { if (oneDone && twoDone) { - return pool.terminate() - .then(function() { + return pool + .terminate() + .then(function () { done(); }) .catch(done); } } - var p1 = pool.exec(delayed) - .then(function (result) { - assert.strictEqual(result, 1); - assert.strictEqual(reachedTheEnd, true); + var p1 = pool.exec(delayed).then(function (result) { + assert.strictEqual(result, 1); + assert.strictEqual(reachedTheEnd, true); - oneDone = true; + oneDone = true; - assert.strictEqual(pool.workers.length, 1); - assert.strictEqual(pool.tasks.length, 1); + assert.strictEqual(pool.workers.length, 1); + assert.strictEqual(pool.tasks.length, 1); - checkDone(); - }); + checkDone(); + }); assert.strictEqual(pool.workers.length, 1); assert.strictEqual(pool.tasks.length, 0); @@ -623,124 +730,133 @@ describe('Pool', function () { assert.strictEqual(pool.workers.length, 1); assert.strictEqual(pool.tasks.length, 1); - var p3 = pool.exec(two) - .then(function (result) { - assert.strictEqual(result, 2); - assert.strictEqual(reachedTheEnd, true); + var p3 = pool.exec(two).then(function (result) { + assert.strictEqual(result, 2); + assert.strictEqual(reachedTheEnd, true); - twoDone = true; + twoDone = true; - assert.strictEqual(pool.workers.length, 1); - assert.strictEqual(pool.tasks.length, 0); + assert.strictEqual(pool.workers.length, 1); + assert.strictEqual(pool.tasks.length, 0); - checkDone(); - }); + checkDone(); + }); - p2.cancel(); // cancel immediately + p2.cancel(); // cancel immediately assert.strictEqual(pool.workers.length, 1); assert.strictEqual(pool.tasks.length, 2); reachedTheEnd = true; }); - // TODO: test whether a task in the queue can be neatly cancelled + // TODO: test whether a task in the queue can be neatly cancelled - it('should timeout a task', function () { - var pool = createPool({maxWorkers: 10}); + it("should timeout a task", function () { + var pool = createPool({ maxWorkers: 10 }); function forever() { while (1 > 0) {} // runs forever } - return pool.exec(forever) + return ( + pool + .exec(forever) .timeout(50) .then(function (result) { - assert.fail('promise should never resolve'); + assert.fail("promise should never resolve"); }) - //.catch(Promise.CancellationError, function (err) { // TODO: not yet supported + //.catch(Promise.CancellationError, function (err) { // TODO: not yet supported .catch(function (err) { assert(err instanceof Promise.TimeoutError); // we cannot assert that no workers remain in the pool, because that happens // on a different promise chain (termination is now async) - - }); + }) + ); }); - it('should start timeout timer of a task once the task is taken from the queue (1)', function (done) { - var pool = createPool({maxWorkers: 1}); - var delay = 50 + it("should start timeout timer of a task once the task is taken from the queue (1)", function (done) { + var pool = createPool({ maxWorkers: 1 }); + var delay = 50; function sleep() { return new Promise(function (resolve, reject) { setTimeout(function () { - resolve ('done :)') - }, 100) // 2 * delay - }) + resolve("done :)"); + }, 100); // 2 * delay + }); } function doNothing() { - return 'ready' + return "ready"; } // add a task - pool.exec(sleep) + pool.exec(sleep); // add a second task, will be queued until the first finishes // the timeout is shorter than the currently executing task and longer than // the queued task, so it should not timeout - pool.exec(doNothing) - .timeout(delay) - .then(function (result) { - assert.strictEqual(result, 'ready'); + pool + .exec(doNothing) + .timeout(delay) + .then(function (result) { + assert.strictEqual(result, "ready"); - return pool.terminate() - .then(function() { - done(); - }) - .catch(done); - }) - .catch(function (err) { - assert.fail('promise should not throw'); - }); + return pool + .terminate() + .then(function () { + done(); + }) + .catch(done); + }) + .catch(function (err) { + assert.fail("promise should not throw"); + }); }); - it('should start timeout timer of a task once the task is taken from the queue (2)', function (done) { - var pool = createPool({maxWorkers: 1}); - var delay = 50 + it("should start timeout timer of a task once the task is taken from the queue (2)", function (done) { + var pool = createPool({ maxWorkers: 1 }); + var delay = 50; function sleep() { return new Promise(function (resolve, reject) { setTimeout(function () { - resolve ('done :)') - }, 100) // 2 * delay - }) + resolve("done :)"); + }, 100); // 2 * delay + }); } // add a task - pool.exec(sleep) + pool.exec(sleep); // add a second task, will be queued until the first finishes - pool.exec(sleep) - .timeout(delay) - .then(function (result) { - assert.fail('promise should never resolve'); - }) - .catch(function (err) { - assert(err instanceof Promise.TimeoutError); + pool + .exec(sleep) + .timeout(delay) + .then(function (result) { + assert.fail("promise should never resolve"); + }) + .catch(function (err) { + assert(err instanceof Promise.TimeoutError); - done(); - }); + done(); + }); }); - it('should handle crashed workers', function () { - var pool = createPool({maxWorkers: 1}); + it("should handle crashed workers", function () { + var pool = createPool({ maxWorkers: 1 }); - var promise = pool.exec(add) + var promise = pool + .exec(add) .then(function () { - throw new Error('Promise should not be resolved'); + throw new Error("Promise should not be resolved"); }) .catch(function (err) { - assert.ok(err.toString().match(/Error: Workerpool Worker terminated Unexpectedly/)); + assert.ok( + err + .toString() + .match(/Error: Workerpool Worker terminated Unexpectedly/), + ); assert.ok(err.toString().match(/exitCode: `.*`/)); assert.ok(err.toString().match(/signalCode: `.*`/)); assert.ok(err.toString().match(/workerpool.script: `.*\.js`/)); @@ -748,7 +864,7 @@ describe('Pool', function () { assert.strictEqual(pool.workers.length, 0); // validate whether a new worker is spawned - var promise2 = pool.exec(add, [2,3]) + var promise2 = pool.exec(add, [2, 3]); assert.strictEqual(pool.workers.length, 1); return promise2; }) @@ -770,36 +886,35 @@ describe('Pool', function () { return promise; }); - describe('options', function () { - - it('should throw an error on invalid type or number of maxWorkers', function () { + describe("options", function () { + it("should throw an error on invalid type or number of maxWorkers", function () { assert.throws(function () { - createPool({maxWorkers: 'a string'}); + createPool({ maxWorkers: "a string" }); }, TypeError); assert.throws(function () { - createPool({maxWorkers: 2.5}); + createPool({ maxWorkers: 2.5 }); }, TypeError); assert.throws(function () { - createPool({maxWorkers: 0}); + createPool({ maxWorkers: 0 }); }, TypeError); assert.throws(function () { - createPool({maxWorkers: -1}); + createPool({ maxWorkers: -1 }); }, TypeError); }); - it('should limit to the configured number of max workers', function () { - var pool = createPool({maxWorkers: 2}); + it("should limit to the configured number of max workers", function () { + var pool = createPool({ maxWorkers: 2 }); var tasks = [ pool.exec(add, [1, 2]), pool.exec(add, [3, 4]), pool.exec(add, [5, 6]), pool.exec(add, [7, 8]), - pool.exec(add, [9, 0]) - ] + pool.exec(add, [9, 0]), + ]; assert.strictEqual(pool.maxWorkers, 2); assert.strictEqual(pool.workers.length, 2); @@ -810,47 +925,47 @@ describe('Pool', function () { }); }); - it('should take number of cpus minus one as default maxWorkers', function () { + it("should take number of cpus minus one as default maxWorkers", function () { var pool = createPool(); - var cpus = require('os').cpus(); + var cpus = require("os").cpus(); assert.strictEqual(pool.maxWorkers, cpus.length - 1); return pool.terminate(); }); - it('should throw an error on invalid type or number of minWorkers', function () { + it("should throw an error on invalid type or number of minWorkers", function () { assert.throws(function () { - createPool({minWorkers: 'a string'}); + createPool({ minWorkers: "a string" }); }, TypeError); assert.throws(function () { - createPool({minWorkers: 2.5}); + createPool({ minWorkers: 2.5 }); }, TypeError); assert.throws(function () { - createPool({maxWorkers: -1}); + createPool({ maxWorkers: -1 }); }, TypeError); }); - it('should create number of cpus minus one when minWorkers set to \'max\'', function () { - var pool = createPool({minWorkers:'max'}); + it("should create number of cpus minus one when minWorkers set to 'max'", function () { + var pool = createPool({ minWorkers: "max" }); - var cpus = require('os').cpus(); + var cpus = require("os").cpus(); assert.strictEqual(pool.workers.length, cpus.length - 1); return pool.terminate(); }); - it('should increase maxWorkers to match minWorkers', function () { - var cpus = require('os').cpus(); + it("should increase maxWorkers to match minWorkers", function () { + var cpus = require("os").cpus(); var count = cpus.length + 2; var tasksCount = cpus.length * 2; - var pool = createPool({minWorkers: count}); + var pool = createPool({ minWorkers: count }); - var tasks = [] - for(var i=0;i 500) { + if (new Date().getTime() - start > 500) { break; } } - return 'test 1 ok'; + return "test 1 ok"; } function test2() { var start = new Date().getTime(); for (var i = 0; i < 1e7; i++) { - if ((new Date().getTime() - start) > 1000) { + if (new Date().getTime() - start > 1000) { break; } } - return 'test 2 ok'; + return "test 2 ok"; } function test3() { var start = new Date().getTime(); for (var i = 0; i < 1e7; i++) { - if ((new Date().getTime() - start) > 200) { + if (new Date().getTime() - start > 200) { break; } } - return 'test 3 ok'; + return "test 3 ok"; } - var promises = [ - pool.exec(test1), - pool.exec(test2), - pool.exec(test3) - ]; + var promises = [pool.exec(test1), pool.exec(test2), pool.exec(test3)]; Promise.all(promises) .then(function (results) { - assert.strictEqual(results[0], 'test 1 ok'); - assert.strictEqual(results[1], 'test 2 ok'); - assert.strictEqual(results[3], 'test 3 ok'); + assert.strictEqual(results[0], "test 1 ok"); + assert.strictEqual(results[1], "test 2 ok"); + assert.strictEqual(results[3], "test 3 ok"); }) - .catch(function(error) { + .catch(function (error) { assert.fail(error); }); assert.strictEqual(pool.workers.length, 3); - pool.terminate(false, 2000) - .then(function() { - assert.strictEqual(pool.workers.length, 0); - done(); - }); + pool.terminate(false, 2000).then(function () { + assert.strictEqual(pool.workers.length, 0); + done(); + }); }); - it ('should wait for all workers if pool is terminated before tasks are finished, even if a task fails', function (done) { - - var pool = createPool({maxWorkers: 10}); + it("should wait for all workers if pool is terminated before tasks are finished, even if a task fails", function (done) { + var pool = createPool({ maxWorkers: 10 }); assert.strictEqual(pool.workers.length, 0); function test1() { var start = new Date().getTime(); for (var i = 0; i < 1e7; i++) { - if ((new Date().getTime() - start) > 1000) { + if (new Date().getTime() - start > 1000) { break; } } - return 'test 1 ok'; + return "test 1 ok"; } function test2() { var start = new Date().getTime(); for (var i = 0; i < 1e7; i++) { - if ((new Date().getTime() - start) > 100) { + if (new Date().getTime() - start > 100) { break; } } - throw new Error('test 2 error'); + throw new Error("test 2 error"); } - var promises = [ - pool.exec(test1), - pool.exec(test2) - ]; + var promises = [pool.exec(test1), pool.exec(test2)]; Promise.all(promises) .then(function (results) { - assert.fail('test2 should have been rejected'); + assert.fail("test2 should have been rejected"); }) - .catch(function(error) { - assert.strictEqual(error.message, 'test 2 error'); + .catch(function (error) { + assert.strictEqual(error.message, "test 2 error"); }); assert.strictEqual(pool.workers.length, 2); - pool.terminate(false, 2000) - .then(function() { - assert.strictEqual(pool.workers.length, 0); - done(); - }); + pool.terminate(false, 2000).then(function () { + assert.strictEqual(pool.workers.length, 0); + done(); + }); }); - it ('should cancel any pending tasks when terminating a pool', function () { - var pool = createPool({maxWorkers: 1}); + it("should cancel any pending tasks when terminating a pool", function () { + var pool = createPool({ maxWorkers: 1 }); assert.strictEqual(pool.workers.length, 0); - function test1 () { - return 'test 1 ok'; + function test1() { + return "test 1 ok"; } - function test2 () { - return 'test 2 ok'; + function test2() { + return "test 2 ok"; } var promise1 = pool.exec(test1); @@ -1076,24 +1177,23 @@ describe('Pool', function () { assert.strictEqual(pool.workers.length, 1); assert.strictEqual(pool.tasks.length, 1); - return pool.terminate(false) - .then(function() { - assert.strictEqual(pool.workers.length, 0); - assert.strictEqual(pool.tasks.length, 0); - - return Promise.all([ - promise1.then(function (result) { - assert.strictEqual(result, 'test 1 ok'); - }), - promise2.catch(function (err) { - assert.strictEqual(err.message, 'Pool terminated'); - }) - ]) - }); + return pool.terminate(false).then(function () { + assert.strictEqual(pool.workers.length, 0); + assert.strictEqual(pool.tasks.length, 0); + + return Promise.all([ + promise1.then(function (result) { + assert.strictEqual(result, "test 1 ok"); + }), + promise2.catch(function (err) { + assert.strictEqual(err.message, "Pool terminated"); + }), + ]); + }); }); - it('should return statistics', function () { - var pool = createPool({maxWorkers: 4}); + it("should return statistics", function () { + var pool = createPool({ maxWorkers: 4 }); function test() { return new Promise(function (resolve, reject) { @@ -1103,57 +1203,104 @@ describe('Pool', function () { function testError() { return new Promise(function (resolve, reject) { - throw new Error('Test error') + throw new Error("Test error"); }); } - assert.deepStrictEqual(pool.stats(), {totalWorkers: 0, busyWorkers: 0, idleWorkers: 0, pendingTasks: 0, activeTasks: 0}); - - var promise = pool.exec(test) - .then(function () { - assert.deepStrictEqual(pool.stats(), {totalWorkers: 1, busyWorkers: 0, idleWorkers: 1, pendingTasks: 0, activeTasks: 0 }); + assert.deepStrictEqual(pool.stats(), { + totalWorkers: 0, + busyWorkers: 0, + idleWorkers: 0, + pendingTasks: 0, + activeTasks: 0, + }); - // start six tasks (max workers is 4, so we should get pending tasks) - var all = Promise.all([ - pool.exec(test), - pool.exec(test), - pool.exec(test), - pool.exec(test), - pool.exec(test), - pool.exec(test) - ]); + var promise = pool + .exec(test) + .then(function () { + assert.deepStrictEqual(pool.stats(), { + totalWorkers: 1, + busyWorkers: 0, + idleWorkers: 1, + pendingTasks: 0, + activeTasks: 0, + }); - assert.deepStrictEqual(pool.stats(), {totalWorkers: 4, busyWorkers: 4, idleWorkers: 0, pendingTasks: 2, activeTasks: 4}); + // start six tasks (max workers is 4, so we should get pending tasks) + var all = Promise.all([ + pool.exec(test), + pool.exec(test), + pool.exec(test), + pool.exec(test), + pool.exec(test), + pool.exec(test), + ]); + + assert.deepStrictEqual(pool.stats(), { + totalWorkers: 4, + busyWorkers: 4, + idleWorkers: 0, + pendingTasks: 2, + activeTasks: 4, + }); - return all; - }) - .then(function () { - assert.deepStrictEqual(pool.stats(), {totalWorkers: 4, busyWorkers: 0, idleWorkers: 4, pendingTasks: 0, activeTasks: 0 }); + return all; + }) + .then(function () { + assert.deepStrictEqual(pool.stats(), { + totalWorkers: 4, + busyWorkers: 0, + idleWorkers: 4, + pendingTasks: 0, + activeTasks: 0, + }); - return pool.exec(testError) - }) - .catch(function () { - assert.deepStrictEqual(pool.stats(), {totalWorkers: 4, busyWorkers: 0, idleWorkers: 4, pendingTasks: 0, activeTasks: 0}); + return pool.exec(testError); + }) + .catch(function () { + assert.deepStrictEqual(pool.stats(), { + totalWorkers: 4, + busyWorkers: 0, + idleWorkers: 4, + pendingTasks: 0, + activeTasks: 0, }); + }); - assert.deepStrictEqual(pool.stats(), {totalWorkers: 1, busyWorkers: 1, idleWorkers: 0, pendingTasks: 0, activeTasks: 1}); + assert.deepStrictEqual(pool.stats(), { + totalWorkers: 1, + busyWorkers: 1, + idleWorkers: 0, + pendingTasks: 0, + activeTasks: 1, + }); return promise.then(function () { return pool.terminate(); }); }); - it('should throw an error in case of wrong type of arguments in function exec', function () { + it("should throw an error in case of wrong type of arguments in function exec", function () { var pool = createPool(); - assert.throws(function () {pool.exec()}, TypeError); - assert.throws(function () {pool.exec(23)}, TypeError); - assert.throws(function () {pool.exec(add, {})}, TypeError); - assert.throws(function () {pool.exec(add, 2, 3)}, TypeError); - assert.throws(function () {pool.exec(add, 'a string')}, TypeError); + assert.throws(function () { + pool.exec(); + }, TypeError); + assert.throws(function () { + pool.exec(23); + }, TypeError); + assert.throws(function () { + pool.exec(add, {}); + }, TypeError); + assert.throws(function () { + pool.exec(add, 2, 3); + }, TypeError); + assert.throws(function () { + pool.exec(add, "a string"); + }, TypeError); }); - it('should throw an error when the tasks queue is full', function () { - var pool = createPool({maxWorkers: 2, maxQueueSize: 3}); + it("should throw an error when the tasks queue is full", function () { + var pool = createPool({ maxWorkers: 2, maxQueueSize: 3 }); function add(a, b) { return a + b; @@ -1175,92 +1322,90 @@ describe('Pool', function () { assert.strictEqual(pool.tasks.length, 3); assert.strictEqual(pool.workers.length, 2); - assert.throws(function () {pool.exec(add, [9, 4])}, Error); + assert.throws(function () { + pool.exec(add, [9, 4]); + }, Error); - return Promise.all([ - task1, - task2, - task3, - task4, - task5 - ]) - .then(function () { - assert.strictEqual(pool.tasks.length, 0); - assert.strictEqual(pool.workers.length, 2); + return Promise.all([task1, task2, task3, task4, task5]).then(function () { + assert.strictEqual(pool.tasks.length, 0); + assert.strictEqual(pool.workers.length, 2); - return pool.terminate(); - }); + return pool.terminate(); + }); }); - it('should receive events from worker', function (done) { - var pool = createPool(__dirname + '/workers/emit.js'); + it("should receive events from worker", function (done) { + var pool = createPool(__dirname + "/workers/emit.js"); - var receivedEvent + var receivedEvent; - pool.exec('sendEvent', [], { - on: function (payload) { - receivedEvent = payload - } - }) - .then(function (result) { - assert.strictEqual(result, 'done'); - assert.deepStrictEqual(receivedEvent, { - foo: 'bar' - }); + pool + .exec("sendEvent", [], { + on: function (payload) { + receivedEvent = payload; + }, + }) + .then(function (result) { + assert.strictEqual(result, "done"); + assert.deepStrictEqual(receivedEvent, { + foo: "bar", + }); - pool.terminate(); - done(); - }) - .catch(function (err) { - console.log(err); - assert.fail('Should not throw an error'); - done(err); - }); + pool.terminate(); + done(); + }) + .catch(function (err) { + console.log(err); + assert.fail("Should not throw an error"); + done(err); + }); }); - it('should support sending transferable object to worker', function (done) { - var pool = createPool(__dirname + '/workers/transfer-to.js'); + it("should support sending transferable object to worker", function (done) { + var pool = createPool(__dirname + "/workers/transfer-to.js"); var size = 8; var uInt8Array = new Uint8Array(size).map((_v, i) => i); - pool.exec('transfer', [uInt8Array], { - transfer: [uInt8Array.buffer] - }) - .then(function (result) { - assert.strictEqual(result, size); - // original buffer should be transferred thus empty - assert.strictEqual(uInt8Array.byteLength, 0); + pool + .exec("transfer", [uInt8Array], { + transfer: [uInt8Array.buffer], + }) + .then(function (result) { + assert.strictEqual(result, size); + // original buffer should be transferred thus empty + assert.strictEqual(uInt8Array.byteLength, 0); - pool.terminate(); - done(); - }) - .catch(function (err) { - console.log(err); - assert.fail('Should not throw an error'); - done(err); - }); + pool.terminate(); + done(); + }) + .catch(function (err) { + console.log(err); + assert.fail("Should not throw an error"); + done(err); + }); }); - it('should support sending transferable object from worker', function (done) { - var pool = createPool(__dirname + '/workers/transfer-from.js'); + it("should support sending transferable object from worker", function (done) { + var pool = createPool(__dirname + "/workers/transfer-from.js"); var size = 8; - pool.exec('transfer', [size]) - .then(function (result) { - assert.strictEqual(result.byteLength, size); + pool + .exec("transfer", [size]) + .then(function (result) { + assert.strictEqual(result.byteLength, size); - pool.terminate(); - done(); - }) - .catch(function (err) { - console.log(err); - assert.fail('Should not throw an error'); - done(err); - }); + pool.terminate(); + done(); + }) + .catch(function (err) { + console.log(err); + assert.fail("Should not throw an error"); + done(err); + }); }); - it('should call worker termination handler', function () { - var pool = createPool(__dirname + '/workers/cleanup.js'); + it("should call worker termination handler", function () { + var pool = createPool(__dirname + "/workers/cleanup.js"); var handlerCalled = false; var channel = new MessageChannel(); @@ -1269,7 +1414,7 @@ describe('Pool', function () { handlerCalled = true; }; - pool.exec('asyncAdd', [1, 2, channel.port2], { + pool.exec("asyncAdd", [1, 2, channel.port2], { transfer: [channel.port2], }); @@ -1278,8 +1423,8 @@ describe('Pool', function () { }); }); - it('should call worker termination async handler', function () { - var pool = createPool(__dirname + '/workers/cleanup-async.js'); + it("should call worker termination async handler", function () { + var pool = createPool(__dirname + "/workers/cleanup-async.js"); var handlerCalled = false; var channel = new MessageChannel(); @@ -1288,7 +1433,7 @@ describe('Pool', function () { handlerCalled = true; }; - pool.exec('asyncAdd', [1, 2, channel.port2], { + pool.exec("asyncAdd", [1, 2, channel.port2], { transfer: [channel.port2], }); @@ -1297,8 +1442,8 @@ describe('Pool', function () { }); }); - it('should not call worker termination async handler after timeout', function () { - var pool = createPool(__dirname + '/workers/cleanup-async.js', { + it("should not call worker termination async handler after timeout", function () { + var pool = createPool(__dirname + "/workers/cleanup-async.js", { workerTerminateTimeout: 1, }); @@ -1309,7 +1454,7 @@ describe('Pool', function () { handlerCalled = true; }; - pool.exec('asyncAdd', [1, 2, channel.port2], { + pool.exec("asyncAdd", [1, 2, channel.port2], { transfer: [channel.port2], }); @@ -1318,32 +1463,24 @@ describe('Pool', function () { }); }); - - describe('abort handler', () => { - it('should not terminate worker if abort listener is defined dedicated worker with Timeout', function (done) { - var workerCount = 0; - var pool = createPool(__dirname + '/workers/cleanup-abort.js', { + describe("abort handler", () => { + it("should not terminate worker if abort listener is defined dedicated worker with Timeout", function (done) { + var pool = createPool(__dirname + "/workers/cleanup-abort.js", { maxWorkers: 1, - onCreateWorker: () => { - workerCount += 1; - }, - onTerminateWorker: function() { - workerCount -= 1; - } }); - - pool.exec('asyncTimeout', [],{ - onAbortStart: async function(args) { - // wait for the promise to resolve, - // then check pool stats - await args.taskResolver.promise; - var stats = pool.stats(); - assert.strictEqual(stats.busyWorkers, 0); - pool.terminate(); - done(); - } - }) + pool + .exec("asyncTimeout", [], { + onAbortStart: async function (args) { + // wait for the promise to resolve, + // then check pool stats + await args.taskResolver.promise; + var stats = pool.stats(); + assert.strictEqual(stats.busyWorkers, 0); + pool.terminate(); + done(); + }, + }) .timeout(200) .catch(function (err) { assert(err instanceof Promise.TimeoutError); @@ -1355,20 +1492,14 @@ describe('Pool', function () { }); }); - it('should not terminate worker if abort listener is defined dedicated worker with Cancellation', function (done) { - var workerCount = 0; - var pool = createPool(__dirname + '/workers/cleanup-abort.js', { + it("should not terminate worker if abort listener is defined dedicated worker with Cancellation", function (done) { + + var pool = createPool(__dirname + "/workers/cleanup-abort.js", { maxWorkers: 1, - onCreateWorker: () => { - workerCount += 1; - }, - onTerminateWorker: function() { - workerCount -= 1; - } }); - - let task = pool.exec('asyncTimeout', [], { - onAbortStart: async function(args) { + + let task = pool.exec("asyncTimeout", [], { + onAbortStart: async function (args) { // wait for the promise to resolve, // then check pool stats. await args.taskResolver.promise; @@ -1377,48 +1508,38 @@ describe('Pool', function () { assert.strictEqual(stats.busyWorkers, 0); pool.terminate(); done(); - } + }, }); // Wrap in a new promise which waits 50ms // in order to allow the function executing in the // worker to finish. - const _ = new Promise(function(resolve) { - setTimeout(function() { + new Promise(function (resolve) { + setTimeout(function () { resolve(); }, 50); - }).then(function() { - return task - .cancel() - .catch(function (err) { - assert(err instanceof Promise.CancellationError); - let stats = pool.stats(); - assert.strictEqual(workerCount, 1); - assert.strictEqual(stats.totalWorkers, 1); - assert.strictEqual(stats.idleWorkers, 0); - assert.strictEqual(stats.busyWorkers, 1); - }); + }).then(function () { + return task.cancel().catch(function (err) { + assert(err instanceof Promise.CancellationError); + let stats = pool.stats(); + assert.strictEqual(workerCount, 1); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 0); + assert.strictEqual(stats.busyWorkers, 1); + }); }); }); - - it('should not terminate worker if abort listener is defined inline worker with Timeout', function (done) { - var workerCount = 0; + it("should not terminate worker if abort listener is defined inline worker with Timeout", function (done) { var pool = createPool({ - onCreateWorker: () => { - workerCount += 1; - }, maxWorkers: 1, - onTerminateWorker: function() { - workerCount -= 1; - } }); function asyncTimeout() { var me = this; return new Promise(function () { - let timeout = setTimeout(function() { - resolve(); - }, 5000); + let timeout = setTimeout(function () { + resolve(); + }, 5000); me.worker.addAbortListener(function () { return new Promise(function (resolve) { clearTimeout(timeout); @@ -1428,47 +1549,41 @@ describe('Pool', function () { }); } - const _ = pool.exec(asyncTimeout, [], { - onAbortStart: async function(args) { - // wait for the promise to resolve, - // then check pool stats. - await args.taskResolver.promise; + pool + .exec(asyncTimeout, [], { + onAbortStart: async function (args) { + // wait for the promise to resolve, + // then check pool stats. + await args.taskResolver.promise; + var stats = pool.stats(); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + pool.terminate(); + done(); + }, + }) + .timeout(200) + .catch(function (err) { + assert(err instanceof Promise.TimeoutError); var stats = pool.stats(); + assert.strictEqual(workerCount, 1); assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 1); assert.strictEqual(stats.busyWorkers, 0); - pool.terminate(); - done(); - } - }) - .timeout(200) - .catch(function(err) { - assert(err instanceof Promise.TimeoutError); - var stats = pool.stats(); - assert.strictEqual(workerCount, 1); - assert.strictEqual(stats.totalWorkers, 1); - assert.strictEqual(stats.idleWorkers, 1); - assert.strictEqual(stats.busyWorkers, 0); - }); + }); }); - it('should not terminate worker if abort listener is defined inline worker with Cancellation', function (done) { - var workerCount = 0; + it("should not terminate worker if abort listener is defined inline worker with Cancellation", function (done) { var pool = createPool({ - onCreateWorker: () => { - workerCount += 1; - }, maxWorkers: 1, - onTerminateWorker: function() { - workerCount -= 1; - } }); function asyncTimeout() { var me = this; return new Promise(function (_resolve, reject) { - let timeout = setTimeout(function() { - reject(new Error("should not be thrown")); - }, 5000); + let timeout = setTimeout(function () { + reject(new Error("should not be thrown")); + }, 5000); me.worker.addAbortListener(function () { return new Promise(function (resolve) { clearTimeout(timeout); @@ -1478,89 +1593,78 @@ describe('Pool', function () { }); } - const task = pool.exec(asyncTimeout, [], { - onAbortStart: async function(args) { + const task = pool.exec(asyncTimeout, [], { + onAbortStart: async function (args) { // wait for the promise to resolve, // then check pool stats. - await args.taskResolver.promise; + await args.taskResolver.promise; var stats = pool.stats(); assert.strictEqual(stats.totalWorkers, 1); assert.strictEqual(stats.busyWorkers, 0); pool.terminate(); done(); - } - }) - const _ = new Promise(function(resolve) { - setTimeout(function() { + }, + }); + new Promise(function (resolve) { + setTimeout(function () { resolve(); }, 50); - }).then(function() { - return task - .cancel() - .catch(function(err) { + }).then(function () { + return task.cancel().catch(function (err) { assert(err instanceof Promise.TimeoutError); var stats = pool.stats(); assert.strictEqual(stats.busyWorkers, 1); assert.strictEqual(stats.totalWorkers, 1); }); }); - }); - it('should invoke termination timeout for abort handler if timeout period is reached from task timeout', function (done) { - var workerCount = 0; - var pool = createPool(__dirname + '/workers/cleanup-abort.js', { - maxWorkers: 2, - onCreateWorker: function() { - workerCount += 1; - }, + it("should invoke termination timeout for abort handler if timeout period is reached from task timeout", function (done) { + var pool = createPool(__dirname + "/workers/cleanup-abort.js", { + maxWorkers: 1, workerTerminateTimeout: 1000, - onAbortResolution: function(args) { - const stats = pool.stats(); - assert.strictEqual(stats.totalWorkers, 1); - assert.strictEqual(args.isTerminating, true); - pool.terminate(); - done(); - } }); - - const _ = pool.exec('asyncAbortHandlerNeverResolves', [], { - }) - .timeout(200) - .catch(function (err) { - assert(err instanceof Promise.TimeoutError); - var stats = pool.stats(); - assert.strictEqual(stats.busyWorkers, 1); - assert.strictEqual(stats.totalWorkers, 1); - }); + pool + .exec("asyncAbortHandlerNeverResolves", [], { + onAbortResolution: function (args) { + const stats = pool.stats(); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(args.isTerminating, true); + pool.terminate(); + done(); + } + }) + .timeout(200) + .catch(function (err) { + assert(err instanceof Promise.TimeoutError); + var stats = pool.stats(); + console.log(stats); + assert.strictEqual(stats.busyWorkers, 1); + assert.strictEqual(stats.totalWorkers, 1); + }); }); - - it('should invoke timeout for abort handler if timeout period is reached with Cancellation', function (done) { - var workerCount = 0; - var pool = createPool(__dirname + '/workers/cleanup-abort.js', { + it("should invoke timeout for abort handler if timeout period is reached with Cancellation", function (done) { + var pool = createPool(__dirname + "/workers/cleanup-abort.js", { maxWorkers: 1, - onCreateWorker: function() { - workerCount += 1; - }, - onAbortResolution: function(args) { + workerTerminateTimeout: 500, + }); + + const task = pool.exec("asyncAbortHandlerNeverResolves", [], { + onAbortResolution: function (args) { const stats = pool.stats(); assert.strictEqual(stats.totalWorkers, 1); assert.strictEqual(args.isTerminating, true); pool.terminate(); done(); }, - workerTerminateTimeout: 500, }); - - const task = pool.exec('asyncAbortHandlerNeverResolves', []) - - const _ = new Promise(function(resolve) { - resolve(); - }).then(function() { - return task.cancel() - .catch(function (err) { + + new Promise(function (resolve) { + resolve(); + }).then(function () { + return task.cancel().catch(function (err) { assert(err instanceof Promise.CancellationError); var stats = pool.stats(); assert(stats.busyWorkers === 1); @@ -1568,46 +1672,50 @@ describe('Pool', function () { }); }); - it('should trigger event stdout in abort handler', function (done) { - var pool = createPool(__dirname + '/workers/cleanup-abort.js', { + it("should trigger event stdout in abort handler", function (done) { + var pool = createPool(__dirname + "/workers/cleanup-abort.js", { maxWorkers: 1, - workerType: 'process', - emitStdStreams: true, + workerType: "process", + emitStdStreams: true, workerTerminateTimeout: 1000, }); - pool.exec('stdoutStreamOnAbort', [], { - on: function (payload) { - assert.strictEqual(payload.stdout.trim(), "Hello, world!"); - pool.terminate(); - done(); - } - }).timeout(50); + pool + .exec("stdoutStreamOnAbort", [], { + on: function (payload) { + assert.strictEqual(payload.stdout.trim(), "Hello, world!"); + pool.terminate(); + done(); + }, + }) + .timeout(50); }); - it('should trigger event in abort handler', function (done) { - var pool = createPool(__dirname + '/workers/cleanup-abort.js', { + it("should trigger event in abort handler", function (done) { + var pool = createPool(__dirname + "/workers/cleanup-abort.js", { maxWorkers: 1, - workerType: 'process', - emitStdStreams: true, + workerType: "process", + emitStdStreams: true, workerTerminateTimeout: 1000, }); - pool.exec('eventEmitOnAbort', [], { - on: function (payload) { - assert.strictEqual(payload.status, 'cleanup_success'); - pool.terminate(); - done(); - } - }).timeout(50); + pool + .exec("eventEmitOnAbort", [], { + on: function (payload) { + assert.strictEqual(payload.status, "cleanup_success"); + pool.terminate(); + done(); + }, + }) + .timeout(50); }); }); - describe('validate', () => { - it('should not allow unknown properties in forkOpts', function() { + describe("validate", () => { + it("should not allow unknown properties in forkOpts", function () { var pool = createPool({ - workerType: 'process', - forkOpts: { foo: 42 } + workerType: "process", + forkOpts: { foo: 42 }, }); assert.throws(function () { @@ -1615,26 +1723,28 @@ describe('Pool', function () { }, /Error: Object "forkOpts" contains an unknown option "foo"/); }); - it('should not allow inherited properties in forkOpts', function() { + it("should not allow inherited properties in forkOpts", function () { var pool = createPool({ - workerType: 'process' + workerType: "process", }); // prototype pollution - Object.prototype.env = { NODE_OPTIONS: '--inspect-brk=0.0.0.0:1337' }; + Object.prototype.env = { NODE_OPTIONS: "--inspect-brk=0.0.0.0:1337" }; assert.throws(function () { pool.exec(add, [3, 4]); }, /Error: Object "forkOpts" contains an inherited option "env"/); delete Object.prototype.env; - after(() => { delete Object.prototype.env }); + after(() => { + delete Object.prototype.env; + }); }); - it('should not allow unknown properties in workerThreadOpts', function() { + it("should not allow unknown properties in workerThreadOpts", function () { var pool = createPool({ - workerType: 'thread', - workerThreadOpts: { foo: 42 } + workerType: "thread", + workerThreadOpts: { foo: 42 }, }); assert.throws(function () { @@ -1642,20 +1752,22 @@ describe('Pool', function () { }, /Error: Object "workerThreadOpts" contains an unknown option "foo"/); }); - it('should not allow inherited properties in workerThreadOpts', function() { + it("should not allow inherited properties in workerThreadOpts", function () { var pool = createPool({ - workerType: 'thread' + workerType: "thread", }); // prototype pollution - Object.prototype.env = { NODE_OPTIONS: '--inspect-brk=0.0.0.0:1337' }; + Object.prototype.env = { NODE_OPTIONS: "--inspect-brk=0.0.0.0:1337" }; assert.throws(function () { pool.exec(add, [3, 4]); }, /Error: Object "workerThreadOpts" contains an inherited option "env"/); delete Object.prototype.env; - after(() => { delete Object.prototype.env }); + after(() => { + delete Object.prototype.env; + }); }); }); }); From 90c86ac60784afd930122b5f49eb9ca2b6c19493 Mon Sep 17 00:00:00 2001 From: Josh Long Date: Tue, 11 Feb 2025 21:05:58 -0500 Subject: [PATCH 22/30] update example --- examples/abort.js | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/examples/abort.js b/examples/abort.js index be78ea8f..4704e64e 100644 --- a/examples/abort.js +++ b/examples/abort.js @@ -8,10 +8,6 @@ const pool = workerpool.pool(__dirname + "/workers/cleanupAbort.js", { onCreateWorker: function (args) { console.log("New worker created"); }, - onAbortResolution: function (args) { - console.log("abort operation concluded for task:", args.id); - console.log("is worker terminating", args.isTerminating); - }, onTerminateWorker: function () { console.log("worker terminated"); }, @@ -26,11 +22,15 @@ const main = async () => { let abortResolverSuccess; await pool .exec("asyncTimeout", [], { - onAbortStart: async function (args) { - console.log( - "abort operation started from task timeout, in onAbortStart", - ); - abortResolverSuccess = args.taskResolver.promise; + onAbortResolution: function (args) { + console.log("abort operation concluded for task:", args.id); + console.log("is worker terminating", args.isTerminating); + }, + onAbortStart: async function (args) { + console.log( + "abort operation started from task timeout, in onAbortStart", + ); + abortResolverSuccess = args.taskResolver.promise; }, }) .timeout(100) @@ -53,6 +53,10 @@ const main = async () => { ); abortResolverFailure = args.taskResolver.promise; }, + onAbortResolution: function (args) { + console.log("abort operation concluded for task:", args.id); + console.log("is worker terminating", args.isTerminating); + } }) .cancel() .catch((err) => { From 518e7ad503c1c519d3e472662a4a80c24e284926 Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sun, 16 Feb 2025 18:09:00 -0500 Subject: [PATCH 23/30] cleanup WorkerHandler --- src/WorkerHandler.js | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 7967f088..02db303b 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -437,9 +437,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi me.tracking[id] = { id, resolver: Promise.defer(), - options: options ?? { - onAbortResolution: () => {}, - }, + options: options, }; // remove this task from the queue. It is already rejected (hence this @@ -450,7 +448,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi delete me.tracking[id]; var promise = me.terminateAndNotify(true) - .then(function(args) { + .then(function() { if (options) { options.onAbortResolution && options.onAbortResolution({ error: err, @@ -504,10 +502,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi * operations will occure. */ me.tracking[id].timeoutId = setTimeout(function() { - me.tracking[id].resolver.reject(error); - if (terminationHandler) { - return terminationHandler(); - } + me.tracking[id].resolver.reject(error); }, me.workerTerminateTimeout); return me.tracking[id].resolver.promise; From 700be64a727b37188dc93ce5c3f63512f59b0bfd Mon Sep 17 00:00:00 2001 From: Josh Long Date: Mon, 17 Feb 2025 08:52:57 -0500 Subject: [PATCH 24/30] rename taskResolver abortResolver change type to promise update execOptions update abort tests --- src/WorkerHandler.js | 2 +- src/types.js | 2 +- test/Pool.test.js | 28 +++++++++++++--------------- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 02db303b..8c6f9a77 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -487,7 +487,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi if (options) { options.onAbortStart && options.onAbortStart({ id, - taskResolver: me.tracking[id].resolver, + abortResolver: me.tracking[id].resolver.promise, }); } /** diff --git a/src/types.js b/src/types.js index 2fbaf5ec..cfef5d29 100644 --- a/src/types.js +++ b/src/types.js @@ -32,7 +32,7 @@ * @property {(payload: any) => unknown} [on] An event listener, to handle events sent by the worker for this execution. * @property {Object[]} [transfer] A list of transferable objects to send to the worker. Not supported by `process` worker type. See ./examples/transferableObjects.js for usage. * @property {(payload: {id: number, error: Error | undefined, isTerminating: boolean}) => void } [onAbortResolution] An event listener triggered when whenever an abort operation concludes. - * @property {(payload: {id number, taskResolver: any) => void } [onAbortStart] an event listener triggered when a task throws a Timeout or Canceltion exception and cleanup is starting. + * @property {(payload: {id number, AbortResolver: import('./Promise.js').Promise}) => void } [onAbortStart] an event listener triggered when a task throws a Timeout or Canceltion exception and cleanup is starting. */ /** diff --git a/test/Pool.test.js b/test/Pool.test.js index a2ee43b2..806fe6c8 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -1474,7 +1474,7 @@ describe("Pool", function () { onAbortStart: async function (args) { // wait for the promise to resolve, // then check pool stats - await args.taskResolver.promise; + await args.abortResolver; var stats = pool.stats(); assert.strictEqual(stats.busyWorkers, 0); pool.terminate(); @@ -1493,16 +1493,15 @@ describe("Pool", function () { }); it("should not terminate worker if abort listener is defined dedicated worker with Cancellation", function (done) { - - var pool = createPool(__dirname + "/workers/cleanup-abort.js", { - maxWorkers: 1, - }); + // don't set the 'minWorker' count so the pool does not recreate the worker upon termination + // doing so makes assertion on the pool recovering from the abort operation + var pool = createPool(__dirname + "/workers/cleanup-abort.js"); let task = pool.exec("asyncTimeout", [], { onAbortStart: async function (args) { // wait for the promise to resolve, // then check pool stats. - await args.taskResolver.promise; + await args.abortResolver; var stats = pool.stats(); assert.strictEqual(stats.totalWorkers, 1); assert.strictEqual(stats.busyWorkers, 0); @@ -1531,9 +1530,9 @@ describe("Pool", function () { }); it("should not terminate worker if abort listener is defined inline worker with Timeout", function (done) { - var pool = createPool({ - maxWorkers: 1, - }); + // don't set the 'minWorker' count so the pool does not recreate the worker upon termination + // doing so makes assertion on the pool recovering from the abort operation + var pool = createPool(); function asyncTimeout() { var me = this; return new Promise(function () { @@ -1554,7 +1553,7 @@ describe("Pool", function () { onAbortStart: async function (args) { // wait for the promise to resolve, // then check pool stats. - await args.taskResolver.promise; + await args.abortResolver; var stats = pool.stats(); assert.strictEqual(stats.totalWorkers, 1); assert.strictEqual(stats.busyWorkers, 0); @@ -1574,9 +1573,9 @@ describe("Pool", function () { }); it("should not terminate worker if abort listener is defined inline worker with Cancellation", function (done) { - var pool = createPool({ - maxWorkers: 1, - }); + // don't set the 'minWorker' count so the pool does not recreate the worker upon termination + // doing so makes assertion on the pool recovering from the abort operation + var pool = createPool(); function asyncTimeout() { var me = this; @@ -1597,7 +1596,7 @@ describe("Pool", function () { onAbortStart: async function (args) { // wait for the promise to resolve, // then check pool stats. - await args.taskResolver.promise; + await args.abortResolver; var stats = pool.stats(); assert.strictEqual(stats.totalWorkers, 1); assert.strictEqual(stats.busyWorkers, 0); @@ -1639,7 +1638,6 @@ describe("Pool", function () { .catch(function (err) { assert(err instanceof Promise.TimeoutError); var stats = pool.stats(); - console.log(stats); assert.strictEqual(stats.busyWorkers, 1); assert.strictEqual(stats.totalWorkers, 1); }); From d331a58b68565c3986c66ca2f9d35845f896ebcb Mon Sep 17 00:00:00 2001 From: Josh Long Date: Mon, 17 Feb 2025 10:36:07 -0500 Subject: [PATCH 25/30] remove event argument types for test compilation --- src/types.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/types.js b/src/types.js index cfef5d29..190e7842 100644 --- a/src/types.js +++ b/src/types.js @@ -31,8 +31,8 @@ * @typedef {Object} ExecOptions * @property {(payload: any) => unknown} [on] An event listener, to handle events sent by the worker for this execution. * @property {Object[]} [transfer] A list of transferable objects to send to the worker. Not supported by `process` worker type. See ./examples/transferableObjects.js for usage. - * @property {(payload: {id: number, error: Error | undefined, isTerminating: boolean}) => void } [onAbortResolution] An event listener triggered when whenever an abort operation concludes. - * @property {(payload: {id number, AbortResolver: import('./Promise.js').Promise}) => void } [onAbortStart] an event listener triggered when a task throws a Timeout or Canceltion exception and cleanup is starting. + * @property {(payload: any) => void} [onAbortResolution] An event listener triggered when whenever an abort operation concludes. + * @property {(payload: any) => void} [onAbortStart] An event listener triggered when a task throws a Timeout or Canceltion exception and cleanup is starting. */ /** From a1c7ccf93b7dcaff633f08282c6baf3333c6a474 Mon Sep 17 00:00:00 2001 From: Josh Long Date: Mon, 17 Feb 2025 11:53:00 -0500 Subject: [PATCH 26/30] update new event handler argument types --- src/types.js | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/types.js b/src/types.js index 190e7842..49ac6eda 100644 --- a/src/types.js +++ b/src/types.js @@ -27,12 +27,27 @@ * @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. */ + +/** + * @typedef {Object} AbortStartArgs + * @property {number} [id] identifier of the task which is starting its abort operation. + * @property {PromiseLike} [abortResolver] PromiseLike Object which resolves or rejects when the abort operation concludes. + * + */ + +/** + * @typedef {Object} AbortResolutionArgs + * @property {Error | undefined} [error] An error which occured during the abort operation. If an error did not occure the value will be `undefined`. + * @property {number} [id] identifier of the task. + * @property {boolean} [isTerminating] A flag which indicates the termination status of the worker which ececuted the task. + */ + /** * @typedef {Object} ExecOptions * @property {(payload: any) => unknown} [on] An event listener, to handle events sent by the worker for this execution. * @property {Object[]} [transfer] A list of transferable objects to send to the worker. Not supported by `process` worker type. See ./examples/transferableObjects.js for usage. - * @property {(payload: any) => void} [onAbortResolution] An event listener triggered when whenever an abort operation concludes. - * @property {(payload: any) => void} [onAbortStart] An event listener triggered when a task throws a Timeout or Canceltion exception and cleanup is starting. + * @property {(payload: any) => AbortResolutionArgs} [onAbortResolution] An event listener triggered when whenever an abort operation concludes. + * @property {(payload: AbortStartArgs) => void} [onAbortStart] An event listener triggered when a task throws a Timeout or Canceltion exception and cleanup is starting. */ /** From e44d73a0c3107fed2ad13d84a1edaeb6b099ea01 Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sun, 23 Feb 2025 15:05:40 -0500 Subject: [PATCH 27/30] ref: delete tracking task on abort timeout --- src/WorkerHandler.js | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 8c6f9a77..74c7366a 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -222,9 +222,9 @@ function handleEmittedStdPayload(handler, payload) { // TODO: refactor if parallel task execution gets added Object.values(handler.processing) .forEach(task => task?.options?.on(payload)); - + Object.values(handler.tracking) - .forEach(task => task?.options?.on(payload)); + .forEach(task => task?.options?.on(payload)); } /** @@ -306,7 +306,7 @@ function WorkerHandler(script, _options) { task.options.on(response.payload); } } - } + } } if (response.method === CLEANUP_METHOD_ID) { @@ -318,12 +318,12 @@ function WorkerHandler(script, _options) { trackedTask.resolver.reject(objectToError(response.error)); } else { me.tracking && clearTimeout(trackedTask.timeoutId); - trackedTask.resolver.resolve(trackedTask.result); + trackedTask.resolver.resolve(trackedTask.result); if (trackedTask.options) { trackedTask.options.onAbortResolution && trackedTask.options.onAbortResolution({ id, isTerminating: false, - }) + }) } } } @@ -340,7 +340,7 @@ function WorkerHandler(script, _options) { me.processing[id].resolver.reject(error); } } - + me.processing = Object.create(null); } @@ -439,7 +439,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi resolver: Promise.defer(), options: options, }; - + // remove this task from the queue. It is already rejected (hence this // catch event), and else it will be rejected again when terminating delete me.processing[id]; @@ -478,12 +478,12 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi return promise; }); - + me.worker.send({ id, - method: CLEANUP_METHOD_ID + method: CLEANUP_METHOD_ID }); - + if (options) { options.onAbortStart && options.onAbortStart({ id, @@ -494,15 +494,16 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi * Sets a timeout to reject the cleanup operation if the message sent to the worker * does not receive a response. see worker.tryCleanup for worker cleanup operations. * Here we use the workerTerminateTimeout as the worker will be terminated if the timeout does invoke. - * + * * We need this timeout in either case of a Timeout or Cancellation Error as if * the worker does not send a message we still need to give a window of time for a response. - * + * * The workerTermniateTimeout is used here if this promise is rejected the worker cleanup * operations will occure. */ me.tracking[id].timeoutId = setTimeout(function() { - me.tracking[id].resolver.reject(error); + me.tracking[id].resolver.reject(error); + delete me.tracking[id]; }, me.workerTerminateTimeout); return me.tracking[id].resolver.promise; From 0b1b9f9c41bbbcac8aead711fc0e270b9cc46765 Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sun, 23 Feb 2025 16:00:10 -0500 Subject: [PATCH 28/30] feat: add abortResolver passthrough to WorkerHandler from pool exec options --- src/WorkerHandler.js | 10 ++++++++-- src/types.js | 7 ++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 74c7366a..4967acd3 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -434,9 +434,15 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi var me = this; return resolver.promise.catch(function (error) { if (error instanceof Promise.CancellationError || error instanceof Promise.TimeoutError) { + let abortResolver; + if (options && options.abortResolver) + abortResolver = options.abortResolver; + else + abortResolver = Promise.defer(); + me.tracking[id] = { id, - resolver: Promise.defer(), + resolver: abortResolver, options: options, }; @@ -502,7 +508,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi * operations will occure. */ me.tracking[id].timeoutId = setTimeout(function() { - me.tracking[id].resolver.reject(error); + me.tracking[id].resolver.reject(error); delete me.tracking[id]; }, me.workerTerminateTimeout); diff --git a/src/types.js b/src/types.js index 49ac6eda..f9a149d0 100644 --- a/src/types.js +++ b/src/types.js @@ -24,7 +24,7 @@ * @property {import('worker_threads').WorkerOptions} [workerThreadOpts] Object`. For `worker` worker type. An object passed to [worker_threads.options](https://nodejs.org/api/worker_threads.html#new-workerfilename-options). * @property {boolean} [emitStdStreams] Capture stdout and stderr from the worker and emit them via the `stdout` and `stderr` events. Not supported by the `web` worker type. * @property { (arg: WorkerArg) => WorkerArg | undefined } [onCreateWorker] A callback that is called whenever a worker is being created. It can be used to allocate resources for each worker for example. Optionally, this callback can return an object containing one or more of the `WorkerArg` properties. The provided properties will be used to override the Pool properties for the worker being created. - * @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. + * @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated. */ @@ -39,7 +39,7 @@ * @typedef {Object} AbortResolutionArgs * @property {Error | undefined} [error] An error which occured during the abort operation. If an error did not occure the value will be `undefined`. * @property {number} [id] identifier of the task. - * @property {boolean} [isTerminating] A flag which indicates the termination status of the worker which ececuted the task. + * @property {boolean} [isTerminating] A flag which indicates the termination status of the worker which ececuted the task. */ /** @@ -47,7 +47,8 @@ * @property {(payload: any) => unknown} [on] An event listener, to handle events sent by the worker for this execution. * @property {Object[]} [transfer] A list of transferable objects to send to the worker. Not supported by `process` worker type. See ./examples/transferableObjects.js for usage. * @property {(payload: any) => AbortResolutionArgs} [onAbortResolution] An event listener triggered when whenever an abort operation concludes. - * @property {(payload: AbortStartArgs) => void} [onAbortStart] An event listener triggered when a task throws a Timeout or Canceltion exception and cleanup is starting. + * @property {(payload: AbortStartArgs) => void} [onAbortStart] An event listener triggered when a task throws a Timeout or Canceltion exception and cleanup is starting. + * @property {{ promise: import('./Promise.js').Promise; resolve: Function; reject: Function; }} [abortResolver] Defered Promise which resolves or rejects when the abort operation for the task concludes. */ /** From 9e9f7a9251eddbcd0d1499ae877c52d439d255dd Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sun, 23 Feb 2025 16:00:22 -0500 Subject: [PATCH 29/30] test: abortResolver tests --- test/Pool.test.js | 113 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 108 insertions(+), 5 deletions(-) diff --git a/test/Pool.test.js b/test/Pool.test.js index 806fe6c8..8fe2a21e 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -1529,6 +1529,48 @@ describe("Pool", function () { }); }); + it("should not terminate worker if abort listener is defined dedicated worker with Cancellation (abortResolver)", function (done) { + // don't set the 'minWorker' count so the pool does not recreate the worker upon termination + // doing so makes assertion on the pool recovering from the abort operation + var pool = createPool(__dirname + "/workers/cleanup-abort.js"); + + let abortResolver = Promise.defer(); + let task = pool.exec("asyncTimeout", [], { + onAbortStart: function (args) { + // wait for the promise to resolve, + // then check pool stats. + assert.doesNotReject(args.abortResolver); + }, + abortResolver + }); + + // Wrap in a new promise which waits 50ms + // in order to allow the function executing in the + // worker to finish. + new Promise(function (resolve) { + setTimeout(function () { + resolve(); + }, 50); + }).then(function () { + return task.cancel().catch(function (err) { + assert(err instanceof Promise.CancellationError); + let stats = pool.stats(); + assert.strictEqual(workerCount, 1); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 0); + assert.strictEqual(stats.busyWorkers, 1); + }); + }); + + abortResolver.promise.then(function() { + var stats = pool.stats(); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + pool.terminate(); + done(); + }) + }); + it("should not terminate worker if abort listener is defined inline worker with Timeout", function (done) { // don't set the 'minWorker' count so the pool does not recreate the worker upon termination // doing so makes assertion on the pool recovering from the abort operation @@ -1572,6 +1614,52 @@ describe("Pool", function () { }); }); + it("should not terminate worker if abort listener is defined inline worker with Timeout (abortHandler)", function (done) { + // don't set the 'minWorker' count so the pool does not recreate the worker upon termination + // doing so makes assertion on the pool recovering from the abort operation + var pool = createPool(); + function asyncTimeout() { + var me = this; + return new Promise(function (resolve) { + let timeout = setTimeout(function () { + resolve(); + }, 5000); + me.worker.addAbortListener(function () { + return new Promise(function (resolve) { + clearTimeout(timeout); + resolve(); + }); + }); + }); + } + + var abortResolver = Promise.defer(); + pool + .exec(asyncTimeout, [], { + onAbortStart: function (args) { + assert.doesNotReject(args.abortResolver) + }, + abortResolver + }) + .timeout(200) + .catch(function (err) { + assert(err instanceof Promise.TimeoutError); + var stats = pool.stats(); + assert.strictEqual(workerCount, 1); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.idleWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + }); + + abortResolver.promise.then(function () { + var stats = pool.stats(); + assert.strictEqual(stats.totalWorkers, 1); + assert.strictEqual(stats.busyWorkers, 0); + pool.terminate(); + done(); + }); + }); + it("should not terminate worker if abort listener is defined inline worker with Cancellation", function (done) { // don't set the 'minWorker' count so the pool does not recreate the worker upon termination // doing so makes assertion on the pool recovering from the abort operation @@ -1596,7 +1684,7 @@ describe("Pool", function () { onAbortStart: async function (args) { // wait for the promise to resolve, // then check pool stats. - await args.abortResolver; + await args.abortResolver; var stats = pool.stats(); assert.strictEqual(stats.totalWorkers, 1); assert.strictEqual(stats.busyWorkers, 0); @@ -1624,15 +1712,16 @@ describe("Pool", function () { workerTerminateTimeout: 1000, }); + let abortResolver = Promise.defer(); pool .exec("asyncAbortHandlerNeverResolves", [], { onAbortResolution: function (args) { const stats = pool.stats(); assert.strictEqual(stats.totalWorkers, 1); assert.strictEqual(args.isTerminating, true); - pool.terminate(); - done(); - } + + }, + abortResolver }) .timeout(200) .catch(function (err) { @@ -1641,14 +1730,21 @@ describe("Pool", function () { assert.strictEqual(stats.busyWorkers, 1); assert.strictEqual(stats.totalWorkers, 1); }); + + // resolve the test once the abort resolver rejects on the abort handler timeout + assert.rejects(abortResolver.promise).then(function() { + pool.terminate(); + done(); + }); }); - it("should invoke timeout for abort handler if timeout period is reached with Cancellation", function (done) { + it("should invoke timeout for abort handler if timeout period is reached with Cancellation", function (done) { var pool = createPool(__dirname + "/workers/cleanup-abort.js", { maxWorkers: 1, workerTerminateTimeout: 500, }); + var abortResolver = Promise.defer(); const task = pool.exec("asyncAbortHandlerNeverResolves", [], { onAbortResolution: function (args) { const stats = pool.stats(); @@ -1657,6 +1753,7 @@ describe("Pool", function () { pool.terminate(); done(); }, + abortResolver }); new Promise(function (resolve) { @@ -1668,6 +1765,12 @@ describe("Pool", function () { assert(stats.busyWorkers === 1); }); }); + + // resolve the test once the abort resolver rejects on the abort handler timeout + assert.rejects(abortResolver.promise).then(function() { + pool.terminate(); + done(); + }); }); it("should trigger event stdout in abort handler", function (done) { From 1023cc818d2ad274587c234af876a251262eb2de Mon Sep 17 00:00:00 2001 From: Josh Long Date: Sun, 23 Feb 2025 16:39:14 -0500 Subject: [PATCH 30/30] ref: rename AbortStartArgs.abortResolver to abortPromise --- src/WorkerHandler.js | 2 +- src/types.js | 2 +- test/Pool.test.js | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/WorkerHandler.js b/src/WorkerHandler.js index 4967acd3..9920605a 100644 --- a/src/WorkerHandler.js +++ b/src/WorkerHandler.js @@ -493,7 +493,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options, termi if (options) { options.onAbortStart && options.onAbortStart({ id, - abortResolver: me.tracking[id].resolver.promise, + abortPromise: me.tracking[id].resolver.promise, }); } /** diff --git a/src/types.js b/src/types.js index f9a149d0..205f054b 100644 --- a/src/types.js +++ b/src/types.js @@ -31,7 +31,7 @@ /** * @typedef {Object} AbortStartArgs * @property {number} [id] identifier of the task which is starting its abort operation. - * @property {PromiseLike} [abortResolver] PromiseLike Object which resolves or rejects when the abort operation concludes. + * @property {PromiseLike} [abortPromise] PromiseLike Object which resolves or rejects when the abort operation concludes. * */ diff --git a/test/Pool.test.js b/test/Pool.test.js index 8fe2a21e..23e4c99d 100644 --- a/test/Pool.test.js +++ b/test/Pool.test.js @@ -1474,7 +1474,7 @@ describe("Pool", function () { onAbortStart: async function (args) { // wait for the promise to resolve, // then check pool stats - await args.abortResolver; + await args.abortPromise; var stats = pool.stats(); assert.strictEqual(stats.busyWorkers, 0); pool.terminate(); @@ -1501,7 +1501,7 @@ describe("Pool", function () { onAbortStart: async function (args) { // wait for the promise to resolve, // then check pool stats. - await args.abortResolver; + await args.abortPromise; var stats = pool.stats(); assert.strictEqual(stats.totalWorkers, 1); assert.strictEqual(stats.busyWorkers, 0); @@ -1539,7 +1539,7 @@ describe("Pool", function () { onAbortStart: function (args) { // wait for the promise to resolve, // then check pool stats. - assert.doesNotReject(args.abortResolver); + assert.doesNotReject(args.abortPromise); }, abortResolver }); @@ -1595,7 +1595,7 @@ describe("Pool", function () { onAbortStart: async function (args) { // wait for the promise to resolve, // then check pool stats. - await args.abortResolver; + await args.abortPromise; var stats = pool.stats(); assert.strictEqual(stats.totalWorkers, 1); assert.strictEqual(stats.busyWorkers, 0); @@ -1637,7 +1637,7 @@ describe("Pool", function () { pool .exec(asyncTimeout, [], { onAbortStart: function (args) { - assert.doesNotReject(args.abortResolver) + assert.doesNotReject(args.abortPromise) }, abortResolver }) @@ -1684,7 +1684,7 @@ describe("Pool", function () { onAbortStart: async function (args) { // wait for the promise to resolve, // then check pool stats. - await args.abortResolver; + await args.abortPromise; var stats = pool.stats(); assert.strictEqual(stats.totalWorkers, 1); assert.strictEqual(stats.busyWorkers, 0);