diff --git a/src/index.ts b/src/index.ts index f2b3f4c3..e630dcac 100644 --- a/src/index.ts +++ b/src/index.ts @@ -69,6 +69,7 @@ function onabort (abortSignal : AbortSignalAny, listener : () => void) { abortSignal.once('abort', listener); } } + class AbortError extends Error { constructor (reason?: AbortSignalEventTarget['reason']) { // TS does not recognizes the cause clause @@ -830,7 +831,7 @@ class ThreadPool { // Call reject() first to make sure we always reject with the AbortError // if the task is aborted, not with an Error from the possible // thread termination below. - reject(new AbortError((signal as AbortSignalEventTarget))); + reject(new AbortError((signal as AbortSignalEventTarget).reason)); if (taskInfo.workerInfo !== null) { // Already running: We cancel the Worker this is running on. @@ -952,7 +953,7 @@ class ThreadPool { for (let i = 0; i < skipQueueLength; i++) { const taskInfo : TaskInfo = this.skipQueue.shift() as TaskInfo; if (taskInfo.workerInfo === null) { - taskInfo.done(new AbortError('pool is closing up')); + taskInfo.done(new AbortError('pool is closed')); } else { this.skipQueue.push(taskInfo); } @@ -962,7 +963,7 @@ class ThreadPool { for (let i = 0; i < taskQueueLength; i++) { const taskInfo : TaskInfo = this.taskQueue.shift() as TaskInfo; if (taskInfo.workerInfo === null) { - taskInfo.done(new AbortError('pool is closing up')); + taskInfo.done(new AbortError('pool is closed')); } else { this.taskQueue.push(taskInfo); } diff --git a/test/abort-task.ts b/test/abort-task.ts index c1f58cba..1ab7dab8 100644 --- a/test/abort-task.ts +++ b/test/abort-task.ts @@ -1,4 +1,3 @@ -import { AbortController } from 'abort-controller'; import { EventEmitter } from 'events'; import Piscina from '..'; import { test } from 'tap'; @@ -184,3 +183,51 @@ test('task with AbortSignal cleans up properly', async ({ equal }) => { await pool.runTask('1+1', controller.signal); }); + +test('aborted AbortSignal rejects task immediately (with reason)', async ({ match, equal }) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures/move.ts') + }); + const customReason = new Error('custom reason'); + + const controller = new AbortController(); + controller.abort(customReason); + equal(controller.signal.aborted, true); + equal(controller.signal.reason, customReason); + + // The data won't be moved because the task will abort immediately. + const data = new Uint8Array(new SharedArrayBuffer(4)); + + try { + await pool.run(data, { transferList: [data.buffer], signal: controller.signal }); + } catch (error) { + equal(error.message, 'The task has been aborted'); + match(error.cause, customReason); + } + + equal(data.length, 4); +}); + +test('tasks can be aborted through AbortController while running', async ({ equal, match }) => { + const pool = new Piscina({ + filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts') + }); + const reason = new Error('custom reason'); + + const buf = new Int32Array(new SharedArrayBuffer(4)); + const abortController = new AbortController(); + + try { + const promise = pool.run(buf, { signal: abortController.signal }); + + Atomics.wait(buf, 0, 0); + equal(Atomics.load(buf, 0), 1); + + abortController.abort(reason); + + await promise; + } catch (error) { + equal(error.message, 'The task has been aborted'); + match(error.cause, reason); + } +}); diff --git a/test/pool-close.ts b/test/pool-close.ts index c6734440..e842c8a8 100644 --- a/test/pool-close.ts +++ b/test/pool-close.ts @@ -30,7 +30,10 @@ test('close()', async (t) => { setImmediate(() => { t.resolves(pool.close(), 'close is resolved when running tasks are completed'); - t.rejects(pool.run({ time: 1000 }), /The task has been aborted/, 'abort any task enqueued during close'); + t.resolves(pool.run({ time: 1000 }).then(null, err => { + t.equal(err.message, 'The task has been aborted'); + t.equal(err.cause, 'queue is closing up'); + })); }); await t.resolves(pool.run({ time: 100 }), 'complete running task'); @@ -39,9 +42,9 @@ test('close()', async (t) => { test('close({force: true})', async (t) => { t.test('queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => { - const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 }); + const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 1 }); - const task1 = pool.run({ time: 100 }); + const task1 = pool.run({ time: 1000 }); const task2 = pool.run({ time: 200 }); setImmediate(() => t.resolves(pool.close({ force: true }), 'close is resolved when all running tasks are completed')); @@ -49,7 +52,10 @@ test('close({force: true})', async (t) => { await Promise.all([ t.resolves(once(pool, 'close'), 'handler is called when pool is closed'), t.resolves(task1, 'complete running task'), - t.rejects(task2, /The task has been aborted/, 'abort task that are not started yet') + t.resolves(task2.then(null, err => { + t.equal(err.message, 'The task has been aborted'); + t.equal(err.cause, 'pool is closed'); + })) ]); });