Skip to content

Commit

Permalink
fix(job-queue-plugin): More accurate determination of BullMQ job state
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Oct 5, 2021
1 parent 73fa278 commit 3b3bb3b
Showing 1 changed file with 28 additions and 16 deletions.
44 changes: 28 additions & 16 deletions packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
);
const processFn = this.queueNameProcessFnMap.get(queueName);
if (processFn) {
const job = this.createVendureJob(bullJob);
const job = await this.createVendureJob(bullJob);
try {
job.on('progress', _job => bullJob.updateProgress(_job.progress));
const result = await processFn(job);
Expand Down Expand Up @@ -100,11 +100,18 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
async cancelJob(jobId: string): Promise<Job | undefined> {
const bullJob = await this.queue.getJob(jobId);
if (bullJob) {
if (await bullJob.isActive()) {
// Not yet possible in BullMQ, see
// https://github.com/taskforcesh/bullmq/issues/632
throw new InternalServerError(`Cannot cancel a running job`);
}
try {
await bullJob.remove();
return this.createVendureJob(bullJob);
} catch (e) {
Logger.error(`Error when cancelling job: ${e.message}`, loggerCtx);
const message = `Error when cancelling job: ${e.message}`;
Logger.error(message, loggerCtx);
throw new InternalServerError(message);
}
}
}
Expand Down Expand Up @@ -157,17 +164,19 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
}
const totalItems = Object.values(jobCounts).reduce((sum, count) => sum + count, 0);

return Promise.resolve({
items: items
.sort((a, b) => b.timestamp - a.timestamp)
.map(bullJob => this.createVendureJob(bullJob)),
return {
items: await Promise.all(
items
.sort((a, b) => b.timestamp - a.timestamp)
.map(bullJob => this.createVendureJob(bullJob)),
),
totalItems,
});
};
}

async findManyById(ids: ID[]): Promise<Job[]> {
const bullJobs = await Promise.all(ids.map(id => this.queue.getJob(id.toString())));
return bullJobs.filter(notNullOrUndefined).map(j => this.createVendureJob(j));
return Promise.all(bullJobs.filter(notNullOrUndefined).map(j => this.createVendureJob(j)));
}

async findOne(id: ID): Promise<Job | undefined> {
Expand Down Expand Up @@ -226,12 +235,12 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
await this.worker.disconnect();
}

private createVendureJob(bullJob: Bull.Job): Job {
private async createVendureJob(bullJob: Bull.Job): Promise<Job> {
const jobJson = bullJob.toJSON();
return new Job({
queueName: bullJob.name,
id: bullJob.id,
state: this.getState(bullJob),
state: await this.getState(bullJob),
data: bullJob.data,
attempts: bullJob.attemptsMade,
createdAt: new Date(jobJson.timestamp),
Expand All @@ -244,24 +253,27 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy {
});
}

private getState(bullJob: Bull.Job): JobState {
private async getState(bullJob: Bull.Job): Promise<JobState> {
const jobJson = bullJob.toJSON();

if (!jobJson.processedOn && !jobJson.failedReason) {
if ((await bullJob.isWaiting()) || (await bullJob.isWaitingChildren())) {
return JobState.PENDING;
}
if (!jobJson.finishedOn) {
if (await bullJob.isActive()) {
return JobState.RUNNING;
}
if (jobJson.failedReason && bullJob.attemptsMade < (bullJob.opts.attempts ?? 0)) {
if (await bullJob.isDelayed()) {
return JobState.RETRYING;
}
if (jobJson.failedReason) {
if (await bullJob.isFailed()) {
return JobState.FAILED;
}
if (jobJson.finishedOn) {
if (await bullJob.isCompleted()) {
return JobState.COMPLETED;
}
if (!jobJson.finishedOn) {
return JobState.CANCELLED;
}
throw new InternalServerError('Could not determine job state');
// TODO: how to handle "cancelled" state? Currently when we cancel a job, we simply remove all record of it.
}
Expand Down

0 comments on commit 3b3bb3b

Please sign in to comment.