From 9eb5ef6056b9e432263d13e54977c997faa1ab93 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 17 May 2023 01:03:11 +0800 Subject: [PATCH] [fix] [broker] In Key_Shared mode: remove unnecessary blocking mechanisms to avoid unnecessary blocking --- ...tStickyKeyDispatcherMultipleConsumers.java | 24 +------------------ 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 1a8c6e180a2a2..8f05530f58bfa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -71,17 +71,12 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi */ private final LinkedHashMap recentlyJoinedConsumers; - private final Set stuckConsumers; - private final Set nextStuckConsumers; - PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery()); this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery(); this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); - this.stuckConsumers = new HashSet<>(); - this.nextStuckConsumers = new HashSet<>(); this.keySharedMode = ksm.getKeySharedMode(); switch (this.keySharedMode) { case AUTO_SPLIT: @@ -226,8 +221,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis } } - nextStuckConsumers.clear(); - final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); final Map> consumerStickyKeyHashesMap = new HashMap<>(); @@ -318,14 +311,11 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis // acquire message-dispatch permits for already delivered messages acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent); - stuckConsumers.clear(); - if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null || recentlyJoinedConsumers.isEmpty())) { // This means, that all the messages we've just read cannot be dispatched right now. // This condition can only happen when: // 1. We have consumers ready to accept messages (otherwise the would not haven been triggered) // 2. All keys in the current set of messages are routing to consumers that are currently busy - // and stuck is not caused by stuckConsumers // // The solution here is to move on and read next batch of messages which might hopefully contain // also keys meant for other consumers. @@ -334,10 +324,7 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis // ahead in the stream while the new consumers are not ready to accept the new messages, // therefore would be most likely only increase the distance between read-position and mark-delete // position. - if (!nextStuckConsumers.isEmpty()) { - isDispatcherStuckOnReplays = true; - stuckConsumers.addAll(nextStuckConsumers); - } + isDispatcherStuckOnReplays = true; return true; } else if (currentThreadKeyNumber == 0) { return true; @@ -348,8 +335,6 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, int maxMessages, ReadType readType, Set stickyKeyHashes) { if (maxMessages == 0) { - // the consumer was stuck - nextStuckConsumers.add(consumer); return 0; } if (readType == ReadType.Normal && stickyKeyHashes != null @@ -366,13 +351,6 @@ private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List en // At this point, all the old messages were already consumed and this consumer // is now ready to receive any message if (maxReadPosition == null) { - // stop to dispatch by stuckConsumers - if (stuckConsumers.contains(consumer)) { - if (log.isDebugEnabled()) { - log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", name, consumer); - } - return 0; - } // The consumer has not recently joined, so we can send all messages return maxMessages; }