Skip to content

Commit

Permalink
[fix][broker] Correctly set byte and message out totals per subscription
Browse files Browse the repository at this point in the history
Fixes #15819

The existing code calculates the pulsar_out_bytes_total and pulsar_out_messages_total
per subscription metrics by adding the values from the currently connected consumers.
This produces incorrect values as soon as one or more of the consumers disconnects
from the subscription.

This changes these two metrics to directly use the subscription stats for these
values, and match the output of `pulsar-admin topic stats`.  It also changes
these metrics to be defined as `counter` type.

Signed-off-by: Paul Gier <paul.gier@datastax.com>
  • Loading branch information
pgier committed Nov 14, 2022
1 parent fdf86d3 commit 4afc9c4
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl
AggregatedSubscriptionStats subsStats) {
stats.subscriptionsCount++;
stats.msgBacklog += subscriptionStats.msgBacklog;
subsStats.bytesOutCounter = subscriptionStats.bytesOutCounter;
subsStats.msgOutCounter = subscriptionStats.msgOutCounter;
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
Expand All @@ -133,8 +135,6 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.messageAckRate += cStats.messageAckRate;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PrometheusMetricStreams {
* @param labelsAndValuesArray varargs of label and label value
*/
void writeSample(String metricName, Number value, String... labelsAndValuesArray) {
SimpleTextOutputStream stream = initGaugeType(metricName);
SimpleTextOutputStream stream = initMetricType(metricName);
stream.write(metricName).write('{');
for (int i = 0; i < labelsAndValuesArray.length; i += 2) {
stream.write(labelsAndValuesArray[i]).write("=\"").write(labelsAndValuesArray[i + 1]).write('\"');
Expand All @@ -65,11 +65,19 @@ void releaseAll() {
metricStreamMap.clear();
}

private SimpleTextOutputStream initGaugeType(String metricName) {
private SimpleTextOutputStream initMetricType(String metricName) {
String metricTypeDef = String.format("# TYPE %s %s\n", metricName, MetricType(metricName));
return metricStreamMap.computeIfAbsent(metricName, s -> {
SimpleTextOutputStream stream = new SimpleTextOutputStream(PulsarByteBufAllocator.DEFAULT.directBuffer());
stream.write("# TYPE ").write(metricName).write(" gauge\n");
stream.write(metricTypeDef);
return stream;
});
}

private String MetricType(String metricName) {
if (metricName.endsWith("_total")) {
return "counter";
}
return "gauge";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,88 @@ public void testPerTopicStats() throws Exception {
c2.close();
}

/**
* Test that the total message and byte counts for a topic are not reset when a consumer disconnects.
*
* @throws Exception
*/
@Test
public void testPerTopicStatsReconnect() throws Exception {
Producer<byte[]> p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create();

Consumer<byte[]> c1 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();

final int messages = 5;
final int pulsarMessageOverhead = 31; // Number of extra bytes pulsar adds to each message
final int messageSizeBytes = "my-message-n".getBytes().length + pulsarMessageOverhead;

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}

for (int i = 0; i < messages; i++) {
c1.acknowledge(c1.receive());
}

c1.close();

for (int i = 0; i < messages; i++) {
String message = "my-message-" + i;
p1.send(message.getBytes());
}

Consumer<byte[]> c2 = pulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/my-topic1")
.subscriptionName("test")
.subscribe();

for (int i = 0; i < messages; i++) {
c2.acknowledge(c2.receive());
}

p1.close();
c2.close();

ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);

metrics.entries().forEach(e -> {
System.out.println(e.getKey() + ": " + e.getValue());
});

List<Metric> cm = (List<Metric>) metrics.get("pulsar_in_bytes_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_in_messages_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");

cm = (List<Metric>) metrics.get("pulsar_out_bytes_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("subscription"), "test");

cm = (List<Metric>) metrics.get("pulsar_out_messages_total");
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).value, (messages * 2));
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("subscription"), "test");
}

@Test
public void testPerTopicExpiredStat() throws Exception {
String ns = "prop/ns-abc1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ public void canWriteSampleWithLabels() {
"Cluster and Namespace metric line missing");
}

@Test
public void canWriteSampleCounter() {
underTest.writeSample("my_metric_total", 123);

String actual = writeToString();

assertTrue(actual.startsWith("# TYPE my_metric_total counter"), "Counter type line missing");
assertTrue(actual.contains("my_metric_total{} 123"), "Metric line missing");
}

private String writeToString() {
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer();
try {
Expand Down

0 comments on commit 4afc9c4

Please sign in to comment.