Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/abort listener execution #484

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
b274b8e
feat: add std stream event emit for abort handlers
joshLong145 Nov 30, 2024
ba38228
ref: remove log
joshLong145 Nov 30, 2024
f9b4919
feat: add event emit from abort handler
joshLong145 Dec 2, 2024
1f235f3
ref: lower test timeouts
joshLong145 Dec 2, 2024
9ac04b4
ref: change option assignment to tracked task
joshLong145 Dec 3, 2024
650128d
fmt
joshLong145 Dec 4, 2024
85de985
ref: change abort listener execution to fuffil on first promise to re…
joshLong145 Dec 7, 2024
18e4853
feat: add tracking tasks to busy check
joshLong145 Dec 8, 2024
0685ae3
feat: add event handlers for abort start and resolution
joshLong145 Dec 11, 2024
4a892d2
test: abort test refactor (wip)
joshLong145 Dec 11, 2024
e824a79
ref: add default for tracking task options
joshLong145 Dec 11, 2024
efb12c6
ref: rename event handler
joshLong145 Dec 11, 2024
8987fa3
ref: refactor std handler
joshLong145 Dec 12, 2024
f5c38fd
ref: move tracking task deletion after task check
joshLong145 Dec 12, 2024
622ed7d
Merge branch 'feat/std-streams-task-abort' of github.com:joshLong145/…
joshLong145 Dec 12, 2024
9d85663
ref: move abort resolver handler call to worker termination if rejecting
joshLong145 Dec 13, 2024
7c1e099
dev: refactor abort example to show implemented changes
joshLong145 Dec 13, 2024
29c2a51
Merge branch 'master' of github.com:josdejong/workerpool into fix/abo…
joshLong145 Jan 1, 2025
ad9534d
ref: add default for abort resolution handler
joshLong145 Jan 1, 2025
7d24650
move to terminationHandler over pool controlled worker cleanup
joshLong145 Feb 6, 2025
0e0448a
clarify example
joshLong145 Feb 6, 2025
e183786
refactor event listeners for abort op lifecycle
joshLong145 Feb 9, 2025
7e9fa7e
cleanup tests
joshLong145 Feb 9, 2025
90c86ac
update example
joshLong145 Feb 12, 2025
518e7ad
cleanup WorkerHandler
joshLong145 Feb 16, 2025
700be64
rename taskResolver abortResolver change type to promise
joshLong145 Feb 17, 2025
d331a58
remove event argument types for test compilation
joshLong145 Feb 17, 2025
a1c7ccf
update new event handler argument types
joshLong145 Feb 17, 2025
e44d73a
ref: delete tracking task on abort timeout
joshLong145 Feb 23, 2025
0b1b9f9
feat: add abortResolver passthrough to WorkerHandler from pool exec o…
joshLong145 Feb 23, 2025
9e9f7a9
test: abortResolver tests
joshLong145 Feb 23, 2025
1023cc8
ref: rename AbortStartArgs.abortResolver to abortPromise
joshLong145 Feb 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 56 additions & 27 deletions examples/abort.js
Original file line number Diff line number Diff line change
@@ -1,46 +1,75 @@
const workerpool = require("..");

var workerCount = 0;

// create a worker pool
const pool = workerpool.pool(__dirname + "/workers/cleanupAbort.js", {
// maximum time to wait for worker to cleanup it's resources
// on termination before forcefully stopping the worker
workerTerminateTimeout: 1000,
onCreateWorker: (args) => {
onCreateWorker: function (args) {
console.log("New worker created");
workerCount += 1;
}
},
onTerminateWorker: function () {
console.log("worker terminated");
},
maxWorkers: 1,
});

function add (a, b) {
return a + b;
function add(a, b) {
return a + b;
}

const main = async () => {
const cleanedUpTask = pool.exec('asyncTimeout', []).timeout(1_000).catch((err) => {
console.log("task timeout");
console.log("timeout occured: ", err.message);
console.log("worker count ", workerCount);
return pool.exec(add, [1, 2]).then((sum) => {
console.log('add result', sum);
console.log("worker count: ", workerCount);
});
let abortResolverSuccess;
await pool
.exec("asyncTimeout", [], {
onAbortResolution: function (args) {
console.log("abort operation concluded for task:", args.id);
console.log("is worker terminating", args.isTerminating);
},
onAbortStart: async function (args) {
console.log(
"abort operation started from task timeout, in onAbortStart",
);
abortResolverSuccess = args.taskResolver.promise;
},
})
.timeout(100)
.catch((err) => {
console.log("timeout handled: ", err.message);
});
await cleanedUpTask;

const canceledTask = pool.exec('asyncAbortHandlerNeverResolves').cancel().catch((err) => {
console.log("task canceled");
console.log("cancel occured: ", err.message);
console.log("worker count ", workerCount);
return pool.exec(add, [1, 2]).then((sum) => {
console.log('add result', sum);
console.log("worker count: ", workerCount);
});

await abortResolverSuccess.catch((err) => {
console.log("", err);
});

console.log("pool status after abort operation:", pool.stats());

let abortResolverFailure;
await pool
.exec("asyncAbortHandlerNeverResolves", [], {
onAbortStart: function (args) {
console.log(
"abort operation started from task cancel, in onAbortStart",
);
abortResolverFailure = args.taskResolver.promise;
},
onAbortResolution: function (args) {
console.log("abort operation concluded for task:", args.id);
console.log("is worker terminating", args.isTerminating);
}
})
.cancel()
.catch((err) => {
console.log("task canceled");
console.log("cancel occured: ", err.message);
});

await canceledTask;
}
await abortResolverFailure.catch((e) => {
console.log("cancelation handled: ", e.message);
});

console.log("final pool stats", pool.stats());
// we dont need to terminate the pool, since all workers should be terminated by this point even though there is a handler.
};

main();
17 changes: 9 additions & 8 deletions src/Pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,18 +229,18 @@ Pool.prototype._next = function () {
// get the first task from the queue
var me = this;
var task = this.tasks.shift();

var terminationHandler = function() {
// if the worker crashed and terminated, remove it from the pool
if (worker.terminated) {
return me._removeWorker(worker);
}
}
// check if the task is still pending (and not cancelled -> promise rejected)
if (task.resolver.promise.pending) {
// send the request to the worker
var promise = worker.exec(task.method, task.params, task.resolver, task.options)
var promise = worker.exec(task.method, task.params, task.resolver, task.options, terminationHandler)
.then(me._boundNext)
.catch(function () {
// if the worker crashed and terminated, remove it from the pool
if (worker.terminated) {
return me._removeWorker(worker);
}
}).then(function() {
.then(function() {
me._next(); // trigger next task in the queue
});

Expand Down Expand Up @@ -430,6 +430,7 @@ Pool.prototype._createWorkerHandler = function () {
workerType: this.workerType,
workerTerminateTimeout: this.workerTerminateTimeout,
emitStdStreams: this.emitStdStreams,
onAbortResolution: this.onAbortResolution
});
}

Expand Down
90 changes: 67 additions & 23 deletions src/WorkerHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ function handleEmittedStdPayload(handler, payload) {
// TODO: refactor if parallel task execution gets added
Object.values(handler.processing)
.forEach(task => task?.options?.on(payload));

Object.values(handler.tracking)
.forEach(task => task?.options?.on(payload));
.forEach(task => task?.options?.on(payload));
}

/**
Expand Down Expand Up @@ -289,7 +289,7 @@ function WorkerHandler(script, _options) {
me.terminate();
}

// resolve the task's promise
// resolve the task's promis
if (response.error) {
task.resolver.reject(objectToError(response.error));
}
Expand All @@ -306,21 +306,27 @@ function WorkerHandler(script, _options) {
task.options.on(response.payload);
}
}
}
}
}

if (response.method === CLEANUP_METHOD_ID) {
var trackedTask = me.tracking[response.id];
if (trackedTask !== undefined) {
delete me.tracking[id];
if (response.error) {
clearTimeout(trackedTask.timeoutId);
trackedTask.resolver.reject(objectToError(response.error))
trackedTask.resolver.reject(objectToError(response.error));
} else {
me.tracking && clearTimeout(trackedTask.timeoutId);
trackedTask.resolver.resolve(trackedTask.result);
trackedTask.resolver.resolve(trackedTask.result);
if (trackedTask.options) {
trackedTask.options.onAbortResolution && trackedTask.options.onAbortResolution({
id,
isTerminating: false,
})
}
}
}
delete me.tracking[id];
}
}
});
Expand All @@ -334,7 +340,7 @@ function WorkerHandler(script, _options) {
me.processing[id].resolver.reject(error);
}
}

me.processing = Object.create(null);
}

Expand Down Expand Up @@ -390,7 +396,7 @@ WorkerHandler.prototype.methods = function () {
* @param {import('./types.js').ExecOptions} [options]
* @return {Promise.<*, Error>} result
*/
WorkerHandler.prototype.exec = function(method, params, resolver, options) {
WorkerHandler.prototype.exec = function(method, params, resolver, options, terminationHandler) {
if (!resolver) {
resolver = Promise.defer();
}
Expand Down Expand Up @@ -428,12 +434,18 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) {
var me = this;
return resolver.promise.catch(function (error) {
if (error instanceof Promise.CancellationError || error instanceof Promise.TimeoutError) {
let abortResolver;
if (options && options.abortResolver)
abortResolver = options.abortResolver;
else
abortResolver = Promise.defer();

me.tracking[id] = {
id,
resolver: Promise.defer(),
resolver: abortResolver,
options: options,
};

// remove this task from the queue. It is already rejected (hence this
// catch event), and else it will be rejected again when terminating
delete me.processing[id];
Expand All @@ -442,39 +454,71 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) {
delete me.tracking[id];

var promise = me.terminateAndNotify(true)
.then(function() {
throw err;
.then(function() {
if (options) {
options.onAbortResolution && options.onAbortResolution({
error: err,
id,
isTerminating: true
});
}
if (terminationHandler) {
return terminationHandler();
} else {
throw err;
}
}, function(err) {
throw err;
if (options) {
options.onAbortResolution && options.onAbortResolution({
error: err,
id,
isTerminating: true
});
}
if (terminationHandler) {
return terminationHandler();
} else {
throw err;
}
});

return promise;
});

me.worker.send({
id,
method: CLEANUP_METHOD_ID
method: CLEANUP_METHOD_ID
});



if (options) {
options.onAbortStart && options.onAbortStart({
id,
abortPromise: me.tracking[id].resolver.promise,
});
}
/**
* Sets a timeout to reject the cleanup operation if the message sent to the worker
* does not receive a response. see worker.tryCleanup for worker cleanup operations.
* Here we use the workerTerminateTimeout as the worker will be terminated if the timeout does invoke.
*
*
* We need this timeout in either case of a Timeout or Cancellation Error as if
* the worker does not send a message we still need to give a window of time for a response.
*
*
* The workerTermniateTimeout is used here if this promise is rejected the worker cleanup
* operations will occure.
*/
me.tracking[id].timeoutId = setTimeout(function() {
me.tracking[id].resolver.reject(error);
me.tracking[id].resolver.reject(error);
delete me.tracking[id];
}, me.workerTerminateTimeout);

return me.tracking[id].resolver.promise;
} else {
throw error;
if (terminationHandler) {
return terminationHandler();
} else {
throw error;
}
}
})
};
Expand All @@ -484,7 +528,7 @@ WorkerHandler.prototype.exec = function(method, params, resolver, options) {
* @return {boolean} Returns true if the worker is busy
*/
WorkerHandler.prototype.busy = function () {
return this.cleaning || Object.keys(this.processing).length > 0;
return this.cleaning || Object.keys(this.processing).length > 0 || Object.keys(this.tracking).length > 0;
};

/**
Expand Down
18 changes: 18 additions & 0 deletions src/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,28 @@
* @property { (arg: WorkerArg) => void } [onTerminateWorker] A callback that is called whenever a worker is being terminated. It can be used to release resources that might have been allocated for this specific worker. The callback is passed as argument an object as described for `onCreateWorker`, with each property sets with the value for the worker being terminated.
*/


/**
* @typedef {Object} AbortStartArgs
* @property {number} [id] identifier of the task which is starting its abort operation.
* @property {PromiseLike<void>} [abortPromise] PromiseLike Object which resolves or rejects when the abort operation concludes.
*
*/

/**
* @typedef {Object} AbortResolutionArgs
* @property {Error | undefined} [error] An error which occured during the abort operation. If an error did not occure the value will be `undefined`.
* @property {number} [id] identifier of the task.
* @property {boolean} [isTerminating] A flag which indicates the termination status of the worker which ececuted the task.
*/

/**
* @typedef {Object} ExecOptions
* @property {(payload: any) => unknown} [on] An event listener, to handle events sent by the worker for this execution.
* @property {Object[]} [transfer] A list of transferable objects to send to the worker. Not supported by `process` worker type. See ./examples/transferableObjects.js for usage.
* @property {(payload: any) => AbortResolutionArgs} [onAbortResolution] An event listener triggered when whenever an abort operation concludes.
* @property {(payload: AbortStartArgs) => void} [onAbortStart] An event listener triggered when a task throws a Timeout or Canceltion exception and cleanup is starting.
* @property {{ promise: import('./Promise.js').Promise<any, Error>; resolve: Function; reject: Function; }} [abortResolver] Defered Promise which resolves or rejects when the abort operation for the task concludes.
*/

/**
Expand Down
10 changes: 5 additions & 5 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ worker.cleanup = function(requestId) {
error: convertError(new Error('Worker terminating')),
});

// If there are no handlers registered, reject the promise with an error as we want the handler to be notified
// If there are no handlers registered, as we want the handler to be notified
// that cleanup should begin and the handler should be GCed.
return new Promise(function(resolve) { resolve(); });
}
Expand Down Expand Up @@ -236,10 +236,10 @@ worker.cleanup = function(requestId) {
// - Reject if one or more handlers reject
// Upon one of the above cases a message will be sent to the handler with the result of the handler execution
// which will either kill the worker if the result contains an error, or
return Promise.all([
settlePromise,
timeoutPromise
]).then(function() {
return new Promise(function (resolve, reject) {
settlePromise.then(resolve, reject);
timeoutPromise.then(resolve, reject);
}).then(function() {
worker.send({
id: requestId,
method: CLEANUP_METHOD_ID,
Expand Down
Loading