diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java index 6eccd029cbaf5..fa3141b9cc627 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java @@ -127,4 +127,10 @@ public interface ManagedLedgerMXBean { long[] getLedgerAddEntryLatencyBuckets(); StatsBuckets getInternalLedgerAddEntryLatencyBuckets(); + + long getNumberOfLedgersMarkedDeletable(); + + long getNumberOfLedgersBeingDeleted(); + + long getNumberOfLedgersExceededMaxRetryCount(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 23fd63ad3baf8..a2d03374d7835 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -30,6 +30,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Queues; import com.google.common.collect.Range; +import com.google.common.collect.Sets; +import com.google.common.io.BaseEncoding; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.Recycler; @@ -37,6 +39,7 @@ import io.netty.util.ReferenceCountUtil; import java.io.IOException; import java.time.Clock; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -52,6 +55,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; @@ -68,6 +72,7 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -146,6 +151,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected static final int AsyncOperationTimeoutSeconds = 30; + protected static final String DELETABLE_LEDGER_MARKER_KEYWORD = "DL"; + protected static final String DELETABLE_LEDGER_MARKER_PREFIX = DELETABLE_LEDGER_MARKER_KEYWORD + ":"; + protected static final String DELETABLE_OFFLOADED_LEDGER_MARKER_KEYWORD = "DOL"; + protected static final String DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX = + DELETABLE_OFFLOADED_LEDGER_MARKER_KEYWORD + ":"; + protected static final String DELETABLE_LEDGER_PLACEHOLDER = ""; + protected final BookKeeper bookKeeper; protected final String name; private final Map ledgerMetadata; @@ -290,6 +302,12 @@ public enum PositionBound { @VisibleForTesting Map createdLedgerCustomMetadata; + /** + * Retry counter for deletable ledgers. The counter info should not be persisted to the metadata store + * so that it can retry after topic reload. + */ + protected ConcurrentLongHashMap deletableLedgerRetryCounter = new ConcurrentLongHashMap<>(); + public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, final String name) { @@ -2523,7 +2541,25 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { advanceCursorsIfNecessary(ledgersToDelete); PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; + // Update metadata + // Mark deletable ledgers + Set deletableLedgers = Stream + .concat( + ledgersToDelete.stream() + .filter(ls -> !ls.getOffloadContext().getBookkeeperDeleted()), + offloadedLedgersToDelete.stream()) + .map(LedgerInfo::getLedgerId) + .collect(Collectors.toSet()); + + // Mark deletable offloaded ledgers + Set deletableOffloadedLedgers = ledgersToDelete.stream() + .filter(ls -> ls.getOffloadContext().hasUidMsb()) + .map(LedgerInfo::getLedgerId) + .collect(Collectors.toSet()); + + markDeletableLedgers(deletableLedgers, deletableOffloadedLedgers); + for (LedgerInfo ls : ledgersToDelete) { if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) { // this info is relevant because the lastMessageId won't be available anymore @@ -2532,12 +2568,11 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { } invalidateReadHandle(ls.getLedgerId()); - ledgers.remove(ls.getLedgerId()); + entryCache.invalidateAllEntries(ls.getLedgerId()); + NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries()); TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize()); - - entryCache.invalidateAllEntries(ls.getLedgerId()); } for (LedgerInfo ls : offloadedLedgersToDelete) { LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); @@ -2563,16 +2598,10 @@ public void operationComplete(Void result, Stat stat) { metadataMutex.unlock(); trimmerMutex.unlock(); - for (LedgerInfo ls : ledgersToDelete) { - log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); - asyncDeleteLedger(ls.getLedgerId(), ls); - } - for (LedgerInfo ls : offloadedLedgersToDelete) { - log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(), - ls.getSize()); - asyncDeleteLedgerFromBookKeeper(ls.getLedgerId()); - } promise.complete(null); + + // perform actual deletion in background + scheduledExecutor.submit(safeRun(() -> tryRemoveAllDeletableLedgers())); } @Override @@ -2718,28 +2747,35 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { } } - private void asyncDeleteLedgerFromBookKeeper(long ledgerId) { - asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); - } - - private void asyncDeleteLedger(long ledgerId, LedgerInfo info) { - if (!info.getOffloadContext().getBookkeeperDeleted()) { - // only delete if it hasn't been previously deleted for offload - asyncDeleteLedger(ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); - } - + private void asyncDeleteOffloadedLedger(long ledgerId, LedgerInfo info, int retry, DeleteLedgerCallback callback) { if (info.getOffloadContext().hasUidMsb()) { UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb()); cleanupOffloaded(ledgerId, uuid, OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()), OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()), - "Trimming"); + "Trimming", retry, callback); } } private void asyncDeleteLedger(long ledgerId, long retry) { + asyncDeleteLedger(ledgerId, retry, new DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + + } + }, 0, null); + } + + private void asyncDeleteLedger( + long ledgerId, long retry, DeleteLedgerCallback callback, int lastRc, Object lastCtx) { if (retry <= 0) { log.warn("[{}] Failed to delete ledger after retries {}", name, ledgerId); + callback.deleteLedgerFailed(createManagedLedgerException(lastRc), lastCtx); return; } bookKeeper.asyncDeleteLedger(ledgerId, (rc, ctx) -> { @@ -2747,13 +2783,16 @@ private void asyncDeleteLedger(long ledgerId, long retry) { log.warn("[{}] Ledger was already deleted {}", name, ledgerId); } else if (rc != BKException.Code.OK) { log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc)); - scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)), + scheduledExecutor.schedule( + safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1, callback, rc, ctx)), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); + return; } else { if (log.isDebugEnabled()) { log.debug("[{}] Deleted ledger {}", name, ledgerId); } } + callback.deleteLedgerComplete(ctx); }, null); } @@ -3147,26 +3186,70 @@ private CompletableFuture completeLedgerInfoForOffloaded(long ledgerId, UU }); } + private void cleanupOffloaded(long ledgerId, UUID uuid, String offloadDriverName, + Map offloadDriverMetadata, String cleanupReason) { + cleanupOffloaded( + ledgerId, + uuid, + offloadDriverName, + offloadDriverMetadata, + cleanupReason, + DEFAULT_LEDGER_DELETE_RETRIES, + new DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + + } + }); + } + private void cleanupOffloaded(long ledgerId, UUID uuid, String offloadDriverName, /* * TODO: use driver name to * identify offloader */ - Map offloadDriverMetadata, String cleanupReason) { + Map offloadDriverMetadata, String cleanupReason, int retry, DeleteLedgerCallback callback) { + if (retry <= 0) { + log.warn("[{}] Failed to delete offloaded ledger after retries {} / {}", name, ledgerId, uuid); + callback.deleteLedgerFailed( + new ManagedLedgerException("Failed to delete offloaded ledger after retries"), null); + return; + } + log.info("[{}] Cleanup offload for ledgerId {} uuid {} because of the reason {}.", name, ledgerId, uuid.toString(), cleanupReason); Map metadataMap = Maps.newHashMap(); metadataMap.putAll(offloadDriverMetadata); metadataMap.put("ManagedLedgerName", name); - Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toHours(1)).limit(10), - Retries.NonFatalPredicate, - () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, metadataMap), - scheduledExecutor, name).whenComplete((ignored, exception) -> { - if (exception != null) { - log.warn("[{}] Error cleaning up offload for {}, (cleanup reason: {})", - name, ledgerId, cleanupReason, exception); - } - }); + try { + config.getLedgerOffloader() + .deleteOffloaded(ledgerId, uuid, metadataMap) + .whenComplete((ignored, exception) -> { + if (exception != null) { + log.warn("[{}] Error cleaning up offload for {}, (cleanup reason: {})", + name, ledgerId, cleanupReason, exception); + scheduledExecutor.schedule( + safeRun(() -> cleanupOffloaded( + ledgerId, + uuid, + offloadDriverName, + offloadDriverMetadata, + cleanupReason, + retry - 1, + callback)), + DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS); + return; + } + callback.deleteLedgerComplete(null); + }); + } catch (Exception e) { + log.warn("[{}] Failed to cleanup offloaded ledgers.", name, e); + } } /** @@ -4049,6 +4132,208 @@ public CompletableFuture getManagedLedgerInternalSta return statFuture; } + /** + * During the execution of this method, lock {@code metadataMutex} needs to be held + * because the {@code propertiesMap} would be updated (not thread-safe). + * @param deletableLedgerIds + */ + private void markDeletableLedgers(Collection deletableLedgerIds, + Collection deletableOffloadedLedgerIds) { + for (Long ledgerId : deletableLedgerIds) { + final String deletableLedgerMarker = DELETABLE_LEDGER_MARKER_PREFIX + ledgerId; + propertiesMap.put(deletableLedgerMarker, DELETABLE_LEDGER_PLACEHOLDER); + mbean.addLedgerMarkedDeletableCounter(); + } + for (Long ledgerId : deletableOffloadedLedgerIds) { + final String deletableOffloadedLedgerMarker = DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId; + // Offload context info is required in ledger cleanup, therefore the serialized info object + // is kept in the propertiesMap until the ledger deletion is done + final String offloadedLedgerInfo = BaseEncoding.base64().encode(ledgers.get(ledgerId).toByteArray()); + propertiesMap.put(deletableOffloadedLedgerMarker, offloadedLedgerInfo); + mbean.addLedgerMarkedDeletableCounter(); + } + } + + private Set getAllDeletableLedgers(String prefix) { + Set deletableLedgers = propertiesMap.keySet().stream() + .filter(k -> k.startsWith(prefix)) + .map(k -> { + Long ledgerId = Long.parseLong(k.substring(prefix.length())); + if (deletableLedgerRetryCounter.containsKey(ledgerId) + && deletableLedgerRetryCounter.get(ledgerId).get() >= DEFAULT_LEDGER_DELETE_RETRIES) { + log.error("[{}] Cannot delete ledger:{} after {} reties and now stop retrying on this broker", + name, ledgerId, DEFAULT_LEDGER_DELETE_RETRIES); + return null; + } + return ledgerId; + }) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + if (!deletableLedgers.isEmpty()) { + return deletableLedgers; + } + return Sets.newHashSet(); + } + + private Set getAllDeletableLedgers() { + return getAllDeletableLedgers(DELETABLE_LEDGER_MARKER_PREFIX); + } + + private Set getAllDeletableOffloadedLedgers() { + return getAllDeletableLedgers(DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX); + } + + /** + * During the execution of this method, lock {@code metadataMutex} needs to be held + * because the {@code propertiesMap} would be updated (not thread-safe). + */ + private void removeAllDeletableLedgers() { + Set deletableLedgers = getAllDeletableLedgers(); + Set deletableOffloadedLedgers = getAllDeletableOffloadedLedgers(); + final CountDownLatch counter = new CountDownLatch(deletableLedgers.size() + deletableOffloadedLedgers.size()); + + Set finishedDeletedLedgers = ConcurrentHashMap.newKeySet(); + Set finishedDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet(); + Set timeoutDeletedLedgers = ConcurrentHashMap.newKeySet(); + + Set succeedDeletedLedgers = ConcurrentHashMap.newKeySet(); + Set failDeletedLedgers = ConcurrentHashMap.newKeySet(); + + Set succeedDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet(); + Set failDeletedOffloadedLedgers = ConcurrentHashMap.newKeySet(); + + for (Long deletableLedger : deletableLedgers) { + asyncDeleteLedger(deletableLedger, DEFAULT_LEDGER_DELETE_RETRIES, + new DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + counter.countDown(); + finishedDeletedLedgers.add(deletableLedger); + succeedDeletedLedgers.add(deletableLedger); + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}] Failed to delete bookkeeper ledger:{} due to", + name, deletableLedger, exception); + counter.countDown(); + finishedDeletedLedgers.add(deletableLedger); + failDeletedLedgers.add(deletableLedger); + } + }, 0, null); + } + + for (Long deletableOffloadedLedger : deletableOffloadedLedgers) { + final String deletableOffloadedLedgerMarker = + DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + deletableOffloadedLedger; + + try { + final LedgerInfo deletableOffloadedLedgerInfo = LedgerInfo.parseFrom( + BaseEncoding.base64().decode(propertiesMap.get(deletableOffloadedLedgerMarker))); + asyncDeleteOffloadedLedger(deletableOffloadedLedger, deletableOffloadedLedgerInfo, + DEFAULT_LEDGER_DELETE_RETRIES, + new DeleteLedgerCallback() { + @Override + public void deleteLedgerComplete(Object ctx) { + counter.countDown(); + finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); + succeedDeletedOffloadedLedgers.add(deletableOffloadedLedger); + } + + @Override + public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}] Failed to delete offloaded ledger:{} due to", + name, deletableOffloadedLedger, exception); + counter.countDown(); + finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); + failDeletedOffloadedLedgers.add(deletableOffloadedLedger); + } + }); + } catch (Exception e) { + log.warn("[{}] Failed to retrieve offloaded ledger info of {} due to", + name, deletableOffloadedLedger, e); + counter.countDown(); + finishedDeletedOffloadedLedgers.add(deletableOffloadedLedger); + failDeletedOffloadedLedgers.add(deletableOffloadedLedger); + } + } + + try { + if (!counter.await(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS)) { + for (Long deletableLedger : deletableLedgers) { + if (!finishedDeletedLedgers.contains(deletableLedger)) { + log.warn("[{}] Failed to delete ledger:{} due to operation timeout={}s", + name, deletableLedger, AsyncOperationTimeoutSeconds); + timeoutDeletedLedgers.add(deletableLedger); + } + } + for (Long deletableOffloadedLedger : deletableOffloadedLedgers) { + if (!finishedDeletedOffloadedLedgers.contains(deletableOffloadedLedger)) { + log.warn("[{}] Failed to delete offloaded ledger:{} due to operation timeout={}s", + name, deletableOffloadedLedger, AsyncOperationTimeoutSeconds); + timeoutDeletedLedgers.add(deletableOffloadedLedger); + } + } + } + + // remove markers after deleting ledgers + for (Long ledgerId : succeedDeletedLedgers) { + final String deletableLedgerMarker = DELETABLE_LEDGER_MARKER_PREFIX + ledgerId; + propertiesMap.remove(deletableLedgerMarker); + if (deletableLedgerRetryCounter.containsKey(ledgerId)) { + deletableLedgerRetryCounter.remove(ledgerId); + } + mbean.addLedgerDeletedAfterMarkedCounter(); + } + for (Long ledgerId : succeedDeletedOffloadedLedgers) { + final String deletableLedgerMarker = DELETABLE_OFFLOADED_LEDGER_MARKER_PREFIX + ledgerId; + propertiesMap.remove(deletableLedgerMarker); + if (deletableLedgerRetryCounter.containsKey(ledgerId)) { + deletableLedgerRetryCounter.remove(ledgerId); + } + mbean.addLedgerDeletedAfterMarkedCounter(); + } + + // update retry count to track whether the max limit is reached + Set allFailedLedgers = new HashSet<>(); + allFailedLedgers.addAll(failDeletedLedgers); + allFailedLedgers.addAll(failDeletedOffloadedLedgers); + allFailedLedgers.addAll(timeoutDeletedLedgers); + + if (allFailedLedgers.isEmpty()) { + log.info("[{}] ledgers: {} and offloaded ledgers: {} are deleted successfully.", + name, deletableLedgers, deletableOffloadedLedgers); + } else { + for (Long failDeletedLedger : allFailedLedgers) { + deletableLedgerRetryCounter + .computeIfAbsent(failDeletedLedger, k -> new AtomicInteger()).incrementAndGet(); + } + } + + if (log.isDebugEnabled()) { + log.debug("[{}] Successfully delete bookkeeper ledgers: {} and offloaded ledgers: {}. " + + "Failed to delete bookkeeper ledgers: {} and offloaded ledgers: {}. " + + "Timeout ledgers: {}", name, + succeedDeletedLedgers, succeedDeletedOffloadedLedgers, + failDeletedLedgers, failDeletedOffloadedLedgers, + timeoutDeletedLedgers); + } + } catch (Exception e) { + // Avoid modifying the existing meta-information so that + // it can trigger a retry in the next check + log.error("[{}] Failed to update metadata after ledger deletion", name); + } + } + + private void tryRemoveAllDeletableLedgers() { + if (!metadataMutex.tryLock()) { + scheduledExecutor.schedule(safeRun(() -> tryRemoveAllDeletableLedgers()), 100, TimeUnit.MILLISECONDS); + } else { + removeAllDeletableLedgers(); + metadataMutex.unlock(); + } + } + public CompletableFuture> getEnsemblesAsync(long ledgerId) { LedgerInfo ledgerInfo = ledgers.get(ledgerId); if (ledgerInfo != null && ledgerInfo.hasOffloadContext()) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java index f13aa2aa2e82c..dfb63dd8bcd13 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java @@ -50,6 +50,9 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean { private final LongAdder cursorLedgerCreateOp = new LongAdder(); private final LongAdder cursorLedgerDeleteOp = new LongAdder(); + private final LongAdder ledgerMarkedDeletableCounter = new LongAdder(); + private final LongAdder ledgerDeletedAfterMarkedCounter = new LongAdder(); + // addEntryLatencyStatsUsec measure total latency including time entry spent while waiting in queue private final StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC); // ledgerAddEntryLatencyStatsUsec measure latency to persist entry into ledger @@ -174,6 +177,14 @@ public void endCursorLedgerDeleteOp() { cursorLedgerDeleteOp.decrement(); } + public void addLedgerMarkedDeletableCounter() { + ledgerMarkedDeletableCounter.increment(); + } + + public void addLedgerDeletedAfterMarkedCounter() { + ledgerDeletedAfterMarkedCounter.increment(); + } + @Override public String getName() { return managedLedger.getName(); @@ -319,4 +330,20 @@ public PendingBookieOpsStats getPendingBookieOpsStats() { return result; } + @Override + public long getNumberOfLedgersMarkedDeletable() { + return ledgerMarkedDeletableCounter.longValue(); + } + + @Override + public long getNumberOfLedgersBeingDeleted() { + return ledgerDeletedAfterMarkedCounter.longValue(); + } + + @Override + public long getNumberOfLedgersExceededMaxRetryCount() { + return managedLedger.deletableLedgerRetryCounter.values().stream() + .filter(c -> c.get() >= ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES) + .count(); + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 982b91486a0ef..0af38d0503a58 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2103,6 +2103,46 @@ public void testRetentionSize() throws Exception { }); } + @Test + public void testTwoPhraseDeletion() throws Exception { + @Cleanup("shutdown") + ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(0); + config.setMaxEntriesPerLedger(1); + config.setRetentionTime(1, TimeUnit.SECONDS); + config.setMaximumRolloverTime(1, TimeUnit.SECONDS); + + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open("two_phrase_deletion", config); + ManagedCursor c1 = ml.openCursor("testCursor1"); + ml.addEntry("m1".getBytes()); + ml.addEntry("m2".getBytes()); + c1.skipEntries(2, IndividualDeletedEntries.Exclude); + // let current ledger close + ml.rollCurrentLedgerIfFull(); + + // the closed and expired ledger should be deleted + Awaitility.await() + .pollDelay(1500, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertTrue(ml.getLedgersInfoAsList().size() <= 1); + assertEquals(ml.getTotalSize(), 0); + }); + // delete the expired ledger + ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null)); + + // check metrics within each phrase + Awaitility.await() + .pollInterval(100, TimeUnit.MILLISECONDS) + .atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> { + assertEquals(ml.mbean.getNumberOfLedgersMarkedDeletable(), 2); + assertEquals(ml.mbean.getNumberOfLedgersBeingDeleted(), 2); + assertEquals(ml.mbean.getNumberOfLedgersExceededMaxRetryCount(), 0); + }); + ml.close(); + } + @Test public void testTimestampOnWorkingLedger() throws Exception { ManagedLedgerConfig conf = new ManagedLedgerConfig(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java index 8a9dd4a3da6cd..b7a070bb11038 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerMetrics.java @@ -128,6 +128,16 @@ private List aggregate(Map> ledgersByD statsPeriodSeconds); populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_MarkDeleteRate", lStats.getMarkDeleteRate()); + + // collect metrics for ledger deletion + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_NumberOfLedgersMarkedDeletable", + (double) lStats.getNumberOfLedgersMarkedDeletable()); + + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_NumberOfLedgersBeingDeleted", + (double) lStats.getNumberOfLedgersBeingDeleted()); + + populateAggregationMapWithSum(tempAggregatedMetricsMap, "brk_ml_NumberOfLedgersExceededMaxRetryCount", + (double) lStats.getNumberOfLedgersExceededMaxRetryCount()); } // SUM up collections of each metrics