From f0a53abf209a38efcf005f8af29ff8bff452eaa8 Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Tue, 15 Feb 2022 23:50:48 -0500 Subject: [PATCH] KAFKA-10000: Add new metrics for source task transactions --- checkstyle/suppressions.xml | 2 +- .../connect/runtime/ConnectMetricsRegistry.java | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index a53a50a75fcd3..09c87da1e4833 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -133,7 +133,7 @@ files="(JsonConverter|Values|ConnectHeaders).java"/> + files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/> diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java index cf567450fe2a0..f301439da8356 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java @@ -112,6 +112,9 @@ public class ConnectMetricsRegistry { public final MetricNameTemplate dlqProduceRequests; public final MetricNameTemplate dlqProduceFailures; public final MetricNameTemplate lastErrorTimestamp; + public final MetricNameTemplate transactionSizeMin; + public final MetricNameTemplate transactionSizeMax; + public final MetricNameTemplate transactionSizeAvg; public Map connectorStatusMetrics; @@ -207,6 +210,16 @@ public ConnectMetricsRegistry(Set tags) { "completely written to Kafka.", sourceTaskTags); + transactionSizeMin = createTemplate("transaction-size-min", SOURCE_TASK_GROUP_NAME, + "The number of records in the smallest transaction the task has committed so far. ", + sourceTaskTags); + transactionSizeMax = createTemplate("transaction-size-max", SOURCE_TASK_GROUP_NAME, + "The number of records in the largest transaction the task has committed so far.", + sourceTaskTags); + transactionSizeAvg = createTemplate("transaction-size-avg", SOURCE_TASK_GROUP_NAME, + "The average number of records in the transactions the task has committed so far.", + sourceTaskTags); + /***** Sink worker task level *****/ Set sinkTaskTags = new LinkedHashSet<>(tags); sinkTaskTags.add(CONNECTOR_TAG_NAME);