Skip to content

Commit

Permalink
test: add testing for abort reason
Browse files Browse the repository at this point in the history
  • Loading branch information
metcoder95 committed Sep 15, 2023
1 parent 1420904 commit 71efc92
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 8 deletions.
7 changes: 4 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ function onabort (abortSignal : AbortSignalAny, listener : () => void) {
abortSignal.once('abort', listener);
}
}

class AbortError extends Error {
constructor (reason?: AbortSignalEventTarget['reason']) {
// TS does not recognizes the cause clause
Expand Down Expand Up @@ -830,7 +831,7 @@ class ThreadPool {
// Call reject() first to make sure we always reject with the AbortError
// if the task is aborted, not with an Error from the possible
// thread termination below.
reject(new AbortError((signal as AbortSignalEventTarget)));
reject(new AbortError((signal as AbortSignalEventTarget).reason));

if (taskInfo.workerInfo !== null) {
// Already running: We cancel the Worker this is running on.
Expand Down Expand Up @@ -952,7 +953,7 @@ class ThreadPool {
for (let i = 0; i < skipQueueLength; i++) {
const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo;
if (taskInfo.workerInfo === null) {
taskInfo.done(new AbortError('pool is closing up'));
taskInfo.done(new AbortError('pool is closed'));
} else {
this.skipQueue.push(taskInfo);
}
Expand All @@ -962,7 +963,7 @@ class ThreadPool {
for (let i = 0; i < taskQueueLength; i++) {
const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo;
if (taskInfo.workerInfo === null) {
taskInfo.done(new AbortError('pool is closing up'));
taskInfo.done(new AbortError('pool is closed'));
} else {
this.taskQueue.push(taskInfo);
}
Expand Down
49 changes: 48 additions & 1 deletion test/abort-task.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { AbortController } from 'abort-controller';
import { EventEmitter } from 'events';
import Piscina from '..';
import { test } from 'tap';
Expand Down Expand Up @@ -184,3 +183,51 @@ test('task with AbortSignal cleans up properly', async ({ equal }) => {

await pool.runTask('1+1', controller.signal);
});

test('aborted AbortSignal rejects task immediately (with reason)', async ({ match, equal }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/move.ts')
});
const customReason = new Error('custom reason');

const controller = new AbortController();
controller.abort(customReason);
equal(controller.signal.aborted, true);
equal(controller.signal.reason, customReason);

// The data won't be moved because the task will abort immediately.
const data = new Uint8Array(new SharedArrayBuffer(4));

try {
await pool.run(data, { transferList: [data.buffer], signal: controller.signal });
} catch (error) {
equal(error.message, 'The task has been aborted');
match(error.cause, customReason);
}

equal(data.length, 4);
});

test('tasks can be aborted through AbortController while running', async ({ equal, match }) => {
const pool = new Piscina({
filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
});
const reason = new Error('custom reason');

const buf = new Int32Array(new SharedArrayBuffer(4));
const abortController = new AbortController();

try {
const promise = pool.run(buf, { signal: abortController.signal });

Atomics.wait(buf, 0, 0);
equal(Atomics.load(buf, 0), 1);

abortController.abort(reason);

await promise;
} catch (error) {
equal(error.message, 'The task has been aborted');
match(error.cause, reason);
}
});
14 changes: 10 additions & 4 deletions test/pool-close.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ test('close()', async (t) => {

setImmediate(() => {
t.resolves(pool.close(), 'close is resolved when running tasks are completed');
t.rejects(pool.run({ time: 1000 }), /The task has been aborted/, 'abort any task enqueued during close');
t.resolves(pool.run({ time: 1000 }).then(null, err => {
t.equal(err.message, 'The task has been aborted');
t.equal(err.cause, 'queue is closing up');
}));
});

await t.resolves(pool.run({ time: 100 }), 'complete running task');
Expand All @@ -39,17 +42,20 @@ test('close()', async (t) => {

test('close({force: true})', async (t) => {
t.test('queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => {
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 1 });

const task1 = pool.run({ time: 100 });
const task1 = pool.run({ time: 1000 });
const task2 = pool.run({ time: 200 });

setImmediate(() => t.resolves(pool.close({ force: true }), 'close is resolved when all running tasks are completed'));

await Promise.all([
t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
t.resolves(task1, 'complete running task'),
t.rejects(task2, /The task has been aborted/, 'abort task that are not started yet')
t.resolves(task2.then(null, err => {
t.equal(err.message, 'The task has been aborted');
t.equal(err.cause, 'pool is closed');
}))
]);
});

Expand Down

0 comments on commit 71efc92

Please sign in to comment.