From 6404cc15dd8ca6e95c2741bf97794c38a2d16d9c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 24 Sep 2024 03:50:01 +0800 Subject: [PATCH] [fix] Key_Shared mode consumption latency when low traffic (#23340) Co-authored-by: Lari Hotari (cherry picked from commit 4ce0c752cc4b2d6dccb818ab0ffa854e82e42b85) --- conf/broker.conf | 4 ++-- conf/standalone.conf | 4 ++-- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 4 ++-- .../persistent/PersistentDispatcherMultipleConsumers.java | 6 ++++-- 4 files changed, 10 insertions(+), 8 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 12c97d63068b1..4d6f54a5e6676 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -470,12 +470,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0 # On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered # out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff # delay. This parameter sets the initial backoff delay in milliseconds. -dispatcherRetryBackoffInitialTimeInMs=100 +dispatcherRetryBackoffInitialTimeInMs=1 # On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered # out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff # delay. This parameter sets the maximum backoff delay in milliseconds. -dispatcherRetryBackoffMaxTimeInMs=1000 +dispatcherRetryBackoffMaxTimeInMs=10 # Precise dispatcher flow control according to history message number of each entry preciseDispatcherFlowControl=false diff --git a/conf/standalone.conf b/conf/standalone.conf index 98e2c5e946478..10ca74a0b0e6b 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -286,12 +286,12 @@ dispatcherReadFailureBackoffMandatoryStopTimeInMs=0 # On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered # out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff # delay. This parameter sets the initial backoff delay in milliseconds. -dispatcherRetryBackoffInitialTimeInMs=100 +dispatcherRetryBackoffInitialTimeInMs=1 # On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered # out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff # delay. This parameter sets the maximum backoff delay in milliseconds. -dispatcherRetryBackoffMaxTimeInMs=1000 +dispatcherRetryBackoffMaxTimeInMs=10 # Precise dispatcher flow control according to history message number of each entry preciseDispatcherFlowControl=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 834de2b77e919..867c2c2461e3c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1198,14 +1198,14 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered " + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff " + "delay. This parameter sets the initial backoff delay in milliseconds.") - private int dispatcherRetryBackoffInitialTimeInMs = 100; + private int dispatcherRetryBackoffInitialTimeInMs = 1; @FieldContext( category = CATEGORY_POLICIES, doc = "On Shared and KeyShared subscriptions, if all available messages in the subscription are filtered " + "out and not dispatched to any consumer, message dispatching will be rescheduled with a backoff " + "delay. This parameter sets the maximum backoff delay in milliseconds.") - private int dispatcherRetryBackoffMaxTimeInMs = 1000; + private int dispatcherRetryBackoffMaxTimeInMs = 10; @FieldContext( dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 0f8404043c37d..3c5d79476b3cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -696,10 +696,12 @@ private synchronized void handleSendingMessagesAndReadingMore(ReadType readType, boolean triggerReadingMore = sendMessagesToConsumers(readType, entries, needAcquireSendInProgress); int entriesDispatched = lastNumberOfEntriesDispatched; updatePendingBytesToDispatch(-totalBytesSize); + if (entriesDispatched > 0) { + // Reset the backoff when we successfully dispatched messages + retryBackoff.reset(); + } if (triggerReadingMore) { if (entriesDispatched > 0) { - // Reset the backoff when we successfully dispatched messages - retryBackoff.reset(); // Call readMoreEntries in the same thread to trigger the next read readMoreEntries(); } else if (entriesDispatched == 0) {