From afd80e82fc65ccdca997c0fa2e4e6d2e4890a7b8 Mon Sep 17 00:00:00 2001 From: Juri Wiens Date: Thu, 11 Jun 2020 13:25:31 +0200 Subject: [PATCH] Add further prometheusMeter improvements --- src/batch_consumer.metrics.ts | 3 +-- src/fastify_plugin.ts | 17 ++++++++++++----- src/index.ts | 1 - src/producer.metrics.ts | 2 +- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/batch_consumer.metrics.ts b/src/batch_consumer.metrics.ts index 2c4a9b5..980e8ea 100644 --- a/src/batch_consumer.metrics.ts +++ b/src/batch_consumer.metrics.ts @@ -1,7 +1,6 @@ import range from "lodash.range" import { KafkaBatchConsumer } from "./batch_consumer" -import { getErrorType } from "./utils/error_type" -import type { PrometheusMeter } from "./prometheus-meter-interface" +import type { PrometheusMeter } from "./prometheus_meter" export class KafkaBatchConsumerMetrics { constructor( diff --git a/src/fastify_plugin.ts b/src/fastify_plugin.ts index 416bbbc..822a64a 100644 --- a/src/fastify_plugin.ts +++ b/src/fastify_plugin.ts @@ -1,20 +1,26 @@ import fastifyPlugin from "fastify-plugin" +import { librdkafkaVersion } from "node-rdkafka" import { KafkaBatchConsumer, KafkaBatchConsumerConfig } from "./batch_consumer" import { KafkaProducer, KafkaProducerConfig } from "./producer" import { KafkaProducerLogging } from "./producer.logging" import { KafkaProducerMetrics } from "./producer.metrics" import { KafkaBatchConsumerMetrics } from "./batch_consumer.metrics" -import { PrometheusMeter } from "./prometheus-meter-interface" import { KafkaBatchConsumerLogging } from "./batch_consumer.logging" +import type { PrometheusMeter } from "./prometheus_meter" export const kafkaFastifyPlugin: Plugin = fastifyPlugin( async (app, opts: KafkaPluginOptions) => { + const prometheusMeter = opts.prometheusMeter || app.prometheusMeter + app.log.info( + "Initializing kafka-plugin using librdkafka %s", + librdkafkaVersion, + ) if (opts.producer) { app.log.debug("Found kafka producer config") const producer = new KafkaProducer(opts.producer) KafkaProducerLogging.observe(app.log, producer) - if (opts.prometheusMeter) { - new KafkaProducerMetrics(opts.prometheusMeter).observe(producer) + if (prometheusMeter) { + new KafkaProducerMetrics(prometheusMeter).observe(producer) } await producer.connect() app.decorate("kafkaProducer", producer) @@ -24,8 +30,8 @@ export const kafkaFastifyPlugin: Plugin = fastifyPlugin( app.log.debug("Found kafka consumer config") const consumer = new KafkaBatchConsumer(opts.consumer) KafkaBatchConsumerLogging.observe(app.log, consumer) - if (opts.prometheusMeter) { - new KafkaBatchConsumerMetrics(opts.prometheusMeter).observe(consumer) + if (prometheusMeter) { + new KafkaBatchConsumerMetrics(prometheusMeter).observe(consumer) } await consumer.connect() app.decorate("kafkaConsumer", consumer) @@ -63,5 +69,6 @@ declare module "fastify" { interface FastifyInstance { kafkaConsumer?: KafkaBatchConsumer kafkaProducer?: KafkaProducer + prometheusMeter?: PrometheusMeter } } diff --git a/src/index.ts b/src/index.ts index a03e625..a13ec70 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,6 +9,5 @@ export * from "./partitioner" export * from "./producer" export * from "./producer.metrics" export * from "./producer.logging" -export * from "./prometheus-meter-interface" export * from "./serializer" export * from "./topic_processor" diff --git a/src/producer.metrics.ts b/src/producer.metrics.ts index 9516c32..9e4ad6b 100644 --- a/src/producer.metrics.ts +++ b/src/producer.metrics.ts @@ -1,6 +1,6 @@ import { KafkaProducer } from "./producer" import { getErrorType } from "./utils/error_type" -import type { PrometheusMeter } from "./prometheus-meter-interface" +import type { PrometheusMeter } from "./prometheus_meter" export class KafkaProducerMetrics { constructor(