From d2300e4ea6c628fd90382e55b2bb0b38e9366f24 Mon Sep 17 00:00:00 2001 From: Elliot West Date: Thu, 15 Jun 2023 04:43:24 +0100 Subject: [PATCH] [fix][admin] Report earliest msg in partitioned backlog (#19465) Co-authored-by: tison --- .../pulsar/broker/admin/AdminApi2Test.java | 16 +++- .../apache/pulsar/client/admin/Topics.java | 2 + .../data/stats/SubscriptionStatsImpl.java | 12 +++ .../policies/data/stats/TopicStatsImpl.java | 12 +++ .../data/stats/SubscriptionStatsImplTest.java | 82 +++++++++++++++++++ .../data/stats/TopicStatsImplTest.java | 81 ++++++++++++++++++ 6 files changed, 204 insertions(+), 1 deletion(-) create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImplTest.java create mode 100644 pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImplTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index e1800349bbbb7..71892d009313e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; import static org.mockito.Mockito.spy; @@ -36,6 +37,7 @@ import java.lang.reflect.Field; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.time.Clock; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -1895,6 +1897,8 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti .acknowledgmentGroupTime(0, TimeUnit.SECONDS) .subscribe(); + long start1 = 0; + long start2 = 0; @Cleanup Producer producer = client.newProducer() .topic(topic) @@ -1902,6 +1906,12 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti .create(); for (int i = 0; i < 10; i++) { + if (i == 0) { + start1 = Clock.systemUTC().millis(); + } + if (i == 5) { + start2 = Clock.systemUTC().millis(); + } if (i > 4) { producer.newMessage() .value("message-1".getBytes(StandardCharsets.UTF_8)) @@ -1912,22 +1922,26 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti } } // wait until the message add to delay queue. + long finalStart1 = start1; Awaitility.await().untilAsserted(() -> { TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true, true, true); assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10); assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440); assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5); + assertTrue(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog() >= finalStart1); }); for (int i = 0; i < 5; i++) { consumer.acknowledge(consumer.receive()); } // Wait the ack send. - Awaitility.await().untilAsserted(() -> { + long finalStart2 = start2; + Awaitility.await().timeout(1, MINUTES).untilAsserted(() -> { TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, false, true, true, true); assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5); assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223); assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0); + assertTrue(topicStats2.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog() >= finalStart2); }); } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index f599e2566bffc..156d67e4e58b3 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1361,6 +1361,8 @@ default PartitionedTopicStats getPartitionedStats(String topic, boolean perParti * Set to true to get precise backlog, Otherwise get imprecise backlog. * @param subscriptionBacklogSize * Whether to get backlog size for each subscription. + * @param getEarliestTimeInBacklog + * Whether to get the earliest time in backlog. * @return a future that can be used to track when the partitioned topic statistics are returned */ CompletableFuture getPartitionedStatsAsync( diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java index fb75fecca3363..d77764e679da0 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImpl.java @@ -177,6 +177,7 @@ public void reset() { consumersAfterMarkDeletePosition.clear(); nonContiguousDeletedMessagesRanges = 0; nonContiguousDeletedMessagesRangesSerializedSize = 0; + earliestMsgPublishTimeInBacklog = 0L; delayedMessageIndexSizeInBytes = 0; subscriptionProperties.clear(); filterProcessedMsgCount = 0; @@ -221,6 +222,17 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) { this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition); this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges; this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize; + if (this.earliestMsgPublishTimeInBacklog != 0 && stats.earliestMsgPublishTimeInBacklog != 0) { + this.earliestMsgPublishTimeInBacklog = Math.min( + this.earliestMsgPublishTimeInBacklog, + stats.earliestMsgPublishTimeInBacklog + ); + } else { + this.earliestMsgPublishTimeInBacklog = Math.max( + this.earliestMsgPublishTimeInBacklog, + stats.earliestMsgPublishTimeInBacklog + ); + } this.delayedMessageIndexSizeInBytes += stats.delayedMessageIndexSizeInBytes; this.subscriptionProperties.putAll(stats.subscriptionProperties); this.filterProcessedMsgCount += stats.filterProcessedMsgCount; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java index ae2438709a017..3bf43dbf41f0a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImpl.java @@ -216,6 +216,7 @@ public void reset() { this.lastOffloadFailureTimeStamp = 0; this.lastOffloadSuccessTimeStamp = 0; this.publishRateLimitedTimes = 0L; + this.earliestMsgPublishTimeInBacklogs = 0L; this.delayedMessageIndexSizeInBytes = 0; this.compaction.reset(); this.ownerBroker = null; @@ -315,6 +316,17 @@ public TopicStatsImpl add(TopicStats ts) { } } } + if (earliestMsgPublishTimeInBacklogs != 0 && ((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs != 0) { + earliestMsgPublishTimeInBacklogs = Math.min( + earliestMsgPublishTimeInBacklogs, + ((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs + ); + } else { + earliestMsgPublishTimeInBacklogs = Math.max( + earliestMsgPublishTimeInBacklogs, + ((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs + ); + } return this; } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImplTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImplTest.java new file mode 100644 index 0000000000000..8a4b5da9edd20 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/SubscriptionStatsImplTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data.stats; + +import static org.testng.Assert.assertEquals; +import org.testng.annotations.Test; + +public class SubscriptionStatsImplTest { + + @Test + public void testReset() { + SubscriptionStatsImpl stats = new SubscriptionStatsImpl(); + stats.earliestMsgPublishTimeInBacklog = 1L; + stats.reset(); + assertEquals(stats.earliestMsgPublishTimeInBacklog, 0L); + + } + + @Test + public void testAdd_EarliestMsgPublishTimeInBacklogs_Earliest() { + SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl(); + stats1.earliestMsgPublishTimeInBacklog = 10L; + + SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl(); + stats2.earliestMsgPublishTimeInBacklog = 20L; + + SubscriptionStatsImpl aggregate = stats1.add(stats2); + assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 10L); + } + + @Test + public void testAdd_EarliestMsgPublishTimeInBacklogs_First0() { + SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl(); + stats1.earliestMsgPublishTimeInBacklog = 0L; + + SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl(); + stats2.earliestMsgPublishTimeInBacklog = 20L; + + SubscriptionStatsImpl aggregate = stats1.add(stats2); + assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 20L); + } + + @Test + public void testAdd_EarliestMsgPublishTimeInBacklogs_Second0() { + SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl(); + stats1.earliestMsgPublishTimeInBacklog = 10L; + + SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl(); + stats2.earliestMsgPublishTimeInBacklog = 0L; + + SubscriptionStatsImpl aggregate = stats1.add(stats2); + assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 10L); + } + + @Test + public void testAdd_EarliestMsgPublishTimeInBacklogs_Zero() { + SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl(); + stats1.earliestMsgPublishTimeInBacklog = 0L; + + SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl(); + stats2.earliestMsgPublishTimeInBacklog = 0L; + + SubscriptionStatsImpl aggregate = stats1.add(stats2); + assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 0L); + } +} \ No newline at end of file diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImplTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImplTest.java new file mode 100644 index 0000000000000..09cef4c4d0f82 --- /dev/null +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/stats/TopicStatsImplTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.policies.data.stats; + +import static org.testng.Assert.assertEquals; +import org.testng.annotations.Test; + +public class TopicStatsImplTest { + + @Test + public void testReset() { + TopicStatsImpl stats = new TopicStatsImpl(); + stats.earliestMsgPublishTimeInBacklogs = 1L; + stats.reset(); + assertEquals(stats.earliestMsgPublishTimeInBacklogs, 0L); + } + + @Test + public void testAdd_EarliestMsgPublishTimeInBacklogs_Earliest() { + TopicStatsImpl stats1 = new TopicStatsImpl(); + stats1.earliestMsgPublishTimeInBacklogs = 10L; + + TopicStatsImpl stats2 = new TopicStatsImpl(); + stats2.earliestMsgPublishTimeInBacklogs = 20L; + + TopicStatsImpl aggregate = stats1.add(stats2); + assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 10L); + } + + @Test + public void testAdd_EarliestMsgPublishTimeInBacklogs_First0() { + TopicStatsImpl stats1 = new TopicStatsImpl(); + stats1.earliestMsgPublishTimeInBacklogs = 0L; + + TopicStatsImpl stats2 = new TopicStatsImpl(); + stats2.earliestMsgPublishTimeInBacklogs = 20L; + + TopicStatsImpl aggregate = stats1.add(stats2); + assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 20L); + } + + @Test + public void testAdd_EarliestMsgPublishTimeInBacklogs_Second0() { + TopicStatsImpl stats1 = new TopicStatsImpl(); + stats1.earliestMsgPublishTimeInBacklogs = 10L; + + TopicStatsImpl stats2 = new TopicStatsImpl(); + stats2.earliestMsgPublishTimeInBacklogs = 0L; + + TopicStatsImpl aggregate = stats1.add(stats2); + assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 10L); + } + + @Test + public void testAdd_EarliestMsgPublishTimeInBacklogs_Zero() { + TopicStatsImpl stats1 = new TopicStatsImpl(); + stats1.earliestMsgPublishTimeInBacklogs = 0L; + + TopicStatsImpl stats2 = new TopicStatsImpl(); + stats2.earliestMsgPublishTimeInBacklogs = 0L; + + TopicStatsImpl aggregate = stats1.add(stats2); + assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 0L); + } +} \ No newline at end of file