Skip to content

Commit

Permalink
feat(job-queue-plugin): Implement Redis-based job buffering
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Oct 7, 2021
1 parent a5f3659 commit c7b91c3
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 6 deletions.
2 changes: 2 additions & 0 deletions packages/job-queue-plugin/src/bullmq/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { HealthCheckRegistryService, PluginCommonModule, VendurePlugin } from '@
import { BullMQJobQueueStrategy } from './bullmq-job-queue-strategy';
import { BULLMQ_PLUGIN_OPTIONS } from './constants';
import { RedisHealthIndicator } from './redis-health-indicator';
import { RedisJobBufferStorageStrategy } from './redis-job-buffer-storage-strategy';
import { BullMQPluginOptions } from './types';

/**
Expand Down Expand Up @@ -103,6 +104,7 @@ import { BullMQPluginOptions } from './types';
imports: [PluginCommonModule],
configuration: config => {
config.jobQueueOptions.jobQueueStrategy = new BullMQJobQueueStrategy();
config.jobQueueOptions.jobBufferStorageStrategy = new RedisJobBufferStorageStrategy();
return config;
},
providers: [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Injector, Job, JobBufferStorageStrategy } from '@vendure/core';
import { Injector, Job, JobBufferStorageStrategy, JobConfig, Logger } from '@vendure/core';
import Redis, { Cluster, RedisOptions } from 'ioredis';

import { BULLMQ_PLUGIN_OPTIONS } from './constants';
import { BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants';
import { BullMQPluginOptions } from './types';

const BUFFER_LIST_PREFIX = 'vendure-job-buffer';

export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
private redis: Redis.Redis | Redis.Cluster;

Expand All @@ -18,15 +20,52 @@ export class RedisJobBufferStorageStrategy implements JobBufferStorageStrategy {
}
}

async add(processorId: string, job: Job<any>): Promise<Job<any>> {
async add(bufferId: string, job: Job<any>): Promise<Job<any>> {
const result = await this.redis.lpush(this.keyName(bufferId), this.toJobConfigString(job));
return job;
}

async bufferSize(bufferIds?: string[]): Promise<{ [bufferId: string]: number }> {
throw new Error('Method not implemented.');
const result: { [bufferId: string]: number } = {};
for (const id of bufferIds || []) {
const key = this.keyName(id);
const count = await this.redis.llen(key);
result[id] = count;
}
return result;
}

async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Job[] }> {
const result: { [bufferId: string]: Job[] } = {};
for (const id of bufferIds || []) {
const key = this.keyName(id);
const items = await this.redis.lrange(key, 0, -1);
await this.redis.del(key);
result[id] = items.map(item => this.toJob(item));
}
return result;
}

async flush(bufferIds?: string[]): Promise<{ [bufferId: string]: Array<Job<any>> }> {
throw new Error('Method not implemented.');
private keyName(bufferId: string) {
return `${BUFFER_LIST_PREFIX}:${bufferId}`;
}

private toJobConfigString(job: Job<any>): string {
const jobConfig: JobConfig<any> = {
...job,
data: job.data,
id: job.id ?? undefined,
};
return JSON.stringify(jobConfig);
}

private toJob(jobConfigString: string): Job {
try {
const jobConfig: JobConfig<any> = JSON.parse(jobConfigString);
return new Job(jobConfig);
} catch (e) {
Logger.error(`Could not parse buffered job:\n${e.message}`, loggerCtx, e.stack);
throw e;
}
}
}

0 comments on commit c7b91c3

Please sign in to comment.