Skip to content

Commit

Permalink
feat(queue): add a telemetry interface (#2721)
Browse files Browse the repository at this point in the history
  • Loading branch information
fgozdz authored Oct 31, 2024
1 parent d7d2a68 commit 273b574
Show file tree
Hide file tree
Showing 21 changed files with 1,397 additions and 361 deletions.
42 changes: 21 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,27 +217,27 @@ This is just scratching the surface, check all the features and more in the offi

Since there are a few job queue solutions, here is a table comparing them:

| Feature | [BullMQ-Pro](https://bullmq.io/#bullmq-pro) | [BullMQ](https://bullmq.io) | Bull | Kue | Bee | Agenda |
| :------------------------ | :-------------: | :-------------: | :-------------: | :---: | -------- | ------ |
| Backend | redis | redis | redis | redis | redis | mongo |
| Observables || | | | | |
| Group Rate Limit || | | | | |
| Group Support || | | | | |
| Batches Support || | | | | |
| Parent/Child Dependencies || | | | | |
| Debouncing || || | | |
| Priorities || ||| ||
| Concurrency || |||||
| Delayed jobs || ||| ||
| Global events || ||| | |
| Rate Limiter || || | | |
| Pause/Resume || ||| | |
| Sandboxed worker || || | | |
| Repeatable jobs || || | ||
| Atomic ops || || || |
| Persistence || |||||
| UI || ||| ||
| Optimized for | Jobs / Messages | Jobs / Messages | Jobs / Messages | Jobs | Messages | Jobs |
| Feature | [BullMQ-Pro](https://bullmq.io/#bullmq-pro) | [BullMQ](https://bullmq.io) | Bull | Kue | Bee | Agenda |
| :------------------------ | :-----------------------------------------: | :-------------------------: | :-------------: | :---: | -------- | ------ |
| Backend | redis | redis | redis | redis | redis | mongo |
| Observables | | | | | | |
| Group Rate Limit | | | | | | |
| Group Support | | | | | | |
| Batches Support | | | | | | |
| Parent/Child Dependencies | | | | | | |
| Debouncing | | || | | |
| Priorities | | ||| ||
| Concurrency | | |||||
| Delayed jobs | | ||| ||
| Global events | | ||| | |
| Rate Limiter | | || | | |
| Pause/Resume | | ||| | |
| Sandboxed worker | | || | | |
| Repeatable jobs | | || | ||
| Atomic ops | | || || |
| Persistence | | |||||
| UI | | ||| ||
| Optimized for | Jobs / Messages | Jobs / Messages | Jobs / Messages | Jobs | Messages | Jobs |

## Contributing

Expand Down
10 changes: 5 additions & 5 deletions docs/gitbook/bullmq-pro/groups/concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ The concurrency factor is configured as follows:
import { WorkerPro } from '@taskforcesh/bullmq-pro';

const worker = new WorkerPro('myQueue', processFn, {
group: {
concurrency: 3 // Limit to max 3 parallel jobs per group
},
concurrency: 100,
connection
group: {
concurrency: 3, // Limit to max 3 parallel jobs per group
},
concurrency: 100,
connection,
});
```

Expand Down
1 change: 1 addition & 0 deletions src/classes/flow-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@ export class FlowProducer extends EventEmitter {
emit: this.emit.bind(this) as any,
on: this.on.bind(this) as any,
redisVersion: this.connection.redisVersion,
trace: async (): Promise<any> => {},
};
}

Expand Down
133 changes: 80 additions & 53 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { Backoffs } from './backoffs';
import { Scripts, raw2NextJobData } from './scripts';
import { UnrecoverableError } from './errors/unrecoverable-error';
import type { QueueEvents } from './queue-events';
import { SpanKind } from '../enums';

const logger = debuglog('bull');

Expand All @@ -43,6 +44,7 @@ const optsDecodeMap = {
idof: 'ignoreDependencyOnFailure',
kl: 'keepLogs',
rdof: 'removeDependencyOnFailure',
tm: 'telemetryMetadata',
};

const optsEncodeMap = invertObject(optsDecodeMap);
Expand Down Expand Up @@ -656,6 +658,28 @@ export class Job<
return result;
}

private async shouldRetryJob(err: Error): Promise<[boolean, number]> {
if (
this.attemptsMade + 1 < this.opts.attempts &&
!this.discarded &&
!(err instanceof UnrecoverableError || err.name == 'UnrecoverableError')
) {
const opts = this.queue.opts as WorkerOptions;

const delay = await Backoffs.calculate(
<BackoffOptions>this.opts.backoff,
this.attemptsMade + 1,
err,
this,
opts.settings && opts.settings.backoffStrategy,
);

return [delay == -1 ? false : true, delay == -1 ? 0 : delay];
} else {
return [false, 0];
}
}

/**
* Moves a job to the failed queue.
*
Expand All @@ -672,7 +696,6 @@ export class Job<
const client = await this.queue.client;
const message = err?.message;

const queue = this.queue;
this.failedReason = message;

let command: string;
Expand All @@ -683,32 +706,15 @@ export class Job<
//
// Check if an automatic retry should be performed
//
let moveToFailed = false;
let finishedOn, delay;
if (
this.attemptsMade + 1 < this.opts.attempts &&
!this.discarded &&
!(err instanceof UnrecoverableError || err.name == 'UnrecoverableError')
) {
const opts = queue.opts as WorkerOptions;

// Check if backoff is needed
delay = await Backoffs.calculate(
<BackoffOptions>this.opts.backoff,
this.attemptsMade + 1,
err,
this,
opts.settings && opts.settings.backoffStrategy,
);

if (delay === -1) {
moveToFailed = true;
} else if (delay) {
let finishedOn: number;
const [shouldRetry, retryDelay] = await this.shouldRetryJob(err);
if (shouldRetry) {
if (retryDelay) {
const args = this.scripts.moveToDelayedArgs(
this.id,
Date.now(),
token,
delay,
retryDelay,
);
this.scripts.execCommand(multi, 'moveToDelayed', args);
command = 'moveToDelayed';
Expand All @@ -722,11 +728,6 @@ export class Job<
command = 'retryJob';
}
} else {
// If not, move to failed
moveToFailed = true;
}

if (moveToFailed) {
const args = this.scripts.moveToFailedArgs(
this,
message,
Expand All @@ -740,36 +741,62 @@ export class Job<
command = 'moveToFinished';
}

const results = await multi.exec();
const anyError = results.find(result => result[0]);
if (anyError) {
throw new Error(
`Error "moveToFailed" with command ${command}: ${anyError}`,
);
}
return this.queue.trace<Promise<void | any[]>>(
SpanKind.INTERNAL,
this.getSpanOperation(command),
this.queue.name,
async (span, dstPropagationMedatadata) => {
if (dstPropagationMedatadata) {
this.scripts.execCommand(multi, 'updateJobOption', [
this.toKey(this.id),
'tm',
dstPropagationMedatadata,
]);
}

const result = results[results.length - 1][1] as number;
if (result < 0) {
throw this.scripts.finishedErrors({
code: result,
jobId: this.id,
command,
state: 'active',
});
}
const results = await multi.exec();
const anyError = results.find(result => result[0]);
if (anyError) {
throw new Error(
`Error "moveToFailed" with command ${command}: ${anyError}`,
);
}

if (finishedOn && typeof finishedOn === 'number') {
this.finishedOn = finishedOn;
}
const result = results[results.length - 1][1] as number;
if (result < 0) {
throw this.scripts.finishedErrors({
code: result,
jobId: this.id,
command,
state: 'active',
});
}

if (delay && typeof delay === 'number') {
this.delay = delay;
}
if (finishedOn && typeof finishedOn === 'number') {
this.finishedOn = finishedOn;
}

this.attemptsMade += 1;
if (retryDelay && typeof retryDelay === 'number') {
this.delay = retryDelay;
}

this.attemptsMade += 1;

if (Array.isArray(result)) {
return raw2NextJobData(result);
}
},
);
}

if (Array.isArray(result)) {
return raw2NextJobData(result);
private getSpanOperation(command: string) {
switch (command) {
case 'moveToDelayed':
return 'delay';
case 'retryJob':
return 'retry';
case 'moveToFinished':
return 'fail';
}
}

Expand Down
85 changes: 84 additions & 1 deletion src/classes/queue-base.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { EventEmitter } from 'events';
import { QueueBaseOptions, RedisClient } from '../interfaces';
import { QueueBaseOptions, RedisClient, Span, Tracer } from '../interfaces';
import { MinimalQueue } from '../types';
import {
delay,
Expand All @@ -11,6 +11,7 @@ import { RedisConnection } from './redis-connection';
import { Job } from './job';
import { KeysMap, QueueKeys } from './queue-keys';
import { Scripts } from './scripts';
import { TelemetryAttributes, SpanKind } from '../enums';

/**
* @class QueueBase
Expand All @@ -30,6 +31,13 @@ export class QueueBase extends EventEmitter implements MinimalQueue {
protected connection: RedisConnection;
public readonly qualifiedName: string;

/**
* Instance of a telemetry client
* To use it wrap the code with trace helper
* It will check if tracer is provided and if not it will continue as is
*/
private tracer: Tracer | undefined;

/**
*
* @param name - The name of the queue.
Expand Down Expand Up @@ -76,6 +84,10 @@ export class QueueBase extends EventEmitter implements MinimalQueue {
this.keys = queueKeys.getKeys(name);
this.toKey = (type: string) => queueKeys.toKey(name, type);
this.setScripts();

if (opts?.telemetry) {
this.tracer = opts.telemetry.tracer;
}
}

/**
Expand Down Expand Up @@ -175,4 +187,75 @@ export class QueueBase extends EventEmitter implements MinimalQueue {
}
}
}

/**
* Wraps the code with telemetry and provides a span for configuration.
*
* @param spanKind - kind of the span: Producer, Consumer, Internal
* @param operation - operation name (such as add, process, etc)
* @param destination - destination name (normally the queue name)
* @param callback - code to wrap with telemetry
* @param srcPropagationMedatada -
* @returns
*/
async trace<T>(
spanKind: SpanKind,
operation: string,
destination: string,
callback: (span?: Span, dstPropagationMetadata?: string) => Promise<T> | T,
srcPropagationMetadata?: string,
) {
if (!this.tracer) {
return callback();
}

const currentContext = this.opts.telemetry.contextManager.active();

let parentContext;
if (srcPropagationMetadata) {
parentContext = this.opts.telemetry.contextManager.fromMetadata(
currentContext,
srcPropagationMetadata,
);
}

const spanName = `${operation} ${destination}`;
const span = this.tracer.startSpan(
spanName,
{
kind: spanKind,
},
parentContext,
);

try {
span.setAttributes({
[TelemetryAttributes.QueueName]: this.name,
[TelemetryAttributes.QueueOperation]: operation,
});

let messageContext;
let dstPropagationMetadata: undefined | string;

if (spanKind === SpanKind.CONSUMER) {
messageContext = span.setSpanOnContext(parentContext);
} else {
messageContext = span.setSpanOnContext(currentContext);
}

if (callback.length == 2) {
dstPropagationMetadata =
this.opts.telemetry.contextManager.getMetadata(messageContext);
}

return await this.opts.telemetry.contextManager.with(messageContext, () =>
callback(span, dstPropagationMetadata),
);
} catch (err) {
span.recordException(err as Error);
throw err;
} finally {
span.end();
}
}
}
5 changes: 4 additions & 1 deletion src/classes/queue-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ export interface QueueEventsListener extends IoredisListener {
*
* This event is triggered when a job is deduplicated because deduplicatedId still existed.
*/
deduplicated: (args: { jobId: string; deduplicationId: string }, id: string) => void;
deduplicated: (
args: { jobId: string; deduplicationId: string },
id: string,
) => void;

/**
* Listen to 'delayed' event.
Expand Down
Loading

0 comments on commit 273b574

Please sign in to comment.