Skip to content

Commit

Permalink
feat(flows): add telemetry support (#2879)
Browse files Browse the repository at this point in the history
  • Loading branch information
manast authored Nov 5, 2024
1 parent 855aa5e commit 5ed154b
Show file tree
Hide file tree
Showing 5 changed files with 428 additions and 157 deletions.
247 changes: 158 additions & 89 deletions src/classes/flow-producer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import {
IoredisListener,
QueueBaseOptions,
RedisClient,
Tracer,
ContextManager,
} from '../interfaces';
import { getParentKey, isRedisInstance } from '../utils';
import { getParentKey, isRedisInstance, trace } from '../utils';
import { Job } from './job';
import { KeysMap, QueueKeys } from './queue-keys';
import { RedisConnection } from './redis-connection';
import { SpanKind, TelemetryAttributes } from '../enums';

export interface AddNodeOpts {
multi: ChainableCommander;
Expand Down Expand Up @@ -95,6 +98,10 @@ export class FlowProducer extends EventEmitter {
queueKeys: QueueKeys;

protected connection: RedisConnection;
protected telemetry: {
tracer: Tracer | undefined;
contextManager: ContextManager | undefined;
};

constructor(
public opts: QueueBaseOptions = { connection: {} },
Expand Down Expand Up @@ -122,6 +129,10 @@ export class FlowProducer extends EventEmitter {
});

this.queueKeys = new QueueKeys(opts.prefix);

if (opts?.telemetry) {
this.telemetry = opts.telemetry;
}
}

emit<U extends keyof FlowProducerListener>(
Expand Down Expand Up @@ -196,19 +207,32 @@ export class FlowProducer extends EventEmitter {
? `${parentKey}:dependencies`
: undefined;

const jobsTree = this.addNode({
multi,
node: flow,
queuesOpts: opts?.queuesOptions,
parent: {
parentOpts,
parentDependenciesKey,
},
});
return trace<Promise<JobNode>>(
this.telemetry,
SpanKind.PRODUCER,
flow.queueName,
'addFlow',
flow.queueName,
async span => {
span?.setAttributes({
[TelemetryAttributes.FlowName]: flow.name,
});

const jobsTree = await this.addNode({
multi,
node: flow,
queuesOpts: opts?.queuesOptions,
parent: {
parentOpts,
parentDependenciesKey,
},
});

await multi.exec();
await multi.exec();

return jobsTree;
return jobsTree;
},
);
}

/**
Expand Down Expand Up @@ -255,11 +279,27 @@ export class FlowProducer extends EventEmitter {
const client = await this.connection.client;
const multi = client.multi();

const jobsTrees = this.addNodes(multi, flows);

await multi.exec();

return jobsTrees;
return trace<Promise<JobNode[]>>(
this.telemetry,
SpanKind.PRODUCER,
'',
'addBulkFlows',
'',
async span => {
span?.setAttributes({
[TelemetryAttributes.BulkCount]: flows.length,
[TelemetryAttributes.BulkNames]: flows
.map(flow => flow.name)
.join(','),
});

const jobsTrees = await this.addNodes(multi, flows);

await multi.exec();

return jobsTrees;
},
);
}

/**
Expand All @@ -273,70 +313,92 @@ export class FlowProducer extends EventEmitter {
* @param parent - parent data sent to children to create the "links" to their parent
* @returns
*/
protected addNode({ multi, node, parent, queuesOpts }: AddNodeOpts): JobNode {
protected async addNode({
multi,
node,
parent,
queuesOpts,
}: AddNodeOpts): Promise<JobNode> {
const prefix = node.prefix || this.opts.prefix;
const queue = this.queueFromNode(node, new QueueKeys(prefix), prefix);
const queueOpts = queuesOpts && queuesOpts[node.queueName];

const jobsOpts = queueOpts?.defaultJobOptions ?? {};
const jobId = node.opts?.jobId || v4();

const job = new this.Job(
queue,
return trace<Promise<JobNode>>(
this.telemetry,
SpanKind.PRODUCER,
node.name,
node.data,
{
...jobsOpts,
...node.opts,
parent: parent?.parentOpts,
},
jobId,
);

const parentKey = getParentKey(parent?.parentOpts);

if (node.children && node.children.length > 0) {
// Create parent job, will be a job in status "waiting-children".
const parentId = jobId;
const queueKeysParent = new QueueKeys(node.prefix || this.opts.prefix);
const waitChildrenKey = queueKeysParent.toKey(
node.queueName,
'waiting-children',
);

job.addJob(<Redis>(multi as unknown), {
parentDependenciesKey: parent?.parentDependenciesKey,
waitChildrenKey,
parentKey,
});

const parentDependenciesKey = `${queueKeysParent.toKey(
node.queueName,
parentId,
)}:dependencies`;

const children = this.addChildren({
multi,
nodes: node.children,
parent: {
parentOpts: {
id: parentId,
queue: queueKeysParent.getQueueQualifiedName(node.queueName),
'addNode',
node.queueName,
async (span, dstPropagationMetadata) => {
span?.setAttributes({
[TelemetryAttributes.JobName]: node.name,
[TelemetryAttributes.JobId]: jobId,
});

const job = new this.Job(
queue,
node.name,
node.data,
{
...jobsOpts,
...node.opts,
parent: parent?.parentOpts,
telemetryMetadata: dstPropagationMetadata,
},
parentDependenciesKey,
},
queuesOpts,
});

return { job, children };
} else {
job.addJob(<Redis>(multi as unknown), {
parentDependenciesKey: parent?.parentDependenciesKey,
parentKey,
});
jobId,
);

return { job };
}
const parentKey = getParentKey(parent?.parentOpts);

if (node.children && node.children.length > 0) {
// Create the parent job, it will be a job in status "waiting-children".
const parentId = jobId;
const queueKeysParent = new QueueKeys(
node.prefix || this.opts.prefix,
);
const waitChildrenKey = queueKeysParent.toKey(
node.queueName,
'waiting-children',
);

await job.addJob(<Redis>(multi as unknown), {
parentDependenciesKey: parent?.parentDependenciesKey,
waitChildrenKey,
parentKey,
});

const parentDependenciesKey = `${queueKeysParent.toKey(
node.queueName,
parentId,
)}:dependencies`;

const children = await this.addChildren({
multi,
nodes: node.children,
parent: {
parentOpts: {
id: parentId,
queue: queueKeysParent.getQueueQualifiedName(node.queueName),
},
parentDependenciesKey,
},
queuesOpts,
});

return { job, children };
} else {
await job.addJob(<Redis>(multi as unknown), {
parentDependenciesKey: parent?.parentDependenciesKey,
parentKey,
});

return { job };
}
},
);
}

/**
Expand All @@ -349,23 +411,28 @@ export class FlowProducer extends EventEmitter {
* @param nodes - the nodes representing jobs to be added to some queue
* @returns
*/
protected addNodes(multi: ChainableCommander, nodes: FlowJob[]): JobNode[] {
return nodes.map(node => {
const parentOpts = node?.opts?.parent;
const parentKey = getParentKey(parentOpts);
const parentDependenciesKey = parentKey
? `${parentKey}:dependencies`
: undefined;

return this.addNode({
multi,
node,
parent: {
parentOpts,
parentDependenciesKey,
},
});
});
protected addNodes(
multi: ChainableCommander,
nodes: FlowJob[],
): Promise<JobNode[]> {
return Promise.all(
nodes.map(node => {
const parentOpts = node?.opts?.parent;
const parentKey = getParentKey(parentOpts);
const parentDependenciesKey = parentKey
? `${parentKey}:dependencies`
: undefined;

return this.addNode({
multi,
node,
parent: {
parentOpts,
parentDependenciesKey,
},
});
}),
);
}

private async getNode(client: RedisClient, node: NodeOpts): Promise<JobNode> {
Expand Down Expand Up @@ -406,7 +473,9 @@ export class FlowProducer extends EventEmitter {
}

private addChildren({ multi, nodes, parent, queuesOpts }: AddChildrenOpts) {
return nodes.map(node => this.addNode({ multi, node, parent, queuesOpts }));
return Promise.all(
nodes.map(node => this.addNode({ multi, node, parent, queuesOpts })),
);
}

private getChildren(
Expand Down
Loading

0 comments on commit 5ed154b

Please sign in to comment.