diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java index 7ae0215486dc9..c23bc1cbe53bd 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Consumer.java @@ -62,6 +62,7 @@ public class Consumer { private final long consumerId; private final String consumerName; private final Rate msgOut; + private final Rate msgRedeliver; // Represents how many messages we can safely send to the consumer without // overflowing its receiving queue. The consumer will use Flow commands to @@ -90,6 +91,7 @@ public Consumer(Subscription subscription, SubType subType, long consumerId, Str this.maxUnackedMessages = maxUnackedMessages; this.cnx = cnx; this.msgOut = new Rate(); + this.msgRedeliver = new Rate(); this.appId = appId; stats = new ConsumerStats(); @@ -357,8 +359,10 @@ private boolean shouldBlockConsumerOnUnackMsgs() { public void updateRates() { msgOut.calculateRate(); + msgRedeliver.calculateRate(); stats.msgRateOut = msgOut.getRate(); stats.msgThroughputOut = msgOut.getValueRate(); + stats.msgRateRedeliver = msgRedeliver.getRate(); } public ConsumerStats getStats() { @@ -452,6 +456,11 @@ public void redeliverUnacknowledgedMessages() { subscription.redeliverUnacknowledgedMessages(this); flowConsumerBlockedPermits(this); if (pendingAcks != null) { + int totalRedeliveryMessages = 0; + for (Integer batchSize : pendingAcks.values()) { + totalRedeliveryMessages += batchSize; + } + msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages); pendingAcks.clear(); } @@ -474,6 +483,7 @@ public void redeliverUnacknowledgedMessages(List messageIds) { blockedConsumerOnUnackedMsgs = false; subscription.redeliverUnacknowledgedMessages(this, pendingPositions); + msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages); int numberOfBlockedPermits = Math.min(totalRedeliveryMessages, permitsReceivedWhileConsumerBlocked.get()); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java index e51b9d929b62b..7daf48de9acb5 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java @@ -575,6 +575,8 @@ public PersistentSubscriptionStats getStats() { subStats.consumers.add(consumerStats); subStats.msgRateOut += consumerStats.msgRateOut; subStats.msgThroughputOut += consumerStats.msgThroughputOut; + subStats.msgRateRedeliver += consumerStats.msgRateRedeliver; + subStats.unackedMessages += consumerStats.unackedMessages; }); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index e90219ec6190d..823295813d0e8 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -817,6 +817,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats subscriptions.forEach((subscriptionName, subscription) -> { double subMsgRateOut = 0; double subMsgThroughputOut = 0; + double subMsgRateRedeliver = 0; + long subUnackedMessages = 0; // Start subscription name & consumers try { @@ -834,6 +836,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats ConsumerStats consumerStats = consumer.getStats(); subMsgRateOut += consumerStats.msgRateOut; subMsgThroughputOut += consumerStats.msgThroughputOut; + subMsgRateRedeliver += consumerStats.msgRateRedeliver; + subUnackedMessages += consumerStats.unackedMessages; // Populate consumer specific stats here destStatsStream.startObject(); @@ -844,6 +848,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats destStatsStream.writePair("connectedSince", consumerStats.connectedSince); destStatsStream.writePair("msgRateOut", consumerStats.msgRateOut); destStatsStream.writePair("msgThroughputOut", consumerStats.msgThroughputOut); + destStatsStream.writePair("msgRateRedeliver", consumerStats.msgRateRedeliver); destStatsStream.endObject(); } @@ -855,6 +860,8 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats destStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate()); destStatsStream.writePair("msgRateOut", subMsgRateOut); destStatsStream.writePair("msgThroughputOut", subMsgThroughputOut); + destStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver); + destStatsStream.writePair("unackedMessages", subUnackedMessages); destStatsStream.writePair("type", subscription.getTypeString()); // Close consumers diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java index 4288b7e2a400d..cb2c4a5113d22 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java @@ -185,6 +185,96 @@ public void testBrokerServicePersistentTopicStats() throws Exception { assertEquals(subStats.msgBacklog, 0); } + @Test + public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { + final String topicName = "persistent://prop/use/ns-abc/successSharedTopic"; + final String subName = "successSharedSub"; + + PersistentTopicStats stats; + PersistentSubscriptionStats subStats; + + ConsumerConfiguration conf = new ConsumerConfiguration(); + conf.setSubscriptionType(SubscriptionType.Shared); + Consumer consumer = pulsarClient.subscribe(topicName, subName, conf); + Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + assertNotNull(topicRef); + + rolloverPerIntervalStats(); + stats = topicRef.getStats(); + subStats = stats.subscriptions.values().iterator().next(); + + // subscription stats + assertEquals(stats.subscriptions.keySet().size(), 1); + assertEquals(subStats.msgBacklog, 0); + assertEquals(subStats.consumers.size(), 1); + + Producer producer = pulsarClient.createProducer(topicName); + Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + + rolloverPerIntervalStats(); + stats = topicRef.getStats(); + subStats = stats.subscriptions.values().iterator().next(); + + // publisher stats + assertEquals(subStats.msgBacklog, 10); + assertEquals(stats.publishers.size(), 1); + assertTrue(stats.publishers.get(0).msgRateIn > 0.0); + assertTrue(stats.publishers.get(0).msgThroughputIn > 0.0); + assertTrue(stats.publishers.get(0).averageMsgSize > 0.0); + + // aggregated publish stats + assertEquals(stats.msgRateIn, stats.publishers.get(0).msgRateIn); + assertEquals(stats.msgThroughputIn, stats.publishers.get(0).msgThroughputIn); + double diff = stats.averageMsgSize - stats.publishers.get(0).averageMsgSize; + assertTrue(Math.abs(diff) < 0.000001); + + // consumer stats + assertTrue(subStats.consumers.get(0).msgRateOut > 0.0); + assertTrue(subStats.consumers.get(0).msgThroughputOut > 0.0); + assertEquals(subStats.msgRateRedeliver, 0.0); + assertEquals(subStats.consumers.get(0).unackedMessages, 10); + + // aggregated consumer stats + assertEquals(subStats.msgRateOut, subStats.consumers.get(0).msgRateOut); + assertEquals(subStats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut); + assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver); + assertEquals(stats.msgRateOut, subStats.consumers.get(0).msgRateOut); + assertEquals(stats.msgThroughputOut, subStats.consumers.get(0).msgThroughputOut); + assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver); + assertEquals(subStats.unackedMessages, subStats.consumers.get(0).unackedMessages); + + consumer.redeliverUnacknowledgedMessages(); + Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + + rolloverPerIntervalStats(); + stats = topicRef.getStats(); + subStats = stats.subscriptions.values().iterator().next(); + assertTrue(subStats.msgRateRedeliver > 0.0); + assertEquals(subStats.msgRateRedeliver, subStats.consumers.get(0).msgRateRedeliver); + + Message msg; + for (int i = 0; i < 10; i++) { + msg = consumer.receive(); + consumer.acknowledge(msg); + } + consumer.close(); + Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); + + rolloverPerIntervalStats(); + stats = topicRef.getStats(); + subStats = stats.subscriptions.values().iterator().next(); + + assertEquals(subStats.msgBacklog, 0); + } + @Test public void testBrokerStatsMetrics() throws Exception { final String topicName = "persistent://prop/use/ns-abc/newTopic"; diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/policies/data/ConsumerStats.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/policies/data/ConsumerStats.java index 8633b68796553..3b29dc5acd099 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/policies/data/ConsumerStats.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/policies/data/ConsumerStats.java @@ -26,6 +26,9 @@ public class ConsumerStats { /** Total throughput delivered to the consumer. bytes/s */ public double msgThroughputOut; + /** Total rate of messages redelivered by this consumer. msg/s */ + public double msgRateRedeliver; + /** Name of the consumer */ public String consumerName; @@ -45,6 +48,7 @@ public ConsumerStats add(ConsumerStats stats) { checkNotNull(stats); this.msgRateOut += stats.msgRateOut; this.msgThroughputOut += stats.msgThroughputOut; + this.msgRateRedeliver += stats.msgRateRedeliver; this.availablePermits += stats.availablePermits; this.unackedMessages += stats.unackedMessages; return this; diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/policies/data/PersistentSubscriptionStats.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/policies/data/PersistentSubscriptionStats.java index 01a96ab465b9d..50329e2ee6d78 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/policies/data/PersistentSubscriptionStats.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/policies/data/PersistentSubscriptionStats.java @@ -31,9 +31,15 @@ public class PersistentSubscriptionStats { /** Total throughput delivered on this subscription. bytes/s */ public double msgThroughputOut; + /** Total rate of messages redelivered on this subscription. msg/s */ + public double msgRateRedeliver; + /** Number of messages in the subscription backlog */ public long msgBacklog; + /** Number of unacknowledged messages for the subscription */ + public long unackedMessages; + /** whether this subscription is Exclusive or Shared or Failover */ public SubType type; @@ -50,7 +56,9 @@ public PersistentSubscriptionStats() { public void reset() { msgRateOut = 0; msgThroughputOut = 0; + msgRateRedeliver = 0; msgBacklog = 0; + unackedMessages = 0; msgRateExpired = 0; consumers.clear(); } @@ -61,7 +69,9 @@ public PersistentSubscriptionStats add(PersistentSubscriptionStats stats) { checkNotNull(stats); this.msgRateOut += stats.msgRateOut; this.msgThroughputOut += stats.msgThroughputOut; + this.msgRateRedeliver += stats.msgRateRedeliver; this.msgBacklog += stats.msgBacklog; + this.unackedMessages += stats.unackedMessages; this.msgRateExpired += stats.msgRateExpired; if (this.consumers.size() != stats.consumers.size()) { for (int i = 0; i < stats.consumers.size(); i++) {