diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 30de90ac1a7e1..c3a62a3a3f785 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -384,14 +384,6 @@ private void initializeDispatchRateLimiterIfNeeded() { this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC)); } - // dispatch rate limiter for each subscription - subscriptions.forEach((name, subscription) -> { - Dispatcher dispatcher = subscription.getDispatcher(); - if (dispatcher != null) { - dispatcher.initializeDispatchRateLimiterIfNeeded(); - } - }); - // dispatch rate limiter for each replicator replicators.forEach((name, replicator) -> replicator.initializeDispatchRateLimiterIfNeeded()); @@ -2409,19 +2401,7 @@ public CompletableFuture onPoliciesUpdate(Policies data) { producer.checkPermissionsAsync().thenRun(producer::checkEncryption))); return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> { - List> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size()); - subscriptions.forEach((subName, sub) -> { - List> consumerCheckFutures = new ArrayList<>(sub.getConsumers().size()); - sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync())); - subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> { - Dispatcher dispatcher = sub.getDispatcher(); - if (dispatcher != null) { - dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate); - } - })); - }); - - return FutureUtil.waitForAll(subscriptionCheckFutures).thenCompose((___) -> { + return updateSubscriptionsDispatcherRateLimiter().thenCompose((___) -> { replicators.forEach((name, replicator) -> replicator.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate) ); @@ -3050,17 +3030,7 @@ public void onUpdate(TopicPolicies policies) { dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate); - List> consumerCheckFutures = new ArrayList<>(); - subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(consumer -> { - consumerCheckFutures.add(consumer.checkPermissionsAsync().thenRun(() -> { - Dispatcher dispatcher = sub.getDispatcher(); - if (dispatcher != null) { - dispatcher.updateRateLimiter(); - } - })); - })); - - FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> { + updateSubscriptionsDispatcherRateLimiter().thenRun(() -> { updatePublishDispatcher(); updateSubscribeRateLimiter(); replicators.forEach((name, replicator) -> replicator.getRateLimiter() @@ -3083,6 +3053,21 @@ public void onUpdate(TopicPolicies policies) { }); } + private CompletableFuture updateSubscriptionsDispatcherRateLimiter() { + List> subscriptionCheckFutures = new ArrayList<>((int) subscriptions.size()); + subscriptions.forEach((subName, sub) -> { + List> consumerCheckFutures = new ArrayList<>(sub.getConsumers().size()); + sub.getConsumers().forEach(consumer -> consumerCheckFutures.add(consumer.checkPermissionsAsync())); + subscriptionCheckFutures.add(FutureUtil.waitForAll(consumerCheckFutures).thenRun(() -> { + Dispatcher dispatcher = sub.getDispatcher(); + if (dispatcher != null) { + dispatcher.updateRateLimiter(); + } + })); + }); + return FutureUtil.waitForAll(subscriptionCheckFutures); + } + private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) { synchronized (dispatchRateLimiterLock) { if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {