Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/abort listener #448

Open
wants to merge 44 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
eef94c6
feat: add abort listener
joshLong145 Jun 11, 2024
a00079e
test: add abort listener test
joshLong145 Jun 11, 2024
0085ded
doc: add comments
joshLong145 Jun 11, 2024
5207481
test: fix abort test case and add comment
joshLong145 Jun 12, 2024
138dcb6
ref: remove global handler
joshLong145 Jun 21, 2024
36e5256
ref: add abort handler to worker api binded to method
joshLong145 Jun 21, 2024
a6bba86
fix: abort timeout option
joshLong145 Jun 25, 2024
313f8d3
test: repro tests
joshLong145 Jun 25, 2024
284e03d
chore: update embedded
joshLong145 Jun 25, 2024
d9f023b
dev(worker): add message passing for communicating clean up operations
joshLong145 Aug 3, 2024
1afd385
test: add tests
joshLong145 Aug 3, 2024
7982f99
chore: update embedded worker
joshLong145 Aug 3, 2024
70cd321
dev: Implemnet message passing for timeout and cancellation cleanup o…
joshLong145 Aug 5, 2024
3fbb219
test: update test cases
joshLong145 Aug 5, 2024
cf4a15e
chore update embedded worker
joshLong145 Aug 5, 2024
507a9d6
Merge branch 'master' of github.com:josdejong/workerpool into feat/ab…
joshLong145 Aug 5, 2024
39bd5bb
ref: move worker api to static obect definition
joshLong145 Aug 5, 2024
26aba2a
chore: update codegen
joshLong145 Aug 5, 2024
7a23cf2
ref: remove delay in TimeoutError
joshLong145 Aug 6, 2024
6e8e525
chore: update embedded worker
joshLong145 Aug 6, 2024
ac89dfc
ref: cleanup per review comments
joshLong145 Aug 28, 2024
fdbf443
test: updates per PR comments
joshLong145 Aug 30, 2024
dc22cdd
test: change max worker count for testing worker reuse
joshLong145 Aug 30, 2024
6711fdf
docs: add example and README info for new abort handler feature
joshLong145 Sep 1, 2024
2c38089
ref: comments and logging fixes
joshLong145 Sep 2, 2024
357af05
ref: fix terminationHandler override
joshLong145 Sep 4, 2024
d290b1e
docs: fix typo in comment
joshLong145 Sep 4, 2024
64e3b9b
test: update test cases
joshLong145 Sep 4, 2024
c68cc9b
ref: migrate timeout set to event handler to prevent infinite recursi…
joshLong145 Sep 10, 2024
eb40937
test: add only to timeout test for reproduction test
joshLong145 Sep 14, 2024
003adf6
test: remove forbid-only for test
joshLong145 Sep 14, 2024
bf5676e
ref: revert changes for isolation test
joshLong145 Sep 14, 2024
52c66b6
ref: add termination case for tracked tasks on pool termination and f…
joshLong145 Sep 14, 2024
9f2e281
chore: update code gen
joshLong145 Sep 14, 2024
abddfe3
Merge branch 'master' of github.com:josdejong/workerpool into feat/ab…
joshLong145 Sep 18, 2024
bcb62ce
ref: remove returning itself on promise settling causing recursive calls
joshLong145 Sep 29, 2024
0453c88
Merge branch 'master' of github.com:josdejong/workerpool into feat/ab…
joshLong145 Oct 2, 2024
877d88a
ref: revert timeout change
joshLong145 Oct 2, 2024
1ffe80a
dev: review comments
joshLong145 Oct 2, 2024
9fa09af
ref: revert `after` hook change to `afterEach`
joshLong145 Oct 3, 2024
35a08d2
ref: add error throw to promise reject and remove call to `worker.exit`
joshLong145 Oct 3, 2024
0e84084
chore: update embeddedWorker
joshLong145 Oct 3, 2024
8692d53
docs: fix comment on timeout in worker handler
joshLong145 Oct 3, 2024
a286a89
chore: update embeddedWorker
joshLong145 Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/generated/embeddedWorker.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ exports.Promise = Promise;

exports.Transfer = require('./transfer');

/**
* Registers a handler for worker timeout / cancelation
* which will be run to cleanup a given worker.
* Allowing cleanup to resolve with out worker termination
* @param {() => Promise<void>} listener
*/
exports.addAbortListener = function (listener) {
joshLong145 marked this conversation as resolved.
Show resolved Hide resolved
var worker = require('./worker');
worker.addAbortListener(listener);
}

exports.platform = platform;
exports.isMainThread = isMainThread;
exports.cpus = cpus;
34 changes: 27 additions & 7 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ worker.methods.methods = function methods() {
*/
worker.terminationHandler = undefined;

/**
* abort listener for handling worker timeout / canceling
*/
worker.onAbort = undefined;

/**
* Cleanup and exit the worker.
* @param {Number} code
Expand All @@ -121,16 +126,28 @@ worker.cleanupAndExit = function(code) {
var _exit = function() {
worker.exit(code);
}

if(!worker.terminationHandler) {
return _exit();
var _abort = function() {
worker.onAbort = undefined;
}

var result = worker.terminationHandler(code);
if (isPromise(result)) {
result.then(_exit, _exit);
if (worker.onAbort) {
let handler = worker.onAbort();
if(isPromise(handler)) {
handler.then(_abort);
josdejong marked this conversation as resolved.
Show resolved Hide resolved
} else {
_abort();
}
} else {
_exit();
if(!worker.terminationHandler) {
return _exit();
}

var result = worker.terminationHandler(code);
if (isPromise(result)) {
result.then(_exit, _exit);
} else {
_exit();
}
}
}

Expand Down Expand Up @@ -253,4 +270,7 @@ worker.emit = function (payload) {
if (typeof exports !== 'undefined') {
exports.add = worker.register;
exports.emit = worker.emit;
exports.addAbortListener = function(listener) {
worker.onAbort = listener;
josdejong marked this conversation as resolved.
Show resolved Hide resolved
};
}
16 changes: 16 additions & 0 deletions test/Pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1315,6 +1315,22 @@ describe('Pool', function () {
});
});

it('should not terminate worker if abort listener is defined', function () {
josdejong marked this conversation as resolved.
Show resolved Hide resolved
var pool = createPool(__dirname + '/workers/cleanup-abort.js');

pool.exec('asyncTimeout', [])
.timeout(200)
.catch((err) => {
assert(err instanceof Promise.TimeoutError);
// We should still have a busy worker since the worker should still be
// active and the promise chain for the timeout has yet to resolve
// which keeps the task active.
assert(pool.stats().busyWorkers === 1);
josdejong marked this conversation as resolved.
Show resolved Hide resolved
pool.terminate();
done();
});
});

describe('validate', () => {
it('should not allow unknown properties in forkOpts', function() {
var pool = createPool({
Expand Down
21 changes: 21 additions & 0 deletions test/workers/cleanup-abort.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
var workerpool = require("../..");

function asyncTimeout() {
return new Promise(function (resolve) {
josdejong marked this conversation as resolved.
Show resolved Hide resolved

let timeout = setTimeout(function () {
resolve();
}, 5000);
workerpool.addAbortListener(function () {
clearTimeout(timeout);
resolve();
});
});
}

// create a worker and register public functions
workerpool.worker(
{
asyncTimeout: asyncTimeout,
}
);