Skip to content

Commit

Permalink
[fix][admin] Report earliest msg in partitioned backlog (#19465)
Browse files Browse the repository at this point in the history
Co-authored-by: tison <wander4096@gmail.com>
  • Loading branch information
2 people authored and Technoboy- committed Jun 15, 2023
1 parent e5e853b commit d2300e4
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;

import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName;
import static org.mockito.Mockito.spy;
Expand All @@ -36,6 +37,7 @@
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -1895,13 +1897,21 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();

long start1 = 0;
long start2 = 0;
@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.enableBatching(false)
.create();

for (int i = 0; i < 10; i++) {
if (i == 0) {
start1 = Clock.systemUTC().millis();
}
if (i == 5) {
start2 = Clock.systemUTC().millis();
}
if (i > 4) {
producer.newMessage()
.value("message-1".getBytes(StandardCharsets.UTF_8))
Expand All @@ -1912,22 +1922,26 @@ public void testBacklogNoDelayedForPartitionedTopic() throws PulsarClientExcepti
}
}
// wait until the message add to delay queue.
long finalStart1 = start1;
Awaitility.await().untilAsserted(() -> {
TopicStats topicStats = admin.topics().getPartitionedStats(topic, false, true, true, true);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 440);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 5);
assertTrue(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog() >= finalStart1);
});

for (int i = 0; i < 5; i++) {
consumer.acknowledge(consumer.receive());
}
// Wait the ack send.
Awaitility.await().untilAsserted(() -> {
long finalStart2 = start2;
Awaitility.await().timeout(1, MINUTES).untilAsserted(() -> {
TopicStats topicStats2 = admin.topics().getPartitionedStats(topic, false, true, true, true);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklog(), 5);
assertEquals(topicStats2.getSubscriptions().get(subName).getBacklogSize(), 223);
assertEquals(topicStats2.getSubscriptions().get(subName).getMsgBacklogNoDelayed(), 0);
assertTrue(topicStats2.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog() >= finalStart2);
});

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1361,6 +1361,8 @@ default PartitionedTopicStats getPartitionedStats(String topic, boolean perParti
* Set to true to get precise backlog, Otherwise get imprecise backlog.
* @param subscriptionBacklogSize
* Whether to get backlog size for each subscription.
* @param getEarliestTimeInBacklog
* Whether to get the earliest time in backlog.
* @return a future that can be used to track when the partitioned topic statistics are returned
*/
CompletableFuture<PartitionedTopicStats> getPartitionedStatsAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public void reset() {
consumersAfterMarkDeletePosition.clear();
nonContiguousDeletedMessagesRanges = 0;
nonContiguousDeletedMessagesRangesSerializedSize = 0;
earliestMsgPublishTimeInBacklog = 0L;
delayedMessageIndexSizeInBytes = 0;
subscriptionProperties.clear();
filterProcessedMsgCount = 0;
Expand Down Expand Up @@ -221,6 +222,17 @@ public SubscriptionStatsImpl add(SubscriptionStatsImpl stats) {
this.consumersAfterMarkDeletePosition.putAll(stats.consumersAfterMarkDeletePosition);
this.nonContiguousDeletedMessagesRanges += stats.nonContiguousDeletedMessagesRanges;
this.nonContiguousDeletedMessagesRangesSerializedSize += stats.nonContiguousDeletedMessagesRangesSerializedSize;
if (this.earliestMsgPublishTimeInBacklog != 0 && stats.earliestMsgPublishTimeInBacklog != 0) {
this.earliestMsgPublishTimeInBacklog = Math.min(
this.earliestMsgPublishTimeInBacklog,
stats.earliestMsgPublishTimeInBacklog
);
} else {
this.earliestMsgPublishTimeInBacklog = Math.max(
this.earliestMsgPublishTimeInBacklog,
stats.earliestMsgPublishTimeInBacklog
);
}
this.delayedMessageIndexSizeInBytes += stats.delayedMessageIndexSizeInBytes;
this.subscriptionProperties.putAll(stats.subscriptionProperties);
this.filterProcessedMsgCount += stats.filterProcessedMsgCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ public void reset() {
this.lastOffloadFailureTimeStamp = 0;
this.lastOffloadSuccessTimeStamp = 0;
this.publishRateLimitedTimes = 0L;
this.earliestMsgPublishTimeInBacklogs = 0L;
this.delayedMessageIndexSizeInBytes = 0;
this.compaction.reset();
this.ownerBroker = null;
Expand Down Expand Up @@ -315,6 +316,17 @@ public TopicStatsImpl add(TopicStats ts) {
}
}
}
if (earliestMsgPublishTimeInBacklogs != 0 && ((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs != 0) {
earliestMsgPublishTimeInBacklogs = Math.min(
earliestMsgPublishTimeInBacklogs,
((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs
);
} else {
earliestMsgPublishTimeInBacklogs = Math.max(
earliestMsgPublishTimeInBacklogs,
((TopicStatsImpl) ts).earliestMsgPublishTimeInBacklogs
);
}
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data.stats;

import static org.testng.Assert.assertEquals;
import org.testng.annotations.Test;

public class SubscriptionStatsImplTest {

@Test
public void testReset() {
SubscriptionStatsImpl stats = new SubscriptionStatsImpl();
stats.earliestMsgPublishTimeInBacklog = 1L;
stats.reset();
assertEquals(stats.earliestMsgPublishTimeInBacklog, 0L);

}

@Test
public void testAdd_EarliestMsgPublishTimeInBacklogs_Earliest() {
SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
stats1.earliestMsgPublishTimeInBacklog = 10L;

SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
stats2.earliestMsgPublishTimeInBacklog = 20L;

SubscriptionStatsImpl aggregate = stats1.add(stats2);
assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 10L);
}

@Test
public void testAdd_EarliestMsgPublishTimeInBacklogs_First0() {
SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
stats1.earliestMsgPublishTimeInBacklog = 0L;

SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
stats2.earliestMsgPublishTimeInBacklog = 20L;

SubscriptionStatsImpl aggregate = stats1.add(stats2);
assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 20L);
}

@Test
public void testAdd_EarliestMsgPublishTimeInBacklogs_Second0() {
SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
stats1.earliestMsgPublishTimeInBacklog = 10L;

SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
stats2.earliestMsgPublishTimeInBacklog = 0L;

SubscriptionStatsImpl aggregate = stats1.add(stats2);
assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 10L);
}

@Test
public void testAdd_EarliestMsgPublishTimeInBacklogs_Zero() {
SubscriptionStatsImpl stats1 = new SubscriptionStatsImpl();
stats1.earliestMsgPublishTimeInBacklog = 0L;

SubscriptionStatsImpl stats2 = new SubscriptionStatsImpl();
stats2.earliestMsgPublishTimeInBacklog = 0L;

SubscriptionStatsImpl aggregate = stats1.add(stats2);
assertEquals(aggregate.earliestMsgPublishTimeInBacklog, 0L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.policies.data.stats;

import static org.testng.Assert.assertEquals;
import org.testng.annotations.Test;

public class TopicStatsImplTest {

@Test
public void testReset() {
TopicStatsImpl stats = new TopicStatsImpl();
stats.earliestMsgPublishTimeInBacklogs = 1L;
stats.reset();
assertEquals(stats.earliestMsgPublishTimeInBacklogs, 0L);
}

@Test
public void testAdd_EarliestMsgPublishTimeInBacklogs_Earliest() {
TopicStatsImpl stats1 = new TopicStatsImpl();
stats1.earliestMsgPublishTimeInBacklogs = 10L;

TopicStatsImpl stats2 = new TopicStatsImpl();
stats2.earliestMsgPublishTimeInBacklogs = 20L;

TopicStatsImpl aggregate = stats1.add(stats2);
assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 10L);
}

@Test
public void testAdd_EarliestMsgPublishTimeInBacklogs_First0() {
TopicStatsImpl stats1 = new TopicStatsImpl();
stats1.earliestMsgPublishTimeInBacklogs = 0L;

TopicStatsImpl stats2 = new TopicStatsImpl();
stats2.earliestMsgPublishTimeInBacklogs = 20L;

TopicStatsImpl aggregate = stats1.add(stats2);
assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 20L);
}

@Test
public void testAdd_EarliestMsgPublishTimeInBacklogs_Second0() {
TopicStatsImpl stats1 = new TopicStatsImpl();
stats1.earliestMsgPublishTimeInBacklogs = 10L;

TopicStatsImpl stats2 = new TopicStatsImpl();
stats2.earliestMsgPublishTimeInBacklogs = 0L;

TopicStatsImpl aggregate = stats1.add(stats2);
assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 10L);
}

@Test
public void testAdd_EarliestMsgPublishTimeInBacklogs_Zero() {
TopicStatsImpl stats1 = new TopicStatsImpl();
stats1.earliestMsgPublishTimeInBacklogs = 0L;

TopicStatsImpl stats2 = new TopicStatsImpl();
stats2.earliestMsgPublishTimeInBacklogs = 0L;

TopicStatsImpl aggregate = stats1.add(stats2);
assertEquals(aggregate.earliestMsgPublishTimeInBacklogs, 0L);
}
}

0 comments on commit d2300e4

Please sign in to comment.