From 5bbca7cbe3d4989d0590d6d36904d9db8e5500ac Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 21 Nov 2024 11:22:15 +0800 Subject: [PATCH 1/3] [improve][broker] Decouple pulsar_storage_backlog_age_seconds metric from backlogQuota policy --- .../pulsar/broker/service/BrokerService.java | 44 +++-- .../service/persistent/PersistentTopic.java | 179 ++++++++---------- .../service/persistent/SystemTopic.java | 2 +- .../service/BacklogQuotaManagerTest.java | 112 +++++++++++ 4 files changed, 217 insertions(+), 120 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 77cd52f4558ea..6afa1ae32fbcb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2245,29 +2245,31 @@ public BacklogQuotaManager getBacklogQuotaManager() { public void monitorBacklogQuota() { long startTimeMillis = System.currentTimeMillis(); forEachPersistentTopic(topic -> { - if (topic.isSizeBacklogExceeded()) { - getBacklogQuotaManager().handleExceededBacklogQuota(topic, - BacklogQuota.BacklogQuotaType.destination_storage, false); - } else { - topic.checkTimeBacklogExceeded().thenAccept(isExceeded -> { - if (isExceeded) { - getBacklogQuotaManager().handleExceededBacklogQuota(topic, - BacklogQuota.BacklogQuotaType.message_age, - pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()); - } else { - if (log.isDebugEnabled()) { - log.debug("quota not exceeded for [{}]", topic.getName()); + topic.updateOldPositionInfo().thenAccept(__ -> { + if (topic.isSizeBacklogExceeded()) { + getBacklogQuotaManager().handleExceededBacklogQuota(topic, + BacklogQuota.BacklogQuotaType.destination_storage, false); + } else { + topic.checkTimeBacklogExceeded(false).thenAccept(isExceeded -> { + if (isExceeded) { + getBacklogQuotaManager().handleExceededBacklogQuota(topic, + BacklogQuota.BacklogQuotaType.message_age, + pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()); + } else { + if (log.isDebugEnabled()) { + log.debug("quota not exceeded for [{}]", topic.getName()); + } } - } - }).exceptionally(throwable -> { - log.error("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota", + }); + } + }).whenComplete((unused, throwable) -> { + if (throwable != null) { + log.error("Error when checkBacklogQuota({}) in monitorBacklogQuota", topic.getName(), throwable); - return null; - }).whenComplete((unused, throwable) -> { - backlogQuotaCheckDuration.observe( - MILLISECONDS.toSeconds(System.currentTimeMillis() - startTimeMillis)); - }); - } + } + backlogQuotaCheckDuration.observe( + MILLISECONDS.toSeconds(System.currentTimeMillis() - startTimeMillis)); + }); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 651d12373628b..bf69ae786ca08 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -295,14 +295,14 @@ protected TopicStatsHelper initialValue() { PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater( PersistentTopic.class, PersistentTopicAttributes.class, "persistentTopicAttributes"); - private volatile TimeBasedBacklogQuotaCheckResult timeBasedBacklogQuotaCheckResult; - private static final AtomicReferenceFieldUpdater + private volatile OldestPositionInfo oldestPositionInfo; + private static final AtomicReferenceFieldUpdater TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater( PersistentTopic.class, - TimeBasedBacklogQuotaCheckResult.class, - "timeBasedBacklogQuotaCheckResult"); + OldestPositionInfo.class, + "oldestPositionInfo"); @Value - private static class TimeBasedBacklogQuotaCheckResult { + private static class OldestPositionInfo { Position oldestCursorMarkDeletePosition; String cursorName; long positionPublishTimestampInMillis; @@ -2634,12 +2634,11 @@ public CompletableFuture asyncGetStats(GetStatsOptions stats.backlogQuotaLimitSize = getBacklogQuota(BacklogQuotaType.destination_storage).getLimitSize(); stats.backlogQuotaLimitTime = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime(); - TimeBasedBacklogQuotaCheckResult backlogQuotaCheckResult = timeBasedBacklogQuotaCheckResult; stats.oldestBacklogMessageAgeSeconds = getBestEffortOldestUnacknowledgedMessageAgeSeconds(); - stats.oldestBacklogMessageSubscriptionName = (backlogQuotaCheckResult == null) + stats.oldestBacklogMessageSubscriptionName = (oldestPositionInfo == null) || !hasBacklogs(getStatsOptions.isGetPreciseBacklog()) ? null - : backlogQuotaCheckResult.getCursorName(); + : oldestPositionInfo.getCursorName(); stats.compaction.reset(); mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> { @@ -3425,7 +3424,7 @@ public CompletableFuture checkBacklogQuotaExceeded(String producerName, Ba return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy)); } if (backlogQuotaType == BacklogQuotaType.message_age) { - return checkTimeBacklogExceeded().thenCompose(isExceeded -> { + return checkTimeBacklogExceeded(true).thenCompose(isExceeded -> { if (isExceeded) { log.debug("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName); @@ -3466,16 +3465,15 @@ public long getBestEffortOldestUnacknowledgedMessageAgeSeconds() { if (!hasBacklogs(false)) { return 0; } - TimeBasedBacklogQuotaCheckResult result = timeBasedBacklogQuotaCheckResult; - if (result == null) { + if (oldestPositionInfo == null) { return -1; } else { return TimeUnit.MILLISECONDS.toSeconds( - Clock.systemUTC().millis() - result.getPositionPublishTimestampInMillis()); + Clock.systemUTC().millis() - oldestPositionInfo.getPositionPublishTimestampInMillis()); } } - private void updateResultIfNewer(TimeBasedBacklogQuotaCheckResult updatedResult) { + private void updateResultIfNewer(OldestPositionInfo updatedResult) { TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER.updateAndGet(this, existingResult -> { if (existingResult == null @@ -3489,74 +3487,46 @@ private void updateResultIfNewer(TimeBasedBacklogQuotaCheckResult updatedResult) } - /** - * @return determine if backlog quota enforcement needs to be done for topic based on time limit - */ - public CompletableFuture checkTimeBacklogExceeded() { + public CompletableFuture updateOldPositionInfo() { TopicName topicName = TopicName.get(getName()); - int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime(); - if (log.isDebugEnabled()) { - log.debug("[{}] Time backlog quota = [{}]. Checking if exceeded.", topicName, backlogQuotaLimitInSecond); - } - - // If backlog quota by time is not set - if (backlogQuotaLimitInSecond <= 0) { - return CompletableFuture.completedFuture(false); - } - ManagedCursorContainer managedCursorContainer = (ManagedCursorContainer) ledger.getCursors(); CursorInfo oldestMarkDeleteCursorInfo = managedCursorContainer.getCursorWithOldestPosition(); // If we have no durable cursor since `ledger.getCursors()` only managed durable cursors - if (oldestMarkDeleteCursorInfo == null - || oldestMarkDeleteCursorInfo.getPosition() == null) { + if (oldestMarkDeleteCursorInfo == null || oldestMarkDeleteCursorInfo.getPosition() == null) { if (log.isDebugEnabled()) { - log.debug("[{}] No durable cursor found. Skipping time based backlog quota check." - + " Oldest mark-delete cursor info: {}", topicName, oldestMarkDeleteCursorInfo); + log.debug("[{}] No durable cursor found. Skip update old position info.", topicName); } - return CompletableFuture.completedFuture(false); + return CompletableFuture.completedFuture(null); } Position oldestMarkDeletePosition = oldestMarkDeleteCursorInfo.getPosition(); - - TimeBasedBacklogQuotaCheckResult lastCheckResult = timeBasedBacklogQuotaCheckResult; - if (lastCheckResult != null - && oldestMarkDeletePosition.compareTo(lastCheckResult.getOldestCursorMarkDeletePosition()) == 0) { - + OldestPositionInfo lastOldestPositionInfo = oldestPositionInfo; + if (lastOldestPositionInfo != null + && oldestMarkDeletePosition.compareTo(lastOldestPositionInfo.getOldestCursorMarkDeletePosition()) == 0) { // Same position, but the cursor causing it has changed? - if (!lastCheckResult.getCursorName().equals(oldestMarkDeleteCursorInfo.getCursor().getName())) { - final TimeBasedBacklogQuotaCheckResult updatedResult = new TimeBasedBacklogQuotaCheckResult( - lastCheckResult.getOldestCursorMarkDeletePosition(), + if (!lastOldestPositionInfo.getCursorName().equals(oldestMarkDeleteCursorInfo.getCursor().getName())) { + updateResultIfNewer(new OldestPositionInfo( + lastOldestPositionInfo.getOldestCursorMarkDeletePosition(), oldestMarkDeleteCursorInfo.getCursor().getName(), - lastCheckResult.getPositionPublishTimestampInMillis(), - oldestMarkDeleteCursorInfo.getVersion()); - - updateResultIfNewer(updatedResult); + lastOldestPositionInfo.getPositionPublishTimestampInMillis(), + oldestMarkDeleteCursorInfo.getVersion())); if (log.isDebugEnabled()) { - log.debug("[{}] Time-based backlog quota check. Updating cached result for position {}, " - + "since cursor causing it has changed from {} to {}", + log.debug("[{}] Updating cached old position info {}, " + + "since cursor causing it has changed from {} to {}", topicName, oldestMarkDeletePosition, - lastCheckResult.getCursorName(), + lastOldestPositionInfo.getCursorName(), oldestMarkDeleteCursorInfo.getCursor().getName()); } } - - long entryTimestamp = lastCheckResult.getPositionPublishTimestampInMillis(); - boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp); - if (log.isDebugEnabled()) { - log.debug("[{}] Time based backlog quota check. Using cache result for position {}. " - + "Entry timestamp: {}, expired: {}", - topicName, oldestMarkDeletePosition, entryTimestamp, expired); - } - return CompletableFuture.completedFuture(expired); + return CompletableFuture.completedFuture(null); } - if (brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()) { if (!hasBacklogs(true)) { - return CompletableFuture.completedFuture(false); + return CompletableFuture.completedFuture(null); } - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture future = new CompletableFuture<>(); // Check if first unconsumed message(first message after mark delete position) // for slowest cursor's has expired. Position position = ledger.getNextValidPosition(oldestMarkDeletePosition); @@ -3566,34 +3536,16 @@ public CompletableFuture checkTimeBacklogExceeded() { public void readEntryComplete(Entry entry, Object ctx) { try { long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer()); - updateResultIfNewer( - new TimeBasedBacklogQuotaCheckResult( - oldestMarkDeleteCursorInfo.getPosition(), - oldestMarkDeleteCursorInfo.getCursor().getName(), - entryTimestamp, - oldestMarkDeleteCursorInfo.getVersion())); - - boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp); - if (log.isDebugEnabled()) { - log.debug("[{}] Time based backlog quota check. Oldest unacked entry read from BK. " - + "Oldest entry in cursor {}'s backlog: {}. " - + "Oldest mark-delete position: {}. " - + "Quota {}. Last check result position [{}]. " - + "Expired: {}, entryTimestamp: {}", - topicName, - oldestMarkDeleteCursorInfo.getCursor().getName(), - position, - oldestMarkDeletePosition, - backlogQuotaLimitInSecond, - lastCheckResult.getOldestCursorMarkDeletePosition(), - expired, - entryTimestamp); - } - future.complete(expired); + new OldestPositionInfo( + oldestMarkDeleteCursorInfo.getPosition(), + oldestMarkDeleteCursorInfo.getCursor().getName(), + entryTimestamp, + oldestMarkDeleteCursorInfo.getVersion())); + future.complete(null); } catch (Exception e) { - log.error("[{}][{}] Error deserializing message for backlog check", topicName, e); - future.complete(false); + log.error("[{}][{}] Error deserializing message for update old position", topicName, e); + future.completeExceptionally(e); } finally { entry.release(); } @@ -3601,36 +3553,67 @@ public void readEntryComplete(Entry entry, Object ctx) { @Override public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - log.error("[{}][{}] Error reading entry for precise time based backlog check", + log.error("[{}][{}] Error reading entry for precise update old position", topicName, exception); - future.complete(false); + future.completeExceptionally(exception); } }, null); return future; } else { + if (!hasBacklogs(false)) { + return CompletableFuture.completedFuture(null); + } try { - if (!hasBacklogs(false)) { - return CompletableFuture.completedFuture(false); - } EstimateTimeBasedBacklogQuotaCheckResult checkResult = estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition); if (checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp() != null) { updateResultIfNewer( - new TimeBasedBacklogQuotaCheckResult( - oldestMarkDeleteCursorInfo.getPosition(), - oldestMarkDeleteCursorInfo.getCursor().getName(), - checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp(), - oldestMarkDeleteCursorInfo.getVersion())); + new OldestPositionInfo( + oldestMarkDeleteCursorInfo.getPosition(), + oldestMarkDeleteCursorInfo.getCursor().getName(), + checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp(), + oldestMarkDeleteCursorInfo.getVersion())); } - return CompletableFuture.completedFuture(checkResult.isTruncateBacklogToMatchQuota()); + return CompletableFuture.completedFuture(null); } catch (Exception e) { - log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e); - return CompletableFuture.completedFuture(false); + log.error("[{}][{}] Error reading entry for update old position", topicName, e); + return CompletableFuture.failedFuture(e); } } } + /** + * @return determine if backlog quota enforcement needs to be done for topic based on time limit + */ + public CompletableFuture checkTimeBacklogExceeded(boolean shouldUpdateOldPositionInfo) { + TopicName topicName = TopicName.get(getName()); + int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime(); + + if (log.isDebugEnabled()) { + log.debug("[{}] Time backlog quota = [{}]. Checking if exceeded.", topicName, backlogQuotaLimitInSecond); + } + CompletableFuture updateFuture = shouldUpdateOldPositionInfo ? updateOldPositionInfo() + : CompletableFuture.completedFuture(null); + return updateFuture.thenCompose(__ -> { + if (backlogQuotaLimitInSecond <= 0) { + return CompletableFuture.completedFuture(false); + } + if (oldestPositionInfo == null) { + return CompletableFuture.completedFuture(false); + } + if (!hasBacklogs(brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck())) { + return CompletableFuture.completedFuture(false); + } + long entryTimestamp = oldestPositionInfo.getPositionPublishTimestampInMillis(); + boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp); + return CompletableFuture.completedFuture(expired); + }).exceptionally(e -> { + log.error("[{}][{}] Error checking time backlog exceeded", topicName, e); + return false; + }); + } + private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck( Position markDeletePosition) throws ExecutionException, InterruptedException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java index 8feb432a08001..a26255c9f8bad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SystemTopic.java @@ -51,7 +51,7 @@ public boolean isSizeBacklogExceeded() { } @Override - public CompletableFuture checkTimeBacklogExceeded() { + public CompletableFuture checkTimeBacklogExceeded(boolean shouldUpdateOldPositionInfo) { return CompletableFuture.completedFuture(false); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 56f9f4f91246e..445e9f07a4251 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -618,6 +618,118 @@ public void backlogsStatsPreciseWithNoBacklog() throws PulsarAdminException, Pul config.setExposePreciseBacklogInPrometheus(false); } + @Test + public void backlogsAgeMetricsPreciseWithoutBacklogQuota() throws Exception { + config.setPreciseTimeBasedBacklogQuotaCheck(true); + final String namespace = "prop/ns-quota"; + assertEquals(admin.namespaces().getBacklogQuotaMap(namespace), new HashMap<>()); + + try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) + .statsInterval(0, SECONDS).build()) { + final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID(); + + final String subName1 = "c1"; + final String subName2 = "c2"; + final int numMsgs = 4; + + Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1) + .acknowledgmentGroupTime(0, SECONDS) + .subscribe(); + Consumer consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2) + .acknowledgmentGroupTime(0, SECONDS) + .subscribe(); + Producer producer = createProducer(client, topic1); + + byte[] content = new byte[1024]; + for (int i = 0; i < numMsgs; i++) { + Thread.sleep(3000); // Guarantees if we use wrong message in age, to show up in failed test + producer.send(content); + } + + String c1MarkDeletePositionBefore = + admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition; + + // Move subscription 1, one message, such that subscription 2 is the oldest + // S2 S1 + // 0 1 + Message oldestMessage = consumer1.receive(); + consumer1.acknowledge(oldestMessage); + log.info("Subscription 1 moved 1 message. Now subscription 2 is the oldest. Oldest message:"+ + oldestMessage.getMessageId()); + + c1MarkDeletePositionBefore = waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore); + waitForQuotaCheckToRunTwice(); + + Metrics metrics = prometheusMetricsClient.getMetrics(); + TopicStats topicStats = getTopicStats(topic1); + + long expectedMessageAgeSeconds = MILLISECONDS.toSeconds(System.currentTimeMillis() - oldestMessage.getPublishTime()); + assertThat(topicStats.getOldestBacklogMessageAgeSeconds()) + .isCloseTo(expectedMessageAgeSeconds, within(1L)); + + Metric backlogAgeMetric = + metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds", + Pair.of("topic", topic1)); + assertThat(backlogAgeMetric.tags).containsExactly( + entry("cluster", CLUSTER_NAME), + entry("namespace", namespace), + entry("topic", topic1)); + assertThat((long) backlogAgeMetric.value).isCloseTo(expectedMessageAgeSeconds, within(2L)); + } + config.setPreciseTimeBasedBacklogQuotaCheck(false); + } + + @Test + public void backlogsAgeMetricsNoPreciseWithoutBacklogQuota() throws Exception { + config.setPreciseTimeBasedBacklogQuotaCheck(false); + final String namespace = "prop/ns-quota"; + assertEquals(admin.namespaces().getBacklogQuotaMap(namespace), new HashMap<>()); + + try (PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) + .statsInterval(0, SECONDS).build()) { + final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID(); + + final String subName1 = "c1"; + final int numMsgs = 5; + + Consumer consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1) + .acknowledgmentGroupTime(0, SECONDS) + .subscribe(); + Producer producer = createProducer(client, topic1); + + byte[] content = new byte[1024]; + for (int i = 0; i < numMsgs; i++) { + Thread.sleep(3000); // Guarantees if we use wrong message in age, to show up in failed test + producer.send(content); + } + + Message oldestMessage = consumer1.receive(); + consumer1.acknowledge(oldestMessage); + log.info("Moved subscription 1, by 1 message"); + + // Unload topic to trigger the ledger close + unloadAndLoadTopic(topic1, producer); + long unloadTime = System.currentTimeMillis(); + waitForQuotaCheckToRunTwice(); + + Metrics metrics = prometheusMetricsClient.getMetrics(); + TopicStats topicStats = getTopicStats(topic1); + + long expectedMessageAgeSeconds = MILLISECONDS.toSeconds(System.currentTimeMillis() - unloadTime); + assertThat(topicStats.getOldestBacklogMessageAgeSeconds()) + .isCloseTo(expectedMessageAgeSeconds, within(1L)); + + Metric backlogAgeMetric = + metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds", + Pair.of("topic", topic1)); + assertThat(backlogAgeMetric.tags).containsExactly( + entry("cluster", CLUSTER_NAME), + entry("namespace", namespace), + entry("topic", topic1)); + assertThat((long) backlogAgeMetric.value).isCloseTo(expectedMessageAgeSeconds, within(2L)); + } + } + private long getReadEntries(String topic1) { return ((PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get()) .getManagedLedger().getStats().getEntriesReadTotalCount(); From e2a102f3b87765d0185e971797e43e6c6d922173 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 21 Nov 2024 17:40:45 +0800 Subject: [PATCH 2/3] Optimize code --- .../pulsar/broker/service/persistent/PersistentTopic.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index bf69ae786ca08..1900c5179322f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3489,10 +3489,14 @@ private void updateResultIfNewer(OldestPositionInfo updatedResult) { public CompletableFuture updateOldPositionInfo() { TopicName topicName = TopicName.get(getName()); - ManagedCursorContainer managedCursorContainer = (ManagedCursorContainer) ledger.getCursors(); - CursorInfo oldestMarkDeleteCursorInfo = managedCursorContainer.getCursorWithOldestPosition(); + + if (!(ledger.getCursors() instanceof ManagedCursorContainer managedCursorContainer)) { + return CompletableFuture.failedFuture(new IllegalStateException( + String.format("[%s] No valid cursors found. Skip update old position info.", topicName))); + } // If we have no durable cursor since `ledger.getCursors()` only managed durable cursors + CursorInfo oldestMarkDeleteCursorInfo = managedCursorContainer.getCursorWithOldestPosition(); if (oldestMarkDeleteCursorInfo == null || oldestMarkDeleteCursorInfo.getPosition() == null) { if (log.isDebugEnabled()) { log.debug("[{}] No durable cursor found. Skip update old position info.", topicName); From 44db547b97dfbb1b422c6a0e10251d587d6d5e1d Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Thu, 28 Nov 2024 14:43:20 +0800 Subject: [PATCH 3/3] Address comments --- .../service/persistent/PersistentTopic.java | 37 ++++++++++++------- .../service/BacklogQuotaManagerTest.java | 21 ++++++++--- 2 files changed, 38 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1900c5179322f..eb48ceee72d76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -295,6 +295,7 @@ protected TopicStatsHelper initialValue() { PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater( PersistentTopic.class, PersistentTopicAttributes.class, "persistentTopicAttributes"); + // The topic's oldest position information, if null, indicates that there is no cursor or no backlog. private volatile OldestPositionInfo oldestPositionInfo; private static final AtomicReferenceFieldUpdater TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater( @@ -2636,7 +2637,6 @@ public CompletableFuture asyncGetStats(GetStatsOptions stats.oldestBacklogMessageAgeSeconds = getBestEffortOldestUnacknowledgedMessageAgeSeconds(); stats.oldestBacklogMessageSubscriptionName = (oldestPositionInfo == null) - || !hasBacklogs(getStatsOptions.isGetPreciseBacklog()) ? null : oldestPositionInfo.getCursorName(); @@ -3462,9 +3462,6 @@ public boolean isSizeBacklogExceeded() { @Override public long getBestEffortOldestUnacknowledgedMessageAgeSeconds() { - if (!hasBacklogs(false)) { - return 0; - } if (oldestPositionInfo == null) { return -1; } else { @@ -3495,12 +3492,21 @@ public CompletableFuture updateOldPositionInfo() { String.format("[%s] No valid cursors found. Skip update old position info.", topicName))); } + if (!hasBacklogs(brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck())) { + if (log.isDebugEnabled()) { + log.debug("[{}] No backlog. Update old position info is null", topicName); + } + TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER.set(this, null); + return CompletableFuture.completedFuture(null); + } + // If we have no durable cursor since `ledger.getCursors()` only managed durable cursors CursorInfo oldestMarkDeleteCursorInfo = managedCursorContainer.getCursorWithOldestPosition(); if (oldestMarkDeleteCursorInfo == null || oldestMarkDeleteCursorInfo.getPosition() == null) { if (log.isDebugEnabled()) { - log.debug("[{}] No durable cursor found. Skip update old position info.", topicName); + log.debug("[{}] No durable cursor found. Update old position info is null", topicName); } + TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER.set(this, null); return CompletableFuture.completedFuture(null); } @@ -3527,9 +3533,6 @@ public CompletableFuture updateOldPositionInfo() { return CompletableFuture.completedFuture(null); } if (brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()) { - if (!hasBacklogs(true)) { - return CompletableFuture.completedFuture(null); - } CompletableFuture future = new CompletableFuture<>(); // Check if first unconsumed message(first message after mark delete position) // for slowest cursor's has expired. @@ -3546,6 +3549,18 @@ public void readEntryComplete(Entry entry, Object ctx) { oldestMarkDeleteCursorInfo.getCursor().getName(), entryTimestamp, oldestMarkDeleteCursorInfo.getVersion())); + if (log.isDebugEnabled()) { + log.debug("[{}] Precise based update oldest position info. " + + "Oldest unacked entry read from BK. " + + "Oldest entry in cursor {}'s backlog: {}. " + + "Oldest mark-delete position: {}. " + + "EntryTimestamp: {}", + topicName, + oldestMarkDeleteCursorInfo.getCursor().getName(), + position, + oldestMarkDeletePosition, + entryTimestamp); + } future.complete(null); } catch (Exception e) { log.error("[{}][{}] Error deserializing message for update old position", topicName, e); @@ -3564,9 +3579,6 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { }, null); return future; } else { - if (!hasBacklogs(false)) { - return CompletableFuture.completedFuture(null); - } try { EstimateTimeBasedBacklogQuotaCheckResult checkResult = estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition); @@ -3606,9 +3618,6 @@ public CompletableFuture checkTimeBacklogExceeded(boolean shouldUpdateO if (oldestPositionInfo == null) { return CompletableFuture.completedFuture(false); } - if (!hasBacklogs(brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck())) { - return CompletableFuture.completedFuture(false); - } long entryTimestamp = oldestPositionInfo.getPositionPublishTimestampInMillis(); boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp); return CompletableFuture.completedFuture(expired); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 445e9f07a4251..963dc3d26b28b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -526,7 +526,7 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce assertThat(topicStats.getBacklogSize()).isEqualTo(0); assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog()).isEqualTo(0); assertThat(topicStats.getSubscriptions().get(subName2).getMsgBacklog()).isEqualTo(0); - assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0); + assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(-1); assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull(); metrics = prometheusMetricsClient.getMetrics(); @@ -537,7 +537,7 @@ public void backlogsStatsPrecise() throws PulsarAdminException, PulsarClientExce entry("cluster", CLUSTER_NAME), entry("namespace", namespace), entry("topic", topic1)); - assertThat((long) backlogAgeMetric.value).isEqualTo(0); + assertThat((long) backlogAgeMetric.value).isEqualTo(-1); // producer should create success. Producer producer2 = createProducer(client, topic1); @@ -598,7 +598,7 @@ public void backlogsStatsPreciseWithNoBacklog() throws PulsarAdminException, Pul assertThat(topicStats.getBacklogQuotaLimitTime()).isEqualTo(timeLimitSeconds); assertThat(topicStats.getBacklogSize()).isEqualTo(0); assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog()).isEqualTo(0); - assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0); + assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(-1); assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull(); Metric backlogAgeMetric = @@ -608,7 +608,7 @@ public void backlogsStatsPreciseWithNoBacklog() throws PulsarAdminException, Pul entry("cluster", CLUSTER_NAME), entry("namespace", namespace), entry("topic", topic1)); - assertThat((long) backlogAgeMetric.value).isEqualTo(0); + assertThat((long) backlogAgeMetric.value).isEqualTo(-1); // producer should create success. Producer producer2 = createProducer(client, topic1); @@ -830,6 +830,15 @@ public void backlogsStatsNotPrecise() throws PulsarAdminException, PulsarClientE assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isEqualTo(subName2); expectedAge = MILLISECONDS.toSeconds(System.currentTimeMillis() - unloadTime); assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isCloseTo(expectedAge, within(1L)); + + // Unsubscribe consume1 and consumer2 + consumer1.unsubscribe(); + consumer2.unsubscribe(); + waitForQuotaCheckToRunTwice(); + topicStats = getTopicStats(topic1); + assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull(); + assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(-1); + config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER); } } @@ -885,11 +894,11 @@ public void backlogsStatsNotPreciseWithNoBacklog() throws PulsarAdminException, Metrics metrics = prometheusMetricsClient.getMetrics(); assertEquals(topicStats.getSubscriptions().get(subName1).getMsgBacklog(), 0); assertThat(topicStats.getOldestBacklogMessageSubscriptionName()).isNull(); - assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(0); + assertThat(topicStats.getOldestBacklogMessageAgeSeconds()).isEqualTo(-1); Metric backlogAgeMetric = metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds", Pair.of("topic", topic1)); - assertThat(backlogAgeMetric.value).isEqualTo(0); + assertThat(backlogAgeMetric.value).isEqualTo(-1); // producer should create success. Producer producer2 = createProducer(client, topic1);