From 5b9dff35f6c40f757abd6ca8ea6a650b996628a4 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 1 Aug 2023 22:37:37 +0800 Subject: [PATCH] [fix] [ml] fix discontinuous ledger deletion (#20898) ### Motivation - The task `trim ledgers` runs in the thread `BkMainThreadPool.choose(ledgerName)` - The task `write entries to BK` runs in the thread `BkMainThreadPool.choose(ledgerId)` So the two tasks above may run concurrently/ The task `trim ledgers` work as the flow below: - find the ledgers which are no longer to read, the result is `{Ledgers before the slowest read}`. - check if the `{Ledgers before the slowest read}` is out of retention policy, the result is `{Ledgers to be deleted}`. - if the create time of the ledger is lower than the earliest retention time, mark it should be deleted - if after deleting this ledger, the rest ledgers are still larger than the retention size, mark it should be deleted - delete the`{Ledgers to be deleted}` **(Highlight)** There is a scenario that causes the task `trim ledgers` did discontinuous ledger deletion, resulting consume messages discontinuous: - context: - ledgers: `[{id=1, size=100}, {id=2,size=100}]` - retention size: 150 - no cursor there - Check `ledger 1`, skip by retention check `(200 - 100) < 150` - One in-flight writing is finished, the `calculateTotalSizeWrited()` would return `300` now. - Check `ledger 2`, retention check `(300 - 100) > 150`, mark the ledger-2 should be deleted. - Delete the `ledger 2`. - Create a new consumer. It will receive messages from `[ledger-1, ledegr-3]`, but the `ledger-2` will be skipped. ### Modifications Once the retention constraint has been met, break the loop. (cherry picked from commit 782e91fe327efe2c9c9107d6c679c2837d43935b) (cherry picked from commit b87c0fb01bc6bab195987775221f5985faf68399) --- .../mledger/impl/ManagedLedgerImpl.java | 60 ++++++++++++++----- .../pulsar/common/util/LazyLoadableValue.java | 42 +++++++++++++ 2 files changed, 86 insertions(+), 16 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/util/LazyLoadableValue.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 10405134be939..61606423b4e27 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -138,6 +138,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.LazyLoadableValue; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.metadata.api.Stat; import org.slf4j.Logger; @@ -2570,15 +2571,13 @@ private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInS } } - private boolean hasLedgerRetentionExpired(long ledgerTimestamp) { - return config.getRetentionTimeMillis() >= 0 - && clock.millis() - ledgerTimestamp > config.getRetentionTimeMillis(); + private boolean hasLedgerRetentionExpired(long retentionTimeMs, long ledgerTimestamp) { + return retentionTimeMs >= 0 && clock.millis() - ledgerTimestamp > retentionTimeMs; } - private boolean isLedgerRetentionOverSizeQuota(long sizeToDelete) { + private boolean isLedgerRetentionOverSizeQuota(long retentionSizeInMB, long totalSizeOfML, long sizeToDelete) { // Handle the -1 size limit as "infinite" size quota - return config.getRetentionSizeInMB() >= 0 - && TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= config.getRetentionSizeInMB() * MegaByte; + return retentionSizeInMB >= 0 && totalSizeOfML - sizeToDelete >= retentionSizeInMB * MegaByte; } boolean isOffloadedNeedsDelete(OffloadContext offload, Optional offloadPolicies) { @@ -2675,6 +2674,11 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { } long slowestReaderLedgerId = -1; + final LazyLoadableValue slowestNonDurationLedgerId = + new LazyLoadableValue(() -> getTheSlowestNonDurationReadPosition().getLedgerId()); + final long retentionSizeInMB = config.getRetentionSizeInMB(); + final long retentionTimeMs = config.getRetentionTimeMillis(); + final long totalSizeOfML = TOTAL_SIZE_UPDATER.get(this); if (!cursors.hasDurableCursors()) { // At this point the lastLedger will be pointing to the // ledger that has just been closed, therefore the +1 to @@ -2697,7 +2701,10 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { long totalSizeToDelete = 0; // skip ledger if retention constraint met - for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) { + Iterator ledgerInfoIterator = + ledgers.headMap(slowestReaderLedgerId, false).values().iterator(); + while (ledgerInfoIterator.hasNext()){ + LedgerInfo ls = ledgerInfoIterator.next(); // currentLedger can not be deleted if (ls.getLedgerId() == currentLedger.getId()) { if (log.isDebugEnabled()) { @@ -2717,8 +2724,9 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { } totalSizeToDelete += ls.getSize(); - boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(totalSizeToDelete); - boolean expired = hasLedgerRetentionExpired(ls.getTimestamp()); + boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(retentionSizeInMB, totalSizeOfML, + totalSizeToDelete); + boolean expired = hasLedgerRetentionExpired(retentionTimeMs, ls.getTimestamp()); if (log.isDebugEnabled()) { log.debug( "[{}] Checking ledger {} -- time-old: {} sec -- " @@ -2735,14 +2743,19 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { } ledgersToDelete.add(ls); } else { - if (ls.getLedgerId() < getTheSlowestNonDurationReadPosition().getLedgerId()) { - // once retention constraint has been met, skip check - if (log.isDebugEnabled()) { - log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, - ls.getLedgerId()); - } - invalidateReadHandle(ls.getLedgerId()); + // once retention constraint has been met, skip check + if (log.isDebugEnabled()) { + log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId()); } + releaseReadHandleIfNoLongerRead(ls.getLedgerId(), slowestNonDurationLedgerId.getValue()); + break; + } + } + + while (ledgerInfoIterator.hasNext()) { + LedgerInfo ls = ledgerInfoIterator.next(); + if (!releaseReadHandleIfNoLongerRead(ls.getLedgerId(), slowestNonDurationLedgerId.getValue())) { + break; } } @@ -2828,6 +2841,21 @@ public void operationFailed(MetaStoreException e) { } } + /** + * @param ledgerId the ledger handle which maybe will be released. + * @return if the ledger handle was released. + */ + private boolean releaseReadHandleIfNoLongerRead(long ledgerId, long slowestNonDurationLedgerId) { + if (ledgerId < slowestNonDurationLedgerId) { + if (log.isDebugEnabled()) { + log.debug("[{}] Ledger {} no longer needs to be read, close the cached readHandle", name, ledgerId); + } + invalidateReadHandle(ledgerId); + return true; + } + return false; + } + protected void doDeleteLedgers(List ledgersToDelete) { PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; // Update metadata diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/LazyLoadableValue.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/LazyLoadableValue.java new file mode 100644 index 0000000000000..063d434a64fc0 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/LazyLoadableValue.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.util; + +import java.util.function.Supplier; + +/*** + * Used to lazy load a value, only calculate it when used. Not thread-safety. + */ +public class LazyLoadableValue { + + private Supplier loader; + + private T value; + + public LazyLoadableValue(Supplier loader) { + this.loader = loader; + } + + public T getValue() { + if (value == null) { + value = loader.get(); + } + return value; + } +}