Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JMX Scraper: Kafka server, producer and consumer YAMLs and integration tests added #1670

Merged
merged 8 commits into from
Feb 14, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -140,51 +140,51 @@ List<Consumer<Metric>> kafkaBrokerAssertions() {
metric,
"kafka.partition.count",
"The number of partitions on the broker",
"{partitions}"),
"{partition}"),
metric ->
assertGauge(
metric,
"kafka.partition.offline",
"The number of partitions offline",
"{partitions}"),
"{partition}"),
metric ->
assertGauge(
metric,
"kafka.partition.under_replicated",
"The number of under replicated partitions",
"{partitions}"),
"{partition}"),
metric ->
assertSumWithAttributes(
metric,
"kafka.isr.operation.count",
"The number of in-sync replica shrink and expand operations",
"{operations}",
"{operation}",
attrs -> attrs.containsOnly(entry("operation", "shrink")),
attrs -> attrs.containsOnly(entry("operation", "expand"))),
metric ->
assertGauge(
metric,
"kafka.controller.active.count",
"controller is active on broker",
"{controllers}"),
"{controller}"),
metric ->
assertSum(
metric,
"kafka.leader.election.rate",
"leader election rate - increasing indicates broker failures",
"{elections}"),
"{election}"),
metric ->
assertGauge(
metric,
"kafka.max.lag",
"max lag in messages between follower and leader replicas",
"{messages}"),
"{message}"),
metric ->
assertSum(
metric,
"kafka.unclean.election.rate",
"unclean leader election rate - increasing indicates broker failures",
"{elections}"));
"{election}"));
}

static class KafkaBrokerTargetIntegrationTest extends KafkaIntegrationTest {
Expand Down Expand Up @@ -235,52 +235,52 @@ void endToEnd() {
metric,
"kafka.consumer.bytes-consumed-rate",
"The average number of bytes consumed per second",
"by",
"By",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.fetch-rate",
"The number of fetch requests for all topics per second",
"1"),
"{request}"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.fetch-size-avg",
"The average number of bytes fetched per request",
"by",
"By",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.records-consumed-rate",
"The average number of records consumed per second",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.records-lag-max",
"Number of messages the consumer lags behind the producer",
"1"),
"{record}"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.bytes-consumed-rate",
"The average number of bytes consumed for all topics per second",
"by"),
"By"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.fetch-size-avg",
"The average number of bytes fetched per request for all topics",
"by"),
"By"),
metric ->
assertKafkaGauge(
metric,
"kafka.consumer.total.records-consumed-rate",
"The average number of records consumed for all topics per second",
"1"));
"{record}"));
}
}

Expand All @@ -300,14 +300,14 @@ void endToEnd() {
metric,
"kafka.producer.byte-rate",
"The average number of bytes sent per second for a topic",
"by",
"By",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.compression-rate",
"The average compression rate of record batches for a topic",
"1",
"{ratio}",
topics),
metric ->
assertKafkaGauge(
Expand All @@ -320,27 +320,27 @@ void endToEnd() {
metric,
"kafka.producer.outgoing-byte-rate",
"The average number of outgoing bytes sent per second to all servers",
"by"),
"By"),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-error-rate",
"The average per-second number of record sends that resulted in errors for a topic",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-retry-rate",
"The average per-second number of retried record sends for a topic",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
metric,
"kafka.producer.record-send-rate",
"The average number of records sent per second for a topic",
"1",
"{record}",
topics),
metric ->
assertKafkaGauge(
Expand All @@ -353,10 +353,13 @@ void endToEnd() {
metric,
"kafka.producer.request-rate",
"The average number of requests sent per second",
"1"),
"{request}"),
metric ->
assertKafkaGauge(
metric, "kafka.producer.response-rate", "Responses received per second", "1"));
metric,
"kafka.producer.response-rate",
"Responses received per second",
"{response}"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,45 @@

def consumerFetchManagerMetrics = otel.mbeans("kafka.consumer:client-id=*,type=consumer-fetch-manager-metrics")
otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.fetch-rate",
"The number of fetch requests for all topics per second", "1",
"The number of fetch requests for all topics per second", "{request}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"fetch-rate", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.records-lag-max",
"Number of messages the consumer lags behind the producer", "1",
"Number of messages the consumer lags behind the producer", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"records-lag-max", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.bytes-consumed-rate",
"The average number of bytes consumed for all topics per second", "by",
"The average number of bytes consumed for all topics per second", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"bytes-consumed-rate", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.fetch-size-avg",
"The average number of bytes fetched per request for all topics", "by",
"The average number of bytes fetched per request for all topics", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"fetch-size-avg", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetrics, "kafka.consumer.total.records-consumed-rate",
"The average number of records consumed for all topics per second", "1",
"The average number of records consumed for all topics per second", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"records-consumed-rate", otel.&doubleValueCallback)

def consumerFetchManagerMetricsByTopic = otel.mbeans("kafka.consumer:client-id=*,topic=*,type=consumer-fetch-manager-metrics")
otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.bytes-consumed-rate",
"The average number of bytes consumed per second", "by",
"The average number of bytes consumed per second", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"bytes-consumed-rate", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.fetch-size-avg",
"The average number of bytes fetched per request", "by",
"The average number of bytes fetched per request", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"fetch-size-avg", otel.&doubleValueCallback)

otel.instrument(consumerFetchManagerMetricsByTopic, "kafka.consumer.records-consumed-rate",
"The average number of records consumed per second", "1",
"The average number of records consumed per second", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"records-consumed-rate", otel.&doubleValueCallback)
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,45 @@ otel.instrument(producerMetrics, "kafka.producer.io-wait-time-ns-avg",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"io-wait-time-ns-avg", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.outgoing-byte-rate",
"The average number of outgoing bytes sent per second to all servers", "by",
"The average number of outgoing bytes sent per second to all servers", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"outgoing-byte-rate", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.request-latency-avg",
"The average request latency", "ms",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"request-latency-avg", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.request-rate",
"The average number of requests sent per second", "1",
"The average number of requests sent per second", "{request}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"request-rate", otel.&doubleValueCallback)
otel.instrument(producerMetrics, "kafka.producer.response-rate",
"Responses received per second", "1",
"Responses received per second", "{response}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") }],
"response-rate", otel.&doubleValueCallback)

def producerTopicMetrics = otel.mbeans("kafka.producer:client-id=*,topic=*,type=producer-topic-metrics")
otel.instrument(producerTopicMetrics, "kafka.producer.byte-rate",
"The average number of bytes sent per second for a topic", "by",
"The average number of bytes sent per second for a topic", "By",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"byte-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.compression-rate",
"The average compression rate of record batches for a topic", "1",
"The average compression rate of record batches for a topic", "{ratio}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"compression-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.record-error-rate",
"The average per-second number of record sends that resulted in errors for a topic", "1",
"The average per-second number of record sends that resulted in errors for a topic", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"record-error-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.record-retry-rate",
"The average per-second number of retried record sends for a topic", "1",
"The average per-second number of retried record sends for a topic", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"record-retry-rate", otel.&doubleValueCallback)
otel.instrument(producerTopicMetrics, "kafka.producer.record-send-rate",
"The average number of records sent per second for a topic", "1",
"The average number of records sent per second for a topic", "{record}",
["client-id" : { mbean -> mbean.name().getKeyProperty("client-id") },
"topic" : { mbean -> mbean.name().getKeyProperty("topic") }],
"record-send-rate", otel.&doubleValueCallback)
Loading