Skip to content

Commit

Permalink
feat(core): Allow DefaultJobQueue retries to be configured per queue
Browse files Browse the repository at this point in the history
Relates to #1111
  • Loading branch information
michaelbromley committed Oct 5, 2021
1 parent 511f04d commit 5017622
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ export class InMemoryJobQueueStrategy extends PollingJobQueueStrategy implements
.toString()
.padEnd(10, '0');
}
(job as any).retries = this.setRetries(job.queueName, job);
// tslint:disable-next-line:no-non-null-assertion
this.jobs.set(job.id!, job);
if (!this.unsettledJobs[job.queueName]) {
Expand Down
11 changes: 11 additions & 0 deletions packages/core/src/job-queue/polling-job-queue-strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,14 @@ export interface PollingJobQueueStrategyConfig {
* @description 200
*/
pollInterval?: number | ((queueName: string) => number);
/**
* @description
* When a job is added to the JobQueue using `JobQueue.add()`, the calling
* code may specify the number of retries in case of failure. This option allows
* you to override that number and specify your own number of retries based on
* the job being added.
*/
setRetries?: (queueName: string, job: Job) => number;
/**
* @description
* The strategy used to decide how long to wait before retrying a failed job.
Expand Down Expand Up @@ -183,6 +191,7 @@ class ActiveQueue<Data extends JobData<Data> = {}> {
export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy {
public concurrency: number;
public pollInterval: number | ((queueName: string) => number);
public setRetries: (queueName: string, job: Job) => number;
public backOffStrategy?: BackoffStrategy;

private activeQueues = new QueueNameProcessStorage<ActiveQueue<any>>();
Expand All @@ -196,9 +205,11 @@ export abstract class PollingJobQueueStrategy extends InjectableJobQueueStrategy
this.concurrency = concurrencyOrConfig.concurrency ?? 1;
this.pollInterval = concurrencyOrConfig.pollInterval ?? 200;
this.backOffStrategy = concurrencyOrConfig.backoffStrategy ?? (() => 1000);
this.setRetries = concurrencyOrConfig.setRetries ?? ((_, job) => job.retries);
} else {
this.concurrency = concurrencyOrConfig ?? 1;
this.pollInterval = maybePollInterval ?? 200;
this.setRetries = (_, job) => job.retries;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Type } from '@vendure/common/lib/shared-types';

import { Job } from '../../job-queue/job';
import { BackoffStrategy } from '../../job-queue/polling-job-queue-strategy';
import { PluginCommonModule } from '../plugin-common.module';
import { VendurePlugin } from '../vendure-plugin';
Expand All @@ -19,6 +20,7 @@ export interface DefaultJobQueueOptions {
pollInterval?: number | ((queueName: string) => number);
concurrency?: number;
backoffStrategy?: BackoffStrategy;
setRetries?: (queueName: string, job: Job) => number;
}

/**
Expand Down Expand Up @@ -74,7 +76,7 @@ export interface DefaultJobQueueOptions {
* Defines the backoff strategy used when retrying failed jobs. In other words, if a job fails
* and is configured to be re-tried, how long should we wait before the next attempt?
*
* By default a job will be retried as soon as possible, but in some cases this is not desirable. For example,
* By default, a job will be retried as soon as possible, but in some cases this is not desirable. For example,
* a job may interact with an unreliable 3rd-party API which is sensitive to too many requests. In this case, an
* exponential backoff may be used which progressively increases the delay between each subsequent retry.
*
Expand All @@ -94,6 +96,15 @@ export interface DefaultJobQueueOptions {
* // A default delay for all other queues
* return 1000;
* },
* retries: (queueName, job) => {
* if (queueName === 'send-email') {
* // Override the default number of retries
* // for the 'send-email' job because we have
* // a very unreliable email service.
* return 10;
* }
* return job.retries;
* }
* }),
* ],
* };
Expand All @@ -106,11 +117,13 @@ export interface DefaultJobQueueOptions {
imports: [PluginCommonModule],
entities: [JobRecord],
configuration: config => {
const { pollInterval, concurrency, backoffStrategy } = DefaultJobQueuePlugin.options ?? {};
const { pollInterval, concurrency, backoffStrategy, setRetries } =
DefaultJobQueuePlugin.options ?? {};
config.jobQueueOptions.jobQueueStrategy = new SqlJobQueueStrategy({
concurrency,
pollInterval,
backoffStrategy,
setRetries,
});
return config;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
throw new Error('Connection not available');
}
const constrainedData = this.constrainDataSize(job);
const newRecord = this.toRecord(job, constrainedData);
const newRecord = this.toRecord(job, constrainedData, this.setRetries(job.queueName, job));
const record = await this.connection.getRepository(JobRecord).save(newRecord);
return this.fromRecord(record);
}
Expand Down Expand Up @@ -222,7 +222,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
return !!this.connection && this.connection.isConnected;
}

private toRecord(job: Job<any>, data?: any): JobRecord {
private toRecord(job: Job<any>, data?: any, retries?: number): JobRecord {
return new JobRecord({
id: job.id || undefined,
queueName: job.queueName,
Expand All @@ -234,7 +234,7 @@ export class SqlJobQueueStrategy extends PollingJobQueueStrategy implements Insp
startedAt: job.startedAt,
settledAt: job.settledAt,
isSettled: job.isSettled,
retries: job.retries,
retries: retries ?? job.retries,
attempts: job.attempts,
});
}
Expand Down

0 comments on commit 5017622

Please sign in to comment.