diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 39e14401cba2b..b7fae4bc00e69 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -150,9 +150,15 @@ private static void parseMetricsToPrometheusMetrics(Collection metrics, continue; } } else { - stream.write("# TYPE ").write(entry.getKey().replace("brk_", "pulsar_")).write(' ') - .write(getTypeStr(metricType)).write('\n'); - stream.write(entry.getKey().replace("brk_", "pulsar_")) + + + String name = entry.getKey(); + if (!names.contains(name)) { + stream.write("# TYPE ").write(entry.getKey().replace("brk_", "pulsar_")).write(' ') + .write(getTypeStr(metricType)).write('\n'); + names.add(name); + } + stream.write(name.replace("brk_", "pulsar_")) .write("{cluster=\"").write(cluster).write('"'); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index 96e396f8646fb..b5e780ec8167e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -471,10 +471,14 @@ public void testManagedLedgerCacheStats() throws Exception { public void testManagedLedgerStats() throws Exception { Producer p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); Producer p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create(); + Producer p3 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic1").create(); + Producer p4 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic2").create(); for (int i = 0; i < 10; i++) { String message = "my-message-" + i; p1.send(message.getBytes()); p2.send(message.getBytes()); + p3.send(message.getBytes()); + p4.send(message.getBytes()); } ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); @@ -487,18 +491,59 @@ public void testManagedLedgerStats() throws Exception { System.out.println(e.getKey() + ": " + e.getValue()) ); + Map typeDefs = new HashMap(); + Map metricNames = new HashMap(); + + Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)"); + Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+"); + + Splitter.on("\n").split(metricsStr).forEach(line -> { + if (line.isEmpty()) { + return; + } + if (line.startsWith("#")) { + // Check for duplicate type definitions + Matcher typeMatcher = typePattern.matcher(line); + checkArgument(typeMatcher.matches()); + String metricName = typeMatcher.group(1); + String type = typeMatcher.group(2); + + // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md + // "Only one TYPE line may exist for a given metric name." + if (!typeDefs.containsKey(metricName)) { + typeDefs.put(metricName, type); + } else { + fail("Duplicate type definition found for TYPE definition " + metricName); + } + // From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md + // "The TYPE line for a metric name must appear before the first sample is reported for that metric name." + if (metricNames.containsKey(metricName)) { + fail("TYPE definition for " + metricName + " appears after first sample"); + } + } else { + Matcher metricMatcher = metricNamePattern.matcher(line); + checkArgument(metricMatcher.matches()); + String metricName = metricMatcher.group(1); + metricNames.put(metricName, metricName); + } + }); + List cm = (List) metrics.get("pulsar_ml_AddEntryBytesRate"); - assertEquals(cm.size(), 1); + assertEquals(cm.size(), 2); assertEquals(cm.get(0).tags.get("cluster"), "test"); - assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + String ns = cm.get(0).tags.get("namespace"); + assertEquals(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"), true); cm = (List) metrics.get("pulsar_ml_AddEntryMessagesRate"); - assertEquals(cm.size(), 1); + assertEquals(cm.size(), 2); assertEquals(cm.get(0).tags.get("cluster"), "test"); - assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + ns = cm.get(0).tags.get("namespace"); + assertEquals(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"), true); p1.close(); p2.close(); + p3.close(); + p4.close(); } @Test