From 0d3c41ef61007c5da41db01df96161786e1229c7 Mon Sep 17 00:00:00 2001 From: Xiaoyu Hou Date: Sat, 14 May 2022 14:15:05 +0800 Subject: [PATCH] Sync topicPublishRateLimiter update --- .../pulsar/broker/service/AbstractTopic.java | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 02c2c3d2f4445..ea9abb5539f3b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -111,6 +111,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener 0 || publishRate.publishThrottlingRateInMsg > 0) { - log.info("Enabling publish rate limiting {} ", publishRate); - if (!preciseTopicPublishRateLimitingEnable) { - this.brokerService.setupTopicPublishRateLimiterMonitor(); - } + synchronized (topicPublishRateLimiterLock) { + PublishRate publishRate = topicPolicies.getPublishRate().get(); + if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) { + log.info("Enabling publish rate limiting {} ", publishRate); + if (!preciseTopicPublishRateLimitingEnable) { + this.brokerService.setupTopicPublishRateLimiterMonitor(); + } - if (this.topicPublishRateLimiter == null + if (this.topicPublishRateLimiter == null || this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) { - // create new rateLimiter if rate-limiter is disabled - if (preciseTopicPublishRateLimitingEnable) { - this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate, + // create new rateLimiter if rate-limiter is disabled + if (preciseTopicPublishRateLimitingEnable) { + this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate, () -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor()); + } else { + this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate); + } } else { - this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate); + this.topicPublishRateLimiter.update(publishRate); } } else { - this.topicPublishRateLimiter.update(publishRate); - } - } else { - log.info("Disabling publish throttling for {}", this.topic); - if (topicPublishRateLimiter != null) { - topicPublishRateLimiter.close(); + log.info("Disabling publish throttling for {}", this.topic); + if (topicPublishRateLimiter != null) { + topicPublishRateLimiter.close(); + } + this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER; + enableProducerReadForPublishRateLimiting(); } - this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER; - enableProducerReadForPublishRateLimiting(); } }