diff --git a/conf/broker.conf b/conf/broker.conf index 30e79ebc9f090..d117d679c8532 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -453,6 +453,12 @@ entryFilterNames= # The directory for all the entry filter implementations entryFiltersDirectory= +# Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled, +# only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and +# subscription level. When enabled, messages filtered out due to entry filter logic are counted towards +# each relevant rate limit. +dispatchThrottlingForFilteredEntriesEnabled=false + # Whether allow topic level entry filters policies overrides broker configuration. allowOverrideEntryFilters=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index a6e9a556820a6..8c883045e66c5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1050,6 +1050,16 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se ) private boolean dispatcherDispatchMessagesInSubscriptionThread = true; + @FieldContext( + dynamic = false, + category = CATEGORY_SERVER, + doc = "Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled, " + + "only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and " + + "subscription level. When enabled, messages filtered out due to entry filter logic are counted towards " + + "each relevant rate limit." + ) + private boolean dispatchThrottlingForFilteredEntriesEnabled = false; + // <-- dispatcher read settings --> @FieldContext( dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 29710067a61d4..df02bbd85d470 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -107,6 +107,9 @@ public int filterEntriesForConsumer(Optional optMetadataArray long totalBytes = 0; int totalChunkedMessages = 0; int totalEntries = 0; + int filteredMessageCount = 0; + int filteredEntryCount = 0; + long filteredBytesCount = 0; final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters); List entriesToFiltered = hasFilter ? new ArrayList<>() : null; List entriesToRedeliver = hasFilter ? new ArrayList<>() : null; @@ -135,6 +138,9 @@ public int filterEntriesForConsumer(Optional optMetadataArray // FilterResult will be always `ACCEPTED` when there is No Filter // dont need to judge whether `hasFilter` is true or not. this.filterRejectedMsgs.add(entryMsgCnt); + filteredEntryCount++; + filteredMessageCount += entryMsgCnt; + filteredBytesCount += metadataAndPayload.readableBytes(); entry.release(); continue; } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) { @@ -143,6 +149,9 @@ public int filterEntriesForConsumer(Optional optMetadataArray // FilterResult will be always `ACCEPTED` when there is No Filter // dont need to judge whether `hasFilter` is true or not. this.filterRescheduledMsgs.add(entryMsgCnt); + filteredEntryCount++; + filteredMessageCount += entryMsgCnt; + filteredBytesCount += metadataAndPayload.readableBytes(); entry.release(); continue; } @@ -231,6 +240,11 @@ public int filterEntriesForConsumer(Optional optMetadataArray } + if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) { + acquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, filteredEntryCount, + filteredMessageCount, filteredBytesCount); + } + sendMessageInfo.setTotalMessages(totalMessages); sendMessageInfo.setTotalBytes(totalBytes); sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages); @@ -243,6 +257,19 @@ private void individualAcknowledgeMessageIfNeeded(Position position, Map + rateLimiter.tryDispatchPermit(permits, totalBytesSent)); + topic.getDispatchRateLimiter().ifPresent(rateLimter -> + rateLimter.tryDispatchPermit(permits, totalBytesSent)); + getRateLimiter().ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(permits, totalBytesSent)); + } + } + /** * Determine whether the number of consumers on the subscription reaches the threshold. * @return diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 02d2e725379b6..15b42fedd38ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -684,7 +684,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis totalBytesSent += sendMessageInfo.getTotalBytes(); } - acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent); + acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); if (entriesToDispatch > 0) { if (log.isDebugEnabled()) { @@ -700,23 +700,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis return true; } - private void acquirePermitsForDeliveredMessages(long totalEntries, long totalMessagesSent, long totalBytesSent) { - // acquire message-dispatch permits for already delivered messages - long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent; - if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { - if (topic.getBrokerDispatchRateLimiter().isPresent()) { - topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent); - } - if (topic.getDispatchRateLimiter().isPresent()) { - topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent); - } - - if (dispatchRateLimiter.isPresent()) { - dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent); - } - } - } - private boolean sendChunkedMessagesToConsumers(ReadType readType, List entries, MessageMetadata[] metadataArray) { @@ -775,7 +758,7 @@ private boolean sendChunkedMessagesToConsumers(ReadType readType, totalBytesSent += sendMessageInfo.getTotalBytes(); } - acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent); + acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); return numConsumers.get() == 0; // trigger a new readMoreEntries() call } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index accab20d2daed..3ba7a82aa5e35 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -221,23 +221,8 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List e redeliveryTracker, epoch) .addListener(future -> { if (future.isSuccess()) { - int permits = dispatchThrottlingOnBatchMessageEnabled ? entries.size() - : sendMessageInfo.getTotalMessages(); - // acquire message-dispatch permits for already delivered messages - if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { - if (topic.getBrokerDispatchRateLimiter().isPresent()) { - topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, - sendMessageInfo.getTotalBytes()); - } - - if (topic.getDispatchRateLimiter().isPresent()) { - topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, - sendMessageInfo.getTotalBytes()); - } - dispatchRateLimiter.ifPresent(rateLimiter -> - rateLimiter.tryDispatchPermit(permits, - sendMessageInfo.getTotalBytes())); - } + acquirePermitsForDeliveredMessages(topic, cursor, entries.size(), + sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes()); // Schedule a new read batch operation only after the previous batch has been written to the socket. topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 024ed8581ef1c..5eb553106e679 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -296,19 +296,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } // acquire message-dispatch permits for already delivered messages - if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { - long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent; - if (topic.getBrokerDispatchRateLimiter().isPresent()) { - topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent); - } - if (topic.getDispatchRateLimiter().isPresent()) { - topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent); - } - - if (dispatchRateLimiter.isPresent()) { - dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent); - } - } + acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); stuckConsumers.clear(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java index b129995a8cc47..cba15b0631006 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import io.netty.buffer.ByteBuf; @@ -29,11 +30,14 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; @@ -60,8 +64,9 @@ public class AbstractBaseDispatcherTest { @BeforeMethod public void setup() throws Exception { this.svcConfig = mock(ServiceConfiguration.class); + when(svcConfig.isDispatchThrottlingForFilteredEntriesEnabled()).thenReturn(true); this.subscriptionMock = mock(PersistentSubscription.class); - this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig); + this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, null); } @Test @@ -89,17 +94,24 @@ public void testFilterEntriesForConsumerOfEntryFilter() throws Exception { EntryFilter.FilterResult.REJECT); Map entryFilters = Map.of("key", mockFilter); when(mockTopic.getEntryFilters()).thenReturn(entryFilters); + DispatchRateLimiter subscriptionDispatchRateLimiter = mock(DispatchRateLimiter.class); - this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig); + this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, + subscriptionDispatchRateLimiter); List entries = new ArrayList<>(); - entries.add(EntryImpl.create(1, 2, createMessage("message1", 1))); + Entry e = EntryImpl.create(1, 2, createMessage("message1", 1)); + long expectedBytePermits = e.getLength(); + entries.add(e); SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size()); - // - int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null); + + ManagedCursor cursor = mock(ManagedCursor.class); + + int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, cursor, false, null); assertEquals(size, 0); + verify(subscriptionDispatchRateLimiter).tryDispatchPermit(1, expectedBytePermits); } @Test @@ -201,9 +213,18 @@ private ByteBuf createDelayedMessage(String message, int sequenceId) { private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher { + private final Optional dispatchRateLimiter; + protected AbstractBaseDispatcherTestHelper(Subscription subscription, - ServiceConfiguration serviceConfig) { + ServiceConfiguration serviceConfig, + DispatchRateLimiter rateLimiter) { super(subscription, serviceConfig); + dispatchRateLimiter = Optional.ofNullable(rateLimiter); + } + + @Override + public Optional getRateLimiter() { + return dispatchRateLimiter; } @Override