From 6a8f0c135686073aa9448de2cdeca3c3ed644e49 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 17 May 2024 10:39:07 +0800 Subject: [PATCH] [improve] [broker] Do not call cursor.isCursorDataFullyPersistable if disabled dispatcherPauseOnAckStatePersistentEnabled --- ...PersistentDispatcherMultipleConsumers.java | 9 ++++ ...SubscriptionPauseOnAckStatPersistTest.java | 50 +++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 49a19c0fe3138..f20750fa0c20d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -1087,6 +1087,15 @@ public void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion) { @Override public boolean checkAndResumeIfPaused() { boolean paused = blockedDispatcherOnCursorDataCanNotFullyPersist == TRUE; + // Calling "cursor.isCursorDataFullyPersistable()" will loop the collection "individualDeletedMessages". It is + // not a light method. + // If never enabled "dispatcherPauseOnAckStatePersistentEnabled", skip the following checks to improve + // performance. + if (!paused && !topic.isDispatcherPauseOnAckStatePersistentEnabled()){ + // "true" means no need to pause. + return true; + } + // Enabled "dispatcherPauseOnAckStatePersistentEnabled" before. boolean shouldPauseNow = !cursor.isCursorDataFullyPersistable() && topic.isDispatcherPauseOnAckStatePersistentEnabled(); // No need to change. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java index 9a4de8ecf21cc..36c36735c067e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionPauseOnAckStatPersistTest.java @@ -23,8 +23,12 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; @@ -38,6 +42,7 @@ import org.apache.pulsar.common.policies.data.TopicPolicies; import org.awaitility.Awaitility; import org.awaitility.reflect.WhiteboxImpl; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -549,4 +554,49 @@ public void testMultiConsumersPauseOnAckStatPersistNotAffectReplayRead(Subscript c4.close(); admin.topics().delete(tpName, false); } + + @Test(dataProvider = "multiConsumerSubscriptionTypes") + public void testNeverCallCursorIsCursorDataFullyPersistableIfDisabledTheFeature(SubscriptionType subscriptionType) + throws Exception { + final String tpName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + final String mlName = TopicName.get(tpName).getPersistenceNamingEncoding(); + final String subscription = "s1"; + final int msgSendCount = 100; + // Inject a injection to record the counter of calling "cursor.isCursorDataFullyPersistable". + final ManagedLedgerImpl ml = (ManagedLedgerImpl) pulsar.getBrokerService().getManagedLedgerFactory().open(mlName); + final ManagedCursorImpl cursor = (ManagedCursorImpl) ml.openCursor(subscription); + final ManagedCursorImpl spyCursor = Mockito.spy(cursor); + AtomicInteger callingIsCursorDataFullyPersistableCounter = new AtomicInteger(); + Mockito.doAnswer(invocation -> { + callingIsCursorDataFullyPersistableCounter.incrementAndGet(); + return invocation.callRealMethod(); + }).when(spyCursor).isCursorDataFullyPersistable(); + final ManagedCursorContainer cursors = WhiteboxImpl.getInternalState(ml, "cursors"); + final ManagedCursorContainer activeCursors = WhiteboxImpl.getInternalState(ml, "activeCursors"); + cursors.removeCursor(cursor.getName()); + activeCursors.removeCursor(cursor.getName()); + cursors.add(spyCursor, null); + activeCursors.add(spyCursor, null); + + // Pub & Sub. + Consumer c1 = pulsarClient.newConsumer(Schema.STRING).topic(tpName).subscriptionName(subscription) + .isAckReceiptEnabled(true).subscriptionType(subscriptionType).subscribe(); + Producer p1 = pulsarClient.newProducer(Schema.STRING).topic(tpName).enableBatching(false).create(); + for (int i = 0; i < msgSendCount; i++) { + p1.send(Integer.valueOf(i).toString()); + } + for (int i = 0; i < msgSendCount; i++) { + Message m = c1.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(m); + c1.acknowledge(m); + } + // Verify: the counter of calling "cursor.isCursorDataFullyPersistable". + // In expected the counter should be "0", to avoid flaky, verify it is less than 5. + Assert.assertTrue(callingIsCursorDataFullyPersistableCounter.get() < 5); + + // cleanup. + p1.close(); + c1.close(); + admin.topics().delete(tpName, false); + } }