Skip to content

Commit

Permalink
Sync topicPublishRateLimiter update
Browse files Browse the repository at this point in the history
  • Loading branch information
AnonHxy committed May 14, 2022
1 parent 58c82a7 commit cdaac52
Showing 1 changed file with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
protected volatile boolean schemaValidationEnforced = false;

protected volatile PublishRateLimiter topicPublishRateLimiter;
private final Object topicPublishRateLimiterLock = new Object();

protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;

Expand Down Expand Up @@ -1143,32 +1144,34 @@ protected boolean isExceedMaximumMessageSize(int size, PublishContext publishCon
* update topic publish dispatcher for this topic.
*/
public void updatePublishDispatcher() {
PublishRate publishRate = topicPolicies.getPublishRate().get();
if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
log.info("Enabling publish rate limiting {} ", publishRate);
if (!preciseTopicPublishRateLimitingEnable) {
this.brokerService.setupTopicPublishRateLimiterMonitor();
}
synchronized (topicPublishRateLimiterLock) {
PublishRate publishRate = topicPolicies.getPublishRate().get();
if (publishRate.publishThrottlingRateInByte > 0 || publishRate.publishThrottlingRateInMsg > 0) {
log.info("Enabling publish rate limiting {} ", publishRate);
if (!preciseTopicPublishRateLimitingEnable) {
this.brokerService.setupTopicPublishRateLimiterMonitor();
}

if (this.topicPublishRateLimiter == null
if (this.topicPublishRateLimiter == null
|| this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
() -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
}
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
this.topicPublishRateLimiter.update(publishRate);
}
} else {
this.topicPublishRateLimiter.update(publishRate);
}
} else {
log.info("Disabling publish throttling for {}", this.topic);
if (topicPublishRateLimiter != null) {
topicPublishRateLimiter.close();
log.info("Disabling publish throttling for {}", this.topic);
if (topicPublishRateLimiter != null) {
topicPublishRateLimiter.close();
}
this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProducerReadForPublishRateLimiting();
}
this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProducerReadForPublishRateLimiting();
}
}

Expand Down

0 comments on commit cdaac52

Please sign in to comment.