Skip to content

Commit

Permalink
feat(worker): add support for naming workers
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Feb 20, 2024
1 parent b1432ab commit 7ba2729
Show file tree
Hide file tree
Showing 11 changed files with 131 additions and 26 deletions.
9 changes: 9 additions & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ export class Job<
*/
token?: string;

/**
* The worker name that is processing or processed this job.
*/
processedBy?: string;

protected toKey: (type: string) => string;

protected discarded: boolean;
Expand Down Expand Up @@ -338,6 +343,10 @@ export class Job<
job.parent = JSON.parse(json.parent);
}

if (json.pb) {
job.processedBy = json.pb;
}

return job;
}

Expand Down
1 change: 1 addition & 0 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ export class QueueEvents extends QueueBase {
this.running = true;
const client = await this.client;

// Planed for deprecation as it has no really a use case
try {
await client.client('SETNAME', this.clientName(QUEUE_EVENT_SUFFIX));
} catch (err) {
Expand Down
30 changes: 19 additions & 11 deletions src/classes/queue-getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@

import { QueueBase } from './queue-base';
import { Job } from './job';
import {
clientCommandMessageReg,
QUEUE_EVENT_SUFFIX,
WORKER_SUFFIX,
} from '../utils';
import { clientCommandMessageReg, QUEUE_EVENT_SUFFIX } from '../utils';
import { JobState, JobType } from '../types';
import { JobJsonRaw, Metrics } from '../interfaces';

Expand Down Expand Up @@ -432,15 +428,15 @@ export class QueueGetters<
};
}

private async baseGetClients(suffix: string): Promise<
private async baseGetClients(matcher: (name: string) => boolean): Promise<
{
[index: string]: string;
}[]
> {
const client = await this.client;
const clients = (await client.client('LIST')) as string;
try {
const list = this.parseClientList(clients, suffix);
const list = this.parseClientList(clients, matcher);
return list;
} catch (err) {
if (!clientCommandMessageReg.test((<Error>err).message)) {
Expand All @@ -463,21 +459,32 @@ export class QueueGetters<
[index: string]: string;
}[]
> {
return this.baseGetClients(WORKER_SUFFIX);
const unnamedWorkerClientName = `${this.clientName()}`;
const namedWorkerClientName = `${this.clientName()}:w:`;

const matcher = (name: string) =>
name &&
(name === unnamedWorkerClientName ||
name.startsWith(namedWorkerClientName));

return this.baseGetClients(matcher);
}

/**
* Get queue events list related to the queue.
* Note: GCP does not support SETNAME, so this call will not work
*
* @deprecated do not use this method, it will be removed in the future.
*
* @returns - Returns an array with queue events info.
*/
async getQueueEvents(): Promise<
{
[index: string]: string;
}[]
> {
return this.baseGetClients(QUEUE_EVENT_SUFFIX);
const clientName = `${this.clientName()}${QUEUE_EVENT_SUFFIX}`;
return this.baseGetClients((name: string) => name === clientName);
}

/**
Expand Down Expand Up @@ -531,7 +538,7 @@ export class QueueGetters<
};
}

private parseClientList(list: string, suffix = '') {
private parseClientList(list: string, matcher: (name: string) => boolean) {
const lines = list.split('\n');
const clients: { [index: string]: string }[] = [];

Expand All @@ -545,8 +552,9 @@ export class QueueGetters<
client[key] = value;
});
const name = client['name'];
if (name && name === `${this.clientName()}${suffix ? `${suffix}` : ''}`) {
if (matcher(name)) {
client['name'] = this.name;
client['rawname'] = name;
clients.push(client);
}
});
Expand Down
3 changes: 2 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ export class Scripts {
}
}

async moveToActive(client: RedisClient, token: string) {
async moveToActive(client: RedisClient, token: string, name?: string) {
const opts = this.queue.opts as WorkerOptions;

const queueKeys = this.queue.keys;
Expand All @@ -968,6 +968,7 @@ export class Scripts {
token,
lockDuration: opts.lockDuration,
limiter: opts.limiter,
name,
}),
];

Expand Down
30 changes: 18 additions & 12 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import {
DELAY_TIME_1,
isNotConnectionError,
isRedisInstance,
WORKER_SUFFIX,
} from '../utils';
import { QueueBase } from './queue-base';
import { Repeat } from './repeat';
Expand Down Expand Up @@ -256,16 +255,21 @@ export class Worker<
}
}

const mainFile = this.opts.useWorkerThreads
? 'main-worker.js'
: 'main.js';
let mainFilePath = path.join(
path.dirname(module.filename),
`${mainFile}`,
);
// Separate paths so that bundling tools can resolve dependencies easier
const dirname = path.dirname(module.filename || __filename);
const workerThreadsMainFile = path.join(dirname, 'main-worker.js');
const spawnProcessMainFile = path.join(dirname, 'main.js');

let mainFilePath = this.opts.useWorkerThreads
? workerThreadsMainFile
: spawnProcessMainFile;

try {
fs.statSync(mainFilePath); // would throw if file not exists
} catch (_) {
const mainFile = this.opts.useWorkerThreads
? 'main-worker.js'
: 'main.js';
mainFilePath = path.join(
process.cwd(),
`dist/cjs/classes/${mainFile}`,
Expand All @@ -289,7 +293,8 @@ export class Worker<
}
}

const connectionName = this.clientName(WORKER_SUFFIX);
const connectionName =
this.clientName() + (this.opts.name ? `:w:${this.opts.name}` : '');
this.blockingConnection = new RedisConnection(
isRedisInstance(opts.connection)
? (<Redis>opts.connection).duplicate({ connectionName })
Expand Down Expand Up @@ -530,7 +535,7 @@ export class Worker<
this.blockUntil = await this.waiting;

if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 10) {
return this.moveToActive(client, token);
return this.moveToActive(client, token, this.opts.name);
}
} catch (err) {
// Swallow error if locally paused or closing since we did force a disconnection
Expand All @@ -549,7 +554,7 @@ export class Worker<
this.abortDelayController = new AbortController();
await this.delay(this.limitUntil, this.abortDelayController);
}
return this.moveToActive(client, token);
return this.moveToActive(client, token, this.opts.name);
}
}

Expand All @@ -572,9 +577,10 @@ export class Worker<
protected async moveToActive(
client: RedisClient,
token: string,
name?: string,
): Promise<Job<DataType, ResultType, NameType>> {
const [jobData, id, limitUntil, delayUntil] =
await this.scripts.moveToActive(client, token);
await this.scripts.moveToActive(client, token, name);
this.updateDelays(limitUntil, delayUntil);

return this.nextJobFromJobData(jobData, id, token);
Expand Down
5 changes: 5 additions & 0 deletions src/commands/includes/prepareJobForProcessing.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ local function prepareJobForProcessing(keyPrefix, rateLimiterKey, eventStreamKey
rcall("SET", lockKey, opts['token'], "PX", opts['lockDuration'])
end

if opts['name'] then
-- Set "processedBy" field to the worker name
rcall("HSET", jobKey, "pb", opts['name'])
end

rcall("XADD", eventStreamKey, "*", "event", "active", "jobId", jobId, "prev", "waiting")
rcall("HSET", jobKey, "processedOn", processedOn)
rcall("HINCRBY", jobKey, "ats", 1)
Expand Down
2 changes: 2 additions & 0 deletions src/interfaces/job-json.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export interface JobJson {
parent?: ParentKeys;
parentKey?: string;
repeatJobKey?: string;
processedBy?: string;
}

export interface JobJsonRaw {
Expand All @@ -39,4 +40,5 @@ export interface JobJsonRaw {
rjk?: string;
atm?: string;
ats?: string;
pb?: string; // Worker name
}
7 changes: 7 additions & 0 deletions src/interfaces/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ export type Processor<T = any, R = any, N extends string = string> = (
) => Promise<R>;

export interface WorkerOptions extends QueueBaseOptions {
/**
* Optional worker name. The name will be stored on every job
* processed by this worker instance, and can be used to monitor
* which worker is processing or has processed a given job.
*/
name?: string;

/**
* Condition to start processor at instance creation.
*
Expand Down
2 changes: 0 additions & 2 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,4 @@ export const errorToJSON = (value: any): Record<string, any> => {
return error;
};

export const WORKER_SUFFIX = '';

export const QUEUE_EVENT_SUFFIX = ':qe';
39 changes: 39 additions & 0 deletions tests/test_getters.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,45 @@ describe('Jobs getters', function () {
await worker2.close();
});

it('gets all workers including their names', async function () {
const worker = new Worker(queueName, async () => {}, {
autorun: false,
connection,
prefix,
name: 'worker1',
});
await new Promise<void>(resolve => {
worker.on('ready', () => {
resolve();
});
});

const workers = await queue.getWorkers();
expect(workers).to.have.length(1);

const worker2 = new Worker(queueName, async () => {}, {
autorun: false,
connection,
prefix,
name: 'worker2',
});
await new Promise<void>(resolve => {
worker2.on('ready', () => {
resolve();
});
});

const nextWorkers = await queue.getWorkers();
expect(nextWorkers).to.have.length(2);

// Check that the worker names are included in the response on the rawname property
expect(nextWorkers[0].rawname.endsWith('worker1')).to.be.true;
expect(nextWorkers[1].rawname.endsWith('worker2')).to.be.true;

await worker.close();
await worker2.close();
});

it('gets only workers related only to one queue', async function () {
const queueName2 = `${queueName}2`;
const queue2 = new Queue(queueName2, { connection, prefix });
Expand Down
29 changes: 29 additions & 0 deletions tests/test_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,35 @@ describe('workers', function () {
await worker.close();
});

it('sets the worker name on the job upon processing', async () => {
let worker;
const processing = new Promise<void>(async (resolve, reject) => {
worker = new Worker(
queueName,
async job => {
const fetchedJob = await queue.getJob(job.id!);

try {
expect(fetchedJob).to.be.ok;
expect(fetchedJob!.processedBy).to.be.equal(worker.opts.name);
} catch (err) {
reject(err);
}

resolve();
},
{ connection, prefix, name: 'foobar' },
);
await worker.waitUntilReady();
});

await queue.add('test', { foo: 'bar' });

await processing;

await worker.close();
});

it('retry a job that fails', async () => {
let failedOnce = false;
const notEvenErr = new Error('Not even!');
Expand Down

0 comments on commit 7ba2729

Please sign in to comment.