Skip to content

Commit

Permalink
fix(sandboxed): properly update data on wrapped job (#2739) fixes #2731
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Aug 28, 2024
1 parent b8d8886 commit 9c4b245
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 8 deletions.
2 changes: 1 addition & 1 deletion .husky/pre-commit
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env sh
. "$(dirname -- "$0")/_/husky.sh"

yarn npm-run-all pretty:quick lint:staged
npx npm-run-all pretty:quick lint:staged
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"copy:lua": "copyfiles -f ./src/commands/*.lua ./dist/cjs/commands && copyfiles -f ./src/commands/*.lua ./dist/esm/commands",
"copy:lua:python": "copyfiles -f ./rawScripts/*.lua ./python/bullmq/commands",
"copy:main:type": "copyfiles -f ./dist/esm/classes/main.d.ts ./dist/esm/classes/main-worker.d.ts ./dist/cjs/classes",
"coverage": "nyc --reporter=text --reporter=lcovonly yarn test",
"coverage": "nyc --reporter=text --reporter=lcovonly npm run test",
"cm": "git cz",
"docs": "typedoc --excludeExternals --excludeProtected --excludePrivate --readme none src/index.ts",
"docs:json": "typedoc --excludeExternals --excludeProtected --excludePrivate --readme none src/index.ts --json ./apiVersions/v5.json --name v5",
Expand All @@ -39,7 +39,7 @@
"generate:raw:scripts": "ts-node --project tsconfig-cjs.json generateRawScripts.ts",
"lint": "./node_modules/.bin/eslint . --ignore-path ./.eslintignore",
"lint:staged": "lint-staged",
"prepublishOnly": "yarn build",
"prepublishOnly": "npm run build",
"prepare": "husky install",
"pretest": "npm-run-all clean:scripts generate:raw:scripts transform:commands",
"prettier": "prettier --config package.json src/**/*.ts",
Expand Down Expand Up @@ -162,7 +162,7 @@
]
},
"lint-staged": {
"*.{js,ts}": "yarn eslint:fix"
"*.{js,ts}": "npm run eslint:fix"
},
"repository": {
"type": "git",
Expand Down
11 changes: 7 additions & 4 deletions src/classes/child-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ export class ChildProcessor {
job: JobJsonSandbox,
send: (msg: any) => Promise<void>,
): SandboxedJob {
return {
const wrappedJob = {
...job,
data: JSON.parse(job.data || '{}'),
opts: job.opts,
Expand All @@ -136,7 +136,7 @@ export class ChildProcessor {
* Emulate the real job `log` function.
*/
log: async (row: any) => {
send({
await send({
cmd: ParentCommand.Log,
value: row,
});
Expand All @@ -145,7 +145,7 @@ export class ChildProcessor {
* Emulate the real job `moveToDelayed` function.
*/
moveToDelayed: async (timestamp: number, token?: string) => {
send({
await send({
cmd: ParentCommand.MoveToDelayed,
value: { timestamp, token },
});
Expand All @@ -154,11 +154,14 @@ export class ChildProcessor {
* Emulate the real job `updateData` function.
*/
updateData: async (data: any) => {
send({
await send({
cmd: ParentCommand.Update,
value: data,
});
wrappedJob.data = data;
},
};

return wrappedJob;
}
}
37 changes: 37 additions & 0 deletions tests/fixtures/fixture_processor_steps.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
'use strict';

const delay = require('./delay');

module.exports = async function (job) {
let step = job.data.step;
while (step !== 'FINISH') {
switch (step) {
case 'INITIAL': {
await delay(200);
const data = {
...job.data,
step: 'SECOND',
extraDataSecondStep: 'second data',
};
await job.updateData(data);
step = 'SECOND';
break;
}
case 'SECOND': {
await delay(200);
const data = {
...job.data,
extraDataFinishedStep: 'finish data',
step: 'FINISH',
};

await job.updateData(data);
step = 'FINISH';
return;
}
default: {
throw new Error('invalid step');
}
}
}
};
31 changes: 31 additions & 0 deletions tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,37 @@ function sandboxProcessTests(
await worker.close();
});

it('should process steps and complete', async () => {
const processFile = __dirname + '/fixtures/fixture_processor_steps.js';

const worker = new Worker(queueName, processFile, {
connection,
prefix,
drainDelay: 1,
});

const completing = new Promise<void>((resolve, reject) => {
worker.on('completed', async (job: Job) => {
try {
expect(job.data).to.be.eql({
step: 'FINISH',
extraDataSecondStep: 'second data',
extraDataFinishedStep: 'finish data',
});
resolve();
} catch (err) {
reject(err);
}
});
});

await queue.add('test', { step: 'INITIAL' });

await completing;

await worker.close();
});

it('should process and move to delayed', async () => {
const processFile =
__dirname + '/fixtures/fixture_processor_move_to_delayed.js';
Expand Down

0 comments on commit 9c4b245

Please sign in to comment.