Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Add further prometheusMeter improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
juriwiens committed Jun 11, 2020
1 parent 5be2373 commit afd80e8
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 9 deletions.
3 changes: 1 addition & 2 deletions src/batch_consumer.metrics.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import range from "lodash.range"
import { KafkaBatchConsumer } from "./batch_consumer"
import { getErrorType } from "./utils/error_type"
import type { PrometheusMeter } from "./prometheus-meter-interface"
import type { PrometheusMeter } from "./prometheus_meter"

export class KafkaBatchConsumerMetrics {
constructor(
Expand Down
17 changes: 12 additions & 5 deletions src/fastify_plugin.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
import fastifyPlugin from "fastify-plugin"
import { librdkafkaVersion } from "node-rdkafka"
import { KafkaBatchConsumer, KafkaBatchConsumerConfig } from "./batch_consumer"
import { KafkaProducer, KafkaProducerConfig } from "./producer"
import { KafkaProducerLogging } from "./producer.logging"
import { KafkaProducerMetrics } from "./producer.metrics"
import { KafkaBatchConsumerMetrics } from "./batch_consumer.metrics"
import { PrometheusMeter } from "./prometheus-meter-interface"
import { KafkaBatchConsumerLogging } from "./batch_consumer.logging"
import type { PrometheusMeter } from "./prometheus_meter"

export const kafkaFastifyPlugin: Plugin = fastifyPlugin(
async (app, opts: KafkaPluginOptions) => {
const prometheusMeter = opts.prometheusMeter || app.prometheusMeter
app.log.info(
"Initializing kafka-plugin using librdkafka %s",
librdkafkaVersion,
)
if (opts.producer) {
app.log.debug("Found kafka producer config")
const producer = new KafkaProducer(opts.producer)
KafkaProducerLogging.observe(app.log, producer)
if (opts.prometheusMeter) {
new KafkaProducerMetrics(opts.prometheusMeter).observe(producer)
if (prometheusMeter) {
new KafkaProducerMetrics(prometheusMeter).observe(producer)
}
await producer.connect()
app.decorate("kafkaProducer", producer)
Expand All @@ -24,8 +30,8 @@ export const kafkaFastifyPlugin: Plugin = fastifyPlugin(
app.log.debug("Found kafka consumer config")
const consumer = new KafkaBatchConsumer(opts.consumer)
KafkaBatchConsumerLogging.observe(app.log, consumer)
if (opts.prometheusMeter) {
new KafkaBatchConsumerMetrics(opts.prometheusMeter).observe(consumer)
if (prometheusMeter) {
new KafkaBatchConsumerMetrics(prometheusMeter).observe(consumer)
}
await consumer.connect()
app.decorate("kafkaConsumer", consumer)
Expand Down Expand Up @@ -63,5 +69,6 @@ declare module "fastify" {
interface FastifyInstance {
kafkaConsumer?: KafkaBatchConsumer
kafkaProducer?: KafkaProducer
prometheusMeter?: PrometheusMeter
}
}
1 change: 0 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ export * from "./partitioner"
export * from "./producer"
export * from "./producer.metrics"
export * from "./producer.logging"
export * from "./prometheus-meter-interface"
export * from "./serializer"
export * from "./topic_processor"
2 changes: 1 addition & 1 deletion src/producer.metrics.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { KafkaProducer } from "./producer"
import { getErrorType } from "./utils/error_type"
import type { PrometheusMeter } from "./prometheus-meter-interface"
import type { PrometheusMeter } from "./prometheus_meter"

export class KafkaProducerMetrics {
constructor(
Expand Down

0 comments on commit afd80e8

Please sign in to comment.