Skip to content

Commit

Permalink
remove duplicated broker prometheus metrics type (#8995)
Browse files Browse the repository at this point in the history
### Motivation

If there are multiple topics from different namespaces, the broker prometheus metrics will print out duplicated `# TYPE` definition for pulsar_ml_AddEntryBytesRate and other managed ledger metrics.

In fact, this problem can be verified by `promtool` https://github.com/prometheus/prometheus#building-from-source

On the broker, run this command to check validity of Pulsar broker metric format.
`curl localhost:8080/metrics/ | ~/go/bin/promtool check metrics`

### Modifications

To prevent duplicated metrics type definition, the definition is now tracked and only printed out once. It leverages the existing metrics name Set already defined under parseMetricsToPrometheusMetrics() in PrometheusMetricsGenerator.java

### Verifying this change

- [ x] Make sure that the change passes the CI checks.

This change added tests and can be verified as follows:

Added two topics under new namespaces to trigger conditions that duplicated prometheus type could happen previously under testManagedLedgerStats() of PrometheusMetricsTest.java. Updated test cases checks this duplicated type problem.

(cherry picked from commit 7319819)
  • Loading branch information
zzzming authored and codelipenghui committed Dec 23, 2020
1 parent df884b8 commit 257f60a
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,15 @@ private static void parseMetricsToPrometheusMetrics(Collection<Metrics> metrics,
continue;
}
} else {
stream.write("# TYPE ").write(entry.getKey().replace("brk_", "pulsar_")).write(' ')
.write(getTypeStr(metricType)).write('\n');
stream.write(entry.getKey().replace("brk_", "pulsar_"))


String name = entry.getKey();
if (!names.contains(name)) {
stream.write("# TYPE ").write(entry.getKey().replace("brk_", "pulsar_")).write(' ')
.write(getTypeStr(metricType)).write('\n');
names.add(name);
}
stream.write(name.replace("brk_", "pulsar_"))
.write("{cluster=\"").write(cluster).write('"');
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,10 +471,14 @@ public void testManagedLedgerCacheStats() throws Exception {
public void testManagedLedgerStats() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();
Producer<byte[]> p2 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic2").create();
Producer<byte[]> p3 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic1").create();
Producer<byte[]> p4 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns2/my-topic2").create();
for (int i = 0; i < 10; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
p2.send(message.getBytes());
p3.send(message.getBytes());
p4.send(message.getBytes());
}

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
Expand All @@ -487,18 +491,59 @@ public void testManagedLedgerStats() throws Exception {
System.out.println(e.getKey() + ": " + e.getValue())
);

Map<String, String> typeDefs = new HashMap<String, String>();
Map<String, String> metricNames = new HashMap<String, String>();

Pattern typePattern = Pattern.compile("^#\\s+TYPE\\s+(\\w+)\\s+(\\w+)");
Pattern metricNamePattern = Pattern.compile("^(\\w+)\\{.+");

Splitter.on("\n").split(metricsStr).forEach(line -> {
if (line.isEmpty()) {
return;
}
if (line.startsWith("#")) {
// Check for duplicate type definitions
Matcher typeMatcher = typePattern.matcher(line);
checkArgument(typeMatcher.matches());
String metricName = typeMatcher.group(1);
String type = typeMatcher.group(2);

// From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
// "Only one TYPE line may exist for a given metric name."
if (!typeDefs.containsKey(metricName)) {
typeDefs.put(metricName, type);
} else {
fail("Duplicate type definition found for TYPE definition " + metricName);
}
// From https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md
// "The TYPE line for a metric name must appear before the first sample is reported for that metric name."
if (metricNames.containsKey(metricName)) {
fail("TYPE definition for " + metricName + " appears after first sample");
}
} else {
Matcher metricMatcher = metricNamePattern.matcher(line);
checkArgument(metricMatcher.matches());
String metricName = metricMatcher.group(1);
metricNames.put(metricName, metricName);
}
});

List<Metric> cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryBytesRate");
assertEquals(cm.size(), 1);
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("cluster"), "test");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
String ns = cm.get(0).tags.get("namespace");
assertEquals(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"), true);

cm = (List<Metric>) metrics.get("pulsar_ml_AddEntryMessagesRate");
assertEquals(cm.size(), 1);
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("cluster"), "test");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
ns = cm.get(0).tags.get("namespace");
assertEquals(ns.equals("my-property/use/my-ns") || ns.equals("my-property/use/my-ns2"), true);

p1.close();
p2.close();
p3.close();
p4.close();
}

@Test
Expand Down

0 comments on commit 257f60a

Please sign in to comment.