diff --git a/conf/broker.conf b/conf/broker.conf index d7fc1b7527e5d..426ddd158f59f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -262,6 +262,9 @@ dispatchThrottlingOnNonBacklogConsumerEnabled=true # Max number of entries to read from bookkeeper. By default it is 100 entries. dispatcherMaxReadBatchSize=100 +# Max size in bytes of entries to read from bookkeeper. By default it is 5MB. +dispatcherMaxReadSizeBytes=5242880 + # Min number of entries to read from bookkeeper. By default it is 1 entries. # When there is an error occurred on reading entries from bookkeeper, the broker # will backoff the batch size to this minimum number." diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index af18d1d28c5fd..af11a7e29b9ae 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -522,6 +522,14 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int dispatcherMaxReadBatchSize = 100; + // <-- dispatcher read settings --> + @FieldContext( + dynamic = true, + category = CATEGORY_SERVER, + doc = "Max size in bytes of entries to read from bookkeeper. By default it is 5MB." + ) + private int dispatcherMaxReadSizeBytes = 5 * 1024 * 1024; + @FieldContext( dynamic = true, category = CATEGORY_SERVER, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index c573a1f89defd..621c6d2cbaa18 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -354,7 +354,8 @@ public void readMoreEntries() { consumerList.size()); } havePendingRead = true; - cursor.asyncReadEntriesOrWait(messagesToRead, this, ReadType.Normal); + cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), this, + ReadType.Normal); } else { log.debug("[{}] Cannot schedule next read until previous one is done", name); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 5643c5b3005b2..867c12b253516 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -450,10 +450,11 @@ protected void readMoreEntries(Consumer consumer) { log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead); } havePendingRead = true; + if (consumer.readCompacted()) { topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer); } else { - cursor.asyncReadEntriesOrWait(messagesToRead, this, consumer); + cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), this, consumer); } } else { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 12fb8777a01bb..6503df3fa3fdc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -77,6 +77,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat private Optional dispatchRateLimiter = Optional.empty(); private int readBatchSize; + private final int readMaxSizeBytes; private final int producerQueueThreshold; @@ -119,6 +120,7 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String readBatchSize = Math.min( producerQueueSize, topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize()); + readMaxSizeBytes = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes(); producerQueueThreshold = (int) (producerQueueSize * 0.9); this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); @@ -138,6 +140,7 @@ public PersistentReplicator(PersistentTopic topic, String replicatorName, String readBatchSize = Math.min( producerQueueSize, topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize()); + readMaxSizeBytes = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes(); producerQueueThreshold = (int) (producerQueueSize * 0.9); this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); @@ -285,7 +288,7 @@ protected void readMoreEntries() { log.debug("[{}][{} -> {}] Schedule read of {} messages", topicName, localCluster, remoteCluster, messagesToRead); } - cursor.asyncReadEntriesOrWait(messagesToRead, this, null); + cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this, null); } else { if (log.isDebugEnabled()) { log.debug("[{}][{} -> {}] Not scheduling read due to pending read. Messages To Read {}", topicName,