Skip to content

Commit

Permalink
feat(node): Add kafkajs integration (#13528)
Browse files Browse the repository at this point in the history
  • Loading branch information
onurtemizkan authored Sep 10, 2024
1 parent cef6986 commit 1285e4b
Show file tree
Hide file tree
Showing 16 changed files with 196 additions and 56 deletions.
1 change: 1 addition & 0 deletions dev-packages/node-integration-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"graphql": "^16.3.0",
"http-terminator": "^3.2.0",
"ioredis": "^5.4.1",
"kafkajs": "2.2.4",
"mongodb": "^3.7.3",
"mongodb-memory-server-global": "^7.6.3",
"mongoose": "^5.13.22",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
services:
db:
image: apache/kafka:latest
restart: always
container_name: integration-tests-kafka
ports:
- '9092:9092'
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
const { loggingTransport } = require('@sentry-internal/node-integration-tests');
const Sentry = require('@sentry/node');

Sentry.init({
dsn: 'https://public@dsn.ingest.sentry.io/1337',
release: '1.0',
tracesSampleRate: 1.0,
transport: loggingTransport,
});

// Stop the process from exiting before the transaction is sent
setInterval(() => {}, 1000);

const { Kafka } = require('kafkajs');

async function run() {
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});

const admin = kafka.admin();
await admin.connect();

const producer = kafka.producer();
await producer.connect();

await admin.createTopics({
topics: [{ topic: 'test-topic' }],
});

const consumer = kafka.consumer({
groupId: 'test-group',
});

await consumer.connect();
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true });

consumer.run({
eachMessage: async ({ message }) => {
// eslint-disable-next-line no-console
console.debug('Received message', message.value.toString());
},
});

// Wait for the consumer to be ready
await new Promise(resolve => setTimeout(resolve, 4000));

await producer.send({
topic: 'test-topic',
messages: [
{
value: 'TEST_MESSAGE',
},
],
});

// Wait for the message to be received
await new Promise(resolve => setTimeout(resolve, 5000));
}

// eslint-disable-next-line @typescript-eslint/no-floating-promises
run();
55 changes: 55 additions & 0 deletions dev-packages/node-integration-tests/suites/tracing/kafkajs/test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { cleanupChildProcesses, createRunner } from '../../../utils/runner';

// When running docker compose, we need a larger timeout, as this takes some time...
jest.setTimeout(60_000);

describe('kafkajs', () => {
afterAll(() => {
cleanupChildProcesses();
});

test('traces producers and consumers', done => {
createRunner(__dirname, 'scenario.js')
.withDockerCompose({
workingDirectory: [__dirname],
readyMatches: ['9092'],
})
.expect({
transaction: {
transaction: 'test-topic',
contexts: {
trace: expect.objectContaining({
op: 'message',
status: 'ok',
data: expect.objectContaining({
'messaging.system': 'kafka',
'messaging.destination': 'test-topic',
'otel.kind': 'PRODUCER',
'sentry.op': 'message',
'sentry.origin': 'auto.kafkajs.otel.producer',
}),
}),
},
},
})
.expect({
transaction: {
transaction: 'test-topic',
contexts: {
trace: expect.objectContaining({
op: 'message',
status: 'ok',
data: expect.objectContaining({
'messaging.system': 'kafka',
'messaging.destination': 'test-topic',
'otel.kind': 'CONSUMER',
'sentry.op': 'message',
'sentry.origin': 'auto.kafkajs.otel.consumer',
}),
}),
},
},
})
.start(done);
});
});
1 change: 1 addition & 0 deletions packages/astro/src/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export {
inboundFiltersIntegration,
initOpenTelemetry,
isInitialized,
kafkaIntegration,
koaIntegration,
lastEventId,
linkedErrorsIntegration,
Expand Down
1 change: 1 addition & 0 deletions packages/aws-serverless/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export {
fsIntegration,
genericPoolIntegration,
graphqlIntegration,
kafkaIntegration,
mongoIntegration,
mongooseIntegration,
mysqlIntegration,
Expand Down
1 change: 1 addition & 0 deletions packages/bun/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ export {
setupConnectErrorHandler,
genericPoolIntegration,
graphqlIntegration,
kafkaIntegration,
mongoIntegration,
mongooseIntegration,
mysqlIntegration,
Expand Down
1 change: 1 addition & 0 deletions packages/google-cloud-serverless/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export {
fastifyIntegration,
genericPoolIntegration,
graphqlIntegration,
kafkaIntegration,
mongoIntegration,
mongooseIntegration,
mysqlIntegration,
Expand Down
1 change: 1 addition & 0 deletions packages/node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
"@opentelemetry/instrumentation-hapi": "0.41.0",
"@opentelemetry/instrumentation-http": "0.53.0",
"@opentelemetry/instrumentation-ioredis": "0.43.0",
"@opentelemetry/instrumentation-kafkajs": "0.3.0",
"@opentelemetry/instrumentation-koa": "0.43.0",
"@opentelemetry/instrumentation-mongodb": "0.47.0",
"@opentelemetry/instrumentation-mongoose": "0.42.0",
Expand Down
1 change: 1 addition & 0 deletions packages/node/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export { anrIntegration } from './integrations/anr';
export { expressIntegration, expressErrorHandler, setupExpressErrorHandler } from './integrations/tracing/express';
export { fastifyIntegration, setupFastifyErrorHandler } from './integrations/tracing/fastify';
export { graphqlIntegration } from './integrations/tracing/graphql';
export { kafkaIntegration } from './integrations/tracing/kafka';
export { mongoIntegration } from './integrations/tracing/mongo';
export { mongooseIntegration } from './integrations/tracing/mongoose';
export { mysqlIntegration } from './integrations/tracing/mysql';
Expand Down
3 changes: 3 additions & 0 deletions packages/node/src/integrations/tracing/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { fastifyIntegration, instrumentFastify } from './fastify';
import { genericPoolIntegration, instrumentGenericPool } from './genericPool';
import { graphqlIntegration, instrumentGraphql } from './graphql';
import { hapiIntegration, instrumentHapi } from './hapi';
import { instrumentKafka, kafkaIntegration } from './kafka';
import { instrumentKoa, koaIntegration } from './koa';
import { instrumentMongo, mongoIntegration } from './mongo';
import { instrumentMongoose, mongooseIntegration } from './mongoose';
Expand Down Expand Up @@ -39,6 +40,7 @@ export function getAutoPerformanceIntegrations(): Integration[] {
koaIntegration(),
connectIntegration(),
genericPoolIntegration(),
kafkaIntegration(),
];
}

Expand All @@ -53,6 +55,7 @@ export function getOpenTelemetryInstrumentationToPreload(): (((options?: any) =>
instrumentConnect,
instrumentFastify,
instrumentHapi,
instrumentKafka,
instrumentKoa,
instrumentNest,
instrumentMongo,
Expand Down
37 changes: 37 additions & 0 deletions packages/node/src/integrations/tracing/kafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { KafkaJsInstrumentation } from '@opentelemetry/instrumentation-kafkajs';

import { defineIntegration } from '@sentry/core';
import type { IntegrationFn } from '@sentry/types';
import { generateInstrumentOnce } from '../../otel/instrument';
import { addOriginToSpan } from '../../utils/addOriginToSpan';

const INTEGRATION_NAME = 'Kafka';

export const instrumentKafka = generateInstrumentOnce(
INTEGRATION_NAME,
() =>
new KafkaJsInstrumentation({
consumerHook(span) {
addOriginToSpan(span, 'auto.kafkajs.otel.consumer');
},
producerHook(span) {
addOriginToSpan(span, 'auto.kafkajs.otel.producer');
},
}),
);

const _kafkaIntegration = (() => {
return {
name: INTEGRATION_NAME,
setupOnce() {
instrumentKafka();
},
};
}) satisfies IntegrationFn;

/**
* KafkaJs integration
*
* Capture tracing data for KafkaJs.
*/
export const kafkaIntegration = defineIntegration(_kafkaIntegration);
1 change: 1 addition & 0 deletions packages/remix/src/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export {
inboundFiltersIntegration,
initOpenTelemetry,
isInitialized,
kafkaIntegration,
koaIntegration,
lastEventId,
linkedErrorsIntegration,
Expand Down
1 change: 1 addition & 0 deletions packages/solidstart/src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ export {
inboundFiltersIntegration,
initOpenTelemetry,
isInitialized,
kafkaIntegration,
koaIntegration,
lastEventId,
linkedErrorsIntegration,
Expand Down
1 change: 1 addition & 0 deletions packages/sveltekit/src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export {
inboundFiltersIntegration,
initOpenTelemetry,
isInitialized,
kafkaIntegration,
koaIntegration,
lastEventId,
linkedErrorsIntegration,
Expand Down
Loading

0 comments on commit 1285e4b

Please sign in to comment.