-
Notifications
You must be signed in to change notification settings - Fork 410
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(queue): observability for queue #2721
Conversation
src/classes/queue.ts
Outdated
@@ -250,6 +270,11 @@ export class Queue< | |||
}, | |||
); | |||
this.emit('waiting', job); | |||
|
|||
if (this.tracer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't be useful to store the job.id in the span? (I have no idea, just wondering if this is something that maybe is needed in order to match this trace when the trace continues to the worker or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id won't be available as traces are happening before addition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@roggervalf yes, but it is an attribute that can be added to the span using setAttributes
src/classes/queue.ts
Outdated
}); | ||
} | ||
|
||
const bulk = this.Job.createBulk<DataType, ResultType, NameType>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to await here, as createBulk is an asynchronous operation, otherwise the span will actually end before the jobs are actually added to Redis.
src/classes/queue.ts
Outdated
if (!this.closing) { | ||
if (this._repeat) { | ||
await this._repeat.close(); | ||
} | ||
} | ||
return super.close(); | ||
|
||
super.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to await here as well as super.close() is asynchronous.
@@ -0,0 +1,18 @@ | |||
export enum OpenTelemetryAttributes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this interface is generic, it should be called just TelemetryAttributes
src/interfaces/opentelemetry.ts
Outdated
@@ -0,0 +1,25 @@ | |||
export interface Telemetry { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opentelemetry.ts -> telemetry.ts
src/interfaces/opentelemetry.ts
Outdated
} | ||
|
||
export interface Tracer { | ||
startSpan(name: string): Span; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this method startSpan wouldn't need to be asynchronous?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(same with end() method)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like it's synchronous so no need to use async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are few other package/repos where you can base like:
https://github.com/appsignal/opentelemetry-instrumentation-bullmq
https://github.com/jenniferplusplus/opentelemetry-instrumentation-bullmq
Where authors were trying to follow openTel conventions
src/classes/queue.ts
Outdated
const spanName = `${this.name}.${name} Queue.add`; | ||
span = this.tracer.startSpan(spanName); | ||
span.setAttributes({ | ||
[OpenTelemetryAttributes.QueueName]: this.name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are other attributes that can be added like:
span.setAttributes({
[SemanticAttributes.MESSAGING_SYSTEM]: 'BullMQ', // it could be a constant
[SemanticAttributes.MESSAGING_DESTINATION]: this.name,
[OpenTelemetryAttributes.JOB_NAME]: name
});
You can use this package https://www.npmjs.com/package/@opentelemetry/semantic-conventions for semantic conventions
src/classes/queue.ts
Outdated
@@ -250,6 +270,11 @@ export class Queue< | |||
}, | |||
); | |||
this.emit('waiting', job); | |||
|
|||
if (this.tracer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id won't be available as traces are happening before addition
src/classes/queue.ts
Outdated
return this.Job.createBulk<DataType, ResultType, NameType>( | ||
let span; | ||
if (this.tracer) { | ||
const jobsInBulk = jobs.map(job => job.name); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like there is a way to span different values when doing bulk operation https://github.com/appsignal/opentelemetry-instrumentation-bullmq/blob/main/src/instrumentation.ts#L285-L325
src/classes/queue.ts
Outdated
@@ -299,12 +356,26 @@ export class Queue< | |||
* | |||
*/ | |||
async close(): Promise<void> { | |||
let span; | |||
if (this.tracer) { | |||
const spanName = `${this.name} Queue.close`; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no sure if we need this span for now 🤔. Maybe we can start with add operations and then expand with other values while receiving more requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will leave it for now i think and delete if it were to cause some problems
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still I think to expose few metrics for now as needed, I don't see that the other packages are exposing so many. If we expose them now and then we want to remove, we will wait until a next breaking change. could you consider to spans same methods as in these packages #2721 (review) that looks like people are using now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My two cents: maybe close in particular is not so useful, however I do not think we need to implement only the same methods as in the existing packages, we can make it more powerful right in the first version. In the future we may even be able to make it configurable, which methods and which not to add spans to.
src/interfaces/opentelemetry.ts
Outdated
} | ||
|
||
export interface Tracer { | ||
startSpan(name: string): Span; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like it's synchronous so no need to use async
One thing that I forgot to mention, the interface should be properly documented using typedoc syntax, similarly as we document the rest of the API: https://typedoc.org/example/ |
src/classes/worker.ts
Outdated
} catch (error) { | ||
this.running = false; | ||
throw error; | ||
} finally { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This made me think, why do we need to make sure the span ends even in the case of an exception, but not in all other places? As most functions perform asynchronous calls, they could also potentially raise an exception. Have you thought about that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, in the case of error, shouldn't we store this information in the span? just wondering here... how does the current open telemetry for BullMQ package handle exceptions?
src/classes/worker.ts
Outdated
span.setAttributes({ | ||
[TelemetryAttributes.WorkerName]: this.name, | ||
[TelemetryAttributes.WorkerId]: this.id, | ||
[TelemetryAttributes.WorkerJobsInvolved]: JSON.stringify(jobs), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't we also want to store all the IDs of the jobs whose locks we are extending?
src/classes/worker.ts
Outdated
await this.client, | ||
await this.blockingConnection.client, | ||
token, | ||
{ block }, | ||
); | ||
|
||
if (this.tracer) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In order to match the job that was added to the queue we also need to specify the job.id here in the span attributes.
src/classes/worker.ts
Outdated
@@ -1065,6 +1239,10 @@ will never work with more accuracy than 1ms. */ | |||
} | |||
|
|||
this.notifyFailedJobs(await Promise.all(jobPromises)); | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to store the ids of the failed/stalled jobs here because they need to be mapped to the jobs added to the queue so that we can trace all the way of a job added to the queue to the place where they complete/fail/stall
[TelemetryAttributes.BulkCount]: jobs.length, | ||
}); | ||
|
||
return await this.Job.createBulk<DataType, ResultType, NameType>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of addBulk, we are adding a bunch of jobs, so createBulk will return an array of jobs including their ids. However, later on, in the worker, when these jobs are processed, they are not processed in a bulk, so the question here is how do we handle it? should the span created by the worker when processing a job from this bulk span be mapped to it somehow? can a span have children?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, the spans seems to form a parent-child hierarchy in Otel at least: https://opentelemetry.io/docs/concepts/signals/traces/
Does it make sense that a span created for adding a job to the queue (or a bunch of jobs in a bulk) be the parent span of spans created later when the jobs are being processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The question is how would a worker when processing a job, find the parent span for that job, unless we could use the job Id for the span id...
src/classes/queue.ts
Outdated
() => `${this.name}.${name} Queue.add`, | ||
async span => { | ||
span?.setAttributes({ | ||
[TelemetryAttributes.QueueName]: this.name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this line:
span?.setAttributes({
[TelemetryAttributes.QueueName]: this.name,
});
is repeated on every span, couldn't we just move it to the "trace" method in QueueBase?
src/classes/queue.ts
Outdated
let deletedCount = 0; | ||
const deletedJobsIds: string[] = []; | ||
|
||
span?.setAttributes({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to move all attributes here instead of splitting in 2 calls to setAttributes
src/classes/queue.ts
Outdated
if (jobId == '0' || jobId?.startsWith('0:')) { | ||
throw new Error("JobId cannot be '0' or start with 0:"); | ||
} | ||
return this.trace( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for all the calls to trace we need to specify the correct return type, for example in this method it would look like this:
return this.trace<Job<DataType, ResultType, NameType>>(
This will help in reducing issues derived from returning the wrong type in the trace callback.
src/classes/worker.ts
Outdated
token, | ||
{ block }, | ||
); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is possible that nextJob is undefined here, so we need to check before trying to set the attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I am not sure we need to create a span at all if there is no next job :thinking_face:
src/classes/worker.ts
Outdated
span?.setAttributes({ | ||
[TelemetryAttributes.WorkerName]: this.name, | ||
[TelemetryAttributes.WorkerId]: this.id, | ||
[TelemetryAttributes.WorkerJobsInvolved]: JSON.stringify(jobs), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we want to store the whole jobs here, job ids should be enough, otherwise the amount of memory required can be huge.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing, why would this attribute be called WorkerJobsIdsInvolved? what's its purpose?
src/classes/queue.ts
Outdated
@@ -221,13 +221,9 @@ export class Queue< | |||
data: DataType, | |||
opts?: JobsOptions, | |||
): Promise<Job<DataType, ResultType, NameType>> { | |||
return this.trace( | |||
return await this.trace<Job<DataType, ResultType, NameType>>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to await if we are only returning and not waiting for the result. Same for all the places where there is a return followed by an await.
src/classes/queue-base.ts
Outdated
@@ -191,6 +192,10 @@ export class QueueBase extends EventEmitter implements MinimalQueue { | |||
|
|||
const span = this.tracer.startSpan(getSpanName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should also set the span type, either producer (queue) or consumer (worker), as this is useful later on specially when implementing Otel. Maybe a new argument to trace like spanType:
protected trace<T>(
spanType: SpanType // Producer, Consumer or Internal.
getSpanName: () => string,
callback: (span?: Span) => Promise<T> | T,
) {
src/classes/queue-base.ts
Outdated
* @returns | ||
*/ | ||
protected trace<T>( | ||
getSpanType: () => SpanKind, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not need to have a function for getting the span type here, the reason for having it for the span name is because as the span name usually requires some computation (like concatenating strings, etc), by having it as a callback it does not perform those computations if telemetry is not enabled. For the spanType we are just passing a const, so this will not impact performance in any way.
src/classes/queue-base.ts
Outdated
activeTelemetryHeaders, | ||
); | ||
|
||
return this.contextManager.with(activeContext, () => callback(span)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume contextManager.with will (for the Otel case), create a parent-child relationship for the spans?
src/classes/queue-base.ts
Outdated
activeTelemetryHeaders, | ||
); | ||
|
||
return this.contextManager.with(activeContext, () => callback(span)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, in this case we need to call it like awaiting like this:
return await this.contextManager.with(activeContext, () => callback(span));
The reason being that we want to catch potential exceptions thrown by the callback, without this await the exception will just bubble up and we loose the ability to record the exception in the tracer.
src/classes/queue-base.ts
Outdated
|
||
this.propagation.inject(this.contextManager.active(), telemetryHeaders); | ||
|
||
return this.contextManager.with( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same issue here we need to await before returning.
src/classes/queue.ts
Outdated
if (+new Date(opts.repeat.endDate) < Date.now()) { | ||
throw new Error('End date must be greater than current timestamp'); | ||
return this.trace<Job<DataType, ResultType, NameType>>( | ||
() => 3, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should always use enums or constants instead of magic numbers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case SpanType.PRODUCER (I think)
src/classes/queue-base.ts
Outdated
|
||
const telemetryHeaders: Record<string, string> = {}; | ||
|
||
this.propagation.inject(this.contextManager.active(), telemetryHeaders); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, so we are going to always inject the headers, hmm, we may need to give it a bit more of thought, not sure this is 100% correct.
src/classes/queue.ts
Outdated
() => `${this.name}.${name} Queue.add`, | ||
async (span, telemetryHeaders) => { | ||
if (telemetryHeaders) { | ||
data = { ...data, telemetryHeaders }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The telemetry data should be stored in the options, as the data is user data which we should not tamper with.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we probably need to define a "telemetryData" or similar optional option in JobOpts with type "any" so that we can store the telemetry data there.
src/classes/queue.ts
Outdated
name: job.name, | ||
data: { | ||
...job.data, | ||
...(span && telemetryHeaders), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should go on the options object.
} | ||
|
||
export interface ContextManager { | ||
with<A extends (...args: any[]) => any>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation for "with":
Creates a new context and sets it as active for the fn passed as last argument
src/interfaces/telemetry.ts
Outdated
type HighResolutionTime = [number, number]; | ||
|
||
export interface Propagation { | ||
inject<T>(context: Context, carrier: T, setter?: TextMapSetter): void; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are using the propagation in a very controlled environment I am not sure we need the setter, if so better remove it to make integrations easier.
src/interfaces/worker-options.ts
Outdated
@@ -144,6 +145,11 @@ export interface WorkerOptions extends QueueBaseOptions { | |||
* @default false | |||
*/ | |||
useWorkerThreads?: boolean; | |||
|
|||
/** | |||
* Telemetry client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename comment to "Telemetry Addon"
src/interfaces/telemetry.ts
Outdated
get<T>(carrier: T, key: string): undefined | string | string[]; | ||
} | ||
|
||
export interface JobDataWithHeaders { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move telemetryHeaders
to BaseJobOptions
: https://github.com/taskforcesh/bullmq/blob/master/src/interfaces/base-job-options.ts#L77
I would rename it to maybe "telemetryMetadata", later when serialising the options before storing in Redis we will optimise it to a shorter name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or maybe a better name: telemetryPropagation
not sure.
src/classes/queue.ts
Outdated
if (+new Date(opts.repeat.endDate) < Date.now()) { | ||
throw new Error('End date must be greater than current timestamp'); | ||
return this.trace<Job<DataType, ResultType, NameType>>( | ||
() => 3, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case SpanType.PRODUCER (I think)
…tests for telemetry interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! But please address the new and previous comments.
src/classes/queue-base.ts
Outdated
@@ -30,6 +40,16 @@ export class QueueBase extends EventEmitter implements MinimalQueue { | |||
protected connection: RedisConnection; | |||
public readonly qualifiedName: string; | |||
|
|||
/** | |||
* Instance of a telemetry client | |||
* To use it create if statement in a method to observe with start and end of a span |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think this comment is correct anymore as we are using the trace helper.
src/classes/queue-base.ts
Outdated
this.propagation = opts.telemetry.propagation; | ||
|
||
this.contextManager.getMetadata = (context: Context) => { | ||
const metadata = {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getMetadata
and fromMetadata
should be implemented in the integration. For Otel for example they will use the propagation module, but we do not need to expose the propagation module in our generic telemetry interface.
src/classes/queue-base.ts
Outdated
@@ -175,4 +216,61 @@ export class QueueBase extends EventEmitter implements MinimalQueue { | |||
} | |||
} | |||
} | |||
|
|||
/** | |||
* Wraps the code with telemetry and provides span for configuration. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"... provides a span for configuration."
src/classes/queue.ts
Outdated
await this.scripts.pause(true); | ||
this.emit('paused'); | ||
await this.trace<void>( | ||
SpanKind.PRODUCER, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case I do not think the kind is PRODUCER, as we are just pausing the queue, not producing any jobs. Probably INTERNAL is the correct one, not sure.
src/classes/queue.ts
Outdated
} | ||
return super.close(); | ||
await this.trace<void>( | ||
SpanKind.PRODUCER, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, should not be PRODUCER
src/classes/queue.ts
Outdated
const client = await this.client; | ||
return client.xtrim(this.keys.events, 'MAXLEN', '~', maxLength); | ||
return this.trace<number>( | ||
SpanKind.PRODUCER, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am doubting if it is correct or necessary to have trace on all these methods actually. Because the most important duty of the telemetry is to be able to trace a job from its creation to its completion. But sure there may be situations where a job gets removed manually and things like that, probably we want to trace them too. But "pausing" a queue, or "trimEvents", it does not look like this is something we want to have spans for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trimEvents I guess is not needed for tracing at all.
src/classes/queue.ts
Outdated
@@ -424,7 +510,18 @@ export class Queue< | |||
jobId: string, | |||
progress: number | object, | |||
): Promise<void> { | |||
return this.scripts.updateProgress(jobId, progress); | |||
await this.trace<void>( | |||
SpanKind.PRODUCER, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating a job progress should somehow also map to the span that created the job. Also the SpanKind cannot be PRODUCER.
tests/test_telemetry_interface.ts
Outdated
end(): void {} | ||
} | ||
|
||
class MockPropagation implements Propagation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not be needed as Propagation is OTel implementation specific
tests/test_telemetry_interface.ts
Outdated
|
||
expect.fail('Expected an error to be thrown'); | ||
} catch (error) { | ||
span.recordException(error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not need to call recordException here as this is supposed to be called inside queue.add when an exception occurs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, the issue you have here is that you are throwing an exception as soon as queue.add is called, but by doing so we are not really testing the try/catch inside queue.add
So you could try to throw an error when calling Job.create
instead which is called inside queue.add
.
tests/test_telemetry_interface.ts
Outdated
const span = telemetryClient.trace | ||
.getTracer('testtracer') | ||
.startSpan('Queue.addBulk.error') as MockSpan; | ||
const recordExceptionSpy = sinon.spy(span, 'recordException'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above, in order for this test to be meaningful it should throw an exception from inside addBulk.
src/classes/queue.ts
Outdated
cursor = await this.scripts.promoteJobs(opts.count); | ||
} while (cursor); | ||
await this.trace<void>( | ||
3, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure to change the 3 to KindSpan enum.
… for shorter names
src/classes/worker.ts
Outdated
const client = await this.client; | ||
const bclient = await this.blockingConnection.client; | ||
await this.trace<void>( | ||
SpanKind.CONSUMER, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think this is the correct type, as this span is not consuming but starting the main worker loop.
src/classes/worker.ts
Outdated
token, | ||
{ block }, | ||
return this.trace<Job<DataType, ResultType, NameType>>( | ||
SpanKind.CONSUMER, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should still be kind INTERNAL, because I do not think we should have more than one kind CONSUMER per job, and in this case getNextJob would lead to processJob which also has kind CONSUMER
src/classes/worker.ts
Outdated
if (!job || this.closing || this.paused) { | ||
return; | ||
} | ||
const { telemetryMetadata: dstPropagationMedatada } = job.opts; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this actually be the "source propagation metadata" ?
} | ||
const { telemetryMetadata: dstPropagationMedatada } = job.opts; | ||
|
||
return this.trace<void | Job<DataType, ResultType, NameType>>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we need to pass the source propagation metadata as last parameter here so that it maps the job added by the PRODUCER?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the job can be completed or failed in this case, shouldn't we mark the span with this information somehow? Could you please check the old Otel implementation to see how completed and failed jobs are handled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the above comment I think you missed it before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote some comments, please do not forget to address the older comments as well.
src/classes/queue-base.ts
Outdated
{ | ||
kind: spanKind, | ||
}, | ||
currentContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If srcPropagationMetadata is not defined, nor will currentContext, is this correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, startSpan method will be called without existing context, later it is bond to a new active context with setSpan method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But setSpan is only called if spanKind is of type PRODUCER.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I addressed it in newest commit
src/classes/worker.ts
Outdated
this.emit('completed', job, result, 'active'); | ||
const [jobData, jobId, limitUntil, delayUntil] = completed || []; | ||
this.updateDelays(limitUntil, delayUntil); | ||
if (!job || this.closing || this.paused) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is wrong, if there is no job, we should not create a span here. It is common that there is no job to process when this function is called, so this should be left as it was, i.e. returning before creating a new span.
} | ||
const { telemetryMetadata: dstPropagationMedatada } = job.opts; | ||
|
||
return this.trace<void | Job<DataType, ResultType, NameType>>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the above comment I think you missed it before.
src/classes/worker.ts
Outdated
[TelemetryAttributes.WorkerId]: this.id, | ||
}); | ||
|
||
if (this.resumeWorker) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, we should not create a span if we are not resuming (as this check if used to see if the worker is paused to begin with).
} catch (err) { | ||
this.emit('error', <Error>err); | ||
async close(force = false): Promise<void> { | ||
await this.trace<void>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not need to create a span if we are already closing (as somebody already called this method before and is wrongly calling it again).
@@ -958,20 +1044,32 @@ will never work with more accuracy than 1ms. */ | |||
* @see {@link https://docs.bullmq.io/patterns/manually-fetching-jobs} | |||
*/ | |||
async startStalledCheckTimer(): Promise<void> { | |||
if (!this.opts.skipStalledCheck) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, do not create a span if we are skipping the stalled checker. The spans are not just for wrapping methods, but if there is actually something useful to do in them! :)
…there is a point to
…or every SpanKind
…eat/opentelemetry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…eat/opentelemetry
# [5.22.0](v5.21.2...v5.22.0) (2024-10-31) ### Bug Fixes * **commands:** add missing build statement when releasing [python] ([#2869](#2869)) fixes [#2868](#2868) ([ff2a47b](ff2a47b)) ### Features * **job:** add getChildrenValues method [python] ([#2853](#2853)) ([0f25213](0f25213)) * **queue:** add a telemetry interface ([#2721](#2721)) ([273b574](273b574))
This PR contains the following updates: | Package | Type | Update | Change | |---|---|---|---| | [bullmq](https://bullmq.io/) ([source](https://github.com/taskforcesh/bullmq)) | dependencies | minor | [`5.21.2` -> `5.23.0`](https://renovatebot.com/diffs/npm/bullmq/5.21.2/5.23.0) | --- ### Release Notes <details> <summary>taskforcesh/bullmq (bullmq)</summary> ### [`v5.23.0`](https://github.com/taskforcesh/bullmq/releases/tag/v5.23.0) [Compare Source](taskforcesh/bullmq@v5.22.0...v5.23.0) ##### Features - **scheduler:** add getJobScheduler method ([#​2877](taskforcesh/bullmq#2877)) ref [#​2875](taskforcesh/bullmq#2875) ([956d98c](taskforcesh/bullmq@956d98c)) ### [`v5.22.0`](https://github.com/taskforcesh/bullmq/releases/tag/v5.22.0) [Compare Source](taskforcesh/bullmq@v5.21.2...v5.22.0) ##### Bug Fixes - **commands:** add missing build statement when releasing \[python] ([#​2869](taskforcesh/bullmq#2869)) fixes [#​2868](taskforcesh/bullmq#2868) ([ff2a47b](taskforcesh/bullmq@ff2a47b)) ##### Features - **job:** add getChildrenValues method \[python] ([#​2853](taskforcesh/bullmq#2853)) ([0f25213](taskforcesh/bullmq@0f25213)) - **queue:** add a telemetry interface ([#​2721](taskforcesh/bullmq#2721)) ([273b574](taskforcesh/bullmq@273b574)) </details> --- ### Configuration 📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check this box --- This PR has been generated by [Renovate Bot](https://github.com/renovatebot/renovate). <!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiIzOC4xNDAuMiIsInVwZGF0ZWRJblZlciI6IjM4LjE0Mi4zIiwidGFyZ2V0QnJhbmNoIjoibWFpbiIsImxhYmVscyI6WyJkZXBlbmRlbmNpZXMiXX0=--> Reviewed-on: https://git.tristess.app/alexandresoro/ouca/pulls/290 Reviewed-by: Alexandre Soro <code@soro.dev> Co-authored-by: renovate <renovate@git.tristess.app> Co-committed-by: renovate <renovate@git.tristess.app>
For the record, we are trying to adhere to the following guidelines: https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#conventions