Skip to content

Commit

Permalink
fix(sandbox): catch exit errors (#2800)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Oct 7, 2024
1 parent e1a26b0 commit 6babb9e
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 81 deletions.
5 changes: 3 additions & 2 deletions src/classes/child-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class ChildPool {
};
}

async retain(processFile: string): Promise<Child> {
async retain(processFile: string, exitHandler: any): Promise<Child> {
let child = this.getFree(processFile).pop();

if (child) {
Expand All @@ -40,7 +40,8 @@ export class ChildPool {
workerForkOptions: this.opts.workerForkOptions,
workerThreadsOptions: this.opts.workerThreadsOptions,
});
child.on('exit', this.remove.bind(this, child));

child.on('exit', exitHandler);

try {
await child.init();
Expand Down
112 changes: 63 additions & 49 deletions src/classes/sandbox.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ChildCommand, ParentCommand } from '../enums';
import { ChildMessage } from '../interfaces';
import { Child } from './child';
import { ChildPool } from './child-pool';
import { Job } from './job';

Expand All @@ -8,65 +9,78 @@ const sandbox = <T, R, N extends string>(
childPool: ChildPool,
) => {
return async function process(job: Job<T, R, N>, token?: string): Promise<R> {
const child = await childPool.retain(processFile);
let child: Child;
let msgHandler: any;
let exitHandler: any;
try {
const done: Promise<R> = new Promise((resolve, reject) => {
const initChild = async () => {
try {
exitHandler = (exitCode: any, signal: any) => {
reject(
new Error(
'Unexpected exit code: ' + exitCode + ' signal: ' + signal,
),
);
};

await child.send({
cmd: ChildCommand.Start,
job: job.asJSONSandbox(),
token,
});
child = await childPool.retain(processFile, exitHandler);

const done: Promise<R> = new Promise((resolve, reject) => {
msgHandler = async (msg: ChildMessage) => {
switch (msg.cmd) {
case ParentCommand.Completed:
resolve(msg.value);
break;
case ParentCommand.Failed:
case ParentCommand.Error: {
const err = new Error();
Object.assign(err, msg.value);
reject(err);
break;
}
case ParentCommand.Progress:
await job.updateProgress(msg.value);
break;
case ParentCommand.Log:
await job.log(msg.value);
break;
case ParentCommand.MoveToDelayed:
await job.moveToDelayed(msg.value?.timestamp, msg.value?.token);
break;
case ParentCommand.Update:
await job.updateData(msg.value);
break;
}
};
msgHandler = async (msg: ChildMessage) => {
switch (msg.cmd) {
case ParentCommand.Completed:
resolve(msg.value);
break;
case ParentCommand.Failed:
case ParentCommand.Error: {
const err = new Error();
Object.assign(err, msg.value);
reject(err);
break;
}
case ParentCommand.Progress:
await job.updateProgress(msg.value);
break;
case ParentCommand.Log:
await job.log(msg.value);
break;
case ParentCommand.MoveToDelayed:
await job.moveToDelayed(
msg.value?.timestamp,
msg.value?.token,
);
break;
case ParentCommand.Update:
await job.updateData(msg.value);
break;
}
};

exitHandler = (exitCode: any, signal: any) => {
reject(
new Error('Unexpected exit code: ' + exitCode + ' signal: ' + signal),
);
};
child.on('message', msgHandler);

child.on('message', msgHandler);
child.on('exit', exitHandler);
});
child.send({
cmd: ChildCommand.Start,
job: job.asJSONSandbox(),
token,
});
} catch (error) {
reject(error);
}
};
initChild();
});

try {
await done;
return done;
} finally {
child.off('message', msgHandler);
child.off('exit', exitHandler);

if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) {
childPool.remove(child);
} else {
childPool.release(child);
if (child) {
child.off('message', msgHandler);
child.off('exit', exitHandler);
if (child.exitCode !== null || /SIG.*/.test(`${child.signalCode}`)) {
childPool.remove(child);
} else {
childPool.release(child);
}
}
}
};
Expand Down
47 changes: 24 additions & 23 deletions tests/test_child-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { expect } from 'chai';
import { ChildPool } from '../src/classes';
import { join } from 'path';

const NoopProc = () => {};
describe('Child pool for Child Processes', () => {
sandboxProcessTests();
});
Expand Down Expand Up @@ -32,70 +33,70 @@ function sandboxProcessTests(

it('should return same child if free', async () => {
const processor = __dirname + '/fixtures/fixture_processor_bar.js';
const child = await pool.retain(processor);
const child = await pool.retain(processor, NoopProc);
expect(child).to.be.ok;
pool.release(child);
expect(pool.retained).to.be.empty;
const newChild = await pool.retain(processor);
const newChild = await pool.retain(processor, NoopProc);
expect(child).to.be.eql(newChild);
});

it('should return a new child if reused the last free one', async () => {
const processor = __dirname + '/fixtures/fixture_processor_bar.js';
let child = await pool.retain(processor);
let child = await pool.retain(processor, NoopProc);
expect(child).to.be.ok;
pool.release(child);
expect(pool.retained).to.be.empty;
let newChild = await pool.retain(processor);
let newChild = await pool.retain(processor, NoopProc);
expect(child).to.be.eql(newChild);
child = newChild;
newChild = await pool.retain(processor);
newChild = await pool.retain(processor, NoopProc);
expect(child).not.to.be.eql(newChild);
});

it('should return a new child if none free', async () => {
const processor = __dirname + '/fixtures/fixture_processor_bar.js';
const child = await pool.retain(processor);
const child = await pool.retain(processor, NoopProc);
expect(child).to.be.ok;
expect(pool.retained).not.to.be.empty;
const newChild = await pool.retain(processor);
const newChild = await pool.retain(processor, NoopProc);
expect(child).to.not.be.eql(newChild);
});

it('should return a new child if killed', async () => {
const processor = __dirname + '/fixtures/fixture_processor_bar.js';
const child = await pool.retain(processor);
const child = await pool.retain(processor, NoopProc);
expect(child).to.be.ok;
await pool.kill(child);
expect(pool.retained).to.be.empty;
const newChild = await pool.retain(processor);
const newChild = await pool.retain(processor, NoopProc);
expect(child).to.not.be.eql(newChild);
});

it('should return a new child if many retained and none free', async () => {
const processor = __dirname + '/fixtures/fixture_processor_bar.js';
const children = await Promise.all([
pool.retain(processor),
pool.retain(processor),
pool.retain(processor),
pool.retain(processor),
pool.retain(processor),
pool.retain(processor),
pool.retain(processor, NoopProc),
pool.retain(processor, NoopProc),
pool.retain(processor, NoopProc),
pool.retain(processor, NoopProc),
pool.retain(processor, NoopProc),
pool.retain(processor, NoopProc),
]);
expect(children).to.have.length(6);
const child = await pool.retain(processor);
const child = await pool.retain(processor, NoopProc);
expect(children).not.to.include(child);
}).timeout(10000);

it('should return an old child if many retained and one free', async () => {
const processor = __dirname + '/fixtures/fixture_processor_bar.js';
const children = await Promise.all([
pool.retain(processor),
pool.retain(processor),
pool.retain(processor),
pool.retain(processor),
pool.retain(processor),
pool.retain(processor),
pool.retain(processor, NoopProc),
pool.retain(processor, NoopProc),
pool.retain(processor, NoopProc),
pool.retain(processor, NoopProc),
pool.retain(processor, NoopProc),
pool.retain(processor, NoopProc),
]);

expect(children).to.have.length(6);
Expand All @@ -108,7 +109,7 @@ function sandboxProcessTests(
const processor = __dirname + '/fixtures/fixture_processor_bar.js';
process.execArgv.push('--no-warnings');

const child = await pool.retain(processor);
const child = await pool.retain(processor, NoopProc);
expect(child).to.be.ok;
if (!useWorkerThreads) {
expect(child.childProcess.spawnargs).to.include('--no-warnings');
Expand Down
63 changes: 56 additions & 7 deletions tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,44 @@ describe('Sandboxed process using child processes', () => {

await worker.close();
});

it('should allow to pass workerForkOptions with timeout', async function () {
const processFile = __dirname + '/fixtures/fixture_processor.js';

const workerForkOptions = {
timeout: 50,
} as any;
const worker = new Worker(queueName, processFile, {
autorun: false,
connection,
prefix,
drainDelay: 1,
useWorkerThreads: false,
workerForkOptions,
});

const failing = new Promise<void>((resolve, reject) => {
worker.on('failed', async (job, error) => {
try {
expect([
'Unexpected exit code: null signal: SIGTERM',
'Unexpected exit code: 0 signal: null',
]).to.include(error.message);
resolve();
} catch (err) {
reject(err);
}
});
});

await queue.add('test', { foo: 'bar' });

worker.run();

await failing;

await worker.close();
});
});
});

Expand Down Expand Up @@ -1019,12 +1057,20 @@ function sandboxProcessTests(
useWorkerThreads,
});

const job = await queue.add('test', { exitCode: 1 });
const failing = new Promise<void>((resolve, reject) => {
worker.on('failed', async (job, error) => {
try {
expect(error.message).to.be.equal('Broken file processor');
resolve();
} catch (err) {
reject(err);
}
});
});

await expect(job.waitUntilFinished(queueEvents)).to.be.rejectedWith(
'Broken file processor',
);
await queue.add('test', { exitCode: 1 });

await failing;
await worker.close();
});

Expand All @@ -1050,7 +1096,7 @@ function sandboxProcessTests(
});
});

it('should remove exited process', async () => {
it('should release exited process', async () => {
const processFile = __dirname + '/fixtures/fixture_processor_exit.js';

const worker = new Worker(queueName, processFile, {
Expand All @@ -1072,7 +1118,7 @@ function sandboxProcessTests(
expect(Object.keys(worker['childPool'].retained)).to.have.lengthOf(
0,
);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(0);
expect(worker['childPool'].getAllFree()).to.have.lengthOf(1);
resolve();
} catch (err) {
reject(err);
Expand All @@ -1097,7 +1143,10 @@ function sandboxProcessTests(
});

// acquire and release a child here so we know it has it's full termination handler setup
const initializedChild = await worker['childPool'].retain(processFile);
const initializedChild = await worker['childPool'].retain(
processFile,
() => {},
);
await worker['childPool'].release(initializedChild);

// await this After we've added the job
Expand Down

0 comments on commit 6babb9e

Please sign in to comment.