From f4b5dbf8b2e17d040014df4a4760160ac1967534 Mon Sep 17 00:00:00 2001 From: Paul Gier Date: Fri, 18 Nov 2022 22:55:39 -0600 Subject: [PATCH] [fix][broker] Correctly set byte and message out totals per subscription (#18451) Signed-off-by: Paul Gier Fixes #15819 The existing code calculates the pulsar_out_bytes_total and pulsar_out_messages_total per subscription metrics by adding the values from the currently connected consumers. This produces incorrect values as soon as one or more of the consumers disconnects from the subscription. This changes these two metrics to directly use the subscription stats for these values, and match the output of `pulsar-admin topic stats`. Signed-off-by: Paul Gier Fixes #15819 ### Motivation The prometheus metrics for pulsar_out_bytes_total and pulsar_out_messages_total should never decrease, and they should match the output seen when using pulsar-admin. ### Modifications Changed the calculation of pulsar_out_bytes_total and pulsar_out_messages_total to directly use the subscription stats instead of calculating these values by summing the values of the currently connected consumers. ### Verifying this change - [X] Make sure that the change passes the CI checks. Added a unit test to cover this case. ### Does this pull request potentially affect one of the following parts: *If the box was checked, please highlight the changes* - [ ] Dependencies (add or upgrade a dependency) - [ ] The public API - [ ] The schema - [ ] The default values of configurations - [ ] The threading model - [ ] The binary protocol - [ ] The REST endpoints - [ ] The admin CLI options - [ ] Anything that affects deployment ### Documentation - [ ] `doc` - [ ] `doc-required` - [X] `doc-not-needed` - [ ] `doc-complete` ### Matching PR in forked repository PR in forked repository: https://github.com/pgier/pulsar/pull/2 (cherry picked from commit c03e33e22fd7bfe16eea32139b9691776b2c8977) --- .../prometheus/NamespaceStatsAggregator.java | 4 +- .../broker/stats/PrometheusMetricsTest.java | 82 +++++++++++++++++++ 2 files changed, 84 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index bfb8140877b8e..7bcd1f150be89 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -110,6 +110,8 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl AggregatedSubscriptionStats subsStats) { stats.subscriptionsCount++; stats.msgBacklog += subscriptionStats.msgBacklog; + subsStats.bytesOutCounter = subscriptionStats.bytesOutCounter; + subsStats.msgOutCounter = subscriptionStats.msgOutCounter; subsStats.msgBacklog = subscriptionStats.msgBacklog; subsStats.msgDelayed = subscriptionStats.msgDelayed; subsStats.msgRateExpired = subscriptionStats.msgRateExpired; @@ -128,8 +130,6 @@ private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subsStats.msgRateOut += cStats.msgRateOut; subsStats.messageAckRate += cStats.messageAckRate; subsStats.msgThroughputOut += cStats.msgThroughputOut; - subsStats.bytesOutCounter += cStats.bytesOutCounter; - subsStats.msgOutCounter += cStats.msgOutCounter; if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) { subsStats.blockedSubscriptionOnUnackedMsgs = true; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index c5ecb8d5bf6a6..897f29e0df3b6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -353,6 +353,88 @@ public void testPerTopicStats() throws Exception { c2.close(); } + /** + * Test that the total message and byte counts for a topic are not reset when a consumer disconnects. + * + * @throws Exception + */ + @Test + public void testPerTopicStatsReconnect() throws Exception { + Producer p1 = pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/my-topic1").create(); + + Consumer c1 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + final int messages = 5; + final int pulsarMessageOverhead = 31; // Number of extra bytes pulsar adds to each message + final int messageSizeBytes = "my-message-n".getBytes().length + pulsarMessageOverhead; + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + } + + for (int i = 0; i < messages; i++) { + c1.acknowledge(c1.receive()); + } + + c1.close(); + + for (int i = 0; i < messages; i++) { + String message = "my-message-" + i; + p1.send(message.getBytes()); + } + + Consumer c2 = pulsarClient.newConsumer() + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("test") + .subscribe(); + + for (int i = 0; i < messages; i++) { + c2.acknowledge(c2.receive()); + } + + p1.close(); + c2.close(); + + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsGenerator.generate(pulsar, true, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap metrics = parseMetrics(metricsStr); + + metrics.entries().forEach(e -> { + System.out.println(e.getKey() + ": " + e.getValue()); + }); + + List cm = (List) metrics.get("pulsar_in_bytes_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_in_messages_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + + cm = (List) metrics.get("pulsar_out_bytes_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messageSizeBytes * messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + + cm = (List) metrics.get("pulsar_out_messages_total"); + assertEquals(cm.size(), 1); + assertEquals(cm.get(0).value, (messages * 2)); + assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1"); + assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns"); + assertEquals(cm.get(0).tags.get("subscription"), "test"); + } + @Test public void testPerTopicExpiredStat() throws Exception { String ns = "prop/ns-abc1";