Skip to content

Commit

Permalink
[fix] [ml] fix discontinuous ledger deletion (apache#20898)
Browse files Browse the repository at this point in the history
### Motivation

- The task `trim ledgers` runs in the thread `BkMainThreadPool.choose(ledgerName)`
- The task `write entries to BK` runs in the thread `BkMainThreadPool.choose(ledgerId)`

So the two tasks above may run concurrently/

The task `trim ledgers` work as the flow below:
- find the ledgers which are no longer to read, the result is `{Ledgers before the slowest read}`.
- check if the `{Ledgers before the slowest read}` is out of retention policy, the result is `{Ledgers to be deleted}`.
  - if the create time of the ledger is lower than the earliest retention time, mark it should be deleted
  - if after deleting this ledger, the rest ledgers are still larger than the retention size, mark it should be deleted
- delete the`{Ledgers to be deleted}`

**(Highlight)** There is a scenario that causes the task `trim ledgers` did  discontinuous ledger deletion, resulting consume messages discontinuous:
- context:
  - ledgers: `[{id=1, size=100}, {id=2,size=100}]`
  - retention size: 150
  - no cursor there
- Check `ledger 1`, skip by retention check `(200 - 100) < 150`
- One in-flight writing is finished, the `calculateTotalSizeWrited()` would return `300` now.
- Check `ledger 2`, retention check `(300 - 100) > 150`, mark the ledger-2 should be deleted.
- Delete the `ledger 2`.
- Create a new consumer. It will receive messages from `[ledger-1, ledegr-3]`, but the `ledger-2` will be skipped.

### Modifications

Once the retention constraint has been met, break the loop.

(cherry picked from commit 782e91f)
(cherry picked from commit b87c0fb)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Apr 23, 2024
1 parent 7352c8f commit 5b9dff3
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.LazyLoadableValue;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.Stat;
import org.slf4j.Logger;
Expand Down Expand Up @@ -2570,15 +2571,13 @@ private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInS
}
}

private boolean hasLedgerRetentionExpired(long ledgerTimestamp) {
return config.getRetentionTimeMillis() >= 0
&& clock.millis() - ledgerTimestamp > config.getRetentionTimeMillis();
private boolean hasLedgerRetentionExpired(long retentionTimeMs, long ledgerTimestamp) {
return retentionTimeMs >= 0 && clock.millis() - ledgerTimestamp > retentionTimeMs;
}

private boolean isLedgerRetentionOverSizeQuota(long sizeToDelete) {
private boolean isLedgerRetentionOverSizeQuota(long retentionSizeInMB, long totalSizeOfML, long sizeToDelete) {
// Handle the -1 size limit as "infinite" size quota
return config.getRetentionSizeInMB() >= 0
&& TOTAL_SIZE_UPDATER.get(this) - sizeToDelete >= config.getRetentionSizeInMB() * MegaByte;
return retentionSizeInMB >= 0 && totalSizeOfML - sizeToDelete >= retentionSizeInMB * MegaByte;
}

boolean isOffloadedNeedsDelete(OffloadContext offload, Optional<OffloadPolicies> offloadPolicies) {
Expand Down Expand Up @@ -2675,6 +2674,11 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
}

long slowestReaderLedgerId = -1;
final LazyLoadableValue<Long> slowestNonDurationLedgerId =
new LazyLoadableValue(() -> getTheSlowestNonDurationReadPosition().getLedgerId());
final long retentionSizeInMB = config.getRetentionSizeInMB();
final long retentionTimeMs = config.getRetentionTimeMillis();
final long totalSizeOfML = TOTAL_SIZE_UPDATER.get(this);
if (!cursors.hasDurableCursors()) {
// At this point the lastLedger will be pointing to the
// ledger that has just been closed, therefore the +1 to
Expand All @@ -2697,7 +2701,10 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {

long totalSizeToDelete = 0;
// skip ledger if retention constraint met
for (LedgerInfo ls : ledgers.headMap(slowestReaderLedgerId, false).values()) {
Iterator<LedgerInfo> ledgerInfoIterator =
ledgers.headMap(slowestReaderLedgerId, false).values().iterator();
while (ledgerInfoIterator.hasNext()){
LedgerInfo ls = ledgerInfoIterator.next();
// currentLedger can not be deleted
if (ls.getLedgerId() == currentLedger.getId()) {
if (log.isDebugEnabled()) {
Expand All @@ -2717,8 +2724,9 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
}

totalSizeToDelete += ls.getSize();
boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(totalSizeToDelete);
boolean expired = hasLedgerRetentionExpired(ls.getTimestamp());
boolean overRetentionQuota = isLedgerRetentionOverSizeQuota(retentionSizeInMB, totalSizeOfML,
totalSizeToDelete);
boolean expired = hasLedgerRetentionExpired(retentionTimeMs, ls.getTimestamp());
if (log.isDebugEnabled()) {
log.debug(
"[{}] Checking ledger {} -- time-old: {} sec -- "
Expand All @@ -2735,14 +2743,19 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture<?> promise) {
}
ledgersToDelete.add(ls);
} else {
if (ls.getLedgerId() < getTheSlowestNonDurationReadPosition().getLedgerId()) {
// once retention constraint has been met, skip check
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name,
ls.getLedgerId());
}
invalidateReadHandle(ls.getLedgerId());
// once retention constraint has been met, skip check
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
}
releaseReadHandleIfNoLongerRead(ls.getLedgerId(), slowestNonDurationLedgerId.getValue());
break;
}
}

while (ledgerInfoIterator.hasNext()) {
LedgerInfo ls = ledgerInfoIterator.next();
if (!releaseReadHandleIfNoLongerRead(ls.getLedgerId(), slowestNonDurationLedgerId.getValue())) {
break;
}
}

Expand Down Expand Up @@ -2828,6 +2841,21 @@ public void operationFailed(MetaStoreException e) {
}
}

/**
* @param ledgerId the ledger handle which maybe will be released.
* @return if the ledger handle was released.
*/
private boolean releaseReadHandleIfNoLongerRead(long ledgerId, long slowestNonDurationLedgerId) {
if (ledgerId < slowestNonDurationLedgerId) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} no longer needs to be read, close the cached readHandle", name, ledgerId);
}
invalidateReadHandle(ledgerId);
return true;
}
return false;
}

protected void doDeleteLedgers(List<LedgerInfo> ledgersToDelete) {
PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.util;

import java.util.function.Supplier;

/***
* Used to lazy load a value, only calculate it when used. Not thread-safety.
*/
public class LazyLoadableValue<T> {

private Supplier<T> loader;

private T value;

public LazyLoadableValue(Supplier<T> loader) {
this.loader = loader;
}

public T getValue() {
if (value == null) {
value = loader.get();
}
return value;
}
}

0 comments on commit 5b9dff3

Please sign in to comment.