diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImpl.java index 3ad01a8..7735e2b 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaFederatedProducerImpl.java @@ -10,6 +10,7 @@ import com.linkedin.kafka.clients.metadataservice.MetadataServiceClientException; import com.linkedin.kafka.clients.utils.LiKafkaClientsUtils; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -211,7 +212,19 @@ public Map> partitionsFor(Set topics) { @Override public Map metrics() { - throw new UnsupportedOperationException("Not implemented yet"); + // Get an immutable copy of the current set of producers. + Map> producers = _producers; + + // Aggregate metric from clusters in the group. Since a different client id + // is used for each cluster, there should be no metric name collision. + Map aggregate = new HashMap<>(); + for (LiKafkaProducer producer : producers.values()) { + Map metrics = producer.metrics(); + if (metrics != null) { + aggregate.putAll(producer.metrics()); + } + } + return aggregate; } @Override