Skip to content

Commit

Permalink
feat(queue): add version support
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Oct 14, 2024
1 parent d77d012 commit a600463
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 6 deletions.
29 changes: 23 additions & 6 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { Job } from './job';
import { QueueGetters } from './queue-getters';
import { Repeat } from './repeat';
import { RedisConnection } from './redis-connection';
import { readPackageJson } from '../utils';

export interface ObliterateOpts {
/**
Expand Down Expand Up @@ -96,6 +97,7 @@ export class Queue<
> extends QueueGetters<DataType, ResultType, NameType> {
token = v4();
jobsOpts: BaseJobOptions;
opts: QueueOptions;
private _repeat?: Repeat;

constructor(
Expand All @@ -116,12 +118,8 @@ export class Queue<

this.waitUntilReady()
.then(client => {
if (!this.closing) {
client.hset(
this.keys.meta,
'opts.maxLenEvents',
get(opts, 'streams.events.maxLen', 10000),
);
if (!this.closing && !opts?.skipMetasUpdate) {
return client.hmset(this.keys.meta, this.metaValues);
}
})
.catch(err => {
Expand Down Expand Up @@ -168,6 +166,25 @@ export class Queue<
return { ...this.jobsOpts };
}

get metaValues(): Record<string, string | number> {
const { name, version } = readPackageJson();

return {
'opts.maxLenEvents': this.opts?.streams?.events?.maxLen ?? 10000,
version: `${name}:${version}`,
};
}

/**
* Get library version.
*
* @returns the content of the meta.library field.
*/
async getVersion(): Promise<string> {
const client = await this.client;
return await client.hget(this.keys.meta, 'version');
}

get repeat(): Promise<Repeat> {
return new Promise<Repeat>(async resolve => {
if (!this._repeat) {
Expand Down
13 changes: 13 additions & 0 deletions src/interfaces/queue-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ export interface QueueOptions extends QueueBaseOptions {
};
};

/**
* Skip Meta update.
*
* If true, the queue will not update the metadata of the queue.
* Useful for read-only systems that do should not update the metadata.
*
* @defaultValue false
*/
skipMetasUpdate?: boolean;

/**
* Advanced options for the repeatable jobs.
*/
settings?: AdvancedRepeatOptions;
}

Expand Down
23 changes: 23 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { CONNECTION_CLOSED_ERROR_MSG } from 'ioredis/built/utils';
import { ChildMessage, RedisClient } from './interfaces';
import { EventEmitter } from 'events';
import * as semver from 'semver';
import { join } from 'path';
import { readFileSync } from 'fs';

export const errorObject: { [index: string]: any } = { value: null };

Expand Down Expand Up @@ -215,3 +217,24 @@ export const errorToJSON = (value: any): Record<string, any> => {
export const WORKER_SUFFIX = '';

export const QUEUE_EVENT_SUFFIX = ':qe';

export const readPackageJson: () => { name: string; version: string } = () => {
const packageJsonPossiblePaths = [
join(__dirname, '../package.json'),
join(__dirname, '../../package.json'),
join(__dirname, '../../../package.json'),
];

for (const path of packageJsonPossiblePaths) {
try {
return JSON.parse(readFileSync(path, 'utf-8'));
} catch (err) {
if ((<any>err).code === 'ENOENT') {
continue;
}
console.log(err);
}
}

return { name: 'bullmq', version: '0.0.0' };
};
17 changes: 17 additions & 0 deletions tests/test_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,23 @@ describe('queues', function () {
await connection.quit();
});

it('should return the queue version', async () => {
const queue = new Queue(queueName, { connection });
const version = await queue.getVersion();
const { version: pkgJsonVersion, name } = require('../package.json');
expect(version).to.be.equal(`${name}:${pkgJsonVersion}`);
return queue.close();
});

it('should return default library version when using skipMetasUpdate', async () => {
const exQueueName = `test-${v4()}`;
const queue = new Queue(exQueueName, { connection, skipMetasUpdate: true });
const version = await queue.getVersion();
expect(version).to.be.equal(null);
await queue.close();
await removeAllQueueData(new IORedis(redisHost), exQueueName);
});

//TODO: restore this tests in next breaking change
describe.skip('.add', () => {
describe('when jobId is provided as integer', () => {
Expand Down

0 comments on commit a600463

Please sign in to comment.