Skip to content

Commit

Permalink
rebase code for apache#12294
Browse files Browse the repository at this point in the history
  • Loading branch information
wangjialing committed Oct 21, 2021
1 parent 02ba0a9 commit 99a776f
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getBrokerDispatchRateLimiter().isPresent()) {
topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent,
totalBytesSent);
topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> e
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getBrokerDispatchRateLimiter().isPresent()) {
topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(
sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());
topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes());
}

if (topic.getDispatchRateLimiter().isPresent()) {
Expand Down

0 comments on commit 99a776f

Please sign in to comment.