diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a53a50a75fcd3..09c87da1e4833 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -133,7 +133,7 @@
files="(JsonConverter|Values|ConnectHeaders).java"/>
+ files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
index cf567450fe2a0..f301439da8356 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetricsRegistry.java
@@ -112,6 +112,9 @@ public class ConnectMetricsRegistry {
public final MetricNameTemplate dlqProduceRequests;
public final MetricNameTemplate dlqProduceFailures;
public final MetricNameTemplate lastErrorTimestamp;
+ public final MetricNameTemplate transactionSizeMin;
+ public final MetricNameTemplate transactionSizeMax;
+ public final MetricNameTemplate transactionSizeAvg;
public Map connectorStatusMetrics;
@@ -207,6 +210,16 @@ public ConnectMetricsRegistry(Set tags) {
"completely written to Kafka.",
sourceTaskTags);
+ transactionSizeMin = createTemplate("transaction-size-min", SOURCE_TASK_GROUP_NAME,
+ "The number of records in the smallest transaction the task has committed so far. ",
+ sourceTaskTags);
+ transactionSizeMax = createTemplate("transaction-size-max", SOURCE_TASK_GROUP_NAME,
+ "The number of records in the largest transaction the task has committed so far.",
+ sourceTaskTags);
+ transactionSizeAvg = createTemplate("transaction-size-avg", SOURCE_TASK_GROUP_NAME,
+ "The average number of records in the transactions the task has committed so far.",
+ sourceTaskTags);
+
/***** Sink worker task level *****/
Set sinkTaskTags = new LinkedHashSet<>(tags);
sinkTaskTags.add(CONNECTOR_TAG_NAME);