diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index aebc844..ef00b82 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
- java: [ '8', '11' ]
+ java: ['11']
steps:
- uses: actions/checkout@v2
- name: Set up JDK
diff --git a/pom.xml b/pom.xml
index 2a284b5..82744b2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,7 +12,7 @@
${project.build.directory}/dist
2.10.0
- 1.8
+ 11
2.6.2
UTF-8
@@ -204,23 +204,16 @@
- org.codehaus.mojo
- findbugs-maven-plugin
- 3.0.5
-
- Max
- Low
- true
-
-
-
- analyze-compile
-
- check
-
- compile
-
-
+ com.github.spotbugs
+ spotbugs-maven-plugin
+ 4.5.2.0
+
+
+ com.github.spotbugs
+ spotbugs
+ 4.5.3
+
+
com.github.ekryd.sortpom
@@ -292,7 +285,7 @@
-
+
@@ -392,7 +385,7 @@
-
+
@@ -405,7 +398,7 @@
-
+
@@ -459,10 +452,10 @@
package
-
+
-
+
diff --git a/src/main/java/com/salesforce/mirus/metrics/MirrorJmxReporter.java b/src/main/java/com/salesforce/mirus/metrics/MirrorJmxReporter.java
index edbf972..7b87876 100644
--- a/src/main/java/com/salesforce/mirus/metrics/MirrorJmxReporter.java
+++ b/src/main/java/com/salesforce/mirus/metrics/MirrorJmxReporter.java
@@ -1,13 +1,17 @@
package com.salesforce.mirus.metrics;
+import com.google.common.collect.Sets;
import com.salesforce.mirus.MirusSourceConnector;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.*;
+import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -15,11 +19,27 @@ public class MirrorJmxReporter extends AbstractMirusJmxReporter {
private static final Logger logger = LoggerFactory.getLogger(MirrorJmxReporter.class);
+ public static final Map LATENCY_BUCKETS =
+ Map.of(
+ TimeUnit.MINUTES.toMillis(0),
+ "0m",
+ TimeUnit.MINUTES.toMillis(5),
+ "5m",
+ TimeUnit.MINUTES.toMillis(10),
+ "10m",
+ TimeUnit.MINUTES.toMillis(30),
+ "30m",
+ TimeUnit.MINUTES.toMillis(60),
+ "60m",
+ TimeUnit.HOURS.toMillis(12),
+ "12h");
+
private static MirrorJmxReporter instance = null;
private static final String SOURCE_CONNECTOR_GROUP = MirusSourceConnector.class.getSimpleName();
private static final Set TOPIC_TAGS = new HashSet<>(Collections.singletonList("topic"));
+ private static final Set TOPIC_BUCKET_TAGS = Sets.newHashSet("topic", "bucket");
private static final MetricNameTemplate REPLICATION_LATENCY =
new MetricNameTemplate(
@@ -38,16 +58,25 @@ public class MirrorJmxReporter extends AbstractMirusJmxReporter {
"replication-latency-ms-avg", SOURCE_CONNECTOR_GROUP,
"Average time it takes records to replicate from source to target cluster.", TOPIC_TAGS);
+ protected static final MetricNameTemplate HISTOGRAM_LATENCY =
+ new MetricNameTemplate(
+ "replication-latency-histogram",
+ SOURCE_CONNECTOR_GROUP,
+ "Cumulative histogram counting records delivered per second with latency exceeding a set of fixed bucket thresholds.",
+ TOPIC_BUCKET_TAGS);
+
// Map of topics to their metric objects
private final Map topicSensors;
private final Set topicPartitionSet;
+ private final Map> histogramLatencySensors;
private MirrorJmxReporter() {
- super(new Metrics());
+ super(new Metrics(new MetricConfig(), new ArrayList<>(0), Time.SYSTEM, true));
metrics.sensor("replication-latency");
topicSensors = new HashMap<>();
topicPartitionSet = new HashSet<>();
+ histogramLatencySensors = new HashMap<>();
logger.info("Initialized MirrorJMXReporter");
}
@@ -73,6 +102,15 @@ public synchronized void addTopics(List topicPartitions) {
.filter(topic -> !topicSensors.containsKey(topic))
.collect(Collectors.toMap(topic -> topic, this::createTopicSensor)));
topicPartitionSet.addAll(topicPartitions);
+
+ for (TopicPartition topicPartition : topicPartitions) {
+ TreeMap bucketSensors = new TreeMap<>();
+ String topic = topicPartition.topic();
+ LATENCY_BUCKETS.forEach(
+ (edgeMillis, bucketName) ->
+ bucketSensors.put(edgeMillis, createHistogramSensor(topic, bucketName)));
+ histogramLatencySensors.put(topic, bucketSensors);
+ }
}
/**
@@ -104,6 +142,7 @@ public synchronized void removeTopics(List topicPartitions) {
topic -> {
metrics.removeSensor(replicationLatencySensorName(topic));
topicSensors.remove(topic);
+ histogramLatencySensors.remove(topic);
});
}
@@ -112,6 +151,24 @@ public synchronized void recordMirrorLatency(String topic, long millis) {
if (sensor != null) {
sensor.record((double) millis);
}
+
+ TreeMap bucketSensors = histogramLatencySensors.get(topic);
+ for (Map.Entry sensorEntry : bucketSensors.entrySet()) {
+ long edgeMillis = sensorEntry.getKey();
+ Sensor bucketSensor = sensorEntry.getValue();
+ if (millis >= edgeMillis) {
+ if (bucketSensor.hasExpired()) {
+ String bucket = LATENCY_BUCKETS.get(edgeMillis);
+ // explicitly replace the expired sensor with a new one
+ metrics.removeSensor(histogramLatencySensorName(topic, bucket));
+ bucketSensor = createHistogramSensor(topic, bucket);
+ }
+ bucketSensor.record(1);
+ } else {
+ // bucket sensors are sorted by edgeMillis
+ break;
+ }
+ }
}
private Sensor createTopicSensor(String topic) {
@@ -127,7 +184,32 @@ private Sensor createTopicSensor(String topic) {
return sensor;
}
+ private Sensor createHistogramSensor(String topic, String bucket) {
+ Map tags = new LinkedHashMap<>();
+ tags.put("topic", topic);
+ tags.put("bucket", bucket);
+
+ // bucket sensor will be expired after 5 mins if inactive
+ // this is to prevent inactive bucket sensors from reporting too many zero value metrics
+ Sensor sensor =
+ metrics.sensor(
+ histogramLatencySensorName(topic, bucket),
+ null,
+ TimeUnit.MINUTES.toSeconds(5),
+ Sensor.RecordingLevel.INFO,
+ null);
+ sensor.add(
+ metrics.metricInstance(HISTOGRAM_LATENCY, tags),
+ new Rate(TimeUnit.SECONDS, new WindowedSum()));
+
+ return sensor;
+ }
+
private String replicationLatencySensorName(String topic) {
return topic + "-" + "replication-latency";
}
+
+ private String histogramLatencySensorName(String topic, String bucket) {
+ return topic + "-" + bucket + "-" + "histogram-latency";
+ }
}
diff --git a/src/test/java/com/salesforce/mirus/metrics/MirrorJmxReporterTest.java b/src/test/java/com/salesforce/mirus/metrics/MirrorJmxReporterTest.java
new file mode 100644
index 0000000..7fa23c6
--- /dev/null
+++ b/src/test/java/com/salesforce/mirus/metrics/MirrorJmxReporterTest.java
@@ -0,0 +1,59 @@
+package com.salesforce.mirus.metrics;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MirrorJmxReporterTest {
+
+ private MirrorJmxReporter mirrorJmxReporter;
+ private Metrics metrics;
+ private final String TEST_TOPIC = "TestTopic";
+
+ @Before
+ public void setUp() throws Exception {
+ mirrorJmxReporter = MirrorJmxReporter.getInstance();
+ metrics = mirrorJmxReporter.metrics;
+ }
+
+ @Test
+ public void updateLatencyMetrics() {
+ TopicPartition topicPartition = new TopicPartition(TEST_TOPIC, 1);
+ mirrorJmxReporter.addTopics(List.of(topicPartition));
+
+ mirrorJmxReporter.recordMirrorLatency(TEST_TOPIC, 500);
+
+ Map tags = new LinkedHashMap<>();
+ tags.put("topic", TEST_TOPIC);
+ tags.put("bucket", "0m");
+ Object value =
+ metrics
+ .metrics()
+ .get(
+ metrics.metricName(
+ MirrorJmxReporter.HISTOGRAM_LATENCY.name(),
+ MirrorJmxReporter.HISTOGRAM_LATENCY.group(),
+ MirrorJmxReporter.HISTOGRAM_LATENCY.description(),
+ tags))
+ .metricValue();
+ Assert.assertTrue((double) value > 0);
+
+ tags.put("bucket", "12h");
+ value =
+ metrics
+ .metrics()
+ .get(
+ metrics.metricName(
+ MirrorJmxReporter.HISTOGRAM_LATENCY.name(),
+ MirrorJmxReporter.HISTOGRAM_LATENCY.group(),
+ MirrorJmxReporter.HISTOGRAM_LATENCY.description(),
+ tags))
+ .metricValue();
+ Assert.assertTrue((double) value == 0);
+ }
+}