-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Insights: auto-Instrument BullMQ for Queue Insights #12956
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
There is OTEL instrumentation we can leverage here: https://github.com/appsignal/opentelemetry-instrumentation-bullmq |
In the time being, we can probably write some docs that uses the otel instrumentation manually with bullmq. So users would have to install opentelemetry-instrumentation-bullmq themselves and then call @bcoe for an intermediate solution what do you think? |
Hi! Our team is also interested in instrumenting our BullMQ workloads running inside of NestJS. I know close to nothing about OTEL or about how Sentry works, but I've been playing with the OTEL instrumentation package that you referenced, and got something, but not much. What I've done is basically trying to integrate the OTEL instrumentation with import { BullMQInstrumentation } from '@appsignal/opentelemetry-instrumentation-bullmq'
import { defineIntegration } from '@sentry/core'
import { generateInstrumentOnce } from '@sentry/node'
import type { IntegrationFn } from '@sentry/types'
const INTEGRATION_NAME = 'BullMQ'
export const instrumentBullMQ = generateInstrumentOnce(
INTEGRATION_NAME,
() =>
new BullMQInstrumentation({}),
)
const _bullmqIntegration = (() => {
return {
name: INTEGRATION_NAME,
setupOnce() {
instrumentBullMQ()
},
}
}) satisfies IntegrationFn
/**
* BullMQ integration
*
* Capture tracing data for BullMQ.
*/
export const bullmqIntegration = defineIntegration(_bullmqIntegration) With this snippet I can add the
However, this transaction is empty. Its duration is just a few milliseconds (shorter than the job duration), and it contains no spans at all, even though there are PG queries inside (PG queries are correctly reported in our regular application tracing). We also see no reference to the producing side of the job, nor we see anything under the Queues section of Sentry. Not sure if you were able to get started on those docs you mentioned, but if you have some tips that could unblock us, we'd appreciate it 🙏 |
Hello, thanks for writing in. We are on company-wide hackweek and thus on limited support this week. We'll take a look at this next week. |
Oh cool, have fun and thanks! |
Actually, upon further debugging, I came across a very subtle bug in our instrumented code where we were not using async functions correctly. Once I've fixed that, I do see database spans inside of the We still don't see anything under the Queues feature, so would still appreciate a note in that regard, but my guess is that the OTEL instrumentation is not precisely compatible with what Sentry expects to show data under Queues? |
@iacobus Thanks for the patience! Very cool that you got the integration working yourself. The likely reason you're not getting any data in the queues module yet is that the performance data needs to have a certain schema to show up under that module. Usually, it is the span For queues, this convention can be found here: https://develop.sentry.dev/sdk/telemetry/traces/modules/queues/ (these are our developer docs) Until we provide support for bullmq out of the box you may be able to hack this in. Keep us posted! |
Thanks @lforst. As relevant context for your team, it looks like the instrumentation library under discussion has opted to follow the OpenTelemetry Semantic Convention for Messaging Spans, which seems to be slightly different from what Sentry Queues defines in its spec. OT standard makes intuitive sense to me, curious to hear your thoughts. Getting Queues to work is not as high priority for us right now, and this library doesn't seem easily customizable, so gonna punt on this for some time. I'll stay tuned to this issue. Thanks for the support. |
As far as I can tell we are also adhering to the otel sem conventions except for the span op - which is not an otel concept. |
I was just looking into this as well this week. I tried forking & working on the appsignal version, but it resulted into events being mixed up in the Sentry UI (some queue events would show up under another queue). I came up with this snippet of code. It's very hacky, but it's working somehow. It handles add(), addBulk() & processJob(). I'm not using flows so I probably won't implement it. I'm sure there are some things to fix in here. Especially around the duplication of spans attributes. const { Queue, Worker } = require('bullmq');
const bullMQIntegration = {
name: 'bullmq',
setupOnce() {
const originalAddBulkQueue = Queue.prototype.addBulk;
Queue.prototype.addBulk = async function (...args) {
const jobs = args[0];
const messageId = crypto.randomBytes(8).toString('hex');
Sentry.startSpan({
name: 'queue_producer_transaction',
op: 'queue.publish',
attributes: {
'sentry.op': 'queue.publish',
'messaging.message.id': messageId,
'messaging.destination.name': this.name,
'messaging.system': 'bullmq'
}
},
async parent => {
const promises = [];
for (let i = 0; i < jobs.length; i++) {
const job = jobs[i];
promises.push(
Sentry.startSpan(
{
name: 'queue_producer',
op: 'queue.create',
attributes: {
'sentry.op': 'queue.create',
'messaging.message.id': messageId,
'messaging.destination.name': this.name,
'messaging.system': 'bullmq'
}
},
async span => {
const traceHeader = Sentry.spanToTraceHeader(span);
const baggageHeader = Sentry.spanToBaggageHeader(span);
const instrumentationData = { traceHeader, baggageHeader, timestamp: Date.now(), messageId };
await redis.lpush(`span:${job.name}`, JSON.stringify(instrumentationData));
}
)
);
}
await Promise.all(promises);
await originalAddBulkQueue.apply(this, args);
});
}
const originalAddQueue = Queue.prototype.add;
Queue.prototype.add = async function (...args) {
const messageId = crypto.randomBytes(8).toString('hex');
Sentry.startSpan({
name: 'queue_producer_transaction',
op: 'queue.publish',
attributes: {
'sentry.op': 'queue.publish',
'messaging.message.id': messageId,
'messaging.destination.name': this.name,
'messaging.system': 'bullmq'
}
},
parent => {
Sentry.startSpan(
{
name: 'queue_producer',
op: 'queue.publish',
attributes: {
'sentry.op': 'queue.publish',
'messaging.message.id': messageId,
'messaging.destination.name': this.name,
'messaging.system': 'bullmq'
}
},
async span => {
const traceHeader = Sentry.spanToTraceHeader(span);
const baggageHeader = Sentry.spanToBaggageHeader(span);
const instrumentationData = { traceHeader, baggageHeader, timestamp: Date.now(), messageId };
await redis.lpush(`span:${args[0]}`, JSON.stringify(instrumentationData));
await originalAddQueue.apply(this, args);
}
);
});
};
const originalRunWorker = Worker.prototype.processJob;
Worker.prototype.processJob = async function(...args) {
const message = JSON.parse(await redis.lpop(`span:${args[0].name}`));
if (!message)
return originalRunWorker.apply(this, args);
const latency = Date.now() - message.timestamp;
Sentry.continueTrace(
{ sentryTrace: message.traceHeader, baggage: message.baggageHeader },
() => {
Sentry.startSpan({
name: 'queue_consumer_transaction',
op: 'queue.process',
attributes: {
'sentry.op': 'queue.process',
'messaging.message.id': message.messageId,
'messaging.destination.name': args[0].queue.name,
'messaging.message.receive.latency': latency,
'messaging.system': 'bullmq'
}
},
parent => {
Sentry.startSpan({
name: 'queue_consumer',
op: 'queue.process',
attributes: {
'sentry.op': 'queue.process',
'messaging.message.id': message.messageId,
'messaging.destination.name': args[0].queue.name,
'messaging.message.receive.latency': latency,
'messaging.system': 'bullmq'
}
}, (span) => {
originalRunWorker.apply(this, args)
parent.setStatus({ code: 1, message: 'ok' });
});
})
}
);
};
},
};
Sentry.init({
dsn: getSentryDsn(),
environment: getNodeEnv() || 'development',
release: `ethernal@${getVersion()}`,
integrations: [
nodeProfilingIntegration(),
Sentry.postgresIntegration,
bullMQIntegration,
],
tracesSampleRate: 1.0,
profilesSampleRate: 1.0,
debug: true
}); And here is what it looks like in Sentry: And each destination contains two lines: queue_producer_transaction (type "Producer", with the number of Published events/error rate/time spent set), and queue_consumer_transaction (type "Consumer", with the avg time in queue/processing time/processed/error rate/time spent set) |
Announcing BullMQ Telemetry Support: https://bullmq.io/news/241104/telemetry-support/ |
@WilliamBlais nice! Your otel spans should be picked up automatically by Sentry 👍 |
Hey @andreiborza, any updates on this? It seems like it's really close to supporting it. Sentry even picks up the OTEL spans from BullMQ when telemetry configuration is provided, but unfortunately they're not shown on the queues tab |
Hey @ThallesP no update at this time! You'll have to add some manual instrumentation for now. Using the |
Problem Statement
Similar to Celery for Python, BullMQ seems like it would be a good choice to auto-instrument for queue insights, see: #10148, along with conversations on Twitter:
Solution Brainstorm
We should model an implementation after the implementation for Celery.
The text was updated successfully, but these errors were encountered: