From 0e82025678679d9c0d083824df955409c04f3956 Mon Sep 17 00:00:00 2001 From: simon Date: Wed, 14 Oct 2020 22:34:06 +0200 Subject: [PATCH] fix: not running jobs even though concurrency is not reached --- src/JobProcessor.ts | 10 ++--- test/jobprocessor.test.ts | 82 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 7 deletions(-) create mode 100644 test/jobprocessor.test.ts diff --git a/src/JobProcessor.ts b/src/JobProcessor.ts index 5c79567..36b6d5a 100644 --- a/src/JobProcessor.ts +++ b/src/JobProcessor.ts @@ -360,7 +360,7 @@ export class JobProcessor { job.attrs.name, job.attrs._id ); - this.runOrRetry(); + this.runOrRetry(job); } else { const runIn = job.attrs.nextRunAt.getTime() - now.getTime(); log.extend('jobProcessing')( @@ -380,7 +380,7 @@ export class JobProcessor { * Internal method that tries to run a job and if it fails, retries again! * @returns {undefined} */ - private async runOrRetry() { + private async runOrRetry(job: Job) { if (!this.isRunning) { // const a = new Error(); // console.log('STACK', a.stack); @@ -391,11 +391,7 @@ export class JobProcessor { return; } - const job = this.jobQueue.pop(); - if (!job) { - console.info('empty queue'); - return; - } + this.jobQueue.remove(job); const jobDefinition = this.agenda.definitions[job.attrs.name]; const status = this.jobStatus[job.attrs.name]; diff --git a/test/jobprocessor.test.ts b/test/jobprocessor.test.ts new file mode 100644 index 0000000..1a66880 --- /dev/null +++ b/test/jobprocessor.test.ts @@ -0,0 +1,82 @@ +import * as expect from 'expect.js'; + +import { Db } from 'mongodb'; +import { Agenda } from '../src'; +import { mockMongo } from './helpers/mock-mongodb'; + +// Create agenda instances +let agenda: Agenda; +// mongo db connection db instance +let mongoDb: Db; + +const clearJobs = async () => { + if (mongoDb) { + await mongoDb.collection('agendaJobs').deleteMany({}); + } +}; + +describe('Agenda', function () { + // this.timeout(1000000); + + beforeEach(async () => { + if (!mongoDb) { + const mockedMongo = await mockMongo(); + // mongoCfg = mockedMongo.uri; + mongoDb = mockedMongo.mongo.db(); + } + + return new Promise(resolve => { + agenda = new Agenda( + { + mongo: mongoDb, + maxConcurrency: 4, + defaultConcurrency: 1, + lockLimit: 15, + defaultLockLimit: 6, + processEvery: '1 second' + }, + async () => { + await clearJobs(); + return resolve(); + } + ); + }); + }); + + afterEach(async () => { + await agenda.stop(); + await clearJobs(); + }); + + describe('configuration methods', () => { + it('ensure new jobs are always filling up running queue', async () => { + let shortOneFinished = false; + + agenda.define('test long', async () => { + await new Promise(resolve => setTimeout(resolve, 1000)); + }); + agenda.define('test short', async () => { + shortOneFinished = true; + await new Promise(resolve => setTimeout(resolve, 5)); + }); + + await agenda.start(); + + // queue up long ones + for (let i = 0; i < 100; i++) { + agenda.now('test long'); + } + + await new Promise(resolve => setTimeout(resolve, 1000)); + + // queue more short ones (they should complete first!) + for (let j = 0; j < 100; j++) { + agenda.now('test short'); + } + + await new Promise(resolve => setTimeout(resolve, 1000)); + + expect(shortOneFinished).to.be(true); + }); + }); +});