Skip to content

Commit

Permalink
[improve][broker] Decouple pulsar_storage_backlog_age_seconds metric …
Browse files Browse the repository at this point in the history
…with backlogQuota check (apache#23619)
  • Loading branch information
shibd authored Nov 28, 2024
1 parent c50fa56 commit 963be2c
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2245,29 +2245,31 @@ public BacklogQuotaManager getBacklogQuotaManager() {
public void monitorBacklogQuota() {
long startTimeMillis = System.currentTimeMillis();
forEachPersistentTopic(topic -> {
if (topic.isSizeBacklogExceeded()) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.destination_storage, false);
} else {
topic.checkTimeBacklogExceeded().thenAccept(isExceeded -> {
if (isExceeded) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.message_age,
pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
} else {
if (log.isDebugEnabled()) {
log.debug("quota not exceeded for [{}]", topic.getName());
topic.updateOldPositionInfo().thenAccept(__ -> {
if (topic.isSizeBacklogExceeded()) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.destination_storage, false);
} else {
topic.checkTimeBacklogExceeded(false).thenAccept(isExceeded -> {
if (isExceeded) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.message_age,
pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
} else {
if (log.isDebugEnabled()) {
log.debug("quota not exceeded for [{}]", topic.getName());
}
}
}
}).exceptionally(throwable -> {
log.error("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota",
});
}
}).whenComplete((unused, throwable) -> {
if (throwable != null) {
log.error("Error when checkBacklogQuota({}) in monitorBacklogQuota",
topic.getName(), throwable);
return null;
}).whenComplete((unused, throwable) -> {
backlogQuotaCheckDuration.observe(
MILLISECONDS.toSeconds(System.currentTimeMillis() - startTimeMillis));
});
}
}
backlogQuotaCheckDuration.observe(
MILLISECONDS.toSeconds(System.currentTimeMillis() - startTimeMillis));
});
});
}

Expand Down
Loading

0 comments on commit 963be2c

Please sign in to comment.