Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add signal reason support #403

Merged
merged 3 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ interface AbortSignalEventTarget {
name : 'abort',
listener : () => void) => void;
aborted? : boolean;
reason?: unknown;
}
interface AbortSignalEventEmitter {
off : (name : 'abort', listener : () => void) => void;
Expand All @@ -68,9 +69,12 @@ function onabort (abortSignal : AbortSignalAny, listener : () => void) {
abortSignal.once('abort', listener);
}
}

class AbortError extends Error {
constructor () {
super('The task has been aborted');
constructor (reason?: AbortSignalEventTarget['reason']) {
// TS does not recognizes the cause clause
// @ts-expect-error
super('The task has been aborted', { cause: reason });
}

get name () { return 'AbortError'; }
Expand Down Expand Up @@ -821,13 +825,13 @@ class ThreadPool {
// If the AbortSignal has an aborted property and it's truthy,
// reject immediately.
if ((signal as AbortSignalEventTarget).aborted) {
return Promise.reject(new AbortError());
return Promise.reject(new AbortError((signal as AbortSignalEventTarget).reason));
}
taskInfo.abortListener = () => {
// Call reject() first to make sure we always reject with the AbortError
// if the task is aborted, not with an Error from the possible
// thread termination below.
reject(new AbortError());
reject(new AbortError((signal as AbortSignalEventTarget).reason));

if (taskInfo.workerInfo !== null) {
// Already running: We cancel the Worker this is running on.
Expand Down Expand Up @@ -949,7 +953,7 @@ class ThreadPool {
for (let i = 0; i < skipQueueLength; i++) {
const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo;
if (taskInfo.workerInfo === null) {
taskInfo.done(new AbortError());
taskInfo.done(new AbortError('pool is closed'));
} else {
this.skipQueue.push(taskInfo);
}
Expand All @@ -959,7 +963,7 @@ class ThreadPool {
for (let i = 0; i < taskQueueLength; i++) {
const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo;
if (taskInfo.workerInfo === null) {
taskInfo.done(new AbortError());
taskInfo.done(new AbortError('pool is closed'));
} else {
this.taskQueue.push(taskInfo);
}
Expand Down
49 changes: 48 additions & 1 deletion test/abort-task.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { AbortController } from 'abort-controller';
import { EventEmitter } from 'events';
import Piscina from '..';
import { test } from 'tap';
Expand Down Expand Up @@ -184,3 +183,51 @@ test('task with AbortSignal cleans up properly', async ({ equal }) => {

await pool.runTask('1+1', controller.signal);
});

test('aborted AbortSignal rejects task immediately (with reason)', async ({ match, equal }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/move.ts')
});
const customReason = new Error('custom reason');

const controller = new AbortController();
controller.abort(customReason);
equal(controller.signal.aborted, true);
equal(controller.signal.reason, customReason);

// The data won't be moved because the task will abort immediately.
const data = new Uint8Array(new SharedArrayBuffer(4));

try {
await pool.run(data, { transferList: [data.buffer], signal: controller.signal });
} catch (error) {
equal(error.message, 'The task has been aborted');
match(error.cause, customReason);
}

equal(data.length, 4);
});

test('tasks can be aborted through AbortController while running', async ({ equal, match }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
});
const reason = new Error('custom reason');

const buf = new Int32Array(new SharedArrayBuffer(4));
const abortController = new AbortController();

try {
const promise = pool.run(buf, { signal: abortController.signal });

Atomics.wait(buf, 0, 0);
equal(Atomics.load(buf, 0), 1);

abortController.abort(reason);

await promise;
} catch (error) {
equal(error.message, 'The task has been aborted');
match(error.cause, reason);
}
});
14 changes: 10 additions & 4 deletions test/pool-close.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ test('close()', async (t) => {

setImmediate(() => {
t.resolves(pool.close(), 'close is resolved when running tasks are completed');
t.rejects(pool.run({ time: 1000 }), /The task has been aborted/, 'abort any task enqueued during close');
t.resolves(pool.run({ time: 1000 }).then(null, err => {
t.equal(err.message, 'The task has been aborted');
t.equal(err.cause, 'queue is closing up');
}));
});

await t.resolves(pool.run({ time: 100 }), 'complete running task');
Expand All @@ -39,17 +42,20 @@ test('close()', async (t) => {

test('close({force: true})', async (t) => {
t.test('queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 1 });

const task1 = pool.run({ time: 100 });
const task1 = pool.run({ time: 1000 });
const task2 = pool.run({ time: 200 });

setImmediate(() => t.resolves(pool.close({ force: true }), 'close is resolved when all running tasks are completed'));

await Promise.all([
t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
t.resolves(task1, 'complete running task'),
t.rejects(task2, /The task has been aborted/, 'abort task that are not started yet')
t.resolves(task2.then(null, err => {
t.equal(err.message, 'The task has been aborted');
t.equal(err.cause, 'pool is closed');
}))
]);
});

Expand Down