@@ -2218,29 +2218,31 @@ public BacklogQuotaManager getBacklogQuotaManager() {
22182218 public void monitorBacklogQuota () {
22192219 long startTimeMillis = System .currentTimeMillis ();
22202220 forEachPersistentTopic (topic -> {
2221- if (topic .isSizeBacklogExceeded ()) {
2222- getBacklogQuotaManager ().handleExceededBacklogQuota (topic ,
2223- BacklogQuota .BacklogQuotaType .destination_storage , false );
2224- } else {
2225- topic .checkTimeBacklogExceeded ().thenAccept (isExceeded -> {
2226- if (isExceeded ) {
2227- getBacklogQuotaManager ().handleExceededBacklogQuota (topic ,
2228- BacklogQuota .BacklogQuotaType .message_age ,
2229- pulsar .getConfiguration ().isPreciseTimeBasedBacklogQuotaCheck ());
2230- } else {
2231- if (log .isDebugEnabled ()) {
2232- log .debug ("quota not exceeded for [{}]" , topic .getName ());
2221+ topic .updateOldPositionInfo ().thenAccept (__ -> {
2222+ if (topic .isSizeBacklogExceeded ()) {
2223+ getBacklogQuotaManager ().handleExceededBacklogQuota (topic ,
2224+ BacklogQuota .BacklogQuotaType .destination_storage , false );
2225+ } else {
2226+ topic .checkTimeBacklogExceeded (false ).thenAccept (isExceeded -> {
2227+ if (isExceeded ) {
2228+ getBacklogQuotaManager ().handleExceededBacklogQuota (topic ,
2229+ BacklogQuota .BacklogQuotaType .message_age ,
2230+ pulsar .getConfiguration ().isPreciseTimeBasedBacklogQuotaCheck ());
2231+ } else {
2232+ if (log .isDebugEnabled ()) {
2233+ log .debug ("quota not exceeded for [{}]" , topic .getName ());
2234+ }
22332235 }
2234- }
2235- }).exceptionally (throwable -> {
2236- log .error ("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota" ,
2236+ });
2237+ }
2238+ }).whenComplete ((unused , throwable ) -> {
2239+ if (throwable != null ) {
2240+ log .error ("Error when checkBacklogQuota({}) in monitorBacklogQuota" ,
22372241 topic .getName (), throwable );
2238- return null ;
2239- }).whenComplete ((unused , throwable ) -> {
2240- backlogQuotaCheckDuration .observe (
2241- MILLISECONDS .toSeconds (System .currentTimeMillis () - startTimeMillis ));
2242- });
2243- }
2242+ }
2243+ backlogQuotaCheckDuration .observe (
2244+ MILLISECONDS .toSeconds (System .currentTimeMillis () - startTimeMillis ));
2245+ });
22442246 });
22452247 }
22462248
0 commit comments