From 2772bfb9f443f6d31fe44da3878b6266a6431fd7 Mon Sep 17 00:00:00 2001 From: Manuel Astudillo Date: Fri, 20 Dec 2024 16:38:32 +0100 Subject: [PATCH] fix(sandbox): fix issue where job could stay in active forever --- src/classes/child-pool.ts | 11 +- src/classes/child.ts | 4 +- src/classes/sandbox.ts | 7 +- tests/fixtures/fixture_processor.js | 2 +- tests/fixtures/fixture_processor_exit.js | 2 +- tests/test_sandboxed_process.ts | 153 ++++++++++++++++++++--- 6 files changed, 150 insertions(+), 29 deletions(-) diff --git a/src/classes/child-pool.ts b/src/classes/child-pool.ts index 66147977aa..381ebb608e 100644 --- a/src/classes/child-pool.ts +++ b/src/classes/child-pool.ts @@ -27,7 +27,7 @@ export class ChildPool { }; } - async retain(processFile: string, exitHandler: any): Promise { + async retain(processFile: string): Promise { let child = this.getFree(processFile).pop(); if (child) { @@ -41,10 +41,17 @@ export class ChildPool { workerThreadsOptions: this.opts.workerThreadsOptions, }); - child.on('exit', exitHandler); + child.on('exit', this.remove.bind(this, child)); try { await child.init(); + + // Check status here as well, in case the child exited before we could + // retain it. + if (child.exitCode !== null || child.signalCode !== null) { + throw new Error('Child exited before it could be retained'); + } + this.retained[child.pid] = child; return child; diff --git a/src/classes/child.ts b/src/classes/child.ts index 45413409ba..e9bae5fb8c 100644 --- a/src/classes/child.ts +++ b/src/classes/child.ts @@ -52,7 +52,9 @@ export class Child extends EventEmitter { if (this.childProcess) { return this.childProcess.pid; } else if (this.worker) { - return this.worker.threadId; + // Worker threads pids can become negative when they are terminated + // so we need to use the absolute value to index the retained object + return Math.abs(this.worker.threadId); } else { throw new Error('No child process or worker thread'); } diff --git a/src/classes/sandbox.ts b/src/classes/sandbox.ts index e9793714d3..a6caad6d12 100644 --- a/src/classes/sandbox.ts +++ b/src/classes/sandbox.ts @@ -24,7 +24,8 @@ const sandbox = ( ); }; - child = await childPool.retain(processFile, exitHandler); + child = await childPool.retain(processFile); + child.on('exit', exitHandler); msgHandler = async (msg: ChildMessage) => { switch (msg.cmd) { @@ -76,9 +77,7 @@ const sandbox = ( if (child) { child.off('message', msgHandler); child.off('exit', exitHandler); - if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) { - childPool.remove(child); - } else { + if (child.exitCode === null && child.signalCode === null) { childPool.release(child); } } diff --git a/tests/fixtures/fixture_processor.js b/tests/fixtures/fixture_processor.js index aafade19f3..e1ae18fc63 100644 --- a/tests/fixtures/fixture_processor.js +++ b/tests/fixtures/fixture_processor.js @@ -7,7 +7,7 @@ const delay = require('./delay'); module.exports = function (/*job*/) { - return delay(500).then(() => { + return delay(1000).then(() => { return 42; }); }; diff --git a/tests/fixtures/fixture_processor_exit.js b/tests/fixtures/fixture_processor_exit.js index ba520c35c4..47f1e18303 100644 --- a/tests/fixtures/fixture_processor_exit.js +++ b/tests/fixtures/fixture_processor_exit.js @@ -7,7 +7,7 @@ const delay = require('./delay'); module.exports = function (/*job*/) { - return delay(500).then(() => { + return delay(200).then(() => { delay(100).then(() => { process.exit(0); }); diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index c246a06458..d8877a92ff 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -81,8 +81,11 @@ describe('Sandboxed process using child processes', () => { it('should allow to pass workerForkOptions with timeout', async function () { const processFile = __dirname + '/fixtures/fixture_processor.js'; + // Note that this timeout will not kill the child process immediately, but + // will wait for the child process to resolve all its promises before killing it. + // Therefore the job will not be "cancelled" but will be completed. const workerForkOptions = { - timeout: 50, + timeout: 1000, } as any; const worker = new Worker(queueName, processFile, { autorun: false, @@ -93,13 +96,13 @@ describe('Sandboxed process using child processes', () => { workerForkOptions, }); - const failing = new Promise((resolve, reject) => { - worker.on('failed', async (job, error) => { + const completing = new Promise((resolve, reject) => { + worker.on('completed', async (job, error) => { try { - expect([ - 'Unexpected exit code: null signal: SIGTERM', - 'Unexpected exit code: 0 signal: null', - ]).to.include(error.message); + const retainedChild = Object.values( + worker['childPool'].retained, + )[0]; + expect(retainedChild).to.be.undefined; resolve(); } catch (err) { reject(err); @@ -107,11 +110,15 @@ describe('Sandboxed process using child processes', () => { }); }); + await delay(500); + await queue.add('test', { foo: 'bar' }); worker.run(); - await failing; + await delay(600); + + await completing; await worker.close(); }); @@ -1116,6 +1123,116 @@ function sandboxProcessTests( await worker.close(); }); + describe('when child process a job and its killed direcly after completing', () => { + it('should process the next job in a new child process', async () => { + const processFile = __dirname + '/fixtures/fixture_processor.js'; + const worker = new Worker(queueName, processFile, { + connection, + prefix, + drainDelay: 1, + useWorkerThreads, + }); + + let counter = 0; + let completing; + const completing2 = new Promise((resolve2, reject2) => { + completing = new Promise((resolve, reject) => { + worker.on('completed', async (job: Job, value: any) => { + try { + expect(job.data).to.be.eql({ foo: 'bar' }); + expect(value).to.be.eql(42); + expect( + Object.keys(worker['childPool'].retained), + ).to.have.lengthOf(0); + expect(worker['childPool'].free[processFile]).to.have.lengthOf( + 1, + ); + if (counter == 0) { + counter++; + resolve(); + } else { + resolve2(); + } + } catch (err) { + if (counter == 0) { + return reject(err); + } + reject2(err); + } + }); + }); + }); + + await queue.add('foobar', { foo: 'bar' }); + + await completing; + + const child1 = worker['childPool'].free[processFile][0]; + + await child1.kill('SIGTERM'); + + await queue.add('foobar', { foo: 'bar' }); + + await completing2; + + const child2 = worker['childPool'].free[processFile][0]; + + expect(child1).to.not.equal(child2); + + await worker.close(); + }); + }); + + describe('when child process a job and its killed with SIGKILL while processing', () => { + it('should fail with an unexpected error', async function () { + const processFile = __dirname + '/fixtures/fixture_processor.js'; + + const worker = new Worker(queueName, processFile, { + autorun: false, + connection, + prefix, + drainDelay: 1, + }); + + const started = new Promise((resolve, reject) => { + worker.on('active', async (job, prev) => { + expect(prev).to.be.equal('waiting'); + resolve(); + }); + }); + + const failing = new Promise((resolve, reject) => { + worker.on('failed', async (job, error) => { + try { + expect([ + 'Unexpected exit code: null signal: SIGKILL', + 'Unexpected exit code: 0 signal: null', + ]).to.include(error.message); + resolve(); + } catch (err) { + reject(err); + } + }); + }); + + await queue.add('test', { foo: 'bar' }); + + worker.run(); + + await started; + + // Need some time to create the child job and start processing + await delay(250); + + const retainedChild = Object.values(worker['childPool'].retained)[0]; + await retainedChild.kill('SIGKILL'); + + await failing; + + await worker.close(); + }); + }); + describe('when function is not exported', () => { it('throws an error', async () => { const processFile = @@ -1151,21 +1268,10 @@ function sandboxProcessTests( const completing = new Promise((resolve, reject) => { worker.on('completed', async job => { try { - expect(job.returnvalue).to.be.undefined; - expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf( - 0, - ); - expect(worker['childPool'].getAllFree()).to.have.lengthOf(1); - await delay(500); - expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf( - 0, - ); - expect(worker['childPool'].getAllFree()).to.have.lengthOf(1); + expect(job!.returnvalue).to.be.undefined; resolve(); } catch (err) { reject(err); - } finally { - await worker.close(); } }); }); @@ -1173,6 +1279,13 @@ function sandboxProcessTests( await queue.add('test', { foo: 'bar' }); await completing; + + await delay(200); + + expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(0); + expect(worker['childPool'].getAllFree()).to.have.lengthOf(0); + + await worker.close(); }); it('should allow the job to complete and then exit on worker close', async function () {