File tree Expand file tree Collapse file tree 3 files changed +17
-18
lines changed
pulsar-broker/src/main/java/org/apache/pulsar/broker/service Expand file tree Collapse file tree 3 files changed +17
-18
lines changed Original file line number Diff line number Diff line change @@ -1715,14 +1715,22 @@ public synchronized void monitorBacklogQuota() {
1715
1715
if (persistentTopic .isSizeBacklogExceeded ()) {
1716
1716
getBacklogQuotaManager ().handleExceededBacklogQuota (persistentTopic ,
1717
1717
BacklogQuota .BacklogQuotaType .destination_storage , false );
1718
- } else if (persistentTopic .isTimeBacklogExceeded ()) {
1719
- getBacklogQuotaManager ().handleExceededBacklogQuota (persistentTopic ,
1720
- BacklogQuota .BacklogQuotaType .message_age ,
1721
- pulsar .getConfiguration ().isPreciseTimeBasedBacklogQuotaCheck ());
1722
1718
} else {
1723
- if (log .isDebugEnabled ()) {
1724
- log .debug ("quota not exceeded for [{}]" , topic .getName ());
1725
- }
1719
+ persistentTopic .checkTimeBacklogExceeded ().thenAccept (isExceeded -> {
1720
+ if (isExceeded ) {
1721
+ getBacklogQuotaManager ().handleExceededBacklogQuota (persistentTopic ,
1722
+ BacklogQuota .BacklogQuotaType .message_age ,
1723
+ pulsar .getConfiguration ().isPreciseTimeBasedBacklogQuotaCheck ());
1724
+ } else {
1725
+ if (log .isDebugEnabled ()) {
1726
+ log .debug ("quota not exceeded for [{}]" , topic .getName ());
1727
+ }
1728
+ }
1729
+ }).exceptionally (throwable -> {
1730
+ log .error ("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota" ,
1731
+ persistentTopic .getName (), throwable );
1732
+ return null ;
1733
+ });
1726
1734
}
1727
1735
}
1728
1736
});
Original file line number Diff line number Diff line change @@ -2561,15 +2561,6 @@ public boolean isSizeBacklogExceeded() {
2561
2561
/**
2562
2562
* @return determine if backlog quota enforcement needs to be done for topic based on time limit
2563
2563
*/
2564
- public boolean isTimeBacklogExceeded () {
2565
- try {
2566
- return checkTimeBacklogExceeded ().get ();
2567
- } catch (Throwable e ) {
2568
- log .error ("[{}] checkTimeBacklogExceeded failed." , topic , e );
2569
- return false ;
2570
- }
2571
- }
2572
-
2573
2564
public CompletableFuture <Boolean > checkTimeBacklogExceeded () {
2574
2565
TopicName topicName = TopicName .get (getName ());
2575
2566
int backlogQuotaLimitInSecond = getBacklogQuota (BacklogQuotaType .message_age ).getLimitTime ();
Original file line number Diff line number Diff line change @@ -36,8 +36,8 @@ public boolean isSizeBacklogExceeded() {
36
36
}
37
37
38
38
@ Override
39
- public boolean isTimeBacklogExceeded () {
40
- return false ;
39
+ public CompletableFuture < Boolean > checkTimeBacklogExceeded () {
40
+ return CompletableFuture . completedFuture ( false ) ;
41
41
}
42
42
43
43
@ Override
You can’t perform that action at this time.
0 commit comments