Skip to content

Commit

Permalink
Merge pull request #16 from jeremydavis02/jeremydavis02-lag-merge-branch
Browse files Browse the repository at this point in the history
Merging our consumer lag metric additions
  • Loading branch information
jeremydavis02 authored Jan 28, 2025
2 parents 12bd166 + fc2cd73 commit 8a36bd9
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 37 deletions.
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.appdynamics.extensions</groupId>
<artifactId>kafka-monitoring-extension</artifactId>
<version>2.0.6</version>
<version>2.0.8</version>
<packaging>jar</packaging>
<name>kafka-monitoring-extension</name>
<url>http://maven.apache.org</url>
Expand All @@ -28,6 +28,11 @@
<version>3.7.11</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
125 changes: 125 additions & 0 deletions src/main/java/com/appdynamics/extensions/kafka/ConsumerGroupLag.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.appdynamics.extensions.kafka;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

public class ConsumerGroupLag {
private final String monitoringConsumerGroupID = "monitoring_consumer_" + UUID.randomUUID().toString();
private AdminClient adminClient;

public Map<TopicPartition, PartionOffsets> getConsumerGroupOffsets(String host, String topic, String groupId) {
Map<TopicPartition, Long> logEndOffset = getLogEndOffset(topic, host);

Set<TopicPartition> topicPartitions = new HashSet<>();
for (Entry<TopicPartition, Long> s : logEndOffset.entrySet()) {
topicPartitions.add(s.getKey());
}
adminClient = getAdminClient(host);

KafkaConsumer<String, Object> consumer = createNewConsumer(groupId, host);
Map<TopicPartition, OffsetAndMetadata> comittedOffsetMeta = consumer.committed(topicPartitions);

BinaryOperator<PartionOffsets> mergeFunction = (a, b) -> {
throw new IllegalStateException();
};
Map<TopicPartition, PartionOffsets> result = logEndOffset.entrySet().stream()
.collect(Collectors.toMap(entry -> (entry.getKey()), entry -> {
OffsetAndMetadata committed = comittedOffsetMeta.get(entry.getKey());
long currentOffset = 0;
if(committed != null) { //committed offset will be null for unknown consumer groups
currentOffset = committed.offset();
} else {
currentOffset = entry.getValue(); //set current offset to same as end offset for unknown consumer groups
}
return new PartionOffsets(entry.getValue(), currentOffset, entry.getKey().partition(), topic);
}, mergeFunction));

return result;
}

public Map<TopicPartition, Long> getLogEndOffset(String topic, String host) {
Map<TopicPartition, Long> endOffsets = new ConcurrentHashMap<>();
KafkaConsumer<?, ?> consumer = createNewConsumer(monitoringConsumerGroupID, host);
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = partitionInfoList.stream()
.map(pi -> new TopicPartition(topic, pi.partition())).collect(Collectors.toList());
consumer.assign(topicPartitions);
consumer.seekToEnd(topicPartitions);
topicPartitions.forEach(topicPartition -> endOffsets.put(topicPartition, consumer.position(topicPartition)));
consumer.close();
return endOffsets;
}

private static KafkaConsumer<String, Object> createNewConsumer(String groupId, String host) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new KafkaConsumer<>(properties);
}

private AdminClient getAdminClient(String host) {
Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, host);
return AdminClient.create(properties);
}


static class PartionOffsets {
public long lag;
private long timestamp = System.currentTimeMillis();
public long endOffset;
public long currentOffset;
public int partition;
public String topic;

public PartionOffsets(long endOffset, long currentOffset, int partition, String topic) {
this.endOffset = endOffset;
this.currentOffset = currentOffset;
this.partition = partition;
this.topic = topic;
this.lag = endOffset - currentOffset;
}
public long getLag() {
return lag;
}
public long getCurrentOffset() {
return currentOffset;
}
public long getEndOffset() {
return endOffset;
}
public int getPartition() {
return partition;
}
public String getTopic() {
return topic;
}

@Override
public String toString() {
return "PartionOffsets [lag=" + lag + ", timestamp=" + timestamp + ", endOffset=" + endOffset
+ ", currentOffset=" + currentOffset + ", partion=" + partition + ", topic=" + topic + "]";
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.appdynamics.extensions.kafka;

import org.apache.kafka.clients.admin.*;
import com.appdynamics.extensions.AMonitorTaskRunnable;
import com.appdynamics.extensions.MetricWriteHelper;
import com.appdynamics.extensions.TasksExecutionServiceProvider;
Expand All @@ -25,19 +26,22 @@
import com.appdynamics.extensions.util.CryptoUtils;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.LoggerFactory;

import javax.management.remote.JMXConnector;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.*;
import java.util.regex.*;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class KafkaMonitorTask implements AMonitorTaskRunnable {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaMonitorTask.class);
private MonitorContextConfiguration configuration;
private Map<String, String> kafkaServer;
private Map<String, ?> kafkaServer;
private MetricWriteHelper metricWriteHelper;
private String displayName;
private JMXConnectionAdapter jmxAdapter;
Expand All @@ -57,9 +61,10 @@ public void onTaskComplete () {

public void run () {
try {
logger.info("Starting Kafka Monitoring task for Kafka server: {} ", this.kafkaServer.get(Constants.DISPLAY_NAME));
logger.debug("Starting Kafka Monitoring task for Kafka server: {} ", this.kafkaServer.get(Constants.DISPLAY_NAME));
populateAndPrintMetrics();
logger.info("Completed Kafka Monitoring task for Kafka server: {}",
populateAndPrintLagMetrics();
logger.debug("Completed Kafka Monitoring task for Kafka server: {}",
this.kafkaServer.get(Constants.DISPLAY_NAME));
} catch (Exception e) {
logger.error("Exception occurred while collecting metrics for: {} {}",
Expand All @@ -71,10 +76,8 @@ public void populateAndPrintMetrics () {
try {
BigDecimal connectionStatus = openJMXConnection();
metricWriteHelper.printMetric(this.configuration.getMetricPrefix() +
Constants.METRIC_SEPARATOR + this.displayName + Constants.METRIC_SEPARATOR + "kafka.server" +
Constants.METRIC_SEPARATOR + "HeartBeat",
connectionStatus.toString(),
Constants.AVERAGE, Constants.AVERAGE, Constants.INDIVIDUAL);
Constants.METRIC_SEPARATOR + this.displayName + Constants.METRIC_SEPARATOR + "kafka.server" +
Constants.METRIC_SEPARATOR + "HeartBeat", connectionStatus.toString(), Constants.AVERAGE, Constants.AVERAGE, Constants.INDIVIDUAL);
if(connectionStatus.equals(BigDecimal.ONE)) {
List<Map<String, ?>> mBeansListFromConfig = (List<Map<String, ?>>) configuration.getConfigYml()
.get(Constants.MBEANS);
Expand All @@ -96,6 +99,60 @@ public void populateAndPrintMetrics () {
}
}
}
public void populateAndPrintLagMetrics () {
try {

String topicRegex = (String) this.kafkaServer.get(Constants.TOPIC);
String host = (String) this.kafkaServer.get(Constants.HOST) + ':' + (String) this.kafkaServer.get(Constants.CONSUMER_PORT);

Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, host);
AdminClient kafkaAdmin = AdminClient.create(properties);

List<String> groupIds = kafkaAdmin.listConsumerGroups().all().get().stream().map(s -> s.groupId()).collect(Collectors.toList());
Set<String> topics = kafkaAdmin.listTopics().names().get();
ConsumerGroupLag cgl = new ConsumerGroupLag();
Map<TopicPartition, Long> lagSet = new ConcurrentHashMap<>();

logger.debug("AA Scanning Groups to check for lags");

for (final String topicName : topics) {
logger.debug("AA ReviewingTopic:" + topicName);
boolean isMatch = Pattern.matches(topicRegex, topicName);
if (isMatch) {
for (final String groupId : groupIds) {
logger.debug("AA ReviewingTopic:" + topicName + " GroupName:" + groupId);
Map<TopicPartition, ConsumerGroupLag.PartionOffsets> lag = cgl.getConsumerGroupOffsets(host, topicName, groupId);
for (Map.Entry<TopicPartition, ConsumerGroupLag.PartionOffsets> entry : lag.entrySet()) {
ConsumerGroupLag.PartionOffsets offsets = entry.getValue();
if (lagSet.containsKey(entry.getKey())) {
logger.debug("AA Updating TopicPartition:" + entry.getKey().toString() + " CurrLag:" + lagSet.get(entry.getKey()) + " AddingLag:" + offsets.getLag());
lagSet.put(entry.getKey(), (long) (lagSet.get(entry.getKey()) + offsets.getLag()));
} else {
logger.debug("AA Inserting TopicPartition:" + entry.getKey().toString() + " CurrLag:NONE AddingLag:" + offsets.getLag());
lagSet.put(entry.getKey(), (long) offsets.getLag());
}
}
}
}
}

logger.debug("AA DONE Scanning Groups. Writing Metrics. ");
for (Map.Entry<TopicPartition, Long> entry : lagSet.entrySet()) {
logger.debug("AA METRIC WRITE Topic:" + entry.getKey().topic() + ", Partition:" + entry.getKey().partition() + ", Lag:" + entry.getValue());

metricWriteHelper.printMetric(this.configuration.getMetricPrefix() +
Constants.METRIC_SEPARATOR + this.displayName + Constants.METRIC_SEPARATOR + "kafka.lag" +
Constants.METRIC_SEPARATOR + entry.getKey().topic( ) + Constants.METRIC_SEPARATOR + entry.getKey().partition() +
Constants.METRIC_SEPARATOR + "lag", String.valueOf(entry.getValue()), Constants.AVERAGE, Constants.AVERAGE, Constants.INDIVIDUAL);
}
logger.debug("AA DONE Writing Metrics. Closing connection ");
kafkaAdmin.close();

} catch (Exception e) {
logger.error("Error while getting Lag metrics: {} {}", this.kafkaServer.get(Constants.DISPLAY_NAME), e);
}
}

private BigDecimal openJMXConnection () {
try {
Expand Down Expand Up @@ -124,23 +181,23 @@ private BigDecimal openJMXConnection () {

private Map<String, String> buildRequestMap () {
Map<String, String> requestMap = new HashMap<>();
requestMap.put(Constants.HOST, this.kafkaServer.get(Constants.HOST));
requestMap.put(Constants.PORT, this.kafkaServer.get(Constants.PORT));
requestMap.put(Constants.DISPLAY_NAME, this.kafkaServer.get(Constants.DISPLAY_NAME));
requestMap.put(Constants.SERVICE_URL, this.kafkaServer.get(Constants.SERVICE_URL));
requestMap.put(Constants.USERNAME, this.kafkaServer.get(Constants.USERNAME));
requestMap.put(Constants.HOST, (String)this.kafkaServer.get(Constants.HOST));
requestMap.put(Constants.PORT, (String)this.kafkaServer.get(Constants.PORT));
requestMap.put(Constants.DISPLAY_NAME, (String)this.kafkaServer.get(Constants.DISPLAY_NAME));
requestMap.put(Constants.SERVICE_URL, (String)this.kafkaServer.get(Constants.SERVICE_URL));
requestMap.put(Constants.USERNAME, (String)this.kafkaServer.get(Constants.USERNAME));
requestMap.put(Constants.PASSWORD, getPassword());
return requestMap;
}

private String getPassword () {
String password = this.kafkaServer.get(Constants.PASSWORD);
String password = (String)this.kafkaServer.get(Constants.PASSWORD);
Map<String, ?> configMap = configuration.getConfigYml();
java.util.Map<String, String> cryptoMap = Maps.newHashMap();
cryptoMap.put(Constants.PASSWORD, password);
if (configMap.containsKey(Constants.ENCRYPTION_KEY)) {
String encryptionKey = configMap.get(Constants.ENCRYPTION_KEY).toString();
String encryptedPassword = this.kafkaServer.get(Constants.ENCRYPTED_PASSWORD);
String encryptedPassword = (String)this.kafkaServer.get(Constants.ENCRYPTED_PASSWORD);
if (!Strings.isNullOrEmpty(encryptionKey) && !Strings.isNullOrEmpty(encryptedPassword)) {
cryptoMap.put(Constants.ENCRYPTED_PASSWORD, encryptedPassword);
cryptoMap.put(Constants.ENCRYPTION_KEY, encryptionKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public class Constants {

public static final String PORT = "port";

public static final String CONSUMER_PORT = "consumer_port";

public static final String USERNAME = "username";

public static final String PASSWORD = "password";
Expand All @@ -38,4 +40,11 @@ public class Constants {

public static final String INDIVIDUAL = "INDIVIDUAL";

public static final String CONSUMERGROUPS = "consumerGroups";

public static final String GROUPID = "group_id";

public static final String TOPIC = "topic";


}
Loading

0 comments on commit 8a36bd9

Please sign in to comment.